当前位置:首页 > 资讯 > info5 > 正文

RxJava

发表于: 2016-04-07   作者:qq_20198405   来源:转载   浏览:
摘要: 写在前面什么是ReactiveX?ReactiveX是一个专注于异步编程与控制可观察数据(或者事件)流的API。它组合了观察者模式,迭代器模式和函数式编程的优秀思想。实时数据处理是一件普通的现象,有一个高效、干净和可扩展的方式来处理这些情景是重要的。使用Observables和Operators来熟练操作它们。ReactiveX提供一个可组合又灵活的API来创建和处理数据流,同时简化了异步编程带来

写在前面

什么是 ReactiveX?

ReactiveX 是一个专注于异步编程与控制可观察数据(或者事件)流的API。
它组合了观察者模式,迭代器模式和函数式编程的优秀思想。
实时数据处理是一件普通的现象,有一个高效、干净和可扩展的方式来处理这些情景是重要的。
使用 ObservablesOperators 来熟练操作它们。
ReactiveX 提供一个可组合又灵活的 API 来创建和处理数据流,同时简化了异步编程带来的一些担忧,如:线程创建和并发问题。

RxJava 简介

RxJava 是 ReactiveX 在 Java 上的开源的实现。
Observable(可观察者,即被观察者) 和 Subscriber / Observaber(订阅者 / 观察者)是两个主要的类。
Subscriber是一个实现了 Observer 的抽象类。 SubscriberObserver 接口进行了一些扩展,但他们的基本使用方式是完全一样的
在RxJava 上,一个 Observable 是一个发出数据流或者事件的类,Subscriber 是一个对这些发出的 items(数据流或者事件)进行处理(采取行动)的类。
一个 Observable 的标准流发出一个或多个 item,然后成功完成或者出错。
一个Observable 可以有多个 Subscribers,并且通过 Observable 发出的每一个 item,该 item 将会被发送到Subscriber.onNext() 方法来进行处理。
一旦 Observable 不再发出 items,它将会调用 Subscriber.onCompleted() 方法,或如果有一个出错的话 Observable 会调用 Subscriber.onError() 方法。

RxJava 的观察者模式

RxJava 有四个基本概念:Observable (可观察者,即被观察者)、 Observer (观察者)、 subscribe
(订阅)、事件。ObservableObserver 通过 subscribe() 方法实现订阅关系,从而 Observable可以在需要的时候发出事件来通知 Observer

与传统观察者模式不同, RxJava 的事件回调方法除了普通事件 onNext() (相当于 onClick() /
onEvent())之外,还定义了两个特殊的事件:onCompleted()onError()

  • onCompleted(): 事件队列完结。RxJava 不仅把每个事件单独处理,还会把它们看做一个队列。RxJava 规定,当不会再有新的 onNext() 发出时,需要触发 onCompleted() 方法作为标志。

  • onError(): 事件队列异常。在事件处理过程中出异常时,onError() 会被触发,同时队列自动终止,不允许再有事件发出。

  • 在一个正确运行的事件序列中,onCompleted()onError() 有且只有一个,并且是事件序列中的最后一个。需要注意的是,onCompleted()onError() 二者也是互斥的,即在队列中调用了其中一个,就不应该再调用另一个。

Subscriber和Observer的区别

SubscriberObserver不仅基本使用方式一样,实质上,在 RxJavasubscribe 过程中,Observer 也总是会先被转换成一个 Subscriber 再使用。所以如果你只想使用基本功能,选择 ObserverSubscriber 是完全一样的。它们的区别对于使用者来说主要有两点:

  1. onStart(): 这是 Subscriber 增加的方法。它会在 subscribe 刚开始,而事件还未发送之前被调用,可以用于做一些准备工作,例如数据的清零或重置。这是一个可选方法,默认情况下它的实现为空。需要注意的是,如果对准备工作的线程有要求(例如弹出一个显示进度的对话框,这必须在主线程执行), onStart() 就不适用了,因为它总是在 subscribe 所发生的线程被调用,而不能指定线程。要在指定的线程来做准备工作,可以使用 doOnSubscribe() 方法,具体可以在后面的文中看到。
  2. unsubscribe(): 这是 Subscriber 所实现的另一个接口 Subscription 的方法,用于取消订阅。在这个方法被调用后,Subscriber 将不再接收事件。一般在这个方法调用前,可以使用 isUnsubscribed() 先判断一下状态。 unsubscribe() 这个方法很重要,因为在 subscribe() 之后, Observable 会持有 Subscriber 的引用,这个引用如果不能及时被释放,将有内存泄露的风险。所以最好保持一个原则:要在不再使用的时候尽快在合适的地方(例如 onPause() onStop() 等方法中)调用 unsubscribe() 来解除引用关系,以避免内存泄露的发生。

