rxjava

RXjava

RXjava是什么,有什么作用?如何使用它?
  1. RXjava是一个使用可观测的序列来组成异步的、基于事件的程序的库。简单的说,就是用一个简洁明了的程序序列,来组成异步程序。让异步操作更加容易读写。
  2. 它的作用就是让异步,更简洁,而且当有大量异步操作的时候,仍然很简洁。
  3. 使用起来也不难。调用方法即可。

RXjava类图
如图所示,先从简单的说起。RXjava类似于观察者模式,存在着观察者与被观察者,就相当于buttononclicklistener。需要对被观察者一举一动的瞬间有所反应,但是又不需要时刻观察被观察者的状态。


先从流程说起:

  1. 调用Observableoncreate方法,会返回一个observable对象。而oncreate方法中,是有一个onsubscribe对象,这个对象中有个call方法,在这个call方法中,规定了之后observer的执行步骤。
  2. 然后写一个observer,里面有onnext,oncompleted,onerror三个方法。这里还有一个subscriber类,这个类继承自observer,比它多两个方法,onstart,unsubscribe。这里onstart顾名思义是在subscriber开始的时候运行的,这个后面在subscribe方法中会有调用,到时候再解释。unsubscribesubscriber所实现的另外一个接口subscription里面的方法,用于取消订阅。
  3. 这里是最关键的,在observer(或者说是subscriber,因为在订阅的时候,observer会被转换为subscriber)和observable都有了之后。就开始订阅了,方法是observable.subscribe(subscriber); 看起来比较像是被观察者订阅了观察者,其实不然,可以理解为被观察者与观察者建立了一种联系。subscribe方法其实是:
public Subscription subscribe(Subscriber subscriber) {
    subscriber.onStart();
    onSubscribe.call(subscriber);
    return subscriber;
}

这里调用了subscriberonstart方法,然后调用了之前在oncreate方法中定义的call方法,然后return一个subscriber对象。通过这个对象判断是否在订阅,为unsubscribe方法备用。

  1. 上面的方法还有拓展,比如直接调用Observable.from(一个string数组);,或者Observable.just("1","2","3");这两个方法返回的是一个observable对象,相当于包装了oncreate方法。而oncreate方法中,是有对接下来observer的运行步骤有定义的,这里的定义就相当于
onNext("1");
onNext("2");
onNext("3");
onCompleted();
  1. 还有一种拓展是Action1();这个在RXjava类图中有解释。

知道上面这些流程,也没有卵用。因为RXjava最重要的是异步,而上面全部都是在同一个线程上面。所以接下来要在多线程上面执行这个框架。

在RxJava 中,Scheduler ——调度器,相当于线程控制器,RxJava 通过它来指定每一段代码应该运行在什么样的线程。RxJava 已经内置了几个 Scheduler ,它们已经适合大多数的使用场景:

  • Schedulers.immediate(): 直接在当前线程运行,相当于不指定线程。这是默认的 Scheduler。
  • Schedulers.newThread(): 总是启用新线程,并在新线程执行操作。
  • Schedulers.io(): I/O 操作(读写文件、读写数据库、网络信息交互等)所使用的 Scheduler。行为模式和 newThread() 差不多,区别在于 io() 的内部实现是是用一个无数量上限的线程池,可以重用空闲的线程,因此多数情况下 io()newThread()更有效率。不要把计算工作放在io() 中,可以避免创建不必要的线程。
    -Schedulers.computation(): 计算所使用的 Scheduler。这个计算指的是 CPU 密集型计算,即不会被I/O等操作限制性能的操作,例如图形的计算。这个 Scheduler 使用的固定的线程池,大小为CPU 核数。不要把I/O 操作放在 computation() 中,否则 I/O操作的等待时间会浪费CPU

另外,Android 还有一个专用的 AndroidSchedulers.mainThread(),它指定的操作将在 Android主线程运行。
有了这几个Scheduler ,就可以使用subscribeOn()observeOn() 两个方法来对线程进行控制了。

  • subscribeOn(): 指定 subscribe()所发生的线程,即 Observable.OnSubscribe 被激活时所处的线程。或者叫做事件产生的线程。
  • observeOn(): 指定 Subscriber 所运行在的线程。或者叫做事件消费的线程。

文字叙述总归难理解,上代码:

