建设工程施工合同 示范文本,专业的seo外包公司,网站机房建设解决方案,搭建wordpress一、定义
RxJava 是一个 基于事件流、实现异步操作的库。 二、gradle配置 implementation io.reactivex.rxjava3:rxjava:3.0.0 三、 Rxjava相关操作符介绍
1.创建操作符。
1.create()
创建被观察者对象。
private fun doRxJavaCommonTest() {Observable.create(Observable…一、定义
RxJava 是一个 基于事件流、实现异步操作的库。 二、gradle配置 implementation io.reactivex.rxjava3:rxjava:3.0.0 三、 Rxjava相关操作符介绍
1.创建操作符。
1.create()
创建被观察者对象。
private fun doRxJavaCommonTest() {Observable.create(ObservableOnSubscribeString {Log.v(TAG, --- subscribe)it.onNext()it.onNext()it.onComplete()}).subscribe(object : ObserverString {override fun onComplete() {Log.v(TAG, --- onComplete)}override fun onSubscribe(d: Disposable?) {Log.v(TAG, --- onSubscribe)}override fun onNext(t: String?) {Log.v(TAG, --- onNext)}override fun onError(e: Throwable?) {Log.v(TAG, --- onnError)}})
} 2.just()
主要作用就是创建一个被观察者并发送事件但是发送的事件不可以超过10个以上。
超过10个就会编译报错。 private fun testJust() {Observable.just(1, 2, 3).subscribe(object : ObserverString {override fun onComplete() {Log.v(TAG, onComplete)}override fun onSubscribe(d: Disposable?) {Log.v(TAG, onSubscribe)}override fun onNext(t: String?) {Log.v(TAG, 接收到了事件$t)}override fun onError(e: Throwable?) {Log.v(TAG, onError:$e)}})
}
2021-06-05 15:38:55.690 29674-29674/com.example.rxjavademo V/MAINACTIVITY_RXJAVA: onSubscribe
2021-06-05 15:38:55.690 29674-29674/com.example.rxjavademo V/MAINACTIVITY_RXJAVA: 接收到了事件1
2021-06-05 15:38:55.691 29674-29674/com.example.rxjavademo V/MAINACTIVITY_RXJAVA: 接收到了事件2
2021-06-05 15:38:55.691 29674-29674/com.example.rxjavademo V/MAINACTIVITY_RXJAVA: 接收到了事件3
2021-06-05 15:38:55.691 29674-29674/com.example.rxjavademo V/MAINACTIVITY_RXJAVA: onComplete 3. timer()
作用当到指定时间后就会发送一个0的值给观察者。
在项目中可以做一些延时的处理类似于Handler中的延时。
延迟5秒后将结果发送给观察者Consumer和Observer是创建观察者的两种写法相当于观察者中的onNext方法。 private fun testTimer() {Observable.timer(5, TimeUnit.SECONDS).subscribe(object : ConsumerLong {override fun accept(t: Long?) {Log.v(TAG, t.toString())}})
}
2021-06-05 16:03:38.900 30043-30088/com.example.rxjavademo V/MAINACTIVITY_RXJAVA: 0 4 interval()
每隔一段时间就会发送一个事件这个事件是从0开始不断增1的数字。 类似于Android中的Timer做计时器用。 private fun testInterval() {Observable.interval(2, TimeUnit.SECONDS).subscribe(object : ConsumerLong {override fun accept(t: Long?) {Log.v(TAG, t.toString())}})
}
2021-06-05 16:22:22.218 30163-30224/com.example.rxjavademo V/MAINACTIVITY_RXJAVA: 0
2021-06-05 16:22:24.218 30163-30224/com.example.rxjavademo V/MAINACTIVITY_RXJAVA: 1
2021-06-05 16:22:26.217 30163-30224/com.example.rxjavademo V/MAINACTIVITY_RXJAVA: 2
2021-06-05 16:22:28.218 30163-30224/com.example.rxjavademo V/MAINACTIVITY_RXJAVA: 3
2021-06-05 16:22:30.219 30163-30224/com.example.rxjavademo V/MAINACTIVITY_RXJAVA: 4
2021-06-05 16:22:32.218 30163-30224/com.example.rxjavademo V/MAINACTIVITY_RXJAVA: 5
2021-06-05 16:22:34.218 30163-30224/com.example.rxjavademo V/MAINACTIVITY_RXJAVA: 6
... ...
5 intervalRange()
可以指定发送事件的开始值(100)和数量(4)其他与 interval() 的功能一样。参数依次是开始值循环执行的次数初始延迟时间执行间隔时间时间单位
private fun testIntervalRange() {Observable.intervalRange(100, 4, 0, 10, TimeUnit.SECONDS).subscribe(object : ConsumerLong {override fun accept(t: Long?) {Log.v(TAG, t.toString())}})
}
2021-06-05 16:40:22.510 30432-30500/com.example.rxjavademo V/MAINACTIVITY_RXJAVA: 100
2021-06-05 16:40:32.517 30432-30500/com.example.rxjavademo V/MAINACTIVITY_RXJAVA: 101
2021-06-05 16:40:42.511 30432-30500/com.example.rxjavademo V/MAINACTIVITY_RXJAVA: 102
2021-06-05 16:40:52.513 30432-30500/com.example.rxjavademo V/MAINACTIVITY_RXJAVA: 103 6 range()
发送一定范围的事件序列。
private fun testRange() {Observable.range(0, 5).subscribe(object : ConsumerInt {override fun accept(t: Int?) {Log.v(TAG, t.toString())}})
} 2021-06-05 18:25:12.439 31282-31282/com.example.rxjavademo V/MAINACTIVITY_RXJAVA: 0
2021-06-05 18:25:12.439 31282-31282/com.example.rxjavademo V/MAINACTIVITY_RXJAVA: 1
2021-06-05 18:25:12.439 31282-31282/com.example.rxjavademo V/MAINACTIVITY_RXJAVA: 2
2021-06-05 18:25:12.439 31282-31282/com.example.rxjavademo V/MAINACTIVITY_RXJAVA: 3
2021-06-05 18:25:12.440 31282-31282/com.example.rxjavademo V/MAINACTIVITY_RXJAVA: 47 rangeLong()
作用与 range() 一样数据类型为 Long
private fun testLongRange() {Observable.rangeLong(0,3).subscribe(object : ConsumerLong{override fun accept(t: Long?) {Log.v(TAG, t.toString())}})
} 2021-06-05 18:54:54.821 31573-31573/com.example.rxjavademo V/MAINACTIVITY_RXJAVA: 0
2021-06-05 18:54:54.821 31573-31573/com.example.rxjavademo V/MAINACTIVITY_RXJAVA: 1
2021-06-05 18:54:54.821 31573-31573/com.example.rxjavademo V/MAINACTIVITY_RXJAVA: 2 8 empty() 、 never() 、 error()
empty():直接发送 onComplete() 事件,所以只会执行 onSubscribe 和 onComplete
private fun testEmpty() {Observable.emptyInt().subscribe(object : ObserverInt {override fun onComplete() {Log.v(TAG, onComplete)}override fun onSubscribe(d: Disposable?) {Log.v(TAG, onSubscribe)}override fun onNext(t: Int?) {Log.v(TAG, t.toString())}override fun onError(e: Throwable?) {Log.v(TAG, e.toString())}})
}
2021-06-05 19:12:06.043 31720-31720/com.example.rxjavademo V/MAINACTIVITY_RXJAVA: onSubscribe
2021-06-05 19:12:06.043 31720-31720/com.example.rxjavademo V/MAINACTIVITY_RXJAVA: onComplete
error():
private fun testError() {val subscribe Observable.errorInt(object : Throwable() {init {Log.v(TAG, error init)}}).subscribe(object : ObserverInt {override fun onComplete() {Log.v(TAG, onComplete)}override fun onSubscribe(d: Disposable?) {Log.v(TAG, onSubscribe)}override fun onNext(t: Int?) {Log.v(TAG, t.toString())}override fun onError(e: Throwable?) {Log.v(TAG, e.toString())}})
}
2021-06-06 10:35:23.908 500-500/com.example.rxjavademo V/MAINACTIVITY_RXJAVA: error init
2021-06-06 10:35:23.954 500-500/com.example.rxjavademo V/MAINACTIVITY_RXJAVA: onSubscribe
2021-06-06 10:35:23.954 500-500/com.example.rxjavademo V/MAINACTIVITY_RXJAVA: com.example.rxjavademo.MainActivity$testError$subscribe$1 never(): private fun testNever() {Observable.neverInt().subscribe(object : ObserverInt {override fun onComplete() {Log.v(TAG, onComplete)}override fun onSubscribe(d: Disposable?) {Log.v(TAG, onSubscribe)}override fun onNext(t: Int?) {Log.v(TAG, t.toString())}override fun onError(e: Throwable?) {Log.v(TAG, e.toString())}})
}
2021-06-06 10:44:38.259 873-873/com.example.rxjavademo V/MAINACTIVITY_RXJAVA: onSubscribe 2.转换操作符
1.map
可以将被观察者发送的数据类型转变成其他的类型,再传给观察者。
private fun testMap() {Observable.just(1, 2, 3).map(object : FunctionInt, Int {override fun apply(t: Int?): Int {return ((t ?: 0) 1)}}).subscribe(object : ObserverInt {override fun onComplete() {Log.v(TAG, onComplete)}override fun onSubscribe(d: Disposable?) {Log.v(TAG, onSubscribe)}override fun onNext(t: Int?) {Log.v(TAG, t.toString())}override fun onError(e: Throwable?) {Log.v(TAG, e.toString())}})
} 2021-06-06 11:01:48.716 1400-1400/com.example.rxjavademo V/MAINACTIVITY_RXJAVA: onSubscribe
2021-06-06 11:01:48.716 1400-1400/com.example.rxjavademo V/MAINACTIVITY_RXJAVA: 2
2021-06-06 11:01:48.716 1400-1400/com.example.rxjavademo V/MAINACTIVITY_RXJAVA: 3
2021-06-06 11:01:48.716 1400-1400/com.example.rxjavademo V/MAINACTIVITY_RXJAVA: 4
2021-06-06 11:01:48.716 1400-1400/com.example.rxjavademo V/MAINACTIVITY_RXJAVA: onComplete
2 flatMap()
将事件序列中的元素进行加工返回一个新的被观察者。 flatMap() 其实与 map() 类似但是 flatMap() 返回的是一个 Observerablemap()只是返回数据如果在元素再加工的时候想再使用上面的创建操作符的话建议使用flatMap()而非map()。
private fun flatMap() {Observable.just(1, 2, 3, 4).flatMap(object : FunctionInt, ObservableSourceInt {override fun apply(t: Int?): ObservableSourceInt {return if (t 4 || t null) {Observable.error(Exception(4是错误数字))} else {Observable.just(t 10)}}}).subscribe(object : ConsumerInt {override fun accept(t: Int?) {Log.v(TAG, t.toString())}}, object :ConsumerThrowable {override fun accept(t: Throwable?) {Log.v(TAG, t.toString())}})
} 2021-06-06 11:53:59.738 2310-2310/com.example.rxjavademo V/MAINACTIVITY_RXJAVA: 11
2021-06-06 11:53:59.738 2310-2310/com.example.rxjavademo V/MAINACTIVITY_RXJAVA: 12
2021-06-06 11:53:59.738 2310-2310/com.example.rxjavademo V/MAINACTIVITY_RXJAVA: 13
2021-06-06 11:53:59.744 2310-2310/com.example.rxjavademo V/MAINACTIVITY_RXJAVA: java.lang.Exception: 4是错误数字 3 concatMap()
concatMap() 和 flatMap() 基本上是一样的只不过 concatMap() 转发出来的事件是有序的而 flatMap() 是无序的。 private fun testConcatMap() {Observable.just(1, 2, 3, 4).concatMap(object : FunctionInt, ObservableSourceInt {override fun apply(t: Int?): ObservableSourceInt {if (t 4 || t null) {return Observable.error(Exception(4 或者 null不是有效数字))}return Observable.just(t 10)}}).subscribe(object : ConsumerInt {override fun accept(t: Int?) {Log.v(TAG, t.toString())}}, object : ConsumerThrowable {override fun accept(t: Throwable?) {Log.v(TAG, t?.message?.toString() ?: )}})
} 14:23:59.811 3252-3252/com.example.rxjavademo V/MAINACTIVITY_RXJAVA: 11
2021-06-06 14:23:59.812 3252-3252/com.example.rxjavademo V/MAINACTIVITY_RXJAVA: 12
2021-06-06 14:23:59.812 3252-3252/com.example.rxjavademo V/MAINACTIVITY_RXJAVA: 13
2021-06-06 14:23:59.814 3252-3252/com.example.rxjavademo V/MAINACTIVITY_RXJAVA: 4 或者 null不是有效数字 4 buffer()
从需要发送的事件当中获取一定数量的事件并将这些事件放到缓冲区当中一并发出。
buffer 有两个参数一个是 count另一个 skip。count 缓冲区元素的数量skip 就代表缓冲区满了之后发送下一次事件序列的时候要跳过多少元素。 private fun testBuffer() {Observable.just(1, 2, 3, 4, 5).buffer(2, 1).subscribe(object : ConsumerListInt {override fun accept(t: ListInt?) {Log.v(TAG, 缓冲区大小 t?.size)t?.forEach {Log.v(TAG, it.toString())}}})
} 2021-06-06 14:45:43.605 3528-3528/com.example.rxjavademo V/MAINACTIVITY_RXJAVA: 缓冲区大小2
2021-06-06 14:45:43.605 3528-3528/com.example.rxjavademo V/MAINACTIVITY_RXJAVA: 1
2021-06-06 14:45:43.605 3528-3528/com.example.rxjavademo V/MAINACTIVITY_RXJAVA: 2
2021-06-06 14:45:43.605 3528-3528/com.example.rxjavademo V/MAINACTIVITY_RXJAVA: 缓冲区大小2
2021-06-06 14:45:43.605 3528-3528/com.example.rxjavademo V/MAINACTIVITY_RXJAVA: 2
2021-06-06 14:45:43.605 3528-3528/com.example.rxjavademo V/MAINACTIVITY_RXJAVA: 3
2021-06-06 14:45:43.605 3528-3528/com.example.rxjavademo V/MAINACTIVITY_RXJAVA: 缓冲区大小2
2021-06-06 14:45:43.605 3528-3528/com.example.rxjavademo V/MAINACTIVITY_RXJAVA: 3
2021-06-06 14:45:43.605 3528-3528/com.example.rxjavademo V/MAINACTIVITY_RXJAVA: 4
2021-06-06 14:45:43.605 3528-3528/com.example.rxjavademo V/MAINACTIVITY_RXJAVA: 缓冲区大小2
2021-06-06 14:45:43.605 3528-3528/com.example.rxjavademo V/MAINACTIVITY_RXJAVA: 4
2021-06-06 14:45:43.605 3528-3528/com.example.rxjavademo V/MAINACTIVITY_RXJAVA: 5
2021-06-06 14:45:43.605 3528-3528/com.example.rxjavademo V/MAINACTIVITY_RXJAVA: 缓冲区大小1
2021-06-06 14:45:43.605 3528-3528/com.example.rxjavademo V/MAINACTIVITY_RXJAVA: 5
5 scan()
将发射的数据通过一个函数进行变换然后将变换后的结果作为参数跟下一个发射的数据一起继续通过那个函数变换这样依次连续发射得到最终结果。 private fun testScan() {Observable.just(1, 2, 3, 4, 5, 6).scan(object : BiFunctionInt, Int, Int {override fun apply(t1: Int?, t2: Int?): Int {Log.v(TAG, t1: t1 t2: t2)return ((t1 ?: 0) (t2 ?: 0))}}).subscribe(object : ConsumerInt {override fun accept(t: Int?) {Log.v(TAG, t.toString())}})
} 2021-06-06 14:58:38.200 3698-3698/com.example.rxjavademo V/MAINACTIVITY_RXJAVA: 1
2021-06-06 14:58:38.200 3698-3698/com.example.rxjavademo V/MAINACTIVITY_RXJAVA: t1:1t2:2
2021-06-06 14:58:38.200 3698-3698/com.example.rxjavademo V/MAINACTIVITY_RXJAVA: 3
2021-06-06 14:58:38.200 3698-3698/com.example.rxjavademo V/MAINACTIVITY_RXJAVA: t1:3t2:3
2021-06-06 14:58:38.200 3698-3698/com.example.rxjavademo V/MAINACTIVITY_RXJAVA: 6
2021-06-06 14:58:38.200 3698-3698/com.example.rxjavademo V/MAINACTIVITY_RXJAVA: t1:6t2:4
2021-06-06 14:58:38.200 3698-3698/com.example.rxjavademo V/MAINACTIVITY_RXJAVA: 10
2021-06-06 14:58:38.200 3698-3698/com.example.rxjavademo V/MAINACTIVITY_RXJAVA: t1:10t2:5
2021-06-06 14:58:38.200 3698-3698/com.example.rxjavademo V/MAINACTIVITY_RXJAVA: 15
2021-06-06 14:58:38.200 3698-3698/com.example.rxjavademo V/MAINACTIVITY_RXJAVA: t1:15t2:6
2021-06-06 14:58:38.200 3698-3698/com.example.rxjavademo V/MAINACTIVITY_RXJAVA: 21四、RxJava源码分析
1.RxJava订阅流程 2.onNext与onComplete 3.理解RxJava是切换线程
private fun testChangeMainThread() {Observable.create(ObservableOnSubscribeString {it.onNext(1)it.onNext(2)it.onComplete()Log.v(TAG, subscribe and currentThread is: Thread.currentThread())})//切换到子线程.subscribeOn(Schedulers.io())//切换到主线程.observeOn(AndroidSchedulers.mainThread()).subscribe(object : ObserverString {override fun onSubscribe(d: Disposable?) {Log.v(TAG, onSubscribe and currentThread is: Thread.currentThread())}override fun onNext(t: String?) {Log.v(TAG,onNext: t.toString() and currentThread is: Thread.currentThread())}override fun onError(e: Throwable?) {Log.v(TAG,onError: e?.message and currentThread is: Thread.currentThread())}override fun onComplete() {Log.v(TAG, onComplete and currentThread is: Thread.currentThread())}})
} 2021-06-08 20:44:40.133 6053-6053/com.example.rxjavademo V/MAINACTIVITY_RXJAVA: onSubscribe and currentThread is:Thread[main,5,main]
2021-06-08 20:44:40.138 6053-6112/com.example.rxjavademo V/MAINACTIVITY_RXJAVA: subscribe and currentThread is:Thread[RxCachedThreadScheduler-1,5,main]
2021-06-08 20:44:40.138 6053-6053/com.example.rxjavademo V/MAINACTIVITY_RXJAVA: onNext:1and currentThread is:Thread[main,5,main]
2021-06-08 20:44:40.139 6053-6053/com.example.rxjavademo V/MAINACTIVITY_RXJAVA: onNext:2and currentThread is:Thread[main,5,main]
2021-06-08 20:44:40.139 6053-6053/com.example.rxjavademo V/MAINACTIVITY_RXJAVA: onCompleteand currentThread is:Thread[main,5,main] 通过上面的log 可以发现
1.Observer观察者onSubscribe() 方法运行在当前线程中。
2.Observable被观察者中的 subscribe() 方法运行在subsribeOn()指定的线程中。
3.Observer(观察者)的onNext()和onComplete()和onError()运行在observerOn()指定的线程中。 RxJava线程切换只要涉及两个方法subscribeOn()和observeOn()。 1.subscribeOn()源码分析
subscribeOn(Schedulers.io())
subscribeOn()传入一个Scheduler类实例Scheduler是一个调度类能够延时或周期性地去执行一个任务。 Schedulers类的io()方法执行流程
Scheduler IO 创建流程 AndroidSchedulers.mainThread()
main thread创建流程 subscribeOnobserveOn。subscribeOn是用来调整被观察者发射源的线程而observeOn是调整观察者处理器的线程。 五、RxJava 背压策略
背压解决了 因被观察者发送事件速度 与 观察者接收事件速度 不匹配一般是前者 快于 后者从而导致观察者无法及时响应 / 处理所有 被观察者发送事件 的问题
1.观察者不接收事件的情况下被观察者继续发送事件存放到缓存区按需取出。
R.id.test_Flowable4 - {mSubscription?.request(2)
}/*** 观察者不接收事件扽情况下被观察者继续发送事件存放到缓存区按需取出*/
private var textFlowableTv: TextView? null
private var mSubscription: Subscription? null
private fun testFlowable5() {Flowable.create(FlowableOnSubscribeInt {Log.v(TAG, 发送事件 1)it.onNext(1)Log.v(TAG, 发送事件 2)it.onNext(2)Log.v(TAG, 发送事件 3)it.onNext(3)Log.v(TAG, 发送事件 4)it.onNext(4)Log.v(TAG, 发送完成)it.onComplete()}, BackpressureStrategy.ERROR).subscribeOn(Schedulers.io()).observeOn(AndroidSchedulers.mainThread()).subscribe(object : SubscriberInt {override fun onSubscribe(s: Subscription?) {mSubscription s}override fun onNext(t: Int?) {Log.v(TAG, onNext:$t)}override fun onError(t: Throwable?) {Log.v(TAG, onError:$t)}override fun onComplete() {Log.v(TAG, onComplete)}})
}021-06-27 16:47:29.121 24137-24174/com.example.rxjavademo V/MAINACTIVITY_RXJAVA: 发送事件 1
2021-06-27 16:47:29.123 24137-24174/com.example.rxjavademo V/MAINACTIVITY_RXJAVA: 发送事件 2
2021-06-27 16:47:29.124 24137-24174/com.example.rxjavademo V/MAINACTIVITY_RXJAVA: 发送事件 3
2021-06-27 16:47:29.125 24137-24174/com.example.rxjavademo V/MAINACTIVITY_RXJAVA: 发送事件 4
2021-06-27 16:47:29.126 24137-24174/com.example.rxjavademo V/MAINACTIVITY_RXJAVA: 发送完成
2021-06-27 16:47:32.594 24137-24137/com.example.rxjavademo V/MAINACTIVITY_RXJAVA: onNext:1
2021-06-27 16:47:32.595 24137-24137/com.example.rxjavademo V/MAINACTIVITY_RXJAVA: onNext:2 2.观察者不接收事件的情况下被观察者继续发送事件至超出缓存区大小128
private fun testFlowableError() {Flowable.create(FlowableOnSubscribeInt {//发送129个事件即超出了缓存区的大小for (i in 0..128) {Log.v(TAG, 发送事件$i)it.onNext(i)}it.onComplete()}, BackpressureStrategy.ERROR).subscribeOn(Schedulers.io()).observeOn(AndroidSchedulers.mainThread()).subscribe(object : SubscriberInt {override fun onSubscribe(s: Subscription?) {Log.v(TAG, onSubscribe)}override fun onNext(t: Int?) {Log.v(TAG, onNext:$t)}override fun onError(t: Throwable?) {Log.v(TAG, onError:$t)}override fun onComplete() {Log.v(TAG, onComplete)}})
} 2021-06-27 17:33:19.096 25381-25412/com.example.rxjavademo V/MAINACTIVITY_RXJAVA: 发送事件117
2021-06-27 17:33:19.096 25381-25412/com.example.rxjavademo V/MAINACTIVITY_RXJAVA: 发送事件118
2021-06-27 17:33:19.096 25381-25412/com.example.rxjavademo V/MAINACTIVITY_RXJAVA: 发送事件119
2021-06-27 17:33:19.096 25381-25412/com.example.rxjavademo V/MAINACTIVITY_RXJAVA: 发送事件120
2021-06-27 17:33:19.096 25381-25412/com.example.rxjavademo V/MAINACTIVITY_RXJAVA: 发送事件121
2021-06-27 17:33:19.096 25381-25412/com.example.rxjavademo V/MAINACTIVITY_RXJAVA: 发送事件122
2021-06-27 17:33:19.096 25381-25412/com.example.rxjavademo V/MAINACTIVITY_RXJAVA: 发送事件123
2021-06-27 17:33:19.097 25381-25412/com.example.rxjavademo V/MAINACTIVITY_RXJAVA: 发送事件124
2021-06-27 17:33:19.097 25381-25412/com.example.rxjavademo V/MAINACTIVITY_RXJAVA: 发送事件125
2021-06-27 17:33:19.098 25381-25412/com.example.rxjavademo V/MAINACTIVITY_RXJAVA: 发送事件126
2021-06-27 17:33:19.098 25381-25412/com.example.rxjavademo V/MAINACTIVITY_RXJAVA: 发送事件127
2021-06-27 17:33:19.098 25381-25412/com.example.rxjavademo V/MAINACTIVITY_RXJAVA: 发送事件128
2021-06-27 17:33:19.101 25381-25381/com.example.rxjavademo V/MAINACTIVITY_RXJAVA: onError:io.reactivex.rxjava3.exceptions.MissingBackpressureException: create: could not emit value due to lack of requests3.同步订阅 与 异步订阅 的区别
同步订阅中被观察者与 观察者工作于同1线程同步订阅关系中没有缓存区
被观察者在发送1个事件后必须等待观察者接收后才能继续发下1个事件
private fun testFlowable6() {Flowable.create(FlowableOnSubscribeInt {Log.v(TAG, 发送事件 1)it.onNext(1)Log.v(TAG, 发送事件 2)it.onNext(2)Log.v(TAG, 发送事件 3)it.onNext(3)Log.v(TAG, 发送事件 4)it.onNext(4)Log.v(TAG, 发送完成)it.onComplete()}, BackpressureStrategy.ERROR).subscribe(object : SubscriberInt {override fun onSubscribe(s: Subscription?) {s?.request(4)}override fun onNext(t: Int?) {Log.v(TAG, onNext:$t)}override fun onError(t: Throwable?) {Log.v(TAG, onError:$t)}override fun onComplete() {Log.v(TAG, onComplete)}})
} 2021-06-27 17:57:45.755 25771-25771/com.example.rxjavademo V/MAINACTIVITY_RXJAVA: 发送事件 1
2021-06-27 17:57:45.757 25771-25771/com.example.rxjavademo V/MAINACTIVITY_RXJAVA: onNext:1
2021-06-27 17:57:45.757 25771-25771/com.example.rxjavademo V/MAINACTIVITY_RXJAVA: 发送事件 2
2021-06-27 17:57:45.757 25771-25771/com.example.rxjavademo V/MAINACTIVITY_RXJAVA: onNext:2
2021-06-27 17:57:45.757 25771-25771/com.example.rxjavademo V/MAINACTIVITY_RXJAVA: 发送事件 3
2021-06-27 17:57:45.757 25771-25771/com.example.rxjavademo V/MAINACTIVITY_RXJAVA: onNext:3
2021-06-27 17:57:45.757 25771-25771/com.example.rxjavademo V/MAINACTIVITY_RXJAVA: 发送事件 4
2021-06-27 17:57:45.757 25771-25771/com.example.rxjavademo V/MAINACTIVITY_RXJAVA: onNext:4
2021-06-27 17:57:45.757 25771-25771/com.example.rxjavademo V/MAINACTIVITY_RXJAVA: 发送完成
2021-06-27 17:57:45.759 25771-25771/com.example.rxjavademo V/MAINACTIVITY_RXJAVA: onComplete 观察者只能接受3个事件但被观察者却发送了4个事件所以出现了不匹配情况
private fun testFlowable6() {Flowable.create(FlowableOnSubscribeInt {Log.v(TAG, 发送事件 1)it.onNext(1)Log.v(TAG, 发送事件 2)it.onNext(2)Log.v(TAG, 发送事件 3)it.onNext(3)Log.v(TAG, 发送事件 4)it.onNext(4)Log.v(TAG, 发送完成)it.onComplete()}, BackpressureStrategy.ERROR).subscribe(object : SubscriberInt {override fun onSubscribe(s: Subscription?) {s?.request(3)}override fun onNext(t: Int?) {Log.v(TAG, onNext:$t)}override fun onError(t: Throwable?) {Log.v(TAG, onError:$t)}override fun onComplete() {Log.v(TAG, onComplete)}})
} 2021-06-27 18:37:05.470 26474-26474/com.example.rxjavademo V/MAINACTIVITY_RXJAVA: 发送事件 1
2021-06-27 18:37:05.472 26474-26474/com.example.rxjavademo V/MAINACTIVITY_RXJAVA: onNext:1
2021-06-27 18:37:05.472 26474-26474/com.example.rxjavademo V/MAINACTIVITY_RXJAVA: 发送事件 2
2021-06-27 18:37:05.472 26474-26474/com.example.rxjavademo V/MAINACTIVITY_RXJAVA: onNext:2
2021-06-27 18:37:05.472 26474-26474/com.example.rxjavademo V/MAINACTIVITY_RXJAVA: 发送事件 3
2021-06-27 18:37:05.472 26474-26474/com.example.rxjavademo V/MAINACTIVITY_RXJAVA: onNext:3
2021-06-27 18:37:05.472 26474-26474/com.example.rxjavademo V/MAINACTIVITY_RXJAVA: 发送事件 4
2021-06-27 18:37:05.474 26474-26474/com.example.rxjavademo V/MAINACTIVITY_RXJAVA: onError:io.reactivex.rxjava3.exceptions.MissingBackpressureException: create: could not emit value due to lack of requests
2021-06-27 18:37:05.474 26474-26474/com.example.rxjavademo V/MAINACTIVITY_RXJAVA: 发送完成 观察者不接收事件的情况
private fun testFlowable7() {Flowable.create(FlowableOnSubscribeInt {Log.v(TAG, 发送事件 1)it.onNext(1)Log.v(TAG, 发送事件 2)it.onNext(2)Log.v(TAG, 发送事件 3)it.onNext(3)Log.v(TAG, 发送事件 4)it.onNext(4)Log.v(TAG, 发送完成)it.onComplete()}, BackpressureStrategy.ERROR).subscribe(object :SubscriberInt {override fun onSubscribe(s: Subscription?) {Log.v(TAG, onSubscribe)}override fun onNext(t: Int?) {Log.v(TAG, onNext:$t)}override fun onError(t: Throwable?) {Log.v(TAG, onError:$t)}override fun onComplete() {Log.v(TAG, onComplete)}})
}2021-06-27 19:04:50.591 26746-26746/com.example.rxjavademo V/MAINACTIVITY_RXJAVA: onSubscribe
2021-06-27 19:04:50.593 26746-26746/com.example.rxjavademo V/MAINACTIVITY_RXJAVA: 发送事件 1
2021-06-27 19:04:50.596 26746-26746/com.example.rxjavademo V/MAINACTIVITY_RXJAVA: onError:io.reactivex.rxjava3.exceptions.MissingBackpressureException: create: could not emit value due to lack of requests
2021-06-27 19:04:50.596 26746-26746/com.example.rxjavademo V/MAINACTIVITY_RXJAVA: 发送事件 2
2021-06-27 19:04:50.596 26746-26746/com.example.rxjavademo V/MAINACTIVITY_RXJAVA: 发送事件 3
2021-06-27 19:04:50.596 26746-26746/com.example.rxjavademo V/MAINACTIVITY_RXJAVA: 发送事件 4
2021-06-27 19:04:50.596 26746-26746/com.example.rxjavademo V/MAINACTIVITY_RXJAVA: 发送完成 4.控制 被观察者发送事件 的速度
1.被观察者根据观察者自身接收事件能力10个事件从而仅发送10个事件
private fun testFlowable8() {Flowable.create(FlowableOnSubscribeInt {//调用it.requested()获取当前观察者需要接收扽事件数量val n it.requested()Log.v(TAG, 观察者可接收事件: n)//根据it.requestd()的值即当前观察者需要接收扽事件数量来发送事件for (i in 1..n) {Log.v(TAG, 发送了事件 i)it.onNext(i.toInt())}}, BackpressureStrategy.ERROR).subscribe(object :SubscriberInt {override fun onSubscribe(s: Subscription?) {Log.v(TAG, onSubscribe)//设置观察者每次能接收5个事件s?.request(5)}override fun onNext(t: Int?) {Log.v(TAG, 接收到了事件$t)}override fun onError(t: Throwable?) {Log.v(TAG, onError():$t)}override fun onComplete() {Log.v(TAG, onComplete);}})
}2021-07-01 18:19:06.266 23497-23497/com.example.rxjavademo V/MAINACTIVITY_RXJAVA: onSubscribe
2021-07-01 18:19:06.266 23497-23497/com.example.rxjavademo V/MAINACTIVITY_RXJAVA: 观察者可接收事件:5
2021-07-01 18:19:06.266 23497-23497/com.example.rxjavademo V/MAINACTIVITY_RXJAVA: 发送了事件1
2021-07-01 18:19:06.267 23497-23497/com.example.rxjavademo V/MAINACTIVITY_RXJAVA: 接收到了事件1
2021-07-01 18:19:06.267 23497-23497/com.example.rxjavademo V/MAINACTIVITY_RXJAVA: 发送了事件2
2021-07-01 18:19:06.267 23497-23497/com.example.rxjavademo V/MAINACTIVITY_RXJAVA: 接收到了事件2
2021-07-01 18:19:06.267 23497-23497/com.example.rxjavademo V/MAINACTIVITY_RXJAVA: 发送了事件3
2021-07-01 18:19:06.267 23497-23497/com.example.rxjavademo V/MAINACTIVITY_RXJAVA: 接收到了事件3
2021-07-01 18:19:06.267 23497-23497/com.example.rxjavademo V/MAINACTIVITY_RXJAVA: 发送了事件4
2021-07-01 18:19:06.267 23497-23497/com.example.rxjavademo V/MAINACTIVITY_RXJAVA: 接收到了事件4
2021-07-01 18:19:06.267 23497-23497/com.example.rxjavademo V/MAINACTIVITY_RXJAVA: 发送了事件5
2021-07-01 18:19:06.267 23497-23497/com.example.rxjavademo V/MAINACTIVITY_RXJAVA: 接收到了事件5 在同步订阅情况中使用FlowableEmitter.requested()时要注意的
1.观察者可连续要求接收事件被观察者会进行叠加并一起发送。
private fun testFlowable9() {Flowable.create(FlowableOnSubscribeInt {//调用it.requested()获取当前观察者需要接收的事件数量Log.v(TAG, 观察者可接收事件 it.requested())}, BackpressureStrategy.ERROR).subscribe(object : SubscriberInt {override fun onSubscribe(s: Subscription?) {Log.v(TAG, onSubscribe)s?.request(5) s?.request(10) }override fun onNext(t: Int?) {Log.v(TAG, 接收到了事件$t)}override fun onError(t: Throwable?) {Log.v(TAG, onError: , t)}override fun onComplete() {Log.v(TAG, onComplete)}})
} 2021-07-01 20:14:27.196 7449-7449/com.example.rxjavademo V/MAINACTIVITY_RXJAVA: onSubscribe
2021-07-01 20:14:27.198 7449-7449/com.example.rxjavademo V/MAINACTIVITY_RXJAVA: 观察者可接收事件15 2. 每次发送事件后emitter.requested()会实时更新观察者能接受的事件。
private fun testFlowable10() {Flowable.create(FlowableOnSubscribeInt {//1.调用t.requested()获取当前观察者需要接收的事件的数量Log.v(TAG, 观察者可接收事件数量: it.requested())//2. 每次发送事件后t.requested()会实时更新观察者能接受的事件// 即一开始观察者要接收10个事件发送了1个后会实时更新为9个Log.v(TAG, 发送了事件1)it.onNext(1)Log.v(TAG, 发送了事件1后, 还需要发送事件数量: it.requested())Log.v(TAG, 发送了事件2)it.onNext(2)Log.d(TAG, 发送事件2后, 还需要发送事件数量: it.requested())Log.d(TAG, 发送了事件3);it.onNext(3);Log.d(TAG, 发送事件3后, 还需要发送事件数量: it.requested())it.onComplete()}, BackpressureStrategy.ERROR).subscribe(object : SubscriberInt {override fun onSubscribe(s: Subscription?) {Log.d(TAG, onSubscribe)s?.request(5)}override fun onNext(t: Int?) {Log.v(TAG, 接收到了事件$t)}override fun onError(t: Throwable?) {Log.v(TAG, onError: $t)}override fun onComplete() {Log.v(TAG, onComplete)}})
} 2021-07-01 20:37:58.644 8065-8065/com.example.rxjavademo D/MAINACTIVITY_RXJAVA: onSubscribe
2021-07-01 20:37:58.646 8065-8065/com.example.rxjavademo V/MAINACTIVITY_RXJAVA: 观察者可接收事件数量:5
2021-07-01 20:37:58.647 8065-8065/com.example.rxjavademo V/MAINACTIVITY_RXJAVA: 发送了事件1
2021-07-01 20:37:58.647 8065-8065/com.example.rxjavademo V/MAINACTIVITY_RXJAVA: 接收到了事件1
2021-07-01 20:37:58.648 8065-8065/com.example.rxjavademo V/MAINACTIVITY_RXJAVA: 发送了事件1后, 还需要发送事件数量:4
2021-07-01 20:37:58.648 8065-8065/com.example.rxjavademo V/MAINACTIVITY_RXJAVA: 发送了事件2
2021-07-01 20:37:58.648 8065-8065/com.example.rxjavademo V/MAINACTIVITY_RXJAVA: 接收到了事件2
2021-07-01 20:37:58.648 8065-8065/com.example.rxjavademo D/MAINACTIVITY_RXJAVA: 发送事件2后, 还需要发送事件数量:3
2021-07-01 20:37:58.648 8065-8065/com.example.rxjavademo D/MAINACTIVITY_RXJAVA: 发送了事件3
2021-07-01 20:37:58.649 8065-8065/com.example.rxjavademo V/MAINACTIVITY_RXJAVA: 接收到了事件3
2021-07-01 20:37:58.649 8065-8065/com.example.rxjavademo D/MAINACTIVITY_RXJAVA: 发送事件3后, 还需要发送事件数量:2
2021-07-01 20:37:58.649 8065-8065/com.example.rxjavademo D/MAINACTIVITY_RXJAVA: onComplete 3.异常
当FlowableEmitter.requested()减到0时则代表观察者已经不可接收事件。
此时被观察者若继续发送事件则会抛出MissingBackpressureException异常。
若观察者没有设置可接收事件数量即无调用Subscription.request
那么被观察者默认观察者可接收事件数量 0即FlowableEmitter.requested()的返回值 0 private fun testFlowable11() {Flowable.create(FlowableOnSubscribeInt {// 1. 调用emitter.requested()获取当前观察者需要接收的事件数量Log.v(TAG, 观察者可接收事件数量 it.requested())it.onNext(1)it.onNext(2)it.onNext(3)it.onComplete()}, BackpressureStrategy.ERROR).subscribe(object : SubscriberInt {override fun onSubscribe(s: Subscription?) {Log.v(TAG, onSubscribe)s?.request(2)}override fun onNext(t: Int?) {Log.v(TAG, 接收到了事件$t)}override fun onError(t: Throwable?) {Log.v(TAG, onError:$t);}override fun onComplete() {Log.v(TAG, onComplete);}})
} 2021-07-01 21:15:50.807 8877-8877/com.example.rxjavademo V/MAINACTIVITY_RXJAVA: onSubscribe
2021-07-01 21:15:50.809 8877-8877/com.example.rxjavademo V/MAINACTIVITY_RXJAVA: 观察者可接收事件数量2
2021-07-01 21:15:50.810 8877-8877/com.example.rxjavademo V/MAINACTIVITY_RXJAVA: 接收到了事件1
2021-07-01 21:15:50.811 8877-8877/com.example.rxjavademo V/MAINACTIVITY_RXJAVA: 接收到了事件2
2021-07-01 21:15:50.812 8877-8877/com.example.rxjavademo V/MAINACTIVITY_RXJAVA: onError:io.reactivex.rxjava3.exceptions.MissingBackpressureException: create: could not emit value due to lack of requests 4.异步订阅相关
FlowableEmitter.requested()知道观察者自身接收事件能力即 被观察者不能根据 观察者自身接收事件的能力 控制发送事件的速度。
private fun testFlowable12() {Flowable.create(FlowableOnSubscribeInt {// 调用emitter.requested()获取当前观察者需要接收的事件数量Log.d(TAG, 观察者可接收事件数量 it.requested())}, BackpressureStrategy.ERROR).subscribeOn(Schedulers.io()).observeOn(AndroidSchedulers.mainThread()).subscribe(object :SubscriberInt {override fun onSubscribe(s: Subscription?) {Log.v(TAG, onSubscribe)s?.request(100)// 该设置仅影响观察者线程中的requested却不会影响的被观察者中的FlowableEmitter.requested()的返回值// 因为FlowableEmitter.requested()的返回值 取决于RxJava内部调用request(n)而该内部调用会在一开始就调用request(128)}override fun onNext(t: Int?) {Log.v(TAG, 接收到了事件$t)}override fun onError(t: Throwable?) {Log.v(TAG, onError:$t);}override fun onComplete() {Log.d(TAG, onComplete)}})
} 2021-07-01 21:54:17.645 9315-9315/com.example.rxjavademo V/MAINACTIVITY_RXJAVA: onSubscribe
2021-07-01 21:54:17.651 9315-9355/com.example.rxjavademo D/MAINACTIVITY_RXJAVA: 观察者可接收事件数量 128 通过RxJava内部固定调用被观察者线程中的request(n) 从而 反向控制被观察者的发送事件速度 R.id.testFlowableBtn13 - {// 点击按钮才会接收事件 5 次// 点击按钮 则 接收5个事件mSubscription2?.request(5)
}private var mSubscription2: Subscription? null// 被观察者一共需要发送500个事件但真正开始发送事件的前提 FlowableEmitter.requested()返回值 ≠ 0
// 观察者每次接收事件数量 48点击按钮
private fun testFlowable13() {Flowable.create(FlowableOnSubscribeInt {Log.v(TAG, 观察者可接收事件数量: it.requested())var flag: Boolean//被观察者一共需要发送500个事件for (i in 1..500) {flag false//requested()0则不发送while (it.requested() 0L) {if (!flag) {Log.v(TAG, 不再发送)flag true}}//requested() ! 0 才发送Log.v(TAG, 发送了事件 i 观察者可接收事件数量: it.requested())it.onNext(i)}}, BackpressureStrategy.ERROR).subscribeOn(Schedulers.io()).observeOn(AndroidSchedulers.mainThread()).subscribe(object : SubscriberInt {override fun onSubscribe(s: Subscription?) {Log.v(TAG, onSubscribe)mSubscription2 s//初始状态 不接收事件通过点击按钮接收事件}override fun onNext(t: Int?) {Log.v(TAG, 接收到了事件$t)}override fun onError(t: Throwable?) {Log.v(TAG, onError: $t)}override fun onComplete() {Log.v(TAG, onComplete)}})}
2021-07-04 11:48:53.669 13219-13219/com.example.rxjavademo V/MAINACTIVITY_RXJAVA: onSubscribe
2021-07-04 11:48:53.673 13219-13250/com.example.rxjavademo V/MAINACTIVITY_RXJAVA: 观察者可接收事件数量:128
2021-07-04 11:48:53.673 13219-13250/com.example.rxjavademo V/MAINACTIVITY_RXJAVA: 发送了事件1观察者可接收事件数量:128
2021-07-04 11:48:53.675 13219-13250/com.example.rxjavademo V/MAINACTIVITY_RXJAVA: 发送了事件2观察者可接收事件数量:127
2021-07-04 11:48:53.675 13219-13250/com.example.rxjavademo V/MAINACTIVITY_RXJAVA: 发送了事件3观察者可接收事件数量:126
2021-07-04 11:48:53.675 13219-13250/com.example.rxjavademo V/MAINACTIVITY_RXJAVA: 发送了事件4观察者可接收事件数量:125
2021-07-04 11:48:53.675 13219-13250/com.example.rxjavademo V/MAINACTIVITY_RXJAVA: 发送了事件5观察者可接收事件数量:124
... ...
2021-07-04 11:48:53.693 13219-13250/com.example.rxjavademo V/MAINACTIVITY_RXJAVA: 发送了事件128观察者可接收事件数量:1
2021-07-04 11:48:53.693 13219-13250/com.example.rxjavademo V/MAINACTIVITY_RXJAVA: 不再发送
2021-07-04 11:49:30.455 13219-13219/com.example.rxjavademo V/MAINACTIVITY_RXJAVA: 接收到了事件1
2021-07-04 11:49:30.455 13219-13219/com.example.rxjavademo V/MAINACTIVITY_RXJAVA: 接收到了事件2
2021-07-04 11:49:30.460 13219-13219/com.example.rxjavademo V/MAINACTIVITY_RXJAVA: 接收到了事件3
2021-07-04 11:49:30.460 13219-13219/com.example.rxjavademo V/MAINACTIVITY_RXJAVA: 接收到了事件4
2021-07-04 11:49:30.461 13219-13219/com.example.rxjavademo V/MAINACTIVITY_RXJAVA: 接收到了事件5 5.采用背压策略模式BackpressureStrategy
当缓存区大小存满、被观察者仍然继续发送下1个事件时该如何处理的策略方式。
1.直接抛出异常MissingBackpressureException
private fun testFlowable14() {Flowable.create(FlowableOnSubscribeInt {for (i in 1..129) {Log.v(TAG, 发送事件$i);it.onNext(i)}it.onComplete()}, BackpressureStrategy.ERROR).subscribeOn(Schedulers.io()).observeOn(AndroidSchedulers.mainThread()).subscribe(object : SubscriberInt {override fun onSubscribe(s: Subscription?) {Log.v(TAG, onSubscribe);}override fun onNext(t: Int?) {Log.v(TAG, 接收到了事件$t)}override fun onError(t: Throwable?) {Log.w(TAG, onError: $t)}override fun onComplete() {Log.v(TAG, onComplete);}})} ... ...
2021-07-04 14:21:51.700 14830-14956/com.example.rxjavademo V/MAINACTIVITY_RXJAVA: 发送事件127
2021-07-04 14:21:51.700 14830-14956/com.example.rxjavademo V/MAINACTIVITY_RXJAVA: 发送事件128
2021-07-04 14:21:51.700 14830-14956/com.example.rxjavademo V/MAINACTIVITY_RXJAVA: 发送事件129
2021-07-04 14:21:51.702 14830-14830/com.example.rxjavademo W/MAINACTIVITY_RXJAVA: onError: io.reactivex.rxjava3.exceptions.MissingBackpressureException: create: could not emit value due to lack of requests 2.BackpressureStrategy.MISSING
友好提示缓存区满了
private fun testFlowable15() {Flowable.create(FlowableOnSubscribeInt {for (i in 1..129) {Log.v(TAG, 发送事件$i)it.onNext(i)}it.onComplete()}, BackpressureStrategy.MISSING).subscribeOn(Schedulers.io()).observeOn(AndroidSchedulers.mainThread()).subscribe(object : SubscriberInt {override fun onSubscribe(s: Subscription?) {Log.v(TAG, onSubscribe);}override fun onNext(t: Int?) {Log.v(TAG, 接收到了事件$t)}override fun onError(t: Throwable?) {Log.w(TAG, onError: $t)}override fun onComplete() {Log.v(TAG, onComplete);}})
}2021-07-04 14:41:13.044 15186-15235/com.example.rxjavademo V/MAINACTIVITY_RXJAVA: 发送事件128
2021-07-04 14:41:13.044 15186-15235/com.example.rxjavademo V/MAINACTIVITY_RXJAVA: 发送事件129
2021-07-04 14:41:13.048 15186-15186/com.example.rxjavademo W/MAINACTIVITY_RXJAVA: onError: io.reactivex.rxjava3.exceptions.MissingBackpressureException: Queue is full?! 3.BackpressureStrategy.BUFFER
将缓存区大小设置成无限大
private fun testFlowable16() {Flowable.create(FlowableOnSubscribeInt {for (i in 1..150) {Log.v(TAG, 发送事件$i)it.onNext(i)}it.onComplete()}, BackpressureStrategy.BUFFER).subscribeOn(Schedulers.io()).observeOn(AndroidSchedulers.mainThread()).subscribe(object :SubscriberInt {override fun onSubscribe(s: Subscription?) {Log.v(TAG, onSubscribe);}override fun onNext(t: Int?) {Log.v(TAG, 接收到了事件$t)}override fun onError(t: Throwable?) {Log.w(TAG, onError: $t)}override fun onComplete() {Log.v(TAG, onComplete);}})
} ... ...
2021-07-04 15:16:38.947 15488-15576/com.example.rxjavademo V/MAINACTIVITY_RXJAVA: 发送事件147
2021-07-04 15:16:38.947 15488-15576/com.example.rxjavademo V/MAINACTIVITY_RXJAVA: 发送事件148
2021-07-04 15:16:38.947 15488-15576/com.example.rxjavademo V/MAINACTIVITY_RXJAVA: 发送事件149
2021-07-04 15:16:38.947 15488-15576/com.example.rxjavademo V/MAINACTIVITY_RXJAVA: 发送事件150 4.BackpressureStrategy.DROP
超过缓存区大小128的事件丢弃
R.id.testFlowableBtn17 - {mSubscription17?.request(128)
}
private var mSubscription17: Subscription? null
private fun testFlowable17() {Flowable.create(FlowableOnSubscribeInt {for (i in 1..200) {Log.v(TAG, 发送事件$i)it.onNext(i)}it.onComplete()}, BackpressureStrategy.DROP).subscribeOn(Schedulers.io()).observeOn(AndroidSchedulers.mainThread()).subscribe(object : SubscriberInt {override fun onSubscribe(s: Subscription?) {Log.v(TAG, onSubscribe)mSubscription17 s}override fun onNext(t: Int?) {Log.v(TAG, 接收到了事件$t)}override fun onError(t: Throwable?) {Log.w(TAG, onError: $t)}override fun onComplete() {Log.v(TAG, onComplete);}})
} ... ...
2021-07-04 15:41:22.195 15851-15890/com.example.rxjavademo V/MAINACTIVITY_RXJAVA: 发送事件196
2021-07-04 15:41:22.195 15851-15890/com.example.rxjavademo V/MAINACTIVITY_RXJAVA: 发送事件197
2021-07-04 15:41:22.195 15851-15890/com.example.rxjavademo V/MAINACTIVITY_RXJAVA: 发送事件198
2021-07-04 15:41:22.195 15851-15890/com.example.rxjavademo V/MAINACTIVITY_RXJAVA: 发送事件199
2021-07-04 15:41:22.195 15851-15890/com.example.rxjavademo V/MAINACTIVITY_RXJAVA: 发送事件200
... ...
2021-07-04 15:41:33.829 15851-15851/com.example.rxjavademo V/MAINACTIVITY_RXJAVA: 接收到了事件125
2021-07-04 15:41:33.829 15851-15851/com.example.rxjavademo V/MAINACTIVITY_RXJAVA: 接收到了事件126
2021-07-04 15:41:33.829 15851-15851/com.example.rxjavademo V/MAINACTIVITY_RXJAVA: 接收到了事件127
2021-07-04 15:41:33.829 15851-15851/com.example.rxjavademo V/MAINACTIVITY_RXJAVA: 接收到了事件128
2021-07-04 15:41:33.833 15851-15851/com.example.rxjavademo V/MAINACTIVITY_RXJAVA: onComplete 5.BackpressureStrategy.LATEST
只保存最新最后事件超过缓存区大小128的事件丢弃
private var mSubscription18: Subscription? null
private fun testFlowable18() {Flowable.create(FlowableOnSubscribeInt {for (i in 1..200) {Log.v(TAG, 发送事件$i)it.onNext(i)}it.onComplete()}, BackpressureStrategy.LATEST).subscribeOn(Schedulers.io()).observeOn(AndroidSchedulers.mainThread()).subscribe(object : SubscriberInt {override fun onSubscribe(s: Subscription?) {Log.v(TAG, onSubscribe)mSubscription18 s}override fun onNext(t: Int?) {Log.v(TAG, 接收到了事件$t)}override fun onError(t: Throwable?) {Log.w(TAG, onError: $t)}override fun onComplete() {Log.v(TAG, onComplete)}})
} ... ...
2021-07-04 16:02:55.393 16130-16185/com.example.rxjavademo V/MAINACTIVITY_RXJAVA: 发送事件196
2021-07-04 16:02:55.393 16130-16185/com.example.rxjavademo V/MAINACTIVITY_RXJAVA: 发送事件197
2021-07-04 16:02:55.393 16130-16185/com.example.rxjavademo V/MAINACTIVITY_RXJAVA: 发送事件198
2021-07-04 16:02:55.393 16130-16185/com.example.rxjavademo V/MAINACTIVITY_RXJAVA: 发送事件199
2021-07-04 16:02:55.393 16130-16185/com.example.rxjavademo V/MAINACTIVITY_RXJAVA: 发送事件200
... ...
2021-07-04 16:02:57.489 16130-16130/com.example.rxjavademo V/MAINACTIVITY_RXJAVA: 接收到了事件126
2021-07-04 16:02:57.489 16130-16130/com.example.rxjavademo V/MAINACTIVITY_RXJAVA: 接收到了事件127
2021-07-04 16:02:57.489 16130-16130/com.example.rxjavademo V/MAINACTIVITY_RXJAVA: 接收到了事件128
2021-07-04 16:03:25.229 16130-16130/com.example.rxjavademo V/MAINACTIVITY_RXJAVA: 接收到了事件200
2021-07-04 16:03:25.233 16130-16130/com.example.rxjavademo V/MAINACTIVITY_RXJAVA: onComplete