RxJava2学习之二:RxJava2 基本使用

RxJava简介

RxJava 是一个在Java虚拟机上实现的响应式扩展库:提供了基于Observable序列实现的异步调用及基于事件编程。它扩展了观察者模式,支持数据、事件序列并允许你合并序列,无需关心底层的线程处理、同步、线程安全、并发数据结构和非阻塞I/O处理。

RxJava 是一个响应式编程框架,采用观察者设计模式,自然包含Observable和Subscriber。

简言之,RxJava 是一个在 Java VM 上使用可观测的序列来组成异步的、基于事件的程序的库。

RxAndroid是RxJava的一个针对Android平台的扩展。它包含了一些能够简化Android开发的工具 AndroidObservable。

  • RxJava

RxJava:https://github.com/ReactiveX/RxJava
RxAndroid:https://github.com/ReactiveX/RxAndroid

  • RxJava2

RxJava2:https://github.com/ReactiveX/RxJava/tree/2.x
RxAndroid2:https://github.com/ReactiveX/RxAndroid/tree/2.x

使用RxJava和RxAndroid

  • RxJava
1
compile "io.reactivex.rxjava2:rxjava:2.x.y"
  • RxAndroid
1
2
implementation 'io.reactivex.rxjava2:rxandroid:2.0.2'
implementation 'io.reactivex.rxjava2:rxjava:2.x.y'

基本使用3部曲

创建Observable

1
2
3
4
5
6
7
8
9
10
11
private Observable<String> getObservable() {
return Observable.create(new ObservableOnSubscribe<String>() {
@Override
public void subscribe(ObservableEmitter<String> e) throws Exception {
e.onNext("工作");
e.onNext("开会");
e.onComplete();
//e.onError(new Throwable);
}
});
}

创建Observer

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
private Observer<String> getObserver() {
return new Observer<String>() {
@Override
public void onSubscribe(Disposable d) {
//d.dispose(); //移除订阅关系
//d.isDisposed(); //判断是否发生订阅关系
Log.i(TAG, "onSubscribe: " + d.toString());
}

@Override
public void onNext(String value) {
Log.i(TAG, "onNext: " + value);
}

@Override
public void onError(Throwable e) {
Log.i(TAG, "onError: " + e.getLocalizedMessage());
}

@Override
public void onComplete() {
Log.i(TAG, "onComplete: ");
}
};
}

订阅

1
2
3
4
5
6
7
8
public void hello(View view) {
//1. 创建Observable
Observable<String> observable = getObservable();
//2. 创建Observer
Observer<String> observer = getObserver();
//3. 订阅
observable.subscribe(observer);
}

简洁写法

Observable创建的简洁写法

使用Observable.just创建Observable

1
2
3
4
//使用Observable.just创建Observable
private Observable<String> getObservableWithJust() {
return Observable.just("工作", "开会", "大保健");
}

使用Observable.fromArray创建Observable

1
2
3
4
//使用Observable.fromArray创建Observable
private Observable<String> getObservableWithFromArray() {
return Observable.fromArray("工作", "开会", "大保健");
}

使用Observable.fromCallable创建Observable

1
2
3
4
5
6
7
8
9
//使用Observable.fromCallable创建Observable
private Observable<String> getObservableWithFromCallable() {
return Observable.fromCallable(new Callable<String>() {
@Override
public String call() throws Exception {
return "大保健";
}
});
}

Observable订阅及Observer创建的简洁写法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
observable.subscribe(new Consumer<String>() {
@Override
public void accept(String s) throws Exception {
Log.i(TAG, "accept: " + s);
}
}, new Consumer<Throwable>() {
@Override
public void accept(Throwable throwable) throws Exception {

}
}, new Action() {
@Override
public void run() throws Exception {

}
});

Disposable

Disposable 相当于RxJava1.x中的Subscription,用于解除订阅

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
private Observer<String> getDisposableObserver() {
return new Observer<String>() {
private Disposable d;

@Override
public void onSubscribe(Disposable d) {
this.d = d;
Log.i(TAG, "onSubscribe: " + d.toString());
}

@Override
public void onNext(String value) {
Log.i(TAG, "onNext: " + value);
if ("开会".equals(value)) {
d.dispose(); //移除订阅关系
}
}

@Override
public void onError(Throwable e) {
Log.i(TAG, "onError: " + e.getLocalizedMessage());
}

@Override
public void onComplete() {
Log.i(TAG, "onComplete: ");
}
};
}

Powered by AppBlog.CN     浙ICP备14037229号

Copyright © 2012 - 2020 APP开发技术博客 All Rights Reserved.

访客数 : | 访问量 :