线程控制 —— Scheduler (一)

在不指定线程的情况下, RxJava 遵循的是线程不变的原则,即:在哪个线程调用subscribe(),就在哪个线程生产事件;在哪个线程生产事件,就在哪个线程消费事件。如果需要切换线程,就需要用到 Scheduler(调度器)。

Scheduler 的 API (一)

在RxJava 中,Scheduler ——调度器,相当于线程控制器,RxJava 通过它来指定每一段代码应该运行在什么样的线程。RxJava 已经内置了几个 Scheduler ,它们已经适合大多数的使用场景:

  • Schedulers.immediate(): 直接在当前线程运行,相当于不指定线程。这是默认的 Scheduler
  • Schedulers.newThread(): 总是启用新线程,并在新线程执行操作。
  • Schedulers.io(): I/O 操作(读写文件、读写数据库、网络信息交互等)所使用的 Scheduler。行为模式和 newThread() 差不多,区别在于 io() 的内部实现是是用一个无数量上限的线程池,可以重用空闲的线程,因此多数情况下 io()newThread() 更有效率。不要把计算工作放在 io() 中,可以避免创建不必要的线程。
  • Schedulers.computation(): 计算所使用的 Scheduler。这个计算指的是 CPU 密集型计算,即不会被 I/O 等操作限制性能的操作,例如图形的计算。这个 Scheduler 使用的固定的线程池,大小为 CPU 核数。不要把 I/O 操作放在 computation() 中,否则 I/O 操作的等待时间会浪费 CPU。
  • 另外, Android 还有一个专用的 AndroidSchedulers.mainThread(),它指定的操作将在 Android 主线程运行。

有了这几个 Scheduler ,就可以使用 subscribeOn()observeOn() 两个方法来对线程进行控制了。

  • subscribeOn(): 指定 subscribe() 所发生的线程,即 Observable.OnSubscribe 被激活时所处的线程。或者叫做事件产生的线程。
  • observeOn(): 指定 Subscriber 所运行在的线程。或者叫做事件消费的线程。

变换

RxJava 提供了对事件序列进行变换的支持,这是它的核心功能之一,也是大多数人说『RxJava 真是太好用了』的最大原因。所谓变换,就是将事件序列中的对象或整个序列进行加工处理,转换成不同的事件或事件序列。

操作符

其实操作符就是为了提供一些函数式的特性。函数式最大的好处就是处理数据简洁易懂。

map就是相当于对每一个元素进行变换,返回变换后的集合
filter就是对集合进行过滤
each就是遍历集合
take取出集合中的前几个
skip跳过前几个元素
unique相当于按照数学上的集合处理,去重

其实我觉得题主觉得最难理解的应该是flatMap和Observable的概念吧。Observable可以理解成lazy load的集合。flatMap想当于对lazyLoad的集合中的每个元素再进行一次lazy load。(此段引用自hi大头鬼hi)

用于创建Observable的操作符

操作符 含义
Create 通过调用观察者的方法从头创建一个Observable
Defer 在观察者订阅之前不创建这个Observable,为每一个观察者创建一个新的Observable
Empty/Never/Throw 创建行为受限的特殊Observable
from(T[]) / from(Iterable) 将传入的数组或 Iterable 拆分成具体对象后,依次发送出来。
Interval 创建一个定时发射整数序列的Observable
just(T…) 将对象或者对象集合转换为一个会发射这些对象的Observable
Range 创建发射指定范围的整数序列的Observable
Repeat 创建重复发射特定的数据或数据序列的Observable
Start 创建发射一个函数的返回值的Observable
Timer 创建在一个指定的延迟之后发射单个数据的Observable

