Rxjava 详解

Rxjava详解

Rxjava文章很多了,自学所用。如下链接解析的很好,本文写一下小demo实战。

http://www.jcodecraeer.com/a/anzhuokaifa/androidkaifa/2015/1012/3572.html#toc_1

基本概念:异步库,序列式编写,可线程控制

 "a library for composing asynchronous and event-based programs using observable sequences for the Java VM"(一个在 Java VM 上使用可观测的序列来组成异步的、基于事件的程序的库)。

异步原理

使用观察者模式

盗个图。。

Rxjava 详解_第1张图片

简单的分析:

为何用??? 两个业务需求或两个动作之间如何连接??? 简单的方式是A中直接包括B对象,并调用。

弊端:产生紧耦合,A需要了解B的功能和方法并调用。A对应多种B时,A就要大量的调用。

观察者方案:A和B之间利用注册通知的方法进行业务交互。A(被观察者)提供注册方法(也可以取消注册),提供通知方法(在通知方法中调用B的统一接口update)。B(观察者)实现方法update。类似其他设计模式,便于派生多种不同的观察者,AB都设计抽象类。

举例子:在使用信用卡消费成功后,需要发送短信通知,发送微信通知,需要生成消费积分,产生抽奖次数。消费作为被观察者,在成功后发送通知给各个观察者,使观察者立刻执行自身业务。

Rxjava例子,先看为敬

缩减到一个步骤就是 Observable.subscribe(new observer()); 观察者“订阅”被观察者,代码顺序和语义顺序写上去是反的。
在被观察者在 一系列数据处理 (from、flatmap、filter、map)后是包括一个崭新数据的被观察者,再进行“ 订阅 ”操作,观察者得到预期数据后call自己的业务。
Observable.from(folders)
    .flatMap(new Func1>() {
        @Override
        public Observable call(File file) {
            return Observable.from(file.listFiles());
        }
    })
    .filter(new Func1() {
        @Override
        public Boolean call(File file) {
            return file.getName().endsWith(".png");
        }
    })
    .map(new Func1() {
        @Override
        public Bitmap call(File file) {
            return getBitmapFromFile(file);
        }
    })
    .subscribeOn(Schedulers.io())
    .observeOn(AndroidSchedulers.mainThread())
    .subscribe(new Action1() {
        @Override
        public void call(Bitmap bitmap) {
            imageCollectorView.addImage(bitmap);
        }
    });

创建观察者

创建观察者很简单,实现Observer接口,然后在三个回调函数中添加业务逻辑。其中包括onNext,onCompleted,onError方法,onNext参数就是被观察者处理后的 返回类型
Observer observer = new Observer() {
            @Override
            public void onNext(Integer s) {
                Log.e(tag, "Item: " + s);
            }
            @Override
            public void onCompleted() {
                Log.e(tag, "Completed!");
            }
            @Override
            public void onError(Throwable e) {
                Log.e(tag, "Error!");
            }};

不完整回调Action1:

Action1 onNextAction = new Action1() {
    // onNext()
    @Override
    public void call(String s) {
        Log.d(tag, s);
    }
};

创建被观察者

Observable 即被观察者,它决定什么时候触发事件以及触发怎样的事件。 RxJava 使用 create() 方法来创建一个 Observable ,并为它定义事件触发规则。

被观察者包括了很多自定义实现:from,just,map,flatmap,first,elementAt,filter等。这些实现中会自定义通知(即三个回调函数)的调用流程。

Observable observable = Observable.create(new Observable.OnSubscribe() {
    @Override
    public void call(Subscriber subscriber) {
        subscriber.onNext("Hello");
        subscriber.onNext("Hi");
        subscriber.onNext("Aloha");
        subscriber.onCompleted();
    }});

form例子

将一个数组对象转化成Observable对象,Observable发出每个数组item的通知,即观察者会对每个item都有call响应。
   String [] strings={"one","two"};
        Observable.from(strings).subscribe(new Action1() {
            @Override
            public void call(String s) {
                Log.i(tag,"call from "+s);
            }
        });

just例子

Converts two items into an Observable that emits those items.
将两个items放到一个Observable中,然后依次发送每个item。也可以是多个item,但是参数类型必须保持一致。
        Observable.just(123,456).subscribe(new Action1() {
            @Override
            public void call(Integer integer) {
                Log.i(tag, "call "+integer);
            }
        });

Map例子

