怎样网站建设,做网站公司怎么做,长春建工集团官网,百度网盟官网文章目录 1.定义2.作用3.特点4.使用4.1创建被观察者#xff08;Observable#xff09;4.2创建观察者#xff08;Observer#xff09;4.3订阅#xff08;Subscribe#xff09;4.4Dispose 5.操作符5.1操作符类型5.2just操作符5.2链式调用5.3 fromArray操作符5.4 fromIterab… 文章目录 1.定义2.作用3.特点4.使用4.1创建被观察者Observable4.2创建观察者Observer4.3订阅Subscribe4.4Dispose 5.操作符5.1操作符类型5.2just操作符5.2链式调用5.3 fromArray操作符5.4 fromIterable操作符5.5map操作符5.6flatMap操作符5.7concatMap操作符5.8buffer操作符5.9concat操作符 6.异步7.subscribeOn8.observeOn9.背压9.1Flowable9.2背压策略9.3另一种调用背压策略的方式 10.RxBus11.RxBinding12.内存泄露 1.定义
RxJava在GitHub的介绍 RxJavaa library for composing asynchronous and event-based programs using observable sequences for the Java VM // 翻译RxJava 是一个在 Java VM 上使用可观测的序列来组成异步的、基于事件的程序的库 也就是说RxJava是一个基于事件流实现异步操作的库
2.作用
类似于Android中的AsyncTaskHandler作用用于实现异步操作
3.特点
由于RxJava的使用方式是基于事件流的链式调用所以使得RxJava
逻辑简单实现优雅使用简单
RxJava原理基于一种扩展的观察者模式 RxJava的扩展观察者模式中有4个角色
角色作用被观察者(Observable)产生事件观察者(Observer)接收事件并给出响应动作订阅(Subscribe)连接被观察者观察者事件(Event)被观察者观察者沟通的载体
可以总结为被观察者Observable通过订阅Subscribe按顺序发送事件给观察者Observer观察者Observer按顺序接收事件作出对应的响应动作
4.使用
添加依赖
implementation io.reactivex.rxjava3:rxjava:3.0.0
implementation io.reactivex.rxjava3:rxandroid:3.0.04.1创建被观察者Observable
val ohThisIsObservable Observable.createString{it.onNext(Hello) //发送事件it.onNext(rx)it.onNext(world)it.onComplete() //发送完成事件
}这里采用了create()创建被观察者但并非只有create()能创建其余操作符也可以达成此效果后面介绍。
4.2创建观察者Observer
val observer: ObserverString object : ObserverString {override fun onSubscribe(d: Disposable) { System.out.println( onSubscribe ) }override fun onNext(string: String) { System.out.println( onNext : string) }override fun onError(e: Throwable) { System.out.println(e) }override fun onComplete() { System.out.println( on Complete ) }
}
可看见这里响应事件分别有以下 onSubscribe():准备监听最先调用的方法 onNext():用来发送数据调用一次发送一条 onError():发送异常通知只发送一次多次调用也只会发送第一条 onComplete():发送完成通知只发送一次多次调用也只会发送第一条。 PSonError()和onComplete()互斥俩方法同时只能调用一个要么发生异常onError()不会回调onComplete()要么正常回调onComplete()不回调onError()。
4.3订阅Subscribe
ohThisIsObservable.subscribe(observer)运行代码会发现如下结果 日志中可发现当被观察者(ohThisIsObservable)通过调用onNext()发射数据的时候观察者(observer)调用onNext()接收数据当被观察者(ohThisIsObservable)调用onComplete()时观察者(observer)调用onComplete()其他事件将不会继续发送onError同此理。 RxJava中观察者不仅仅只有observer才能实现下面是个简单版示例
val consumer: ConsumerString Consumer { s -//创建观察者consumerprintln(s)}
val stringObservable Observable.create { emitter -emitter.onNext(Hello)emitter.onNext(~~~rx~~~)emitter.onNext(world)emitter.onComplete()
}
//被观察者发出一连串字符并指定consumer订阅被观察者
stringObservable.subscribe(consumer)
对应输出结果如图 由以上代码可见Observer相对于Consumer在接口方法上要多onSubscribe、onNext、onError、onComplete这些接口在一次事件中可操作程度更精细。
4.4Dispose
在onSubscribe()中会接收到一个Disposable对象该对象相当于一个开关如果开关关闭则观察者不会收到任何事件和数据。例如
val observer: ObserverString object : ObserverString {var mDisposeable: Disposable? nulloverride fun onSubscribe(d: Disposable) {println( onSubscribe )mDisposeable d}override fun onNext(s: String) {println( onNext : $s)if (s stop) {mDisposeable!!.dispose()}}override fun onError(e: Throwable) {println( onError )}override fun onComplete() {println( onComplete )}
}
Observable.just(Hello, world, stop, coding).subscribe(observer)
在上述代码中我们使用一个变量来保存Disposable对象在onNext方法中如果传过来的字符串是“stop”则调用dispose关闭事件的接收后续字符串不在发射甚至onComplete()也不会执行了。结果如下图
5.操作符
Rxjava提供大量操作符来完成对数据处理这些操作符也可以理解成函数。如果把Rxjava比喻成一道数据流水线那么一个操作符就是一道工序数据通过这些工序加工变换、组装最后生产出我们想要的数据。
5.1操作符类型
创建型 转换型 组合型 功能型 过滤型 条件型
5.2just操作符
用于创建一个被观察者并发送事件发送的事件不可以超过10个以上(从其构造函数就可以看出如下图) 简单写个示例
val justObservable Observable.just(Hello, rx, world~!)
val observer: ObserverString object : ObserverString {override fun onSubscribe(d: Disposable) { System.out.println( onSubscribe ) }override fun onNext(string: String) { System.out.println( onNext : string) }override fun onError(e: Throwable) { System.out.println(e) }override fun onComplete() { System.out.println( on Complete ) }
}
justObservable.subscribe(observer)对应输出结果为
5.2链式调用
RxJava最方便的一个特征就是链式调用上述代码可以修改为
Observable.just(Hello, rx, world).subscribe(object : ObserverString {override fun onSubscribe(d: Disposable) { System.out.println( onSubscribe ) }override fun onNext(string: String) { System.out.println( onNext : string) }override fun onError(e: Throwable) { System.out.println(e) }override fun onComplete() { System.out.println( on Complete ) }
})
效果一样(Java代码在这里的表现形式则是lamba表达式)但跟之前看起来给人感觉完全不一样如无特殊说明后续例子都会如此调用。
5.3 fromArray操作符
类似于just但是可以传入无限个参数无数量限制
5.4 fromIterable操作符
可直接传一个List给观察者发射List extends Collection接口而Collection extends Iterable接口所以可以直接传进去。例如
val arrayList ArrayListString()
arrayList.add(111)
arrayList.add(222)
Observable.fromIterable(arrayList).subscribe(object : ObserverString {override fun onSubscribe(d: Disposable) { System.out.println( onSubscribe ) }override fun onNext(string: String) { System.out.println( onNext : string) }override fun onError(e: Throwable) { System.out.println(e) }override fun onComplete() { System.out.println( on Complete ) }
})
对应结果
5.5map操作符
map操作符能直接对发射出来的事件进行处理并且产生新的事件然后再次发射。例如下述例子
Observable.just(Hello).mapAny { get it! }.subscribe(object : ObserverAny {override fun onSubscribe(d: Disposable) {println( onSubscribe )}override fun onNext(o: Any) {println( onNext : o)}override fun onError(e: Throwable) {println( onError )}override fun onComplete() {println( onComplete )}})
这里我们本来传入参数是Hello通过map()拦截后发射出去的参数变成了get it!拦截修改成功。
5.6flatMap操作符
flat英语翻译过来的意思是“使变平”的意思跟map()一样都能直接对发射出来的事件进行处理并且产生新的事件。但其内部方法参数不同。二者都是传参进Function中并在apply中进行数据修改但二者传入参数不同。
map是两个泛型而flatMap第二个参数填Observable被观察者再将这个被观察者发射出去这一下灵活度就增大了这也是网络请求场景中最常用的操作符。下述简单示例
Observable.just(注册).flatMapAny { s -println(s 成功)Observable.just(进行登陆)
}.subscribe(object : ObserverAny {override fun onSubscribe(d: Disposable) {println( onSubscribe )}override fun onNext(o: Any) {println( onNext o)}override fun onError(e: Throwable) {println( onError )}override fun onComplete() {println( onComplete )}
})
对应的日志打印
5.7concatMap操作符
concatMap()与flatMap()使用方式完全一致基本上是一样的。不过concatMap()转发出来的数据是有序的而flatMap()是无序的。
5.8buffer操作符
buffer()有多参数方法这里介绍最常用的单参数形式buffer(x)根据个数来缓冲每次缓冲x个数转换成数组再发射出去例如
Observable.just(1,2,3,4,5,8,9,7,6,10).buffer(3).subscribe(object : ObserverAny {override fun onSubscribe(d: Disposable) {println( onSubscribe )}override fun onNext(o: Any) {println( onNext o)}override fun onError(e: Throwable) {println( onError )}override fun onComplete() {println( onComplete )}
})
对应的输出结果为
5.9concat操作符
可以将多个观察者组合在一起然后按照之前发送顺序发送事件。需要注意的是concat() 最多只可以发送4个事件。 示例如下
Observable.concat(Observable.just(111),Observable.just(222)).subscribe ( object : ObserverAny{override fun onSubscribe(d: Disposable) {println( onSubscribe )}override fun onNext(t: Any) {println( onNext : t)}override fun onError(e: Throwable) {println( onError )}override fun onComplete() {println( onComplete )}} )
对应输出结果为 concatArray()和concat()作用一样不过concatArray()可以发送多于4个被观察者。
6.异步
RxJava提供了非常方便的API来完成线程的调度内置的线程调度器有以下几个
Schedule.single()单线程调度器线程可复用Schedule.newThread()为每个任务创建新的线程Schedule.io()处理I/O密集任务内部线程池实现可根据需求增长Schedulers.computation()处理计算任务如事件循环和回调任务 Schedulers.immediate()默认指定的线程也就是当前线程; AndroidSchedulers.mainThread()Android主线程调度器属于RxAndroid。
线程调度器实际上是指派事件在什么样的线程中处理所需应用场景就不难想象了如果该事件是耗时操作比如网络请求但相应结果会先是在UI中这时候在主线程执行网络请求就不合适了但在子线程执行结果同样要刷新UI也不太合适这里就凸显自由切换线程的好处了。Rxjava可通过调度器来制定被观察者和观察者分别可以在什么线程中执行自己的代码而指定调度器的API则是subscribeOn和observeOn。
7.subscribeOn
首先我们不用线程调度器我们先看观察者和被观察者默认情况下在什么线程中执行自己代码如下
Observable.create(object : ObservableOnSubscribeAny{override fun subscribe(emitter: ObservableEmitterAny) {println( subscribe : Thread.currentThread())emitter.onNext( guess wich thread )emitter.onComplete()}
}).subscribe ( object : ObserverAny{override fun onSubscribe(d: Disposable) {println( onSubscribe : Thread.currentThread())}override fun onNext(t: Any) {println( onNext : t : Thread.currentThread())}override fun onError(e: Throwable) {println( onError : Thread.currentThread())}override fun onComplete() {println( onComplete : Thread.currentThread())}} )
对应的结果为 可见默认情况下观察者和被观察者都是在主线程中执行。假设这个时候要执行耗时操作Android程序必定崩溃所以我们这时要切换线程。 subscribeOn()实际上是指定被观察者的代码在哪个线程中执行。例如
Observable.create(object : ObservableOnSubscribeAny{override fun subscribe(emitter: ObservableEmitterAny) {println( subscribe : Thread.currentThread())emitter.onNext( guess wich thread )emitter.onComplete()}
}).subscribeOn(Schedulers.newThread()) //决定执行subscribe方法所处的线程也就是产生事件或发射事件所处的线程.subscribe ( object : ObserverAny{override fun onSubscribe(d: Disposable) {println( onSubscribe : Thread.currentThread())}override fun onNext(t: Any) {println( onNext : t : Thread.currentThread())}override fun onError(e: Throwable) {println( onError : Thread.currentThread())}override fun onComplete() {println( onComplete : Thread.currentThread())}} )
这段代码中采用subscribeOn(Schedulers.newThread())来指定在新建线程中执行 这时运行得到结果 可见日志是不对的onNext()、onComplete()都没有打印。原因很简单我们在主线程创建观察者和被观察者之后事件发送的执行转交给调度器Schedulers.newThread()还没等来得及新线程发送出事件主线程就直接退出了所以后续日志看不到鉴于此我们使主线程休眠sleep2秒在上述方法的后面调用如下代码
try {Thread.sleep(2000) //这里sleep延时主线程
} catch (e: InterruptedException) {e.printStackTrace()
}
输出结果为
8.observeOn
observeOn()指定后续的操作符以及观察者的代码在什么样的线程中执行。且observeOn()可以多次被调用每次调用都生效。
Observable.create(object : ObservableOnSubscribeAny {override fun subscribe(emitter: ObservableEmitterAny) {Log.i(rxdemo, subscribe : Thread.currentThread())emitter.onNext( guess wich thread )emitter.onComplete()}
}).subscribeOn(Schedulers.io()) //决定执行subscribe方法所处的线程也就是产生事件或发射事件所处的线程.observeOn(AndroidSchedulers.mainThread()) //决定下游事件被处理时所处的线程.subscribe(object : ObserverAny {override fun onSubscribe(d: Disposable) {Log.i(rxdemo, onSubscribe : Thread.currentThread())}override fun onNext(t: Any) {Log.i(rxdemo, onNext : t : Thread.currentThread())}override fun onError(e: Throwable) {Log.i(rxdemo, onError : Thread.currentThread())}override fun onComplete() {Log.i(rxdemo, onComplete : Thread.currentThread())}})
对应输出结果为
9.背压
这个词是从backpressure直译过来背压即来自背部的压力指当被观察者发出很多的数据或事件时观察者来不及处理都积压在那压的观察者喘不过气有时候还会导致OOM。 如下述代码
Observable.create(object : ObservableOnSubscribeAny {override fun subscribe(emitter: ObservableEmitterAny) {while (true){emitter.onNext( subscribe : Hello )}}
}).subscribeOn(Schedulers.io()) //被观察者在I/O线程执行.observeOn(Schedulers.newThread()) //观察者在新线程执行.subscribe { //ConsumerThread.sleep(9000);Log.i(rxdemo, accept ~);}
观察者和被观察者在不同线程中执行被观察者是个死循环不停发射同时观察者处理数据的速度放缓一些休眠9秒处理一次。这时我们可以在Profiler中可以看到 内存随时间可见的上升这种情况如果不处理很大概率可能会出现OOM。究其原因是因为发送数据方和接收数据方不在一个线程内两个线程步调不一致发送数据太多处理不来就缓存起来直到内存用完这就是背压。针对背压Rxjava提供了支持背压处理的观察者和被观察者即Flowable和Subscriber。
9.1Flowable
Flowable是Observable观察者的一种新实现但Flowable额外实现了非阻塞式背压策略。同时用Flowable的时候观察者变为Subscriber。例如下面示例
Flowable.create({ emitter -Log.d(rxdemo, send 1)emitter.onNext(1)Log.d(rxdemo, send 2)emitter.onNext(2)Log.d(rxdemo, finish)emitter.onComplete()},BackpressureStrategy.ERROR
).subscribe(object : SubscriberInt {override fun onSubscribe(s: Subscription) {Log.d(rxdemo, onSubscribe)s.request(2)}override fun onNext(integer: Int) {Log.d(rxdemo, get the $integer)}override fun onError(t: Throwable) {Log.w(rxdemo, onError: , t)}override fun onComplete() {Log.d(rxdemo, onComplete)}
})
对应输出结果为 看到这里你会对两个地方产生疑问一个是onSubscribe()中的s.request(2)这里是向观察者请求处理2条数据的意思如果没有这行代码则我们不请求处理数据程序则会触发这里的背压策略BackpressureStrategy.ERROR直接报错。当然背压策略不仅这一个还有其余几个
9.2背压策略
·BackpressureStrategy.ERROR直接抛出MissingBackpressureException异常 ·BackpressureStratery.MISSING不使用背压没有缓存仅提示缓存区满了 ·BackpressureStratery.BUFFER缓存所有数据直到观察者处理如果观察者处理不及时也会出现OOM被观察者可无限发送事件但实际上是放在缓存区。 ·BackpressureStratery.DROP丢弃超过缓存区大小128的数据 ·BackpressureStratery.LATEST只保存最新的最后的事件超过缓存区大小128时用新数据覆盖老数据。 到此我们可以总结下背压的出现是为了解决两个方面主要问题 · 当发送数据速度 接受数据速度数据堆叠缓存会撑满 · 当缓存区大小存满被观察者继续发送下一个事件时还是相当于撑爆了缓存区 到这里你会发现这还是个缓存区问题那么这个缓存区是否就是128呢我们可以通过Flowable.bufferSize()来获取缓存的大小例如
Flowable.create({ emitter -//发送128个Hello bufferfor (i in 0 until Flowable.bufferSize()) {Log.d(rxdemo, Hello buffer $i)emitter.onNext(Hello buffer $i)}},BackpressureStrategy.ERROR
).subscribeOn(Schedulers.io()).observeOn(Schedulers.newThread()).subscribe(object : SubscriberString {override fun onSubscribe(s: Subscription) {Log.d(rxdemo, onSubscribe)}override fun onNext(str: String) {Log.d(rxdemo, get the $str)}override fun onError(t: Throwable) {Log.w(rxdemo, onError: , t)}override fun onComplete() {Log.d(rxdemo, onComplete)}
})
对应的日志输出为 由日志不难看出其发挥大小为128也就是默认缓存数据为128个上述代码发出了128个Hello buffer。如果这个时候我们多发出来一个会怎样修改下for循环条件i in 0 until Flowable.bufferSize()1。最后会得到结果 毫无意外Subscriber并没有请求处理数据缓存已经爆满外加配置的背压策略为BackpressureStrategy.ERROR所以这里会在缓存撑爆的情况下通知Subscriber发生错误调用ERROR打印MissingBackpressureException。
9.3另一种调用背压策略的方式
看到这里你可能会想如果不使用create方法创建Flowable而是用range、interval这些操作符创建那如何配置策略对此Rxjava提供了对应的方法来匹配相应的背压策略onBackpressureBuffer()、onBackpressureDrop()、onBackpressureLatest()看名字就知道对应的策略啦,例如
Flowable.range(0,100).onBackpressureLatest().subscribeOn(Schedulers.io()).observeOn(Schedulers.newThread()).subscribe(object : SubscriberInt {override fun onSubscribe(s: Subscription) {Log.d(rxdemo, onSubscribe)}override fun onNext(num: Int) {Log.d(rxdemo, get the $num)}override fun onError(t: Throwable) {Log.w(rxdemo, onError: , t)}override fun onComplete() {Log.d(rxdemo, onComplete)}
})
其实到这里你会发现Rxjava的强大之处能随意切换线程跟retrofit结合做网络请求框架能用timer做定时操作用interval做周期性操作甚至进行数组、list的遍历等。
10.RxBus
一种基于RxJava实现事件总线的一种思想可完美替代EventBus相关代码参考
11.RxBinding
主要与RxJava结合用于一些View的事件绑定相关代码参考
12.内存泄露
Rxjava使用不当会造成内存泄露在页面销毁后Observable仍然还有事件等待发送和处理比如interval做周期性操作而没有停下来这个时候会导致Activity回收失败从而致使内存泄露。 解决办法 ·使用Disposable关闭页面时调用dispose()取消订阅 ·使用CompositeDisposable添加一组Disposable在关闭页面时同时取消订阅。 也可以将其与Activity基类生命周期进行绑定在销毁时取消订阅。