当前位置:首页 > 资讯 > info5 > 正文

RxJava入门之生命周期管理

发表于: 2016-07-05   作者:jdsjlzx   来源:转载   浏览:
摘要: Rx背后的理念是:无法知道事件流何时发射数据、也不知何时结束发射,但是你需要控制何时开始和结束接受事件。订阅者可能使用了一些资源,这些资源需要在停止接收事件的时候释放。通过subscription可以实现生命周期管理。SubscribingObservable.subscribe有好几个重载函数,每个函数都是某种情况的简化形式。Subscriptionsubscribe() Subscriptio

Rx 背后的理念是:无法知道事件流何时发射数据、也不知何时结束发射,但是你需要控制何时开始和结束接受事件。订阅者可能使用了一些资源,这些资源需要在停止接收事件的时候释放。 通过 subscription 可以实现生命周期管理。

Subscribing

Observable.subscribe 有好几个重载函数,每个函数都是某种情况的简化形式。

Subscription    subscribe()
Subscription    subscribe(Action1<? super T> onNext)
Subscription    subscribe(Action1<? super T> onNext, Action1<java.lang.Throwable> onError)
Subscription    subscribe(Action1<? super T> onNext, Action1<java.lang.Throwable> onError, Action0 onComplete)
Subscription    subscribe(Observer<? super T> observer)
Subscription    subscribe(Subscriber<? super T> subscriber)

第一个 subscribe() 函数只是订阅事件,但是不去处理这些事件。带有一个或者多个 Action 参数的,使用这些参数来构造一个 Subscriber 对象。这些参数分别对应 onNext、onError和 onComplete 这三种类型的事件,如果没有提供则代表不处理这个类型的事件。
下面的示例演示处理 error 的情况:

Subject<Integer, Integer> s = ReplaySubject.create();
s.subscribe(
    v -> System.out.println(v),
    e -> System.err.println(e));
s.onNext(0);
s.onError(new Exception("Oops"));

输出结果:

0
java.lang.Exception: Oops

如果你不处理 error,则 在发生错误的时候,会抛出 OnErrorNotImplementedException 异常。该异常发生在生产者这边,上面的示例 生产者和消费者位于同一线程,所以你可以直接 try- catch 住,但是在实际应用中,生产者和消费者通常不再同一个线程,所以最好还是提供一个 错误处理函数,否则你不知道错误发生了并导致事件流结束了。

Unsubscribing

在事件流结束发射之前,你可以主动停止接收事件。每个 subscribe 函数都会返回一个 Subscription 示例,该示例有两个函数:

boolean isUnsubscribed()
void unsubscribe()

只需要调用 unsubscribe 函数就可以停止接收数据了。

Subject<Integer, Integer>  values = ReplaySubject.create();
Subscription subscription = values.subscribe(
    v -> System.out.println(v),
    e -> System.err.println(e),
    () -> System.out.println("Done")
);
values.onNext(0);
values.onNext(1);
subscription.unsubscribe();
values.onNext(2);

输出结果:

0
1

一个 observer 调用 unsubscribe 取消监听并不妨碍同一个 observable 上的其他 Observer 对象。

Subject<Integer, Integer>  values = ReplaySubject.create();
Subscription subscription1 = values.subscribe(
    v -> System.out.println("First: " + v)
);
Subscription subscription2 = values.subscribe(
    v -> System.out.println("Second: " + v)
);
values.onNext(0);
values.onNext(1);
subscription1.unsubscribe();
System.out.println("Unsubscribed first");
values.onNext(2);

输出结果:

First: 0
Second: 0
First: 1
Second: 1
Unsubscribed first
Second: 2

onError 和 onCompleted

onError 和 onCompleted 意味着结束事件流。observable 需要遵守该规范,在 onError 或者 onCompleted 发生后就不应该再发射事件了。

Subject<Integer, Integer>  values = ReplaySubject.create();
Subscription subscription1 = values.subscribe(
    v -> System.out.println("First: " + v),
    e -> System.out.println("First: " + e),
    () -> System.out.println("Completed")
);
values.onNext(0);
values.onNext(1);
values.onCompleted();
values.onNext(2);

结果:

First: 0
First: 1
Completed

释放资源

Subscription 和其使用的资源绑定在一起。所以你应该记得释放 Subscription 来释放资源。使用 Subscriptions 的工厂函数可以把 Subscription 和需要的资源绑定在一起,然后可以使用 unsubscribe 来释放绑定的资源。

Subscription s = Subscriptions.create(() -> System.out.println("Clean")); s.unsubscribe();

输出结果:

Clean

Subscriptions.create 函数需要一个 Action 接口类型参数,在 unsubscribe 调用的时候会执行该接口来释放资源。 也有其他一些函数可以简化开发:

  • Subscriptions.empty() 返回一个当 unsubscribe 的时候 啥也不做的Subscription 。当要求你返回一个 Subscription ,但是你确没有资源需要释放,则可以返回这个空的 Subscription。
  • Subscriptions.from(Subscription… subscriptions),返回的 Subscription 释放的时候,会调用所有参数 Subscription 的 unsubscribe 函数。
  • Subscriptions.unsubscribed() 返回一个已经释放过的 Subscription。

Subscription 也有一些标准的实现:

  • BooleanSubscription
  • CompositeSubscription
  • MultipleAssignmentSubscription
  • RefCountSubscription
  • SafeSubscriber
  • Scheduler.Worker
  • SerializedSubscriber
  • SerialSubscription
  • Subscriber
  • TestSubscriber

在后面将会看到他们的使用方式。这里注意 Subscriber 同时也实现了 Subscription。所以我们也可以直接用 Subscriber 来取消监听。

本文出自 云在千峰 http://blog.chengyunfeng.com/?p=954#ixzz4DV5maKAJ

RxJava入门之生命周期管理

  • 0

    开心

    开心

  • 0

    板砖

    板砖

  • 0

    感动

    感动

  • 0

    有用

    有用

  • 0

    疑问

    疑问

  • 0

    难过

    难过

  • 0

    无聊

    无聊

  • 0

    震惊

    震惊

编辑推荐
要理解RxJava,首先得理解什么是(异步)数据流。一些典型的点击事件本质上就是一个异步数据流,这
给对 RxJava 感兴趣的人一些入门的指引 给正在使用 RxJava 但仍然心存疑惑的人一些更深入的解析 RxJ
纸上得来终觉浅,绝知此事要躬行 --陆游 问渠那得清如许,为有源头活水来 --朱熹 一、任务栈Task An
认识 rxjava RxJava是 ReactiveX 在JVM上的一个实现,ReactiveX使用Observable序列 组合异步和基于
1 JPA状态转换图 要理解JPA的核心接口,这个图必须牢记。 分为4种状态: 1)New状态:未有id,未与Pe
1 JPA状态转换图 要理解JPA的核心接口,这个图必须牢记。 分为4种状态: 1)New状态:未有id,未与Pe
1 JPA状态转换图 要理解JPA的核心接口,这个图必须牢记。 分为4种状态: 1)New状态:未有id,未与Pe
1 JPA状态转换图 要理解JPA的核心接口,这个图必须牢记。 分为4种状态: 1)New状态:未有id,未与Pe
1 JPA状态转换图 要理解JPA的核心接口,这个图必须牢记。 分为4种状态: 1)New状态:未有id,未与Pe
采用Context.startService()方法启动服务有关的生命周期方法 onCreate()->onStart() ->onDest
版权所有 IT知识库 CopyRight © 2009-2015 IT知识库 IT610.com , All Rights Reserved. 京ICP备09083238号