在RxJs 5中分享Angular 2 Httpnetworking调用的结果的正确方法是什么?

通过使用Http,我们调用一个方法来进行networking调用并返回一个http observable:

getCustomer() { return this.http.get('/someUrl').map(res => res.json()); } 

如果我们把这个可观察的,并添加多个用户:

 let network$ = getCustomer(); let subscriber1 = network$.subscribe(...); let subscriber2 = network$.subscribe(...); 

我们想要做的是确保这不会导致多个networking请求。

这可能看起来像一个不寻常的场景,但其实很常见:例如,如果调用者订阅observable以显示错误消息,并使用asynchronouspipe道将其传递到模板,则我们已经有两个订阅者。

在RxJs 5中做这件事的正确方法是什么?

也就是说,这似乎工作正常:

 getCustomer() { return this.http.get('/someUrl').map(res => res.json()).share(); } 

但这是RxJs 5中这种做法的惯用方式,还是我们应该做一些其他的事情?

caching数据,如果caching可用,则返回此请求,否则发出HTTP请求。

 import {Injectable} from '@angular/core'; import {Http, Headers} from '@angular/http'; import {Observable} from 'rxjs/Observable'; import 'rxjs/add/observable/of'; //proper way to import the 'of' operator import 'rxjs/add/operator/share'; import 'rxjs/add/operator/map'; import {Data} from './data'; @Injectable() export class DataService { private url:string = 'https://cors-test.appspot.com/test'; private data: Data; private observable: Observable<any>; constructor(private http:Http) {} getData() { if(this.data) { // if `data` is available just return it as `Observable` return Observable.of(this.data); } else if(this.observable) { // if `this.observable` is set then the request is in progress // return the `Observable` for the ongoing request return this.observable; } else { // example header (not necessary) let headers = new Headers(); headers.append('Content-Type', 'application/json'); // create the request, store the `Observable` for subsequent subscribers this.observable = this.http.get(this.url, { headers: headers }) .map(response => { // when the cached data is available we don't need the `Observable` reference anymore this.observable = null; if(response.status == 400) { return "FAILURE"; } else if(response.status == 200) { this.data = new Data(response.json()); return this.data; } // make it shared so more than one subscriber can get the result }) .share(); return this.observable; } } } 

Plunker例子

根据@Cristian的build议,这是一种适用于HTTP可观察性的方法,它只发出一次,然后完成:

 getCustomer() { return this.http.get('/someUrl') .map(res => res.json()).publishLast().refCount(); } 

根据这篇文章

事实certificate,我们可以通过添加publishReplay(1)和refCount轻松地将caching添加到observable。

所以里面如果语句只是追加

 .publishReplay(1) .refCount(); 

.map(...)

更新:Ben Lesh说5.2.0之后的下一个小版本,你可以调用shareReplay()来真正caching。

先前…..

首先,不要使用share()或publishReplay(1).refCount(),它们是相同的,并且它的问题在于,只有在observable处于活动状态时才进行连接,如果在连接完成后连接,它再次创build一个新的可观察的,翻译,而不是真的caching。

Birowski上面给出了正确的解决scheme,即使用ReplaySubject。 ReplaySubject会在我们的例子1中caching你给它的值(bufferSize)。一旦refCount达到零,你就不会创build一个像share()这样的新的observable,并且你build立一个新的连接,这是正确的caching行为。

这是一个可重用的function

 export function cacheable<T>(o: Observable<T>): Observable<T> { let replay = new ReplaySubject<T>(1); o.subscribe( x => replay.next(x), x => replay.error(x), () => replay.complete() ); return replay.asObservable(); } 

以下是如何使用它

 import { Injectable } from '@angular/core'; import { Http } from '@angular/http'; import { Observable } from 'rxjs/Observable'; import { cacheable } from '../utils/rxjs-functions'; @Injectable() export class SettingsService { _cache: Observable<any>; constructor(private _http: Http, ) { } refresh = () => { if (this._cache) { return this._cache; } return this._cache = cacheable<any>(this._http.get('YOUR URL')); } } 

下面是可caching函数的更高级版本这个允许有自己的查找表+提供自定义查找表的能力。 这样,你不必像上面的例子那样检查this._cache。 还要注意,不是传递observable作为第一个参数,而是传递一个返回observables的函数,这是因为Angular的Http立即执行,所以通过返回一个惰性执行函数,我们可以决定不调用它,如果它已经在我们的caching。

 let cacheableCache: { [key: string]: Observable<any> } = {}; export function cacheable<T>(returnObservable: () => Observable<T>, key?: string, customCache?: { [key: string]: Observable<T> }): Observable<T> { if (!!key && (customCache || cacheableCache)[key]) { return (customCache || cacheableCache)[key] as Observable<T>; } let replay = new ReplaySubject<T>(1); returnObservable().subscribe( x => replay.next(x), x => replay.error(x), () => replay.complete() ); let observable = replay.asObservable(); if (!!key) { if (!!customCache) { customCache[key] = observable; } else { cacheableCache[key] = observable; } } return observable; } 

