本文共 8917 字,大约阅读时间需要 29 分钟。
RxJava 到底是什么?
RxJava 的本质可以压缩为异步这一个词。说到根上,它就是一个实现异步操作的库,而别的定语都是基于这之上的。
RxJava 好在哪?
简洁. 这种简洁是逻辑上的简洁,并不是代码量变少.
Observable
:发射源,英文释义“可观察的”,在观察者模式中称为“被观察者”或“可观察对象”;
Observer
:接收源,英文释义“观察者”,没错!就是观察者模式中的“观察者”,可接收Observable、Subject发射的数据;
Subject
:Subject是一个比较特殊的对象,既可充当发射源,也可充当接收源,为避免初学者被混淆,本章将不对Subject做过多的解释和使用,重点放在Observable和Observer上,先把最基本方法的使用学会,后面再学其他的都不是什么问题;
Subscriber
:“订阅者”,也是接收源,那它跟Observer有什么区别呢?Subscriber实现了Observer接口,比Observer多了一个最重要的方法unsubscribe( ),用来取消订阅,当你不再想接收数据了,可以调用unsubscribe( )方法停止接收,Observer 在 subscribe() 过程中,最终也会被转换成 Subscriber 对象,一般情况下,建议使用Subscriber作为接收源;
Action0
:RxJava中的一个接口,它只有一个无参call()方法,且无返回值,同样还有Action1,Action2…Action9等,Action1封装了含有 1 个参的call()方法,即call(T t),Action2封装了含有 2 个参数的call方法,即call(T1 t1,T2 t2),以此类推;
Func0
:与Action0非常相似,也有call()方法,但是它是有返回值的,同样也有Func0、Func1…Func9;RxJava最核心的两个东西是Observables
(被观察者,事件源)和Subscribers(观察者)
。Observables
发出一系列事件,Subscribers
处理这些事件。这里的事件可以是任何你感兴趣的东西(触摸事件,web接口调用返回的数据…)
1.使用create( ),最基本的创建方式:
normalObservable = Observable.create(new Observable.OnSubscribe() { @Override public void call(Subscriber subscriber) { subscriber.onNext("create1"); //发射一个"create1"的String subscriber.onNext("create2"); //发射一个"create2"的String subscriber.onCompleted();//发射完成,这种方法需要手动调用onCompleted,才会回调Observer的onCompleted方法 }});
这里传入了一个 OnSubscribe
对象作为参数。OnSubscribe
会被存储在返回的 Observable
对象中,它的作用相当于一个计划表,当 Observable
被订阅的时候,OnSubscribe
的 call()
方法会自动被调用,事件序列就会依照设定依次触发(对于上面的代码,就是观察者Subscriber
将会被调用两次 onNext() 和一次 onCompleted())。这样,由被观察者调用了观察者的回调方法,就实现了由被观察者向观察者的事件传递,即观察者模式。
上面定义的Observable对象仅仅发出字符串,然后就结束了。我们需要创建一个Subscriber来处理Observable对象发出的字符串:
2.使用just( ),将为你创建一个Observable并自动为你调用onNext( )发射数据:
justObservable = Observable.just("just1","just2");//依次发送"just1"和"just2" // 观察者将会被调用两次 onNext() 和一次 onCompleted()
3.使用from( ),遍历集合,发送每个item:
Listlist = new ArrayList<>();list.add("from1");list.add("from2");list.add("from3");fromObservable = Observable.from(list); //遍历list 每次发送一个/** 注意,just()方法也可以传list,但是发送的是整个list对象,而from()发送的是list的一个item** /
4.使用defer( ),有观察者订阅时才创建Observable,并且为每个观察者创建一个新的Observable:
deferObservable = Observable.defer(new Func0>() { @Override //注意此处的call方法没有Subscriber参数 public Observable call() { return Observable.just("deferObservable"); }});
5.使用interval( ),创建一个按固定时间间隔发射整数序列的Observable,可用作定时器:
intervalObservable = Observable.interval(1, TimeUnit.SECONDS);//每隔一秒发送一次
6.使用range( ),创建一个发射特定整数序列的Observable,第一个参数为起始值,第二个为发送的个数,如果为0则不发送,负数则抛异常:
rangeObservable = Observable.range(10, 5);//将发送整数10,11,12,13,14
7.使用timer( ),创建一个Observable,它在一个给定的延迟后发射一个特殊的值,等同于Android中Handler的postDelay( )方法:
timeObservable = Observable.timer(3, TimeUnit.SECONDS); //3秒后发射一个值
8.使用repeat( ),创建一个重复发射特定数据的Observable:
repeatObservable = Observable.just("repeatObservable").repeat(3);//重复发射3次
mObserver = new Observer() { @Override public void onCompleted() { LogUtil.log("onCompleted"); } @Override public void onError(Throwable e) { } @Override public void onNext(String s) { LogUtil.log(s); }};
ok,有了Observable和Obsever,我们就可以随便玩了,任取一个已创建的Observable和Observer关联上,即形成一个RxJava的例子,如:
justObservable.subscribe(mObserver);
注意这个订阅是反的,是Observable订阅了Observer
mObserver的onNext方法将会依次收到来自justObservable的数据"just1"、"just2",另外,如果你不在意数据是否接收完或者是否出现错误,即不需要Observer的onCompleted()和onError()方法,可使用Action1,subscribe()支持将Action1作为参数传入,RxJava将会调用它的call方法来接收数据,代码如下:
justObservable.subscribe(new Action1() { @Override public void call(String s) { LogUtil.log(s); }});
RxJava 的异步实现,是通过一种扩展的观察者模式来实现的。
观察者模式面向的需求是:A对象(观察者)对 B对象(被观察者)的某种变化高度敏感,需要在 B 变化的一瞬间做出反应。
举个例子,新闻里喜闻乐见的警察抓小偷,警察需要在小偷伸手作案的时候实施抓捕。在这个例子里,警察是观察者,小偷是被观察者,警察需要时刻盯着小偷的一举一动,才能保证不会漏过任何瞬间。
程序的观察者模式和这种真正的『观察』略有不同,观察者不需要时刻盯着被观察者(例如 A 不需要每过 2ms 就检查一次 B 的状态),而是采用注册(Register)或者称为订阅(Subscribe)的方式,告诉被观察者:我需要你的某某状态,你要在它变化的时候通知我。
Android 开发中一个比较典型的例子是点击监听器 OnClickListener 。对设置 OnClickListener 来说, View 是被观察者, OnClickListener 是观察者,二者通过 setOnClickListener() 方法达成订阅关系。订阅之后用户点击按钮的瞬间,Android Framework 就会将点击事件发送给已经注册的 OnClickListener 。采取这样被动的观察方式,既省去了反复检索状态的资源消耗,也能够得到最高的反馈速度。当然,这也得益于我们可以随意定制自己程序中的观察者和被观察者,而警察叔叔明显无法要求小偷『你在作案的时候务必通知我』。
与传统观察者模式不同, RxJava 的事件回调方法除了普通事件 onNext() (相当于 onClick() / onEvent())之外,还定义了两个特殊的事件:onCompleted() 和 onError()。
在一个正确运行的事件序列中, onCompleted() 和 onError() 有且只有一个,并且是事件序列中的最后一个。需要注意的是,onCompleted() 和 onError() 二者也是互斥的,即在队列中调用了其中一个,就不应该再调用另一个。
RxJava 提供了对事件序列进行变换的支持,这是它的核心功能之一.
所谓变换,就是将事件序列中的对象或整个序列进行加工处理,转换成不同的事件或事件序列。概念说着总是模糊难懂的,来看 API。
先看一个例子
Observable.just("images/logo.png") // 输入类型 String .map(new Func1() { @Override public Bitmap call(String filePath) { // 参数类型 String return getBitmapFromPath(filePath); // 返回类型 Bitmap } }) .subscribe(new Action1 () { @Override public void call(Bitmap bitmap) { // 参数类型 Bitmap showBitmap(bitmap); } });
map()
: 事件对象的直接变换,上面的例子把事件的参数类型由 String转为了 Bitmap
首先假设这么一种需求:假设有一个数据结构『学生』,现在需要打印出一组学生的名字。实现方式很简单:
Student[] students = ...;Subscribersubscriber = new Subscriber () { @Override public void onNext(String name) { Log.d(tag, name); } ...};Observable.from(students) .map(new Func1 () { @Override public String call(Student student) { return student.getName(); } }) .subscribe(subscriber);
很简单。那么再假设:如果要打印出每个学生所需要修的所有课程的名称呢?(需求的区别在于,每个学生只有一个名字,但却有多个课程。)首先可以这样实现:
Student[] students = ...;Subscribersubscriber = new Subscriber () { @Override public void onNext(Student student) { List courses = student.getCourses(); for (int i = 0; i < courses.size(); i++) { Course course = courses.get(i); Log.d(tag, course.getName()); } } ...};Observable.from(students) .subscribe(subscriber);
依然很简单。那么如果我不想在 Subscriber 中使用 for 循环,而是希望 Subscriber 中直接传入单个的 Course 对象呢(这对于代码复用很重要)?用 map() 显然是不行的,因为 map() 是一对一的转化,而我现在的要求是一对多的转化。那怎么才能把一个 Student 转化成多个 Course 呢?
这个时候,就需要用 flatMap() 了:
Student[] students = ...;Subscribersubscriber = new Subscriber () { @Override public void onNext(Course course) { Log.d(tag, course.getName()); } ...};Observable.from(students) .flatMap(new Func1 >() { @Override public Observable call(Student student) { return Observable.from(student.getCourses()); } }) .subscribe(subscriber);
从上面的代码可以看出, flatMap() 和 map() 有一个相同点:它也是把传入的参数转化之后返回另一个对象。但需要注意,和 map()不同的是, flatMap() 中返回的是个 Observable 对象,并且这个 Observable 对象并不是被直接发送到了 Subscriber 的回调方法中。
flatMap() 的原理是这样的:
这三个步骤,把事件拆成了两级,通过一组新创建的 Observable 将初始的对象『铺平』之后通过统一路径分发了下去。而这个『铺平』就是 flatMap() 所谓的 flat。
在不指定线程的情况下, RxJava 遵循的是线程不变的原则,即:在哪个线程调用 subscribe(),就在哪个线程生产事件;在哪个线程生产事件,就在哪个线程消费事件。如果需要切换线程,就需要用到 Scheduler (调度器)。
Scheduler
——调度器,相当于线程控制器,RxJava 通过它来指定每一段代码应该运行在什么样的线程。RxJava 已经内置了几个 Scheduler ,它们已经适合大多数的使用场景:
Schedulers.immediate()
: 直接在当前线程运行,相当于不指定线程。这是默认的 Scheduler。Schedulers.newThread()
: 总是启用新线程,并在新线程执行操作。Schedulers.io()
: I/O 操作(读写文件、读写数据库、网络信息交互等)所使用的 Scheduler。行为模式和 newThread() 差不多,区别在于 io() 的内部实现是是用一个无数量上限的线程池,可以重用空闲的线程,因此多数情况下 io() 比 newThread() 更有效率。不要把计算工作放在 io() 中,可以避免创建不必要的线程。Schedulers.computation()
: 计算所使用的 Scheduler。这个计算指的是 CPU 密集型计算,即不会被 I/O 等操作限制性能的操作,例如图形的计算。这个 Scheduler 使用的固定的线程池,大小为 CPU 核数。不要把 I/O 操作放在 computation() 中,否则 I/O 操作的等待时间会浪费 CPU。另外, Android 还有一个专用的 AndroidSchedulers.mainThread()
,它指定的操作将在 Android 主线程运行。有了这几个 Scheduler ,就可以使用 subscribeOn()
和 observeOn()
两个方法来对线程进行控制了。
subscribeOn()
: 指定 subscribe() 所发生的线程,即 Observable.OnSubscribe 被激活时所处的线程。或者叫做事件产生的线程。 observeOn()
: 指定 Subscriber 所运行在的线程。或者叫做事件消费的线程。来个例子
Observable.just(1, 2, 3, 4) .subscribeOn(Schedulers.io()) // 指定 subscribe() 发生在 IO 线程 .observeOn(AndroidSchedulers.mainThread()) // 指定 Subscriber 的回调发生在主线程 .subscribe(new Action1() { @Override public void call(Integer number) { Log.d(tag, "number:" + number); } });
上面这段代码中,由于 subscribeOn(Schedulers.io()) 的指定,被创建的事件的内容 1、2、3、4 将会在 IO 线程发出;而由于 observeOn(AndroidScheculers.mainThread()) 的指定,因此 subscriber 数字的打印将发生在主线程 。
事实上,这种在 subscribe() 之前写上两句 subscribeOn(Scheduler.io()) 和 observeOn(AndroidSchedulers.mainThread()) 的使用方式非常常见,它适用于多数的 『后台线程取数据,主线程显示』的程序策略。
转载于:https://blog.51cto.com/11346335/2134985