变换操作

这些操作符可用于对Observable发射的数据进行变换,详细解释可以看每个操作符的文档

操作符 含义
Buffer 缓存,可以简单的理解为缓存,它定期从Observable收集数据到一个集合,然后把这些数据集合打包发射,而不是一次发射一个
FlatMap 扁平映射,将Observable发射的数据变换为Observables集合,然后将这些Observable发射的数据平坦化的放进一个单独的Observable,可以认为是一个将嵌套的数据结构展开的过程。
GroupBy 分组,将原来的Observable分拆为Observable集合,将原始Observable发射的数据按Key分组,每一个Observable发射一组不同的数据
Map 映射,通过对序列的每一项都应用一个函数变换Observable发射的数据,实质是对序列中的每一项执行一个函数,函数的参数就是这个数据项
Scan 扫描,对Observable发射的每一项数据应用一个函数,然后按顺序依次发射这些值
Window 窗口,定期将来自Observable的数据分拆成一些Observable窗口,然后发射这些窗口,而不是每次发射一项。类似于Buffer,但Buffer发射的是数据,Window发射的是Observable,每一个Observable发射原始Observable的数据的一个子集

过滤操作

这些操作符用于从Observable发射的数据中进行选择

操作符 含义
Debounce 只有在空闲了一段时间后才发射数据,通俗的说,就是如果一段时间没有操作,就执行一次操作
Distinct 去重,过滤掉重复数据项
ElementAt 取值,取特定位置的数据项
Filter 过滤,过滤掉没有通过谓词测试的数据项,只发射通过测试的
First 首项,只发射满足条件的第一条数据
IgnoreElements 忽略所有的数据,只保留终止通知(onError或onCompleted)
Last 末项,只发射最后一条数据
Sample 取样,定期发射最新的数据,等于是数据抽样,有的实现里叫ThrottleFirst
Skip 跳过前面的若干项数据
SkipLast 跳过后面的若干项数据
Take 只保留前面的若干项数据
TakeLast 只保留后面的若干项数据

组合操作

组合操作符用于将多个Observable组合成一个单一的Observable

操作符 含义
And/Then/When 通过模式(And条件)和计划(Then次序)组合两个或多个Observable发射的数据集
CombineLatest 当两个Observables中的任何一个发射了一个数据时,通过一个指定的函数组合每个Observable发射的最新数据(一共两个数据),然后发射这个函数的结果
Join 无论何时,如果一个Observable发射了一个数据项,只要在另一个Observable发射的数据项定义的时间窗口内,就将两个Observable发射的数据合并发射
Merge 将两个Observable发射的数据组合并成一个
StartWith 在发射原来的Observable的数据序列之前,先发射一个指定的数据序列或数据项
Switch 将一个发射Observable序列的Observable转换为这样一个Observable:它逐个发射那些Observable最近发射的数据
Zip 打包,使用一个指定的函数将多个Observable发射的数据组合在一起,然后将这个函数的结果作为单项数据发射

错误处理

这些操作符用于从错误通知中恢复

操作符 含义
Catch 捕获,继续序列操作,将错误替换为正常的数据,从onError通知中恢复
Retry 重试,如果Observable发射了一个错误通知,重新订阅它,期待它正常终止

辅助操作

一组用于处理Observable的操作符

操作符 含义
Delay 延迟一段时间发射结果数据
Do 注册一个动作占用一些Observable的生命周期事件,相当于Mock某个操作
Materialize/Dematerialize 将发射的数据和通知都当做数据发射,或者反过来
ObserveOn 指定观察者观察Observable的调度程序(工作线程)
Serialize 强制Observable按次序发射数据并且功能是有效的
Subscribe 收到Observable发射的数据和通知后执行的操作
SubscribeOn 指定Observable应该在哪个调度程序上执行
TimeInterval 将一个Observable转换为发射两个数据之间所耗费时间的Observable
Timeout 添加超时机制,如果过了指定的一段时间没有发射数据,就发射一个错误通知
Timestamp 给Observable发射的每个数据项添加一个时间戳
Using 创建一个只在Observable的生命周期内存在的一次性资源

