接上文
创新互联从2013年成立,是专业互联网技术服务公司,拥有项目成都网站制作、成都网站建设网站策划,项目实施与项目整合能力。我们以让每一个梦想脱颖而出为使命,1280元平阳做网站,已为上家服务,为平阳各地企业和个人服务,联系电话:13518219792
输出日志信息
- call:2 ConcatMap RxNewThreadScheduler-5
- onNext: ConcatMap 101 ConcatMap
- call:2 ConcatMap RxNewThreadScheduler-6
- onNext: ConcatMap 102 ConcatMap
- call:2 ConcatMap RxNewThreadScheduler-7
- onNext: ConcatMap 103 ConcatMap
- onCompleted: ConcatMap
通过该操作符和flatMap输出的日志信息,很容易看出flatMap并没有保证数据源的顺序性,但是ConcatMap操作符保证了数据源的顺序性。在应用中,如果你对数据的顺序性有要求的话,就需要使用ConcatMap。若没有要求,二者皆可使用。
SwitchMap
当原始Observable发射一个新的数据(Observable)时,它将取消订阅并停止监视产生执之前那个数据的Observable,只监视当前这一个.
- Integer[] integers = {1, 2, 3};
- Observable.from(integers).switchMap(new Func1>() {
- @Override
- public Observable call(Integer integer) {
- Log.e(TAG, "call: SwitchMap" + Thread.currentThread().getName());
- //如果不通过subscribeOn(Schedulers.newThread())在在子线程模拟并发操作,所有数据源依然会全部输出,也就是并发操作此操作符才有作用
- //若在此通过Thread。sleep()设置等待时间,则输出信息会不一样。相当于模拟并发程度
- return Observable.just((integer + 100) + "SwitchMap").subscribeOn(Schedulers.newThread());
- }
- }).observeOn(AndroidSchedulers.mainThread()).subscribe(new Subscriber() {
- @Override
- public void onCompleted() {
- Log.e(TAG, "onCompleted: SwitchMap");
- }
- @Override
- public void onError(Throwable e) {
- Log.e(TAG, "onError: SwitchMap");
- }
- @Override
- public void onNext(String s) {
- Log.e(TAG, "onNext: SwitchMap "+s);
- }
- });
输出日志信息
- call: SwitchMapmain
- call: SwitchMapmain
- call: SwitchMapmain
- onNext: SwitchMap 106SwitchMap
- onCompleted: SwitchMap
当数据源较多时,并不一定是只输出***一项数据,有可能输出几项数据,也可能是全部。
GroupBy
看到这个词你就应该想到了这个操作符的作用,就是你理解的含义,他将数据源按照你的约定进行分组。我们通过groupBy实行将1到10的数据进行就划分,代码如下
- Observable.range(1, 10).groupBy(new Func1() {
- @Override
- public Boolean call(Integer integer) {
- return integer % 2 == 0;
- }
- }).subscribe(new Subscriber>() {
- @Override
- public void onCompleted() {
- Log.e(TAG, "onCompleted:1 ");
- }
- @Override
- public void onError(Throwable e) {
- Log.e(TAG, "onError:1 ");
- }
- @Override
- public void onNext(GroupedObservable booleanIntegerGroupedObservable) {
- booleanIntegerGroupedObservable.toList().subscribe(new Subscriber>() {
- @Override
- public void onCompleted() {
- Log.e(TAG, "onCompleted:2 " );
- }
- @Override
- public void onError(Throwable e) {
- Log.e(TAG, "onError:2 ");
- }
- @Override
- public void onNext(List integers) {
- Log.e(TAG, "onNext:2 "+integers);
- }
- });
- }
- });
输出日志信息
- onNext:2 [1, 3, 5, 7, 9]
- onCompleted:2
- onNext:2 [2, 4, 6, 8, 10]
- onCompleted:2
- onCompleted:1
在上面代码中booleanIntegerGroupedObservable变量有一个getKey()方法,该方法返回的是分组的key,他的值就是groupBy方法call回调所用函数的值,在上面也就是integer % 2 == 0的值,及true和false。有几个分组也是有此值决定的。
Scan
操作符对原始Observable发射的***项数据应用一个函数,然后将那个函数的结果作为自己的***项数据发射。它将函数的结果同第二项数据一起填充给这个函数来产生它自己的第二项数据。它持续进行这个过程来产生剩余的数据序列。
例如计算1+2+3+4的和
- Observable.range(1,4).scan(new Func2() {
- @Override
- public Integer call(Integer integer, Integer integer2) {
- Log.e(TAG, "call: integer:"+integer+" integer2 "+integer2);
- return integer+integer2;
- }
- }).subscribe(new Subscriber() {
- @Override
- public void onCompleted() {
- Log.e(TAG, "onCompleted: ");
- }
- @Override
- public void onError(Throwable e) {
- Log.e(TAG, "onError: " );
- }
- @Override
- public void onNext(Integer integer) {
- Log.e(TAG, "onNext: "+integer );
- }
- });
输出日志信息
- onNext: 1
- call: integer:1 integer2 2
- onNext: 3
- call: integer:3 integer2 3
- onNext: 6
- call: integer:6 integer2 4
- onNext: 10
- onCompleted:
对于scan有一个重载方法,可以设置一个初始值,如上面代码,初始值设置为10,只需将scan加个参数scan(10,new Func2)。
Buffer
操作符将一个Observable变换为另一个,原来的Observable正常发射数据,变换产生的Observable发射这些数据的缓存集合,如果原来的Observable发射了一个onError通知,Buffer会立即传递这个通知,而不是首先发射缓存的数据,即使在这之前缓存中包含了原始Observable发射的数据。
示例代码
- Observable.range(10, 6).buffer(2).subscribe(new Subscriber>() {
- @Override
- public void onCompleted() {
- Log.e(TAG, "onCompleted: ");
- }
- @Override
- public void onError(Throwable e) {
- Log.e(TAG, "onError: ");
- }
- @Override
- public void onNext(List integers) {
- Log.e(TAG, "onNext: " + integers);
- }
- });
输出日志信息
- onNext: [10, 11]
- onNext: [12, 13]
- onNext: [14, 15]
- onCompleted:
上面一次性订阅两个数据,如果设置参数为6,就一次性订阅。buffer的另一重载方法buffer(count, skip)从原始Observable的***项数据开始创建新的缓存(长度count),此后每当收到skip项数据,用count项数据填充缓存:开头的一项和后续的count-1项,它以列表(List)的形式发射缓存,取决于count和skip的值,这些缓存可能会有重叠部分(比如skip count时)。具体执行结果,你可以设置不同的skip和count观察输出日志,查看执行结果及流程。
Window
Window和Buffer类似,但不是发射来自原始Observable的数据包,它发射的是Observables,这些Observables中的每一个都发射原始Observable数据的一个子集,***发射一个onCompleted通知。
- Observable.range(10, 6).window(2).subscribe(new Subscriber>() {
- @Override
- public void onCompleted() {
- Log.e(TAG, "onCompleted1: ");
- }
- @Override
- public void onError(Throwable e) {
- Log.e(TAG, "onError1: ");
- }
- @Override
- public void onNext(Observable integerObservable) {
- Log.e(TAG, "onNext1: ");
- tv1.append("\n");
- integerObservable.subscribe(new Subscriber() {
- @Override
- public void onCompleted() {
- Log.e(TAG, "onCompleted2: ");
- }
- @Override
- public void onError(Throwable e) {
- Log.e(TAG, "onError2: ");
- }
- @Override
- public void onNext(Integer integer) {
- Log.e(TAG, "onNext2: "+integer);
- }
- });
- }
- });
输出日志信息
- onNext2: 10
- onNext2: 11
- onCompleted2:
- onNext2: 12
- onNext2: 13
- onCompleted2:
- onNext2: 14
- onNext2: 15
- onCompleted2:
- onCompleted1:
window和buffer一样也有不同的重载方法。这两个操作符相对其他操作符不太容易理解,可以去RxJava GitHub理解,里面有图示解析。当然***的理解方式就是通过更改变量的值,去观察输出的日志信息。
好了,这篇文章就介绍到这里。若文中有错误的地方,欢迎指正。谢谢。
网站名称:RxJava操作符系列二(下)
文章起源:http://www.shufengxianlan.com/qtweb/news10/242910.html
网站建设、网络推广公司-创新互联,是专注品牌与效果的网站制作,网络营销seo公司;服务项目有等
声明:本网站发布的内容(图片、视频和文字)以用户投稿、用户转载内容为主,如果涉及侵权请尽快告知,我们将会在第一时间删除。文章观点不代表本网站立场,如需处理请联系客服。电话:028-86922220;邮箱:631063699@qq.com。内容未经允许不得转载,或转载时需注明来源: 创新互联