RxJava2学习之二:RxJava2 基本使用
RxJava简介
RxJava 是一个在Java虚拟机上实现的响应式扩展库:提供了基于Observable序列实现的异步调用及基于事件编程。它扩展了观察者模式,支持数据、事件序列并允许你合并序列,无需关心底层的线程处理、同步、线程安全、并发数据结构和非阻塞I/O处理。
RxJava 是一个响应式编程框架,采用观察者设计模式,自然包含Observable和Subscriber。
简言之,RxJava 是一个在 Java VM 上使用可观测的序列来组成异步的、基于事件的程序的库。
RxAndroid是RxJava的一个针对Android平台的扩展。它包含了一些能够简化Android开发的工具 AndroidObservable。
- RxJava
RxJava:https://github.com/ReactiveX/RxJava
RxAndroid:https://github.com/ReactiveX/RxAndroid
- RxJava2
RxJava2:https://github.com/ReactiveX/RxJava/tree/2.x
RxAndroid2:https://github.com/ReactiveX/RxAndroid/tree/2.x
使用RxJava和RxAndroid
- RxJava
compile "io.reactivex.rxjava2:rxjava:2.x.y"
- RxAndroid
implementation 'io.reactivex.rxjava2:rxandroid:2.0.2'
implementation 'io.reactivex.rxjava2:rxjava:2.x.y'
基本使用3部曲
创建Observable
private Observable<String> getObservable() {
return Observable.create(new ObservableOnSubscribe<String>() {
@Override
public void subscribe(ObservableEmitter<String> e) throws Exception {
e.onNext("工作");
e.onNext("开会");
e.onComplete();
//e.onError(new Throwable);
}
});
}
创建Observer
private Observer<String> getObserver() {
return new Observer<String>() {
@Override
public void onSubscribe(Disposable d) {
//d.dispose(); //移除订阅关系
//d.isDisposed(); //判断是否发生订阅关系
Log.i(TAG, "onSubscribe: " + d.toString());
}
@Override
public void onNext(String value) {
Log.i(TAG, "onNext: " + value);
}
@Override
public void onError(Throwable e) {
Log.i(TAG, "onError: " + e.getLocalizedMessage());
}
@Override
public void onComplete() {
Log.i(TAG, "onComplete: ");
}
};
}
订阅
public void hello(View view) {
//1. 创建Observable
Observable<String> observable = getObservable();
//2. 创建Observer
Observer<String> observer = getObserver();
//3. 订阅
observable.subscribe(observer);
}
简洁写法
Observable创建的简洁写法
使用Observable.just创建Observable
//使用Observable.just创建Observable
private Observable<String> getObservableWithJust() {
return Observable.just("工作", "开会", "大保健");
}
使用Observable.fromArray创建Observable
//使用Observable.fromArray创建Observable
private Observable<String> getObservableWithFromArray() {
return Observable.fromArray("工作", "开会", "大保健");
}
使用Observable.fromCallable创建Observable
//使用Observable.fromCallable创建Observable
private Observable<String> getObservableWithFromCallable() {
return Observable.fromCallable(new Callable<String>() {
@Override
public String call() throws Exception {
return "大保健";
}
});
}
Observable订阅及Observer创建的简洁写法
observable.subscribe(new Consumer<String>() {
@Override
public void accept(String s) throws Exception {
Log.i(TAG, "accept: " + s);
}
}, new Consumer<Throwable>() {
@Override
public void accept(Throwable throwable) throws Exception {
}
}, new Action() {
@Override
public void run() throws Exception {
}
});
Disposable
Disposable 相当于RxJava1.x中的Subscription,用于解除订阅
private Observer<String> getDisposableObserver() {
return new Observer<String>() {
private Disposable d;
@Override
public void onSubscribe(Disposable d) {
this.d = d;
Log.i(TAG, "onSubscribe: " + d.toString());
}
@Override
public void onNext(String value) {
Log.i(TAG, "onNext: " + value);
if ("开会".equals(value)) {
d.dispose(); //移除订阅关系
}
}
@Override
public void onError(Throwable e) {
Log.i(TAG, "onError: " + e.getLocalizedMessage());
}
@Override
public void onComplete() {
Log.i(TAG, "onComplete: ");
}
};
}
版权声明:
作者:Joe.Ye
链接:https://www.appblog.cn/index.php/2023/02/25/rxjava2-learning-2-basic-usage-of-rxjava2/
来源:APP全栈技术分享
文章版权归作者所有,未经允许请勿转载。
共有 0 条评论