RxJava 提供了对事件序列进行变换的支持,这是它的核心功能之一,也是大多数人说『RxJava 真是太好用了』的最大原因。 所谓变换,就是将事件序列中的对象或整个序列进行加工处理,转换成不同的事件或事件序列
一对一转换
String [] strings={"one","two"};
        Observable.just(strings) // 输入类型 String数组
                .map(new Func1() {
                    @Override
                    public String call(String[] sarray) { // 参数类型 String数组
                        StringBuffer ssum = new StringBuffer();
                        for (String s : sarray)
                            ssum.append(s);
                        return ssum.toString(); // 返回类型String
                    }
                })
                .subscribe(new Action1() {
                    @Override
                    public void call(String s) { // 参数类型 String
                        Log.i(tag, "call " + s);
                    }
                });

flatMap例子

Returns an Observable that emits items based on applying a function that you supply to each item emitted by the source Observable, where that function returns an Observable, and then merging those resulting
Observables and emitting the results of this merger.

flatmap接收一个Observable,这个Observable发送一组item。在flatmap中将每个item转化成一个Observable,这个Observable汇总合并所有转化结果,最后再发送到观察者。

一组信息转化合并再的转化。Func1<转化前类型,Observable<每个item转化后类型>>
这个例子就是from生成的Observable发送item,flatmap接收每个学生信息,获取到每个学生课程信息,利用from生成每个学生的Observable,这些学生的Observable合并所有课程信息到一起,最后发送给观察者。

class Student{
            private String name;
            public String [] course;
            public Student(String _name,String []_c){
                name=_name;
                course=_c;
            }
        }
        Student [] student=new Student[2];
        student[0]=new Student("小红",new String[]{"数学","英语","物理"});
        student[1]=new Student("小明",new String[]{"地理","语文","化学"});
        Observable.from(student)
                .flatMap(new Func1>(){
                    @Override
                    public Observable call(Student st) {
                        return Observable.from(st.course);
                    }
                }).subscribe(new Observer() {
            @Override
            public void onCompleted() {
                Log.i(tag,"call onCompleted");
            }
            @Override
            public void onError(Throwable e) {
            }
            @Override
            public void onNext(String s) {
                Log.i(tag,"call "+s);
            }
        }
    );

线程分配

observeOn指定观察者的执行线程,subscribeOn指定被观察者的执行线程。
不同于 observeOn() , subscribeOn() 的位置放在哪里都可以,但它是只能调用一次的。

在RxJava 中,Scheduler ——调度器,相当于线程控制器,RxJava 通过它来指定每一段代码应该运行在什么样的线程。RxJava 已经内置了几个 Scheduler ,它们已经适合大多数的使用场景:

  • Schedulers.immediate(): 直接在当前线程运行,相当于不指定线程。这是默认的 Scheduler。

  • Schedulers.newThread():总是启用新线程,并在新线程执行操作。

  • Schedulers.io(): I/O 操作(读写文件、读写数据库、网络信息交互等)所使用的 Scheduler。行为模式和 newThread() 差不多,区别在于 io() 的内部实现是是用一个无数量上限的线程池,可以重用空闲的线程,因此多数情况下 io() 比 newThread() 更有效率。不要把计算工作放在 io() 中,可以避免创建不必要的线程。

  • Schedulers.computation(): 计算所使用的 Scheduler。这个计算指的是 CPU 密集型计算,即不会被 I/O 等操作限制性能的操作,例如图形的计算。这个 Scheduler 使用的固定的线程池,大小为 CPU 核数。不要把 I/O 操作放在 computation() 中,否则 I/O 操作的等待时间会浪费 CPU。

  • 另外, Android 还有一个专用的 AndroidSchedulers.mainThread(),它指定的操作将在 Android 主线程运行。


Observable.just(1, 2, 3, 4) // IO 线程,由 subscribeOn() 指定
    .subscribeOn(Schedulers.io())
    .observeOn(Schedulers.newThread())
    .map(mapOperator) // 新线程,由 observeOn() 指定
    .observeOn(Schedulers.io())
    .map(mapOperator2) // IO 线程,由 observeOn() 指定
    .observeOn(AndroidSchedulers.mainThread) 
    .subscribe(subscriber);  // Android 主线程,由 observeOn() 指定

原理浅析
被观察者Observable创建需要一个 ObSubscribe对象, 它的作用相当于一个计划表,当 Observable 被订阅的时候,OnSubscribe 的 call() 方法会自动被调用,事件序列就会依照设定依次触发。
  1. Observable observable = Observable.create(new Observable.OnSubscribe<String>() {
  2.     @Override
  3.     public void call(Subscriber super String> subscriber) {
  4.         subscriber.onNext("Hello");
  5.         subscriber.onCompleted();
  6.     }});



你可能感兴趣的