当前位置: 首页 > news >正文

建设工程施工合同 示范文本专业的seo外包公司

建设工程施工合同 示范文本,专业的seo外包公司,网站机房建设解决方案,搭建wordpress一、定义 RxJava 是一个 基于事件流、实现异步操作的库。 二、gradle配置 implementation io.reactivex.rxjava3:rxjava:3.0.0 三、 Rxjava相关操作符介绍 1.创建操作符。 1.create() 创建被观察者对象。 private fun doRxJavaCommonTest() {Observable.create(Observable…一、定义 RxJava 是一个 基于事件流、实现异步操作的库。 二、gradle配置 implementation io.reactivex.rxjava3:rxjava:3.0.0 三、 Rxjava相关操作符介绍 1.创建操作符。 1.create() 创建被观察者对象。 private fun doRxJavaCommonTest() {Observable.create(ObservableOnSubscribeString {Log.v(TAG, --- subscribe)it.onNext()it.onNext()it.onComplete()}).subscribe(object : ObserverString {override fun onComplete() {Log.v(TAG, --- onComplete)}override fun onSubscribe(d: Disposable?) {Log.v(TAG, --- onSubscribe)}override fun onNext(t: String?) {Log.v(TAG, --- onNext)}override fun onError(e: Throwable?) {Log.v(TAG, --- onnError)}}) } 2.just() 主要作用就是创建一个被观察者并发送事件但是发送的事件不可以超过10个以上。 超过10个就会编译报错。 private fun testJust() {Observable.just(1, 2, 3).subscribe(object : ObserverString {override fun onComplete() {Log.v(TAG, onComplete)}override fun onSubscribe(d: Disposable?) {Log.v(TAG, onSubscribe)}override fun onNext(t: String?) {Log.v(TAG, 接收到了事件$t)}override fun onError(e: Throwable?) {Log.v(TAG, onError:$e)}}) } 2021-06-05 15:38:55.690 29674-29674/com.example.rxjavademo V/MAINACTIVITY_RXJAVA: onSubscribe 2021-06-05 15:38:55.690 29674-29674/com.example.rxjavademo V/MAINACTIVITY_RXJAVA: 接收到了事件1 2021-06-05 15:38:55.691 29674-29674/com.example.rxjavademo V/MAINACTIVITY_RXJAVA: 接收到了事件2 2021-06-05 15:38:55.691 29674-29674/com.example.rxjavademo V/MAINACTIVITY_RXJAVA: 接收到了事件3 2021-06-05 15:38:55.691 29674-29674/com.example.rxjavademo V/MAINACTIVITY_RXJAVA: onComplete 3. timer() 作用当到指定时间后就会发送一个0的值给观察者。 在项目中可以做一些延时的处理类似于Handler中的延时。 延迟5秒后将结果发送给观察者Consumer和Observer是创建观察者的两种写法相当于观察者中的onNext方法。 private fun testTimer() {Observable.timer(5, TimeUnit.SECONDS).subscribe(object : ConsumerLong {override fun accept(t: Long?) {Log.v(TAG, t.toString())}}) } 2021-06-05 16:03:38.900 30043-30088/com.example.rxjavademo V/MAINACTIVITY_RXJAVA: 0 4 interval() 每隔一段时间就会发送一个事件这个事件是从0开始不断增1的数字。 类似于Android中的Timer做计时器用。 private fun testInterval() {Observable.interval(2, TimeUnit.SECONDS).subscribe(object : ConsumerLong {override fun accept(t: Long?) {Log.v(TAG, t.toString())}}) } 2021-06-05 16:22:22.218 30163-30224/com.example.rxjavademo V/MAINACTIVITY_RXJAVA: 0 2021-06-05 16:22:24.218 30163-30224/com.example.rxjavademo V/MAINACTIVITY_RXJAVA: 1 2021-06-05 16:22:26.217 30163-30224/com.example.rxjavademo V/MAINACTIVITY_RXJAVA: 2 2021-06-05 16:22:28.218 30163-30224/com.example.rxjavademo V/MAINACTIVITY_RXJAVA: 3 2021-06-05 16:22:30.219 30163-30224/com.example.rxjavademo V/MAINACTIVITY_RXJAVA: 4 2021-06-05 16:22:32.218 30163-30224/com.example.rxjavademo V/MAINACTIVITY_RXJAVA: 5 2021-06-05 16:22:34.218 30163-30224/com.example.rxjavademo V/MAINACTIVITY_RXJAVA: 6 ... ... 5 intervalRange() 可以指定发送事件的开始值(100)和数量(4)其他与 interval() 的功能一样。参数依次是开始值循环执行的次数初始延迟时间执行间隔时间时间单位 private fun testIntervalRange() {Observable.intervalRange(100, 4, 0, 10, TimeUnit.SECONDS).subscribe(object : ConsumerLong {override fun accept(t: Long?) {Log.v(TAG, t.toString())}}) } 2021-06-05 16:40:22.510 30432-30500/com.example.rxjavademo V/MAINACTIVITY_RXJAVA: 100 2021-06-05 16:40:32.517 30432-30500/com.example.rxjavademo V/MAINACTIVITY_RXJAVA: 101 2021-06-05 16:40:42.511 30432-30500/com.example.rxjavademo V/MAINACTIVITY_RXJAVA: 102 2021-06-05 16:40:52.513 30432-30500/com.example.rxjavademo V/MAINACTIVITY_RXJAVA: 103 6 range() 发送一定范围的事件序列。 private fun testRange() {Observable.range(0, 5).subscribe(object : ConsumerInt {override fun accept(t: Int?) {Log.v(TAG, t.toString())}}) } 2021-06-05 18:25:12.439 31282-31282/com.example.rxjavademo V/MAINACTIVITY_RXJAVA: 0 2021-06-05 18:25:12.439 31282-31282/com.example.rxjavademo V/MAINACTIVITY_RXJAVA: 1 2021-06-05 18:25:12.439 31282-31282/com.example.rxjavademo V/MAINACTIVITY_RXJAVA: 2 2021-06-05 18:25:12.439 31282-31282/com.example.rxjavademo V/MAINACTIVITY_RXJAVA: 3 2021-06-05 18:25:12.440 31282-31282/com.example.rxjavademo V/MAINACTIVITY_RXJAVA: 47 rangeLong() 作用与 range() 一样数据类型为 Long private fun testLongRange() {Observable.rangeLong(0,3).subscribe(object : ConsumerLong{override fun accept(t: Long?) {Log.v(TAG, t.toString())}}) } 2021-06-05 18:54:54.821 31573-31573/com.example.rxjavademo V/MAINACTIVITY_RXJAVA: 0 2021-06-05 18:54:54.821 31573-31573/com.example.rxjavademo V/MAINACTIVITY_RXJAVA: 1 2021-06-05 18:54:54.821 31573-31573/com.example.rxjavademo V/MAINACTIVITY_RXJAVA: 2 8 empty() 、 never() 、 error() empty():直接发送 onComplete() 事件,所以只会执行 onSubscribe 和 onComplete private fun testEmpty() {Observable.emptyInt().subscribe(object : ObserverInt {override fun onComplete() {Log.v(TAG, onComplete)}override fun onSubscribe(d: Disposable?) {Log.v(TAG, onSubscribe)}override fun onNext(t: Int?) {Log.v(TAG, t.toString())}override fun onError(e: Throwable?) {Log.v(TAG, e.toString())}}) } 2021-06-05 19:12:06.043 31720-31720/com.example.rxjavademo V/MAINACTIVITY_RXJAVA: onSubscribe 2021-06-05 19:12:06.043 31720-31720/com.example.rxjavademo V/MAINACTIVITY_RXJAVA: onComplete error(): private fun testError() {val subscribe Observable.errorInt(object : Throwable() {init {Log.v(TAG, error init)}}).subscribe(object : ObserverInt {override fun onComplete() {Log.v(TAG, onComplete)}override fun onSubscribe(d: Disposable?) {Log.v(TAG, onSubscribe)}override fun onNext(t: Int?) {Log.v(TAG, t.toString())}override fun onError(e: Throwable?) {Log.v(TAG, e.toString())}}) } 2021-06-06 10:35:23.908 500-500/com.example.rxjavademo V/MAINACTIVITY_RXJAVA: error init 2021-06-06 10:35:23.954 500-500/com.example.rxjavademo V/MAINACTIVITY_RXJAVA: onSubscribe 2021-06-06 10:35:23.954 500-500/com.example.rxjavademo V/MAINACTIVITY_RXJAVA: com.example.rxjavademo.MainActivity$testError$subscribe$1 never(): private fun testNever() {Observable.neverInt().subscribe(object : ObserverInt {override fun onComplete() {Log.v(TAG, onComplete)}override fun onSubscribe(d: Disposable?) {Log.v(TAG, onSubscribe)}override fun onNext(t: Int?) {Log.v(TAG, t.toString())}override fun onError(e: Throwable?) {Log.v(TAG, e.toString())}}) } 2021-06-06 10:44:38.259 873-873/com.example.rxjavademo V/MAINACTIVITY_RXJAVA: onSubscribe 2.转换操作符 1.map 可以将被观察者发送的数据类型转变成其他的类型,再传给观察者。 private fun testMap() {Observable.just(1, 2, 3).map(object : FunctionInt, Int {override fun apply(t: Int?): Int {return ((t ?: 0) 1)}}).subscribe(object : ObserverInt {override fun onComplete() {Log.v(TAG, onComplete)}override fun onSubscribe(d: Disposable?) {Log.v(TAG, onSubscribe)}override fun onNext(t: Int?) {Log.v(TAG, t.toString())}override fun onError(e: Throwable?) {Log.v(TAG, e.toString())}}) } 2021-06-06 11:01:48.716 1400-1400/com.example.rxjavademo V/MAINACTIVITY_RXJAVA: onSubscribe 2021-06-06 11:01:48.716 1400-1400/com.example.rxjavademo V/MAINACTIVITY_RXJAVA: 2 2021-06-06 11:01:48.716 1400-1400/com.example.rxjavademo V/MAINACTIVITY_RXJAVA: 3 2021-06-06 11:01:48.716 1400-1400/com.example.rxjavademo V/MAINACTIVITY_RXJAVA: 4 2021-06-06 11:01:48.716 1400-1400/com.example.rxjavademo V/MAINACTIVITY_RXJAVA: onComplete 2 flatMap() 将事件序列中的元素进行加工返回一个新的被观察者。 flatMap() 其实与 map() 类似但是 flatMap() 返回的是一个 Observerablemap()只是返回数据如果在元素再加工的时候想再使用上面的创建操作符的话建议使用flatMap()而非map()。 private fun flatMap() {Observable.just(1, 2, 3, 4).flatMap(object : FunctionInt, ObservableSourceInt {override fun apply(t: Int?): ObservableSourceInt {return if (t 4 || t null) {Observable.error(Exception(4是错误数字))} else {Observable.just(t 10)}}}).subscribe(object : ConsumerInt {override fun accept(t: Int?) {Log.v(TAG, t.toString())}}, object :ConsumerThrowable {override fun accept(t: Throwable?) {Log.v(TAG, t.toString())}}) } 2021-06-06 11:53:59.738 2310-2310/com.example.rxjavademo V/MAINACTIVITY_RXJAVA: 11 2021-06-06 11:53:59.738 2310-2310/com.example.rxjavademo V/MAINACTIVITY_RXJAVA: 12 2021-06-06 11:53:59.738 2310-2310/com.example.rxjavademo V/MAINACTIVITY_RXJAVA: 13 2021-06-06 11:53:59.744 2310-2310/com.example.rxjavademo V/MAINACTIVITY_RXJAVA: java.lang.Exception: 4是错误数字 3 concatMap() concatMap() 和 flatMap() 基本上是一样的只不过 concatMap() 转发出来的事件是有序的而 flatMap() 是无序的。 private fun testConcatMap() {Observable.just(1, 2, 3, 4).concatMap(object : FunctionInt, ObservableSourceInt {override fun apply(t: Int?): ObservableSourceInt {if (t 4 || t null) {return Observable.error(Exception(4 或者 null不是有效数字))}return Observable.just(t 10)}}).subscribe(object : ConsumerInt {override fun accept(t: Int?) {Log.v(TAG, t.toString())}}, object : ConsumerThrowable {override fun accept(t: Throwable?) {Log.v(TAG, t?.message?.toString() ?: )}}) } 14:23:59.811 3252-3252/com.example.rxjavademo V/MAINACTIVITY_RXJAVA: 11 2021-06-06 14:23:59.812 3252-3252/com.example.rxjavademo V/MAINACTIVITY_RXJAVA: 12 2021-06-06 14:23:59.812 3252-3252/com.example.rxjavademo V/MAINACTIVITY_RXJAVA: 13 2021-06-06 14:23:59.814 3252-3252/com.example.rxjavademo V/MAINACTIVITY_RXJAVA: 4 或者 null不是有效数字 4 buffer() 从需要发送的事件当中获取一定数量的事件并将这些事件放到缓冲区当中一并发出。 buffer 有两个参数一个是 count另一个 skip。count 缓冲区元素的数量skip 就代表缓冲区满了之后发送下一次事件序列的时候要跳过多少元素。 private fun testBuffer() {Observable.just(1, 2, 3, 4, 5).buffer(2, 1).subscribe(object : ConsumerListInt {override fun accept(t: ListInt?) {Log.v(TAG, 缓冲区大小 t?.size)t?.forEach {Log.v(TAG, it.toString())}}}) } 2021-06-06 14:45:43.605 3528-3528/com.example.rxjavademo V/MAINACTIVITY_RXJAVA: 缓冲区大小2 2021-06-06 14:45:43.605 3528-3528/com.example.rxjavademo V/MAINACTIVITY_RXJAVA: 1 2021-06-06 14:45:43.605 3528-3528/com.example.rxjavademo V/MAINACTIVITY_RXJAVA: 2 2021-06-06 14:45:43.605 3528-3528/com.example.rxjavademo V/MAINACTIVITY_RXJAVA: 缓冲区大小2 2021-06-06 14:45:43.605 3528-3528/com.example.rxjavademo V/MAINACTIVITY_RXJAVA: 2 2021-06-06 14:45:43.605 3528-3528/com.example.rxjavademo V/MAINACTIVITY_RXJAVA: 3 2021-06-06 14:45:43.605 3528-3528/com.example.rxjavademo V/MAINACTIVITY_RXJAVA: 缓冲区大小2 2021-06-06 14:45:43.605 3528-3528/com.example.rxjavademo V/MAINACTIVITY_RXJAVA: 3 2021-06-06 14:45:43.605 3528-3528/com.example.rxjavademo V/MAINACTIVITY_RXJAVA: 4 2021-06-06 14:45:43.605 3528-3528/com.example.rxjavademo V/MAINACTIVITY_RXJAVA: 缓冲区大小2 2021-06-06 14:45:43.605 3528-3528/com.example.rxjavademo V/MAINACTIVITY_RXJAVA: 4 2021-06-06 14:45:43.605 3528-3528/com.example.rxjavademo V/MAINACTIVITY_RXJAVA: 5 2021-06-06 14:45:43.605 3528-3528/com.example.rxjavademo V/MAINACTIVITY_RXJAVA: 缓冲区大小1 2021-06-06 14:45:43.605 3528-3528/com.example.rxjavademo V/MAINACTIVITY_RXJAVA: 5 5 scan() 将发射的数据通过一个函数进行变换然后将变换后的结果作为参数跟下一个发射的数据一起继续通过那个函数变换这样依次连续发射得到最终结果。 private fun testScan() {Observable.just(1, 2, 3, 4, 5, 6).scan(object : BiFunctionInt, Int, Int {override fun apply(t1: Int?, t2: Int?): Int {Log.v(TAG, t1: t1 t2: t2)return ((t1 ?: 0) (t2 ?: 0))}}).subscribe(object : ConsumerInt {override fun accept(t: Int?) {Log.v(TAG, t.toString())}}) } 2021-06-06 14:58:38.200 3698-3698/com.example.rxjavademo V/MAINACTIVITY_RXJAVA: 1 2021-06-06 14:58:38.200 3698-3698/com.example.rxjavademo V/MAINACTIVITY_RXJAVA: t1:1t2:2 2021-06-06 14:58:38.200 3698-3698/com.example.rxjavademo V/MAINACTIVITY_RXJAVA: 3 2021-06-06 14:58:38.200 3698-3698/com.example.rxjavademo V/MAINACTIVITY_RXJAVA: t1:3t2:3 2021-06-06 14:58:38.200 3698-3698/com.example.rxjavademo V/MAINACTIVITY_RXJAVA: 6 2021-06-06 14:58:38.200 3698-3698/com.example.rxjavademo V/MAINACTIVITY_RXJAVA: t1:6t2:4 2021-06-06 14:58:38.200 3698-3698/com.example.rxjavademo V/MAINACTIVITY_RXJAVA: 10 2021-06-06 14:58:38.200 3698-3698/com.example.rxjavademo V/MAINACTIVITY_RXJAVA: t1:10t2:5 2021-06-06 14:58:38.200 3698-3698/com.example.rxjavademo V/MAINACTIVITY_RXJAVA: 15 2021-06-06 14:58:38.200 3698-3698/com.example.rxjavademo V/MAINACTIVITY_RXJAVA: t1:15t2:6 2021-06-06 14:58:38.200 3698-3698/com.example.rxjavademo V/MAINACTIVITY_RXJAVA: 21四、RxJava源码分析 1.RxJava订阅流程 2.onNext与onComplete 3.理解RxJava是切换线程 private fun testChangeMainThread() {Observable.create(ObservableOnSubscribeString {it.onNext(1)it.onNext(2)it.onComplete()Log.v(TAG, subscribe and currentThread is: Thread.currentThread())})//切换到子线程.subscribeOn(Schedulers.io())//切换到主线程.observeOn(AndroidSchedulers.mainThread()).subscribe(object : ObserverString {override fun onSubscribe(d: Disposable?) {Log.v(TAG, onSubscribe and currentThread is: Thread.currentThread())}override fun onNext(t: String?) {Log.v(TAG,onNext: t.toString() and currentThread is: Thread.currentThread())}override fun onError(e: Throwable?) {Log.v(TAG,onError: e?.message and currentThread is: Thread.currentThread())}override fun onComplete() {Log.v(TAG, onComplete and currentThread is: Thread.currentThread())}}) } 2021-06-08 20:44:40.133 6053-6053/com.example.rxjavademo V/MAINACTIVITY_RXJAVA: onSubscribe and currentThread is:Thread[main,5,main] 2021-06-08 20:44:40.138 6053-6112/com.example.rxjavademo V/MAINACTIVITY_RXJAVA: subscribe and currentThread is:Thread[RxCachedThreadScheduler-1,5,main] 2021-06-08 20:44:40.138 6053-6053/com.example.rxjavademo V/MAINACTIVITY_RXJAVA: onNext:1and currentThread is:Thread[main,5,main] 2021-06-08 20:44:40.139 6053-6053/com.example.rxjavademo V/MAINACTIVITY_RXJAVA: onNext:2and currentThread is:Thread[main,5,main] 2021-06-08 20:44:40.139 6053-6053/com.example.rxjavademo V/MAINACTIVITY_RXJAVA: onCompleteand currentThread is:Thread[main,5,main] 通过上面的log 可以发现 1.Observer观察者onSubscribe() 方法运行在当前线程中。 2.Observable被观察者中的 subscribe() 方法运行在subsribeOn()指定的线程中。 3.Observer(观察者)的onNext()和onComplete()和onError()运行在observerOn()指定的线程中。 RxJava线程切换只要涉及两个方法subscribeOn()和observeOn()。 1.subscribeOn()源码分析 subscribeOn(Schedulers.io()) subscribeOn()传入一个Scheduler类实例Scheduler是一个调度类能够延时或周期性地去执行一个任务。 Schedulers类的io()方法执行流程 Scheduler IO 创建流程 AndroidSchedulers.mainThread() main thread创建流程 subscribeOnobserveOn。subscribeOn是用来调整被观察者发射源的线程而observeOn是调整观察者处理器的线程。 五、RxJava 背压策略 背压解决了 因被观察者发送事件速度 与 观察者接收事件速度 不匹配一般是前者 快于 后者从而导致观察者无法及时响应 / 处理所有 被观察者发送事件 的问题 1.观察者不接收事件的情况下被观察者继续发送事件存放到缓存区按需取出。 R.id.test_Flowable4 - {mSubscription?.request(2) }/*** 观察者不接收事件扽情况下被观察者继续发送事件存放到缓存区按需取出*/ private var textFlowableTv: TextView? null private var mSubscription: Subscription? null private fun testFlowable5() {Flowable.create(FlowableOnSubscribeInt {Log.v(TAG, 发送事件 1)it.onNext(1)Log.v(TAG, 发送事件 2)it.onNext(2)Log.v(TAG, 发送事件 3)it.onNext(3)Log.v(TAG, 发送事件 4)it.onNext(4)Log.v(TAG, 发送完成)it.onComplete()}, BackpressureStrategy.ERROR).subscribeOn(Schedulers.io()).observeOn(AndroidSchedulers.mainThread()).subscribe(object : SubscriberInt {override fun onSubscribe(s: Subscription?) {mSubscription s}override fun onNext(t: Int?) {Log.v(TAG, onNext:$t)}override fun onError(t: Throwable?) {Log.v(TAG, onError:$t)}override fun onComplete() {Log.v(TAG, onComplete)}}) }021-06-27 16:47:29.121 24137-24174/com.example.rxjavademo V/MAINACTIVITY_RXJAVA: 发送事件 1 2021-06-27 16:47:29.123 24137-24174/com.example.rxjavademo V/MAINACTIVITY_RXJAVA: 发送事件 2 2021-06-27 16:47:29.124 24137-24174/com.example.rxjavademo V/MAINACTIVITY_RXJAVA: 发送事件 3 2021-06-27 16:47:29.125 24137-24174/com.example.rxjavademo V/MAINACTIVITY_RXJAVA: 发送事件 4 2021-06-27 16:47:29.126 24137-24174/com.example.rxjavademo V/MAINACTIVITY_RXJAVA: 发送完成 2021-06-27 16:47:32.594 24137-24137/com.example.rxjavademo V/MAINACTIVITY_RXJAVA: onNext:1 2021-06-27 16:47:32.595 24137-24137/com.example.rxjavademo V/MAINACTIVITY_RXJAVA: onNext:2 2.观察者不接收事件的情况下被观察者继续发送事件至超出缓存区大小128 private fun testFlowableError() {Flowable.create(FlowableOnSubscribeInt {//发送129个事件即超出了缓存区的大小for (i in 0..128) {Log.v(TAG, 发送事件$i)it.onNext(i)}it.onComplete()}, BackpressureStrategy.ERROR).subscribeOn(Schedulers.io()).observeOn(AndroidSchedulers.mainThread()).subscribe(object : SubscriberInt {override fun onSubscribe(s: Subscription?) {Log.v(TAG, onSubscribe)}override fun onNext(t: Int?) {Log.v(TAG, onNext:$t)}override fun onError(t: Throwable?) {Log.v(TAG, onError:$t)}override fun onComplete() {Log.v(TAG, onComplete)}}) } 2021-06-27 17:33:19.096 25381-25412/com.example.rxjavademo V/MAINACTIVITY_RXJAVA: 发送事件117 2021-06-27 17:33:19.096 25381-25412/com.example.rxjavademo V/MAINACTIVITY_RXJAVA: 发送事件118 2021-06-27 17:33:19.096 25381-25412/com.example.rxjavademo V/MAINACTIVITY_RXJAVA: 发送事件119 2021-06-27 17:33:19.096 25381-25412/com.example.rxjavademo V/MAINACTIVITY_RXJAVA: 发送事件120 2021-06-27 17:33:19.096 25381-25412/com.example.rxjavademo V/MAINACTIVITY_RXJAVA: 发送事件121 2021-06-27 17:33:19.096 25381-25412/com.example.rxjavademo V/MAINACTIVITY_RXJAVA: 发送事件122 2021-06-27 17:33:19.096 25381-25412/com.example.rxjavademo V/MAINACTIVITY_RXJAVA: 发送事件123 2021-06-27 17:33:19.097 25381-25412/com.example.rxjavademo V/MAINACTIVITY_RXJAVA: 发送事件124 2021-06-27 17:33:19.097 25381-25412/com.example.rxjavademo V/MAINACTIVITY_RXJAVA: 发送事件125 2021-06-27 17:33:19.098 25381-25412/com.example.rxjavademo V/MAINACTIVITY_RXJAVA: 发送事件126 2021-06-27 17:33:19.098 25381-25412/com.example.rxjavademo V/MAINACTIVITY_RXJAVA: 发送事件127 2021-06-27 17:33:19.098 25381-25412/com.example.rxjavademo V/MAINACTIVITY_RXJAVA: 发送事件128 2021-06-27 17:33:19.101 25381-25381/com.example.rxjavademo V/MAINACTIVITY_RXJAVA: onError:io.reactivex.rxjava3.exceptions.MissingBackpressureException: create: could not emit value due to lack of requests3.同步订阅 与 异步订阅 的区别 同步订阅中被观察者与 观察者工作于同1线程同步订阅关系中没有缓存区 被观察者在发送1个事件后必须等待观察者接收后才能继续发下1个事件 private fun testFlowable6() {Flowable.create(FlowableOnSubscribeInt {Log.v(TAG, 发送事件 1)it.onNext(1)Log.v(TAG, 发送事件 2)it.onNext(2)Log.v(TAG, 发送事件 3)it.onNext(3)Log.v(TAG, 发送事件 4)it.onNext(4)Log.v(TAG, 发送完成)it.onComplete()}, BackpressureStrategy.ERROR).subscribe(object : SubscriberInt {override fun onSubscribe(s: Subscription?) {s?.request(4)}override fun onNext(t: Int?) {Log.v(TAG, onNext:$t)}override fun onError(t: Throwable?) {Log.v(TAG, onError:$t)}override fun onComplete() {Log.v(TAG, onComplete)}}) } 2021-06-27 17:57:45.755 25771-25771/com.example.rxjavademo V/MAINACTIVITY_RXJAVA: 发送事件 1 2021-06-27 17:57:45.757 25771-25771/com.example.rxjavademo V/MAINACTIVITY_RXJAVA: onNext:1 2021-06-27 17:57:45.757 25771-25771/com.example.rxjavademo V/MAINACTIVITY_RXJAVA: 发送事件 2 2021-06-27 17:57:45.757 25771-25771/com.example.rxjavademo V/MAINACTIVITY_RXJAVA: onNext:2 2021-06-27 17:57:45.757 25771-25771/com.example.rxjavademo V/MAINACTIVITY_RXJAVA: 发送事件 3 2021-06-27 17:57:45.757 25771-25771/com.example.rxjavademo V/MAINACTIVITY_RXJAVA: onNext:3 2021-06-27 17:57:45.757 25771-25771/com.example.rxjavademo V/MAINACTIVITY_RXJAVA: 发送事件 4 2021-06-27 17:57:45.757 25771-25771/com.example.rxjavademo V/MAINACTIVITY_RXJAVA: onNext:4 2021-06-27 17:57:45.757 25771-25771/com.example.rxjavademo V/MAINACTIVITY_RXJAVA: 发送完成 2021-06-27 17:57:45.759 25771-25771/com.example.rxjavademo V/MAINACTIVITY_RXJAVA: onComplete 观察者只能接受3个事件但被观察者却发送了4个事件所以出现了不匹配情况 private fun testFlowable6() {Flowable.create(FlowableOnSubscribeInt {Log.v(TAG, 发送事件 1)it.onNext(1)Log.v(TAG, 发送事件 2)it.onNext(2)Log.v(TAG, 发送事件 3)it.onNext(3)Log.v(TAG, 发送事件 4)it.onNext(4)Log.v(TAG, 发送完成)it.onComplete()}, BackpressureStrategy.ERROR).subscribe(object : SubscriberInt {override fun onSubscribe(s: Subscription?) {s?.request(3)}override fun onNext(t: Int?) {Log.v(TAG, onNext:$t)}override fun onError(t: Throwable?) {Log.v(TAG, onError:$t)}override fun onComplete() {Log.v(TAG, onComplete)}}) } 2021-06-27 18:37:05.470 26474-26474/com.example.rxjavademo V/MAINACTIVITY_RXJAVA: 发送事件 1 2021-06-27 18:37:05.472 26474-26474/com.example.rxjavademo V/MAINACTIVITY_RXJAVA: onNext:1 2021-06-27 18:37:05.472 26474-26474/com.example.rxjavademo V/MAINACTIVITY_RXJAVA: 发送事件 2 2021-06-27 18:37:05.472 26474-26474/com.example.rxjavademo V/MAINACTIVITY_RXJAVA: onNext:2 2021-06-27 18:37:05.472 26474-26474/com.example.rxjavademo V/MAINACTIVITY_RXJAVA: 发送事件 3 2021-06-27 18:37:05.472 26474-26474/com.example.rxjavademo V/MAINACTIVITY_RXJAVA: onNext:3 2021-06-27 18:37:05.472 26474-26474/com.example.rxjavademo V/MAINACTIVITY_RXJAVA: 发送事件 4 2021-06-27 18:37:05.474 26474-26474/com.example.rxjavademo V/MAINACTIVITY_RXJAVA: onError:io.reactivex.rxjava3.exceptions.MissingBackpressureException: create: could not emit value due to lack of requests 2021-06-27 18:37:05.474 26474-26474/com.example.rxjavademo V/MAINACTIVITY_RXJAVA: 发送完成 观察者不接收事件的情况 private fun testFlowable7() {Flowable.create(FlowableOnSubscribeInt {Log.v(TAG, 发送事件 1)it.onNext(1)Log.v(TAG, 发送事件 2)it.onNext(2)Log.v(TAG, 发送事件 3)it.onNext(3)Log.v(TAG, 发送事件 4)it.onNext(4)Log.v(TAG, 发送完成)it.onComplete()}, BackpressureStrategy.ERROR).subscribe(object :SubscriberInt {override fun onSubscribe(s: Subscription?) {Log.v(TAG, onSubscribe)}override fun onNext(t: Int?) {Log.v(TAG, onNext:$t)}override fun onError(t: Throwable?) {Log.v(TAG, onError:$t)}override fun onComplete() {Log.v(TAG, onComplete)}}) }2021-06-27 19:04:50.591 26746-26746/com.example.rxjavademo V/MAINACTIVITY_RXJAVA: onSubscribe 2021-06-27 19:04:50.593 26746-26746/com.example.rxjavademo V/MAINACTIVITY_RXJAVA: 发送事件 1 2021-06-27 19:04:50.596 26746-26746/com.example.rxjavademo V/MAINACTIVITY_RXJAVA: onError:io.reactivex.rxjava3.exceptions.MissingBackpressureException: create: could not emit value due to lack of requests 2021-06-27 19:04:50.596 26746-26746/com.example.rxjavademo V/MAINACTIVITY_RXJAVA: 发送事件 2 2021-06-27 19:04:50.596 26746-26746/com.example.rxjavademo V/MAINACTIVITY_RXJAVA: 发送事件 3 2021-06-27 19:04:50.596 26746-26746/com.example.rxjavademo V/MAINACTIVITY_RXJAVA: 发送事件 4 2021-06-27 19:04:50.596 26746-26746/com.example.rxjavademo V/MAINACTIVITY_RXJAVA: 发送完成 4.控制 被观察者发送事件 的速度 1.被观察者根据观察者自身接收事件能力10个事件从而仅发送10个事件 private fun testFlowable8() {Flowable.create(FlowableOnSubscribeInt {//调用it.requested()获取当前观察者需要接收扽事件数量val n it.requested()Log.v(TAG, 观察者可接收事件: n)//根据it.requestd()的值即当前观察者需要接收扽事件数量来发送事件for (i in 1..n) {Log.v(TAG, 发送了事件 i)it.onNext(i.toInt())}}, BackpressureStrategy.ERROR).subscribe(object :SubscriberInt {override fun onSubscribe(s: Subscription?) {Log.v(TAG, onSubscribe)//设置观察者每次能接收5个事件s?.request(5)}override fun onNext(t: Int?) {Log.v(TAG, 接收到了事件$t)}override fun onError(t: Throwable?) {Log.v(TAG, onError():$t)}override fun onComplete() {Log.v(TAG, onComplete);}}) }2021-07-01 18:19:06.266 23497-23497/com.example.rxjavademo V/MAINACTIVITY_RXJAVA: onSubscribe 2021-07-01 18:19:06.266 23497-23497/com.example.rxjavademo V/MAINACTIVITY_RXJAVA: 观察者可接收事件:5 2021-07-01 18:19:06.266 23497-23497/com.example.rxjavademo V/MAINACTIVITY_RXJAVA: 发送了事件1 2021-07-01 18:19:06.267 23497-23497/com.example.rxjavademo V/MAINACTIVITY_RXJAVA: 接收到了事件1 2021-07-01 18:19:06.267 23497-23497/com.example.rxjavademo V/MAINACTIVITY_RXJAVA: 发送了事件2 2021-07-01 18:19:06.267 23497-23497/com.example.rxjavademo V/MAINACTIVITY_RXJAVA: 接收到了事件2 2021-07-01 18:19:06.267 23497-23497/com.example.rxjavademo V/MAINACTIVITY_RXJAVA: 发送了事件3 2021-07-01 18:19:06.267 23497-23497/com.example.rxjavademo V/MAINACTIVITY_RXJAVA: 接收到了事件3 2021-07-01 18:19:06.267 23497-23497/com.example.rxjavademo V/MAINACTIVITY_RXJAVA: 发送了事件4 2021-07-01 18:19:06.267 23497-23497/com.example.rxjavademo V/MAINACTIVITY_RXJAVA: 接收到了事件4 2021-07-01 18:19:06.267 23497-23497/com.example.rxjavademo V/MAINACTIVITY_RXJAVA: 发送了事件5 2021-07-01 18:19:06.267 23497-23497/com.example.rxjavademo V/MAINACTIVITY_RXJAVA: 接收到了事件5 在同步订阅情况中使用FlowableEmitter.requested()时要注意的 1.观察者可连续要求接收事件被观察者会进行叠加并一起发送。 private fun testFlowable9() {Flowable.create(FlowableOnSubscribeInt {//调用it.requested()获取当前观察者需要接收的事件数量Log.v(TAG, 观察者可接收事件 it.requested())}, BackpressureStrategy.ERROR).subscribe(object : SubscriberInt {override fun onSubscribe(s: Subscription?) {Log.v(TAG, onSubscribe)s?.request(5) s?.request(10) }override fun onNext(t: Int?) {Log.v(TAG, 接收到了事件$t)}override fun onError(t: Throwable?) {Log.v(TAG, onError: , t)}override fun onComplete() {Log.v(TAG, onComplete)}}) } 2021-07-01 20:14:27.196 7449-7449/com.example.rxjavademo V/MAINACTIVITY_RXJAVA: onSubscribe 2021-07-01 20:14:27.198 7449-7449/com.example.rxjavademo V/MAINACTIVITY_RXJAVA: 观察者可接收事件15 2. 每次发送事件后emitter.requested()会实时更新观察者能接受的事件。 private fun testFlowable10() {Flowable.create(FlowableOnSubscribeInt {//1.调用t.requested()获取当前观察者需要接收的事件的数量Log.v(TAG, 观察者可接收事件数量: it.requested())//2. 每次发送事件后t.requested()会实时更新观察者能接受的事件// 即一开始观察者要接收10个事件发送了1个后会实时更新为9个Log.v(TAG, 发送了事件1)it.onNext(1)Log.v(TAG, 发送了事件1后, 还需要发送事件数量: it.requested())Log.v(TAG, 发送了事件2)it.onNext(2)Log.d(TAG, 发送事件2后, 还需要发送事件数量: it.requested())Log.d(TAG, 发送了事件3);it.onNext(3);Log.d(TAG, 发送事件3后, 还需要发送事件数量: it.requested())it.onComplete()}, BackpressureStrategy.ERROR).subscribe(object : SubscriberInt {override fun onSubscribe(s: Subscription?) {Log.d(TAG, onSubscribe)s?.request(5)}override fun onNext(t: Int?) {Log.v(TAG, 接收到了事件$t)}override fun onError(t: Throwable?) {Log.v(TAG, onError: $t)}override fun onComplete() {Log.v(TAG, onComplete)}}) } 2021-07-01 20:37:58.644 8065-8065/com.example.rxjavademo D/MAINACTIVITY_RXJAVA: onSubscribe 2021-07-01 20:37:58.646 8065-8065/com.example.rxjavademo V/MAINACTIVITY_RXJAVA: 观察者可接收事件数量:5 2021-07-01 20:37:58.647 8065-8065/com.example.rxjavademo V/MAINACTIVITY_RXJAVA: 发送了事件1 2021-07-01 20:37:58.647 8065-8065/com.example.rxjavademo V/MAINACTIVITY_RXJAVA: 接收到了事件1 2021-07-01 20:37:58.648 8065-8065/com.example.rxjavademo V/MAINACTIVITY_RXJAVA: 发送了事件1后, 还需要发送事件数量:4 2021-07-01 20:37:58.648 8065-8065/com.example.rxjavademo V/MAINACTIVITY_RXJAVA: 发送了事件2 2021-07-01 20:37:58.648 8065-8065/com.example.rxjavademo V/MAINACTIVITY_RXJAVA: 接收到了事件2 2021-07-01 20:37:58.648 8065-8065/com.example.rxjavademo D/MAINACTIVITY_RXJAVA: 发送事件2后, 还需要发送事件数量:3 2021-07-01 20:37:58.648 8065-8065/com.example.rxjavademo D/MAINACTIVITY_RXJAVA: 发送了事件3 2021-07-01 20:37:58.649 8065-8065/com.example.rxjavademo V/MAINACTIVITY_RXJAVA: 接收到了事件3 2021-07-01 20:37:58.649 8065-8065/com.example.rxjavademo D/MAINACTIVITY_RXJAVA: 发送事件3后, 还需要发送事件数量:2 2021-07-01 20:37:58.649 8065-8065/com.example.rxjavademo D/MAINACTIVITY_RXJAVA: onComplete 3.异常 当FlowableEmitter.requested()减到0时则代表观察者已经不可接收事件。 此时被观察者若继续发送事件则会抛出MissingBackpressureException异常。 若观察者没有设置可接收事件数量即无调用Subscription.request 那么被观察者默认观察者可接收事件数量 0即FlowableEmitter.requested()的返回值 0 private fun testFlowable11() {Flowable.create(FlowableOnSubscribeInt {// 1. 调用emitter.requested()获取当前观察者需要接收的事件数量Log.v(TAG, 观察者可接收事件数量 it.requested())it.onNext(1)it.onNext(2)it.onNext(3)it.onComplete()}, BackpressureStrategy.ERROR).subscribe(object : SubscriberInt {override fun onSubscribe(s: Subscription?) {Log.v(TAG, onSubscribe)s?.request(2)}override fun onNext(t: Int?) {Log.v(TAG, 接收到了事件$t)}override fun onError(t: Throwable?) {Log.v(TAG, onError:$t);}override fun onComplete() {Log.v(TAG, onComplete);}}) } 2021-07-01 21:15:50.807 8877-8877/com.example.rxjavademo V/MAINACTIVITY_RXJAVA: onSubscribe 2021-07-01 21:15:50.809 8877-8877/com.example.rxjavademo V/MAINACTIVITY_RXJAVA: 观察者可接收事件数量2 2021-07-01 21:15:50.810 8877-8877/com.example.rxjavademo V/MAINACTIVITY_RXJAVA: 接收到了事件1 2021-07-01 21:15:50.811 8877-8877/com.example.rxjavademo V/MAINACTIVITY_RXJAVA: 接收到了事件2 2021-07-01 21:15:50.812 8877-8877/com.example.rxjavademo V/MAINACTIVITY_RXJAVA: onError:io.reactivex.rxjava3.exceptions.MissingBackpressureException: create: could not emit value due to lack of requests 4.异步订阅相关 FlowableEmitter.requested()知道观察者自身接收事件能力即 被观察者不能根据 观察者自身接收事件的能力 控制发送事件的速度。 private fun testFlowable12() {Flowable.create(FlowableOnSubscribeInt {// 调用emitter.requested()获取当前观察者需要接收的事件数量Log.d(TAG, 观察者可接收事件数量 it.requested())}, BackpressureStrategy.ERROR).subscribeOn(Schedulers.io()).observeOn(AndroidSchedulers.mainThread()).subscribe(object :SubscriberInt {override fun onSubscribe(s: Subscription?) {Log.v(TAG, onSubscribe)s?.request(100)// 该设置仅影响观察者线程中的requested却不会影响的被观察者中的FlowableEmitter.requested()的返回值// 因为FlowableEmitter.requested()的返回值 取决于RxJava内部调用request(n)而该内部调用会在一开始就调用request(128)}override fun onNext(t: Int?) {Log.v(TAG, 接收到了事件$t)}override fun onError(t: Throwable?) {Log.v(TAG, onError:$t);}override fun onComplete() {Log.d(TAG, onComplete)}}) } 2021-07-01 21:54:17.645 9315-9315/com.example.rxjavademo V/MAINACTIVITY_RXJAVA: onSubscribe 2021-07-01 21:54:17.651 9315-9355/com.example.rxjavademo D/MAINACTIVITY_RXJAVA: 观察者可接收事件数量 128 通过RxJava内部固定调用被观察者线程中的request(n) 从而 反向控制被观察者的发送事件速度 R.id.testFlowableBtn13 - {// 点击按钮才会接收事件 5 次// 点击按钮 则 接收5个事件mSubscription2?.request(5) }private var mSubscription2: Subscription? null// 被观察者一共需要发送500个事件但真正开始发送事件的前提 FlowableEmitter.requested()返回值 ≠ 0 // 观察者每次接收事件数量 48点击按钮 private fun testFlowable13() {Flowable.create(FlowableOnSubscribeInt {Log.v(TAG, 观察者可接收事件数量: it.requested())var flag: Boolean//被观察者一共需要发送500个事件for (i in 1..500) {flag false//requested()0则不发送while (it.requested() 0L) {if (!flag) {Log.v(TAG, 不再发送)flag true}}//requested() ! 0 才发送Log.v(TAG, 发送了事件 i 观察者可接收事件数量: it.requested())it.onNext(i)}}, BackpressureStrategy.ERROR).subscribeOn(Schedulers.io()).observeOn(AndroidSchedulers.mainThread()).subscribe(object : SubscriberInt {override fun onSubscribe(s: Subscription?) {Log.v(TAG, onSubscribe)mSubscription2 s//初始状态 不接收事件通过点击按钮接收事件}override fun onNext(t: Int?) {Log.v(TAG, 接收到了事件$t)}override fun onError(t: Throwable?) {Log.v(TAG, onError: $t)}override fun onComplete() {Log.v(TAG, onComplete)}})} 2021-07-04 11:48:53.669 13219-13219/com.example.rxjavademo V/MAINACTIVITY_RXJAVA: onSubscribe 2021-07-04 11:48:53.673 13219-13250/com.example.rxjavademo V/MAINACTIVITY_RXJAVA: 观察者可接收事件数量:128 2021-07-04 11:48:53.673 13219-13250/com.example.rxjavademo V/MAINACTIVITY_RXJAVA: 发送了事件1观察者可接收事件数量:128 2021-07-04 11:48:53.675 13219-13250/com.example.rxjavademo V/MAINACTIVITY_RXJAVA: 发送了事件2观察者可接收事件数量:127 2021-07-04 11:48:53.675 13219-13250/com.example.rxjavademo V/MAINACTIVITY_RXJAVA: 发送了事件3观察者可接收事件数量:126 2021-07-04 11:48:53.675 13219-13250/com.example.rxjavademo V/MAINACTIVITY_RXJAVA: 发送了事件4观察者可接收事件数量:125 2021-07-04 11:48:53.675 13219-13250/com.example.rxjavademo V/MAINACTIVITY_RXJAVA: 发送了事件5观察者可接收事件数量:124 ... ... 2021-07-04 11:48:53.693 13219-13250/com.example.rxjavademo V/MAINACTIVITY_RXJAVA: 发送了事件128观察者可接收事件数量:1 2021-07-04 11:48:53.693 13219-13250/com.example.rxjavademo V/MAINACTIVITY_RXJAVA: 不再发送 2021-07-04 11:49:30.455 13219-13219/com.example.rxjavademo V/MAINACTIVITY_RXJAVA: 接收到了事件1 2021-07-04 11:49:30.455 13219-13219/com.example.rxjavademo V/MAINACTIVITY_RXJAVA: 接收到了事件2 2021-07-04 11:49:30.460 13219-13219/com.example.rxjavademo V/MAINACTIVITY_RXJAVA: 接收到了事件3 2021-07-04 11:49:30.460 13219-13219/com.example.rxjavademo V/MAINACTIVITY_RXJAVA: 接收到了事件4 2021-07-04 11:49:30.461 13219-13219/com.example.rxjavademo V/MAINACTIVITY_RXJAVA: 接收到了事件5 5.采用背压策略模式BackpressureStrategy 当缓存区大小存满、被观察者仍然继续发送下1个事件时该如何处理的策略方式。 1.直接抛出异常MissingBackpressureException private fun testFlowable14() {Flowable.create(FlowableOnSubscribeInt {for (i in 1..129) {Log.v(TAG, 发送事件$i);it.onNext(i)}it.onComplete()}, BackpressureStrategy.ERROR).subscribeOn(Schedulers.io()).observeOn(AndroidSchedulers.mainThread()).subscribe(object : SubscriberInt {override fun onSubscribe(s: Subscription?) {Log.v(TAG, onSubscribe);}override fun onNext(t: Int?) {Log.v(TAG, 接收到了事件$t)}override fun onError(t: Throwable?) {Log.w(TAG, onError: $t)}override fun onComplete() {Log.v(TAG, onComplete);}})} ... ... 2021-07-04 14:21:51.700 14830-14956/com.example.rxjavademo V/MAINACTIVITY_RXJAVA: 发送事件127 2021-07-04 14:21:51.700 14830-14956/com.example.rxjavademo V/MAINACTIVITY_RXJAVA: 发送事件128 2021-07-04 14:21:51.700 14830-14956/com.example.rxjavademo V/MAINACTIVITY_RXJAVA: 发送事件129 2021-07-04 14:21:51.702 14830-14830/com.example.rxjavademo W/MAINACTIVITY_RXJAVA: onError: io.reactivex.rxjava3.exceptions.MissingBackpressureException: create: could not emit value due to lack of requests 2.BackpressureStrategy.MISSING 友好提示缓存区满了 private fun testFlowable15() {Flowable.create(FlowableOnSubscribeInt {for (i in 1..129) {Log.v(TAG, 发送事件$i)it.onNext(i)}it.onComplete()}, BackpressureStrategy.MISSING).subscribeOn(Schedulers.io()).observeOn(AndroidSchedulers.mainThread()).subscribe(object : SubscriberInt {override fun onSubscribe(s: Subscription?) {Log.v(TAG, onSubscribe);}override fun onNext(t: Int?) {Log.v(TAG, 接收到了事件$t)}override fun onError(t: Throwable?) {Log.w(TAG, onError: $t)}override fun onComplete() {Log.v(TAG, onComplete);}}) }2021-07-04 14:41:13.044 15186-15235/com.example.rxjavademo V/MAINACTIVITY_RXJAVA: 发送事件128 2021-07-04 14:41:13.044 15186-15235/com.example.rxjavademo V/MAINACTIVITY_RXJAVA: 发送事件129 2021-07-04 14:41:13.048 15186-15186/com.example.rxjavademo W/MAINACTIVITY_RXJAVA: onError: io.reactivex.rxjava3.exceptions.MissingBackpressureException: Queue is full?! 3.BackpressureStrategy.BUFFER 将缓存区大小设置成无限大 private fun testFlowable16() {Flowable.create(FlowableOnSubscribeInt {for (i in 1..150) {Log.v(TAG, 发送事件$i)it.onNext(i)}it.onComplete()}, BackpressureStrategy.BUFFER).subscribeOn(Schedulers.io()).observeOn(AndroidSchedulers.mainThread()).subscribe(object :SubscriberInt {override fun onSubscribe(s: Subscription?) {Log.v(TAG, onSubscribe);}override fun onNext(t: Int?) {Log.v(TAG, 接收到了事件$t)}override fun onError(t: Throwable?) {Log.w(TAG, onError: $t)}override fun onComplete() {Log.v(TAG, onComplete);}}) } ... ... 2021-07-04 15:16:38.947 15488-15576/com.example.rxjavademo V/MAINACTIVITY_RXJAVA: 发送事件147 2021-07-04 15:16:38.947 15488-15576/com.example.rxjavademo V/MAINACTIVITY_RXJAVA: 发送事件148 2021-07-04 15:16:38.947 15488-15576/com.example.rxjavademo V/MAINACTIVITY_RXJAVA: 发送事件149 2021-07-04 15:16:38.947 15488-15576/com.example.rxjavademo V/MAINACTIVITY_RXJAVA: 发送事件150 4.BackpressureStrategy.DROP 超过缓存区大小128的事件丢弃 R.id.testFlowableBtn17 - {mSubscription17?.request(128) } private var mSubscription17: Subscription? null private fun testFlowable17() {Flowable.create(FlowableOnSubscribeInt {for (i in 1..200) {Log.v(TAG, 发送事件$i)it.onNext(i)}it.onComplete()}, BackpressureStrategy.DROP).subscribeOn(Schedulers.io()).observeOn(AndroidSchedulers.mainThread()).subscribe(object : SubscriberInt {override fun onSubscribe(s: Subscription?) {Log.v(TAG, onSubscribe)mSubscription17 s}override fun onNext(t: Int?) {Log.v(TAG, 接收到了事件$t)}override fun onError(t: Throwable?) {Log.w(TAG, onError: $t)}override fun onComplete() {Log.v(TAG, onComplete);}}) } ... ... 2021-07-04 15:41:22.195 15851-15890/com.example.rxjavademo V/MAINACTIVITY_RXJAVA: 发送事件196 2021-07-04 15:41:22.195 15851-15890/com.example.rxjavademo V/MAINACTIVITY_RXJAVA: 发送事件197 2021-07-04 15:41:22.195 15851-15890/com.example.rxjavademo V/MAINACTIVITY_RXJAVA: 发送事件198 2021-07-04 15:41:22.195 15851-15890/com.example.rxjavademo V/MAINACTIVITY_RXJAVA: 发送事件199 2021-07-04 15:41:22.195 15851-15890/com.example.rxjavademo V/MAINACTIVITY_RXJAVA: 发送事件200 ... ... 2021-07-04 15:41:33.829 15851-15851/com.example.rxjavademo V/MAINACTIVITY_RXJAVA: 接收到了事件125 2021-07-04 15:41:33.829 15851-15851/com.example.rxjavademo V/MAINACTIVITY_RXJAVA: 接收到了事件126 2021-07-04 15:41:33.829 15851-15851/com.example.rxjavademo V/MAINACTIVITY_RXJAVA: 接收到了事件127 2021-07-04 15:41:33.829 15851-15851/com.example.rxjavademo V/MAINACTIVITY_RXJAVA: 接收到了事件128 2021-07-04 15:41:33.833 15851-15851/com.example.rxjavademo V/MAINACTIVITY_RXJAVA: onComplete 5.BackpressureStrategy.LATEST 只保存最新最后事件超过缓存区大小128的事件丢弃 private var mSubscription18: Subscription? null private fun testFlowable18() {Flowable.create(FlowableOnSubscribeInt {for (i in 1..200) {Log.v(TAG, 发送事件$i)it.onNext(i)}it.onComplete()}, BackpressureStrategy.LATEST).subscribeOn(Schedulers.io()).observeOn(AndroidSchedulers.mainThread()).subscribe(object : SubscriberInt {override fun onSubscribe(s: Subscription?) {Log.v(TAG, onSubscribe)mSubscription18 s}override fun onNext(t: Int?) {Log.v(TAG, 接收到了事件$t)}override fun onError(t: Throwable?) {Log.w(TAG, onError: $t)}override fun onComplete() {Log.v(TAG, onComplete)}}) } ... ... 2021-07-04 16:02:55.393 16130-16185/com.example.rxjavademo V/MAINACTIVITY_RXJAVA: 发送事件196 2021-07-04 16:02:55.393 16130-16185/com.example.rxjavademo V/MAINACTIVITY_RXJAVA: 发送事件197 2021-07-04 16:02:55.393 16130-16185/com.example.rxjavademo V/MAINACTIVITY_RXJAVA: 发送事件198 2021-07-04 16:02:55.393 16130-16185/com.example.rxjavademo V/MAINACTIVITY_RXJAVA: 发送事件199 2021-07-04 16:02:55.393 16130-16185/com.example.rxjavademo V/MAINACTIVITY_RXJAVA: 发送事件200 ... ... 2021-07-04 16:02:57.489 16130-16130/com.example.rxjavademo V/MAINACTIVITY_RXJAVA: 接收到了事件126 2021-07-04 16:02:57.489 16130-16130/com.example.rxjavademo V/MAINACTIVITY_RXJAVA: 接收到了事件127 2021-07-04 16:02:57.489 16130-16130/com.example.rxjavademo V/MAINACTIVITY_RXJAVA: 接收到了事件128 2021-07-04 16:03:25.229 16130-16130/com.example.rxjavademo V/MAINACTIVITY_RXJAVA: 接收到了事件200 2021-07-04 16:03:25.233 16130-16130/com.example.rxjavademo V/MAINACTIVITY_RXJAVA: onComplete
http://www.w-s-a.com/news/961325/

相关文章:

  • 成都捕鱼网站建设wordpress自定义文章类别
  • wordpress网站怎么加速湖北网站建设企业
  • 迁安做网站中的cms开发南平网站建设公司
  • 肥西县住房和城乡建设局网站代驾系统定制开发
  • 网站建设明细报价表 服务器qq是哪家公司的产品
  • html链接网站模板wordpress怎么调用简码
  • 网站域名怎么查简述网站推广的五要素
  • 咸宁网站设计公司app安装下载
  • 丝网外贸做哪些网站最优的赣州网站建设
  • 如何做网站不被查网站开发工程师岗位说明书
  • 做网站需要vps吗网站建设后怎样发信息
  • 网站建立风格二手交易网站开发可参考文献
  • 成都微信网站开发优化大师优化项目有哪些
  • 哪个网站做自考题目免费郑州网站建设公司qq
  • 地方性的网站有前途顺的网络做网站好不好
  • 学校申请建设网站的原因不要网站域名
  • 推荐响应式网站建设子域名查询工具
  • 如何建设学校的微网站广告推广是什么
  • 设计类专业哪个就业前景好网站建设seoppt
  • 济南建站公司网站网站友链查询源码
  • 校园失物招领网站建设涪陵网站建设公司
  • 怎么做盗号网站手机网站建设需要租用什么科目
  • 成品网站是什么意思沈阳seo推广
  • 购物网站后台流程图昆明官网seo技术
  • 创建自己网站全网零售管理系统
  • 江苏省建设厅网站建筑电工证wordpress收费插件大全
  • 北京中国建设银行招聘信息网站宁德蕉城住房和城乡建设部网站
  • 泉州做网站优化哪家好wordpress站点预览
  • 创建门户网站一页网站首页图如何做
  • 服装手机商城网站建设sns社交网站有哪些