条件和布尔操作

这些操作符可用于单个或多个数据项,也可用于Observable

操作符 含义
All 判断Observable发射的所有的数据项是否都满足某个条件
Amb 给定多个Observable,只让第一个发射数据的Observable发射全部数据
Contains 判断Observable是否会发射一个指定的数据项
DefaultIfEmpty 发射来自原始Observable的数据,如果原始Observable没有发射数据,就发射一个默认数据
SequenceEqual 判断两个Observable是否按相同的数据序列
SkipUntil 丢弃原始Observable发射的数据,直到第二个Observable发射了一个数据,然后发射原始
SkipWhile 丢弃原始Observable发射的数据,直到一个特定的条件为假,然后发射原始Observable剩余的数据
TakeUntil 发射来自原始Observable的数据,直到第二个Observable发射了一个数据或一个通知
TakeWhile 发射原始Observable的数据,直到一个特定的条件为真,然后跳过剩余的数据

算术和聚合操作

这些操作符可用于整个数据序列

操作符 含义
Average 计算Observable发射的数据序列的平均值,然后发射这个结果
Concat 不交错的连接多个Observable的数据
Count 计算Observable发射的数据个数,然后发射这个结果
Max 计算并发射数据序列的最大值
Min 计算并发射数据序列的最小值
Reduce 按顺序对数据序列的每一个应用某个函数,然后返回这个值
Sum 计算并发射数据序列的和

连接操作

一些有精确可控的订阅行为的特殊Observable

操作符 含义
Connect 指示一个可连接的Observable开始发射数据给订阅者
Publish 将一个普通的Observable转换为可连接的
RefCount 使一个可连接的Observable表现得像一个普通的Observable
Replay 确保所有的观察者收到同样的数据序列,即使他们在Observable开始发射数据之后才订阅

转换操作

操作符 含义
To 将Observable转换为其它的对象或数据结构
Blocking 阻塞Observable的操作符

参考:
给 Android 开发者的 RxJava 详解
RxJava 入门 - 傅圆的博客 | MrFu Blog
RxJava学习总结 - 推酷
Operators—-RxWeekend - 傅圆的博客 | MrFu Blog

RxJava

  • 0

    开心

    开心

  • 0

    板砖

    板砖

  • 0

    感动

    感动

  • 0

    有用

    有用

  • 0

    疑问

    疑问

  • 0

    难过

    难过

  • 0

    无聊

    无聊

  • 0

    震惊

    震惊

编辑推荐
要理解RxJava,首先得理解什么是(异步)数据流。一些典型的点击事件本质上就是一个异步数据流,这
本篇文章继续介绍以下类型的操作符 Combining Observables(Observable的组合操作符) Error Handling
上一篇文章我们通过一个简单的例子来给大家展示了RxJava的基本用法,相信大家已经对RxJava有了大概
本篇文章继续介绍以下类型的操作符 Combining Observables(Observable的组合操作符) Error Handling
上一篇文章已经详细讲解了RxJava的创建型操作符,本片文章将继续讲解RxJava操作符,包括: Transfor
作者:扔物线 前言 我从去年开始使用 RxJava ,到现在一年多了。今年加入了 Flipboard 后,看到 Fli
前言 我从去年开始使用 RxJava ,到现在一年多了。今年加入了 Flipboard 后,看到 Flipboard 的 And
我从去年开始使用 RxJava ,到现在一年多了。今年加入了 Flipboard 后,看到 Flipboard 的 Android
我从去年开始使用 RxJava ,到现在一年多了。今年加入了 Flipboard 后,看到 Flipboard 的 Android
我从去年开始使用 RxJava ,到现在一年多了。今年加入了 Flipboard 后,看到 Flipboard 的 Android
版权所有 IT知识库 CopyRight © 2009-2015 IT知识库 IT610.com , All Rights Reserved. 京ICP备09083238号