用法:

 getData() => cacheable(this._http.get("YOUR URL"), "this is key for my cache") 

rxjs 5.4.0有一个新的shareReplay方法。

  • rx-book shareReplay()
  • 没有文档在reactivex.io/rxjs

作者明确表示“处理cachingAJAX结果的理想select”

rxjs PR#2443 feat(shareReplay):增加了shareReplaypublishReplay

shareReplay返回一个观察值,它是通过ReplaySubject进行多播的源。 该重播主题在来源上被错误地回收,而不是源文件的完成。 这使得shareReplay非常适合处理像cachingAJAX结果这样的事情,因为它是可重试的。 这是重复的行为,然而,不同的是,它不会重复源观察,而是重复源观察值。

你select的实现将取决于你是否想要取消订阅()来取消你的HTTP请求。

无论如何, TypeScript装饰器是标准化行为的一个好方法。 这是我写的一个:

  @CacheObservableArgsKey getMyThing(id: string): Observable<any> { return this.http.get('things/'+id); } 

装饰者定义:

 /** * Decorator that replays and connects to the Observable returned from the function. * Caches the result using all arguments to form a key. * @param target * @param name * @param descriptor * @returns {PropertyDescriptor} */ export function CacheObservableArgsKey(target: Object, name: string, descriptor: PropertyDescriptor) { const originalFunc = descriptor.value; const cacheMap = new Map<string, any>(); descriptor.value = function(this: any, ...args: any[]): any { const key = args.join('::'); let returnValue = cacheMap.get(key); if (returnValue !== undefined) { console.log(`${name} cache-hit ${key}`, returnValue); return returnValue; } returnValue = originalFunc.apply(this, args); console.log(`${name} cache-miss ${key} new`, returnValue); if (returnValue instanceof Observable) { returnValue = returnValue.publishReplay(1); returnValue.connect(); } else { console.warn('CacheHttpArgsKey: value not an Observable cannot publishReplay and connect', returnValue); } cacheMap.set(key, returnValue); return returnValue; }; return descriptor; } 

我主演了这个问题,但是我会尽力去解决这个问题。

 //this will be the shared observable that //anyone can subscribe to, get the value, //but not cause an api request let customer$ = new Rx.ReplaySubject(1); getCustomer().subscribe(customer$); //here's the first subscriber customer$.subscribe(val => console.log('subscriber 1: ' + val)); //here's the second subscriber setTimeout(() => { customer$.subscribe(val => console.log('subscriber 2: ' + val)); }, 1000); function getCustomer() { return new Rx.Observable(observer => { console.log('api request'); setTimeout(() => { console.log('api response'); observer.next('customer object'); observer.complete(); }, 500); }); } 

这是certificate 🙂

只有一个外卖: getCustomer().subscribe(customer$)

我们没有订阅getCustomer()的api响应,我们正在订阅一个ReplaySubject,它是可观察的,也可以订阅不同的Observable,并且(这很重要)保存它的最后发射的值,并重新发布到任何它是(ReplaySubject的)订户。

我find了一种方法将http获取结果存储到sessionStorage中,并将其用于会话,这样就不会再次调用服务器。

我用它来调用github API来避免使用限制。

 @Injectable() export class HttpCache { constructor(private http: Http) {} get(url: string): Observable<any> { let cached: any; if (cached === sessionStorage.getItem(url)) { return Observable.of(JSON.parse(cached)); } else { return this.http.get(url) .map(resp => { sessionStorage.setItem(url, resp.text()); return resp.json(); }); } } } 

供参考,sessionStorage限制是5M(或4.75M)。 所以,不应该像这样使用大量的数据。

我认为@ ngx-cache / core对于维护http调用的cachingfunction很有用,特别是在浏览器服务器平台上进行HTTP调用时。

假设我们有以下方法:

 getCustomer() { return this.http.get('/someUrl').map(res => res.json()); } 