Observable.just(1, 2, 3, 4)
    .subscribeOn(Schedulers.io()) // 指定 subscribe() 发生在 IO 线程
    .observeOn(AndroidSchedulers.mainThread()) // 指定 Subscriber 的回调发生在主线程
    .subscribe(new Action1() {
        @Override
        public void call(Integer number) {
            Log.d(tag, "number:" + number);
        }
    });

上面这段代码中,由于
subscribeOn(Schedulers.io())
的指定,被创建的事件的内容 1、2、3、4 将会在 IO 线程发出;而由于
observeOn(AndroidScheculers.mainThread())
的指定,因此subscriber数字的打印将发生在主线程 。事实上,这种在subscribe() 之前写上两句ubscribeOn(Scheduler.io())observeOn(AndroidSchedulers.mainThread())的使用方式非常常见,它适用于多数的 『后台线程取数据,主线程显示』的程序策略。

而前面提到的由图片 id 取得图片并显示的例子,如果也加上这两句:

int drawableRes = ...;
ImageView imageView = ...;
Observable.create(new OnSubscribe() {
    @Override
    public void call(Subscriber subscriber) {
        Drawable drawable = getTheme().getDrawable(drawableRes));
        subscriber.onNext(drawable);
        subscriber.onCompleted();
    }
})
.subscribeOn(Schedulers.io()) // 指定 subscribe() 发生在 IO 线程
.observeOn(AndroidSchedulers.mainThread()) // 指定 Subscriber 的回调发生在主线程
.subscribe(new Observer() {
    @Override
    public void onNext(Drawable drawable) {
        imageView.setImageDrawable(drawable);
    }

    @Override
    public void onCompleted() {
    }

    @Override
    public void onError(Throwable e) {
        Toast.makeText(activity, "Error!", Toast.LENGTH_SHORT).show();
    }
});

那么,加载图片将会发生在IO线程,而设置图片则被设定在了主线程。这就意味着,即使加载图片耗费了几十甚至几百毫秒的时间,也不会造成丝毫界面的卡顿。

变换

  • 变换是最复杂的,也是最难理解的。暂且先说怎么使用,代码如下:

可以看到,map() 方法将参数中的 String 对象转换成一个 Bitmap 对象后返回,而在经过 map() 方法后,事件的参数类型也由 String 转为了 Bitmap。这种直接变换对象并返回的,是最常见的也最容易理解的变换。不过 RxJava 的变换远不止这样,它不仅可以针对事件对象,还可以针对整个事件队列,这使得 RxJava 变得非常灵活。

Student[] students = ...;
Subscriber subscriber = new Subscriber() {
    @Override
    public void onNext(String name) {
        Log.d(tag, name);
    }
    ...
};
Observable.from(students)
    .map(new Func1() {
        @Override
        public String call(Student student) {
            return student.getName();
        }
    })
    .subscribe(subscriber);

flatMap(): 这是一个很有用但非常难理解的变换,因此我决定花多些篇幅来介绍它。 首先假设这么一种需求:假设有一个数据结构『学生』,现在需要打印出一组学生的名字。实现方式很简单:

Student[] students = ...;
Subscriber subscriber = new Subscriber() {
    @Override
    public void onNext(Course course) {
        Log.d(tag, course.getName());
    }
    ...
};
Observable.from(students)
    .flatMap(new Func1>() {
        @Override
        public Observable call(Student student) {
            return Observable.from(student.getCourses());
        }
    })
    .subscribe(subscriber);

从上面的代码可以看出,flatMap()map()有一个相同点:它也是把传入的参数转化之后返回另一个对象。但需要注意,和map()不同的是,flatMap()中返回的是个Observable对象,并且这个Observable对象并不是被直接发送到了Subscriber的回调方法中。flatMap()的原理是这样的:

  1. 使用传入的事件对象创建一个Observable对象;
  2. 并不发送这个Observable,而是将它激活,于是它开始发送事件;
  3. 每一个创建出来的Observable发送的事件,都被汇入同一个Observable,而这个Observable负责将这些事件统一交给Subscriber的回调方法。这三个步骤,把事件拆成了两级,通过一组新创建的Observable将初始的对象『铺平』之后通过统一路径分发了下去。而这个『铺平』就是flatMap()所谓的flat

你可能感兴趣的