Rx.Observable subscribe和forEach有什么区别?

在像这样创build一个Observable之后

var source = Rx.Observable.create(function(observer) {...}); 

订阅有什么区别

 source.subscribe(function(x) {}); 

forEach

 source.forEach(function(x) {}); 

在RxJS 5.0遵循的ES7规范中 (但是RxJS 4.0没有),两者不一样。

订阅

 public subscribe(observerOrNext: Observer | Function, error: Function, complete: Function): Subscription 

Observable.subscribe是你可以完成大部分真正的Observable处理的地方。 它返回一个订阅令牌,您可以使用它来取消订阅。 当您不知道您订阅的事件/序列的持续时间时,或者您需要在已知的持续时间之前停止收听时,这一点非常重要。

的forEach

 public forEach(next: Function, PromiseCtor?: PromiseConstructor): Promise 

Observable.forEach返回一个承诺,当Observable完成或错误时,它将parsing或拒绝。 它的目的是澄清你正在处理一个有限的/有限的持续时间观察序列的情况下,以更“同步”的方式,如整理所有的传入值,然后呈现一次,通过处理承诺。

实际上,您可以采取任何方式处理每个值以及错误和完成事件。 所以最重要的function差异是无法取消承诺。

我只是回顾一下最新的代码,从技术上说,foreach的代码实际上是调用RxScala,RxJS和RxJava中的订阅。 这似乎不是一个很大的不同。 他们现在有一个返回types允许用户有停止订阅或类似的方法。

当我在RxJava早期版本上工作时,订阅订阅返回,forEach只是一个无效。 你可能会看到一些不同的答案,由于变化。

 /** * Subscribes to the [[Observable]] and receives notifications for each element. * * Alias to `subscribe(T => Unit)`. * * $noDefaultScheduler * * @param onNext function to execute for each item. * @throws java.lang.IllegalArgumentException if `onNext` is null * @throws rx.exceptions.OnErrorNotImplementedException if the [[Observable]] tries to call `onError` * @since 0.19 * @see <a href="http://reactivex.io/documentation/operators/subscribe.html">ReactiveX operators documentation: Subscribe</a> */ def foreach(onNext: T => Unit): Unit = { asJavaObservable.subscribe(onNext) } def subscribe(onNext: T => Unit): Subscription = { asJavaObservable.subscribe(scalaFunction1ProducingUnitToAction1(onNext)) } /** * Subscribes an o to the observable sequence. * @param {Mixed} [oOrOnNext] The object that is to receive notifications or an action to invoke for each element in the observable sequence. * @param {Function} [onError] Action to invoke upon exceptional termination of the observable sequence. * @param {Function} [onCompleted] Action to invoke upon graceful termination of the observable sequence. * @returns {Disposable} A disposable handling the subscriptions and unsubscriptions. */ observableProto.subscribe = observableProto.forEach = function (oOrOnNext, onError, onCompleted) { return this._subscribe(typeof oOrOnNext === 'object' ? oOrOnNext : observerCreate(oOrOnNext, onError, onCompleted)); }; /** * Subscribes to the {@link Observable} and receives notifications for each element. * <p> * Alias to {@link #subscribe(Action1)} * <dl> * <dt><b>Scheduler:</b></dt> * <dd>{@code forEach} does not operate by default on a particular {@link Scheduler}.</dd> * </dl> * * @param onNext * {@link Action1} to execute for each item. * @throws IllegalArgumentException * if {@code onNext} is null * @throws OnErrorNotImplementedException * if the Observable calls {@code onError} * @see <a href="http://reactivex.io/documentation/operators/subscribe.html">ReactiveX operators documentation: Subscribe</a> */ public final void forEach(final Action1<? super T> onNext) { subscribe(onNext); } public final Disposable forEach(Consumer<? super T> onNext) { return subscribe(onNext); } 

observable.subscribe(...)observable.forEach(...)完全一样。
正如user3743222在他的回答中提到的那样

从Rx文档:

订阅观察者观察序列。

Rx.Observable.prototype.subscribe([observer] | [onNext], [onError], [onCompleted])

Rx.Observable.prototype.forEach([observer] | [onNext], [onError], [onCompleted])

详细信息请参阅完整文档