您可以使用@ ngx-cache / core的Cached装饰器将来自HTTP调用方法的返回值cache storagecache storagestorage可以configuration,请检查ng-seed / universal上的实现 )第一次执行。 下一次该方法被调用(不pipe是在浏览器还是服务器平台上),该值都从cache storage检索。

 import { Cached } from '@ngx-cache/core'; ... @Cached('get-customer') // the cache key/identifier getCustomer() { return this.http.get('/someUrl').map(res => res.json()); } 

还有可能使用cachingAPI使用caching方法( hasgetset )。

anyclass.ts

 ... import { CacheService } from '@ngx-cache/core'; @Injectable() export class AnyClass { constructor(private readonly cache: CacheService) { // note that CacheService is injected into a private property of AnyClass } // will retrieve 'some string value' getSomeStringValue(): string { if (this.cache.has('some-string')) return this.cache.get('some-string'); this.cache.set('some-string', 'some string value'); return 'some string value'; } } 

以下是客户端和服务器端caching的软件包列表:

  • @ ngx-cache / core :caching工具
  • @ ngx-cache / platform-b​​rowser :SPA /浏览器平台的实现
  • @ ngx-cache / platform-server :服务器平台的实现
  • @ ngx-cache / fs-storage :存储实用程序(服务器平台所需)

rxjs 5.3.0

我一直不满意.map(myFunction).publishReplay(1).refCount()

有了多个订户, .map()在某些情况下执行myFunction两次(我希望它只执行一次)。 一个修复似乎是publishReplay(1).refCount().take(1)

你可以做的另一件事就是不使用refCount()refCount()即使Observable变热:

 let obs = this.http.get('my/data.json').publishReplay(1); obs.connect(); return obs; 

这将启动HTTP请求,无论用户。 我不确定在HTTP GET完成之前是否取消订阅将取消订阅。

可caching的HTTP响应数据使用Rxjs Observer / Observable +caching+订阅

见下面的代码

*免责声明:我是rxjs的新手,所以请记住,我可能会滥用可观察/观察者的方法。 我的解决scheme纯粹是我find的其他解决scheme的集合,而且是找不到简单的有据可查的解决scheme的结果。 因此,我正在提供完整的代码解决scheme(正如我希望find的那样),希望能帮助其他人。

*请注意,这种方法松散地基于GoogleFirebaseObservables。 不幸的是,我缺乏适当的经验/时间来复制他们所做的事情。 但是,以下是提供对某些可caching数据的asynchronous访问的简单方法。

情况 :“产品清单”组件的任务是显示产品清单。 该网站是一个单页的networking应用程序与一些菜单button,将“过滤”显示在页面上的产品。

解决scheme :组件“订阅”服务方法。 service方法返回一个产品对象数组,组件通过订阅callback访问该数组对象。 服务方法将其活动包装在新创build的Observer中,并返回观察者。 在这个观察者内部,它searchcaching的数据并将其传回给订户(组件)并返回。 否则,它发出一个http调用来检索数据,订阅响应,在那里你可以处理这些数据(例如将数据映射到你自己的模型),然后把数据传回给订阅者。

代码

产品list.component.ts

 import { Component, OnInit, Input } from '@angular/core'; import { ProductService } from '../../../services/product.service'; import { Product, ProductResponse } from '../../../models/Product'; @Component({ selector: 'app-product-list', templateUrl: './product-list.component.html', styleUrls: ['./product-list.component.scss'] }) export class ProductListComponent implements OnInit { products: Product[]; constructor( private productService: ProductService ) { } ngOnInit() { console.log('product-list init...'); this.productService.getProducts().subscribe(products => { console.log('product-list received updated products'); this.products = products; }); } } 

product.service.ts

 import { Injectable } from '@angular/core'; import { Http, Headers } from '@angular/http'; import { Observable, Observer } from 'rxjs'; import 'rxjs/add/operator/map'; import { Product, ProductResponse } from '../models/Product'; @Injectable() export class ProductService { products: Product[]; constructor( private http:Http ) { console.log('product service init. calling http to get products...'); } getProducts():Observable<Product[]>{ //wrap getProducts around an Observable to make it async. let productsObservable$ = Observable.create((observer: Observer<Product[]>) => { //return products if it was previously fetched if(this.products){ console.log('## returning existing products'); observer.next(this.products); return observer.complete(); } //Fetch products from REST API console.log('** products do not yet exist; fetching from rest api...'); let headers = new Headers(); this.http.get('http://localhost:3000/products/', {headers: headers}) .map(res => res.json()).subscribe((response:ProductResponse) => { console.log('productResponse: ', response); let productlist = Product.fromJsonList(response.products); //convert service observable to product[] this.products = productlist; observer.next(productlist); }); }); return productsObservable$; } } 

product.ts(模型)

 export interface ProductResponse { success: boolean; msg: string; products: Product[]; } export class Product { product_id: number; sku: string; product_title: string; ..etc... constructor(product_id: number, sku: string, product_title: string, ...etc... ){ //typescript will not autoassign the formal parameters to related properties for exported classes. this.product_id = product_id; this.sku = sku; this.product_title = product_title; ...etc... } //Class method to convert products within http response to pure array of Product objects. //Caller: product.service:getProducts() static fromJsonList(products:any): Product[] { let mappedArray = products.map(Product.fromJson); return mappedArray; } //add more parameters depending on your database entries and constructor static fromJson({ product_id, sku, product_title, ...etc... }): Product { return new Product( product_id, sku, product_title, ...etc... ); } } 

以下是我在Chrome中加载页面时看到的输出示例。 请注意,在初始加载时,将从http(调用我的节点rest服务,在端口3000上本地运行)获取产品。 当我点击导航到产品的“过滤”视图时,产品在caching中find。

我的Chrome日志(控制台):

 core.es5.js:2925 Angular is running in the development mode. Call enableProdMode() to enable the production mode. app.component.ts:19 app.component url: /products product.service.ts:15 product service init. calling http to get products... product-list.component.ts:18 product-list init... product.service.ts:29 ** products do not yet exist; fetching from rest api... product.service.ts:33 productResponse: {success: true, msg: "Products found", products: Array(23)} product-list.component.ts:20 product-list received updated products 

… [点击菜单button来过滤产品] …

 app.component.ts:19 app.component url: /products/chocolatechip product-list.component.ts:18 product-list init... product.service.ts:24 ## returning existing products product-list.component.ts:20 product-list received updated products 

结论:这是迄今为止我发现的最简单的方法来实现可caching的http响应数据。 在我的angular度应用程序中,每次导航到产品的不同视图时,产品列表组件都会重新加载。 ProductService似乎是一个共享实例,因此ProductService中的“products:Product []”的本地caching在导航期间保留,随后对“GetProducts()”的调用返回caching的值。 最后一点,我已经阅读了关于在完成预防“内存泄漏”时如何closures可观察/预订的评论。 我没有把这个包括在内,但是要记住。

只需在地图之后和任何订阅之前调用share()

在我的情况下,我有一个通用的服务(RestClientService.ts),他正在做其余的调用,提取数据,检查错误,并返回observable给具体的实现服务(f.ex .: ContractClientService.ts),最后这个具体的实现返回观察到ContractComponent.ts,这一个订阅更新视图。

RestClientService.ts:

 export abstract class RestClientService<T extends BaseModel> { public GetAll = (path: string, property: string): Observable<T[]> => { let fullPath = this.actionUrl + path; let observable = this._http.get(fullPath).map(res => this.extractData(res, property)); observable = observable.share(); //allows multiple subscribers without making again the http request observable.subscribe( (res) => {}, error => this.handleError2(error, "GetAll", fullPath), () => {} ); return observable; } private extractData(res: Response, property: string) { ... } private handleError2(error: any, method: string, path: string) { ... } } 

ContractService.ts:

 export class ContractService extends RestClientService<Contract> { private GET_ALL_ITEMS_REST_URI_PATH = "search"; private GET_ALL_ITEMS_PROPERTY_PATH = "contract"; public getAllItems(): Observable<Contract[]> { return this.GetAll(this.GET_ALL_ITEMS_REST_URI_PATH, this.GET_ALL_ITEMS_PROPERTY_PATH); } } 

ContractComponent.ts:

 export class ContractComponent implements OnInit { getAllItems() { this.rcService.getAllItems().subscribe((data) => { this.items = data; }); } } 

我写了一个caching类,

 /** * Caches results returned from given fetcher callback for given key, * up to maxItems results, deletes the oldest results when full (FIFO). */ export class StaticCache { static cachedData: Map<string, any> = new Map<string, any>(); static maxItems: number = 400; static get(key: string){ return this.cachedData.get(key); } static getOrFetch(key: string, fetcher: (string) => any): any { let value = this.cachedData.get(key); if (value != null){ console.log("Cache HIT! (fetcher)"); return value; } console.log("Cache MISS... (fetcher)"); value = fetcher(key); this.add(key, value); return value; } static add(key, value){ this.cachedData.set(key, value); this.deleteOverflowing(); } static deleteOverflowing(): void { if (this.cachedData.size > this.maxItems) { this.deleteOldest(this.cachedData.size - this.maxItems); } } /// A Map object iterates its elements in insertion order — a for...of loop returns an array of [key, value] for each iteration. /// However that seems not to work. Trying with forEach. static deleteOldest(howMany: number): void { //console.debug("Deleting oldest " + howMany + " of " + this.cachedData.size); let iterKeys = this.cachedData.keys(); let item: IteratorResult<string>; while (howMany-- > 0 && (item = iterKeys.next(), !item.done)){ //console.debug(" Deleting: " + item.value); this.cachedData.delete(item.value); // Deleting while iterating should be ok in JS. } } static clear(): void { this.cachedData = new Map<string, any>(); } } 

这完全是静态的,因为我们如何使用它,但随意使它成为一个普通的class级和服务。 我不确定angular度是否一直保持单个实例(Angular2是新的)。

这就是我如何使用它:

  let httpService: Http = this.http; function fetcher(url: string): Observable<any> { console.log(" Fetching URL: " + url); return httpService.get(url).map((response: Response) => { if (!response) return null; if (typeof response.json() !== "array") throw new Error("Graph REST should return an array of vertices."); let items: any[] = graphService.fromJSONarray(response.json(), httpService); return array ? items : items[0]; }); } // If data is a link, return a result of a service call. if (this.data[verticesLabel][name]["link"] || this.data[verticesLabel][name]["_type"] == "link") { // Make an HTTP call. let url = this.data[verticesLabel][name]["link"]; let cachedObservable: Observable<any> = StaticCache.getOrFetch(url, fetcher); if (!cachedObservable) throw new Error("Failed loading link: " + url); return cachedObservable; } 

我认为可以有一个更聪明的方法,这将使用一些Observable技巧,但这对我的目的是很好的。

只要使用这个caching层,它可以满足您的所有需求,甚至可以pipe理caching以查找Ajax请求。

http://www.ravinderpayal.com/blogs/12Jan2017-Ajax-Cache-Mangement-Angular2-Service.html

这很容易使用

 @Component({ selector: 'home', templateUrl: './html/home.component.html', styleUrls: ['./css/home.component.css'], }) export class HomeComponent { constructor(AjaxService:AjaxService){ AjaxService.postCache("/api/home/articles").subscribe(values=>{console.log(values);this.articles=values;}); } articles={1:[{data:[{title:"first",sort_text:"description"},{title:"second",sort_text:"description"}],type:"Open Source Works"}]}; } 

该层(作为可注入的angular度服务)是

 import { Injectable } from '@angular/core'; import { Http, Response} from '@angular/http'; import { Observable } from 'rxjs/Observable'; import './../rxjs/operator' @Injectable() export class AjaxService { public data:Object={}; /* private dataObservable:Observable<boolean>; */ private dataObserver:Array<any>=[]; private loading:Object={}; private links:Object={}; counter:number=-1; constructor (private http: Http) { } private loadPostCache(link:string){ if(!this.loading[link]){ this.loading[link]=true; this.links[link].forEach(a=>this.dataObserver[a].next(false)); this.http.get(link) .map(this.setValue) .catch(this.handleError).subscribe( values => { this.data[link] = values; delete this.loading[link]; this.links[link].forEach(a=>this.dataObserver[a].next(false)); }, error => { delete this.loading[link]; } ); } } private setValue(res: Response) { return res.json() || { }; } private handleError (error: Response | any) { // In a real world app, we might use a remote logging infrastructure let errMsg: string; if (error instanceof Response) { const body = error.json() || ''; const err = body.error || JSON.stringify(body); errMsg = `${error.status} - ${error.statusText || ''} ${err}`; } else { errMsg = error.message ? error.message : error.toString(); } console.error(errMsg); return Observable.throw(errMsg); } postCache(link:string): Observable<Object>{ return Observable.create(observer=> { if(this.data.hasOwnProperty(link)){ observer.next(this.data[link]); } else{ let _observable=Observable.create(_observer=>{ this.counter=this.counter+1; this.dataObserver[this.counter]=_observer; this.links.hasOwnProperty(link)?this.links[link].push(this.counter):(this.links[link]=[this.counter]); _observer.next(false); }); this.loadPostCache(link); _observable.subscribe(status=>{ if(status){ observer.next(this.data[link]); } } ); } }); } } 

你有没有试过运行你已经有的代码?

因为你是从getJSON()得到的promise来构造Observable的,所以networking请求是在任何人订阅之前完成的。 由此产生的承诺是由所有用户共享。

 var promise = jQuery.getJSON(requestUrl); // network call is executed now var o = Rx.Observable.fromPromise(promise); // just wraps it in an observable o.subscribe(...); // does not trigger network call o.subscribe(...); // does not trigger network call // ...