RxJava 3 完全指南:响应式编程在Android中的精妙艺术
RxJava 3是Android响应式编程的核心工具,通过观察者模式与函数式编程结合,优雅解决异步操作、线程切换与数据流处理难题。其五大基类(Observable、Flowable、Single等)覆盖不同场景,丰富操作符链支持数据过滤、变换、组合与错误处理。Flowable支持背压策略应对数据速率失衡,Schedulers提供灵活线程调度。结合CompositeDisposable防止内存泄漏,使异步代码更简洁、可维护,大幅提升Android开发效率与代码质量。
博主博客
前言
每个Android开发者都对RxJava怀有特殊的情感。它那简洁优雅的线程切换机制、强大的多网络请求合并能力,配合Retrofit使用,无疑是现代Android应用开发的利器。随着技术的演进,RxJava已经迎来了它的第三代版本。
版本演进与现状
- RxJava 3 于2020年发布,对RxJava 2保持了良好的兼容性
- RxJava 2 已于2020年12月31日停止官方支持
- 错误修复会同时在2.x和3.x版本中进行,但新功能只在3.x上添加
- RxAndroid目前尚未发布3.0版本,但RxJava 3本身已可在Android项目中使用
特别说明:虽然RxAndroid 3还未正式发布,但RxJava 3本身已可独立使用。对于Android开发者,可以使用RxJava 3的核心功能,并结合AndroidSchedulers.mainThread()来操作主线程。
第一章:为什么需要RxJava?
1.1 传统异步编程的痛点
在Android开发中,我们经常面临以下挑战:
- 复杂的线程管理和切换
- 回调地狱(Callback Hell)
- 多个异步操作的协调与合并
- 错误处理的分散和重复
回调地狱示例:
// 传统方式:深层嵌套的回调
api.getUser(userId, new Callback<User>() {
@Override
public void onSuccess(User user) {
api.getOrders(user.getId(), new Callback<List<Order>>() {
@Override
public void onSuccess(List<Order> orders) {
api.getAddress(user.getId(), new Callback<Address>() {
@Override
public void onSuccess(Address address) {
// 最终处理所有数据
updateUI(user, orders, address);
}
@Override
public void onError(Throwable error) {
// 错误处理
}
});
}
@Override
public void onError(Throwable error) {
// 错误处理
}
});
}
@Override
public void onError(Throwable error) {
// 错误处理
}
});
1.2 RxJava的解决方案
RxJava通过观察者模式和函数式编程的结合,提供了:
- 声明式的异步操作链
- 统一的错误处理机制
- 强大的操作符集合
- 内存泄漏的预防(通过Disposable)
RxJava解决方案:
// RxJava方式:链式调用,清晰易读
Observable.zip(
api.getUserRx(userId),
api.getOrdersRx(userId),
api.getAddressRx(userId),
(user, orders, address) -> new UserData(user, orders, address)
)
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(
userData -> updateUI(userData),
error -> showError(error)
);
第二章:RxJava 3核心概念
2.1 数据流与观察者模式
在RxJava中,数据以流(Stream)的形式组织和传递:
// 基本的数据流结构
Observable.just("Data1", "Data2", "Data3") // 数据源
.filter(data -> data.contains("2")) // 操作符1:过滤
.map(data -> data.toUpperCase()) // 操作符2:转换
.subscribe( // 订阅并消费数据
data -> System.out.println("收到: " + data),
error -> System.out.println("错误: " + error),
() -> System.out.println("完成")
);
输出结果:
收到: DATA2
完成
重要术语:
- 上流(Upstream):当前操作符之前的步骤
- 下流(Downstream):当前操作符之后的步骤
- 发射(Emission):数据源发出数据的过程
理解示例:
// 对于map操作符来说:
Observable.just(1, 2, 3) // ← 上流
.map(x -> x * 2) // ← 当前操作符
.subscribe(System.out::println) // ← 下流
// 输出:
// 2
// 4
// 6
2.2 数据流中的对象
在RxJava文档中,以下术语都指代在数据流中传递的数据对象:
- Emission - 发射行为
- Emits - 发射的数据
- Item - 数据项
- Event - 事件
- Signal - 信号
- Data - 数据
- Message - 消息
2.3 背压(Backpressure)机制
问题背景
当异步步骤执行速度不一致时(上流发射太快,下流处理太慢),会导致:
- 内存溢出(缓存过多数据)
- 数据丢失(丢弃来不及处理的数据)
问题示例:
// 生产者和消费者速度不匹配
Observable.interval(1, TimeUnit.MILLISECONDS) // 生产者:每1ms生产一个
.observeOn(Schedulers.computation()) // 切换到计算线程
.subscribe(item -> {
Thread.sleep(100); // 消费者:每100ms消费一个
System.out.println("消费: " + item);
});
// 问题:生产速度(1个/ms) > 消费速度(1个/100ms)
// 结果:内存迅速增长,可能导致OOM
RxJava的解决方案
- 支持背压:
Flowable类,适用于可能产生大量数据的场景 - 不支持背压:
Observable、Single、Maybe、Completable类,适用于数据量可控的场景
背压解决方案示例:
// 使用Flowable处理背压
Flowable.interval(1, TimeUnit.MILLISECONDS)
.onBackpressureBuffer(1000) // 设置缓冲区大小为1000
.observeOn(Schedulers.computation())
.subscribe(
item -> {
Thread.sleep(100);
System.out.println("消费: " + item);
},
error -> {
// 处理背压错误
if (error instanceof MissingBackpressureException) {
System.out.println("背压异常: 数据产生太快!");
}
}
);
2.4 线程调度器(Schedulers)
RxJava通过调度器实现优雅的线程切换:
| 调度器 | 适用场景 | Android特有 |
|---|---|---|
Schedulers.computation() |
CPU密集型计算 | 无 |
Schedulers.io() |
I/O操作、网络请求 | 无 |
Schedulers.single() |
需要单一线程顺序执行 | 无 |
Schedulers.trampoline() |
在当前线程排队执行 | 无 |
AndroidSchedulers.mainThread() |
Android主线程操作 | 有 |
// 线程切换示例
Observable.just("A", "B", "C")
.subscribeOn(Schedulers.io()) // 数据产生在IO线程
.map(item -> {
System.out.println("map在: " + Thread.currentThread().getName());
return item.toLowerCase();
})
.observeOn(Schedulers.computation()) // 切换到计算线程
.filter(item -> !item.equals("b"))
.observeOn(AndroidSchedulers.mainThread()) // 切换到主线程
.subscribe(item -> {
System.out.println("消费在: " + Thread.currentThread().getName());
System.out.println("收到: " + item);
});
// 可能的输出:
// map在: RxCachedThreadScheduler-1 (IO线程)
// map在: RxCachedThreadScheduler-1
// map在: RxCachedThreadScheduler-1
// 消费在: main (主线程)
// 收到: a
// 消费在: main
// 收到: c
2.5 RxJava的五大基类
| 类型 | 说明 | 背压支持 | 适用场景 |
|---|---|---|---|
Flowable |
发射0到N个数据 | ✅ 支持 | 大数据流、背压敏感场景 |
Observable |
发射0到N个数据 | ❌ 不支持 | 小数据流、UI事件 |
Single |
发射单个数据或错误 | ❌ 不支持 | 网络请求、数据库查询 |
Completable |
不发射数据,只通知完成或错误 | ❌ 不支持 | 只关心操作是否完成 |
Maybe |
发射0或1个数据,或错误 | ❌ 不支持 | 可能返回null的查询 |
五大基类使用示例:
// 1. Observable - 多个数据
Observable<String> observable = Observable.just("A", "B", "C");
observable.subscribe(System.out::println);
// 输出: A B C
// 2. Single - 单个数据
Single<String> single = Single.just("成功");
single.subscribe(
data -> System.out.println("成功: " + data),
error -> System.out.println("失败: " + error)
);
// 输出: 成功: 成功
// 3. Maybe - 0或1个数据
Maybe<String> maybe = Maybe.empty(); // 空数据
maybe.subscribe(
data -> System.out.println("数据: " + data),
error -> System.out.println("错误"),
() -> System.out.println("完成(无数据)")
);
// 输出: 完成(无数据)
// 4. Completable - 只关心完成
Completable completable = Completable.fromAction(() -> {
System.out.println("执行操作");
// 执行某些操作,不返回数据
});
completable.subscribe(
() -> System.out.println("操作完成"),
error -> System.out.println("操作失败: " + error)
);
// 输出: 执行操作
// 输出: 操作完成
// 5. Flowable - 支持背压的数据流
Flowable<Integer> flowable = Flowable.range(1, 1000);
flowable.subscribe(
data -> System.out.println("数据: " + data),
error -> System.out.println("错误: " + error),
() -> System.out.println("完成"),
subscription -> {
// 背压控制
subscription.request(100); // 初始请求100个
}
);
第三章:环境搭建与基础使用
3.1 添加依赖
// build.gradle (Module: app)
dependencies {
// RxJava 3
implementation 'io.reactivex.rxjava3:rxjava:3.1.6'
// RxAndroid (Android特定绑定)
implementation 'io.reactivex.rxjava3:rxandroid:3.0.2'
// 如果需要与Retrofit配合使用
implementation 'com.squareup.retrofit2:adapter-rxjava3:2.9.0'
// 如果需要RxBinding (View绑定)
implementation 'com.jakewharton.rxbinding4:rxbinding:4.0.0'
}
3.2 创建第一个RxJava程序
// 最简单的RxJava示例
public class FirstRxJavaExample {
public static void main(String[] args) {
System.out.println("=== 开始RxJava示例 ===");
// 创建Observable
Observable<String> observable = Observable.create(emitter -> {
System.out.println("Observable: 开始发射数据");
emitter.onNext("Hello");
emitter.onNext("RxJava");
emitter.onNext("3!");
emitter.onComplete();
System.out.println("Observable: 数据发射完成");
});
// 订阅并消费数据
observable.subscribe(
// onNext回调
item -> {
System.out.println("Observer: 收到数据 - " + item);
},
// onError回调
error -> {
System.out.println("Observer: 发生错误 - " + error.getMessage());
},
// onComplete回调
() -> {
System.out.println("Observer: 所有数据接收完成");
}
);
System.out.println("=== 示例结束 ===");
}
}
输出结果:
=== 开始RxJava示例 ===
Observable: 开始发射数据
Observer: 收到数据 - Hello
Observer: 收到数据 - RxJava
Observer: 收到数据 - 3!
Observable: 数据发射完成
Observer: 所有数据接收完成
=== 示例结束 ===
3.3 链式调用示例
// 更实用的链式调用示例
Observable.fromArray("apple", "banana", "cherry", "date", "elderberry")
.filter(fruit -> fruit.length() > 5) // 过滤长度大于5的水果
.map(String::toUpperCase) // 转换为大写
.sorted() // 排序
.subscribe(
fruit -> System.out.println("水果: " + fruit),
error -> System.out.println("错误: " + error),
() -> System.out.println("所有水果处理完成")
);
输出结果:
水果: BANANA
水果: CHERRY
水果: ELDERBERRY
所有水果处理完成
第四章:核心操作符详解
4.1 实用操作符
4.1.1 observeOn 与 subscribeOn
System.out.println("主线程: " + Thread.currentThread().getName());
Observable.create(emitter -> {
// 模拟耗时操作
System.out.println("create线程: " + Thread.currentThread().getName());
Thread.sleep(1000);
emitter.onNext("数据1");
emitter.onNext("数据2");
emitter.onComplete();
})
.subscribeOn(Schedulers.io()) // 指定create在IO线程执行
.doOnSubscribe(disposable ->
System.out.println("订阅线程: " + Thread.currentThread().getName()))
.observeOn(Schedulers.computation()) // 后续操作在计算线程
.map(data -> {
System.out.println("map线程: " + Thread.currentThread().getName());
return data + "_处理";
})
.observeOn(AndroidSchedulers.mainThread()) // 最终在主线程
.subscribe(data -> {
System.out.println("消费线程: " + Thread.currentThread().getName());
System.out.println("收到: " + data);
});
// 输出可能(线程名称可能不同):
// 主线程: main
// 订阅线程: main
// create线程: RxCachedThreadScheduler-1
// map线程: RxComputationThreadPool-1
// 消费线程: main
// 收到: 数据1_处理
// map线程: RxComputationThreadPool-1
// 消费线程: main
// 收到: 数据2_处理
重要说明:
subscribeOn():指定Observable创建和发射数据的线程,多次调用只有第一次有效observeOn():指定下游操作执行的线程,每次调用都会影响后续操作- 如果没有指定线程,默认在当前线程执行
4.1.2 生命周期回调操作符
Observable.just("A", "B", "C")
.doOnSubscribe(disposable ->
System.out.println("1. doOnSubscribe: 开始订阅"))
.doOnNext(item ->
System.out.println("2. doOnNext: 将要处理 - " + item))
.doAfterNext(item ->
System.out.println("3. doAfterNext: 处理完成 - " + item))
.doOnError(error ->
System.out.println("4. doOnError: 发生错误 - " + error))
.doOnComplete(() ->
System.out.println("5. doOnComplete: 即将完成"))
.doAfterTerminate(() ->
System.out.println("6. doAfterTerminate: 已经终止"))
.doFinally(() ->
System.out.println("7. doFinally: 最终执行"))
.doOnDispose(() ->
System.out.println("8. doOnDispose: 被取消订阅"))
.subscribe(
item -> System.out.println("9. onNext: 消费 - " + item),
error -> System.out.println("10. onError: 错误"),
() -> System.out.println("11. onComplete: 完成")
);
// 正常情况输出:
// 1. doOnSubscribe: 开始订阅
// 2. doOnNext: 将要处理 - A
// 9. onNext: 消费 - A
// 3. doAfterNext: 处理完成 - A
// 2. doOnNext: 将要处理 - B
// 9. onNext: 消费 - B
// 3. doAfterNext: 处理完成 - B
// 2. doOnNext: 将要处理 - C
// 9. onNext: 消费 - C
// 3. doAfterNext: 处理完成 - C
// 5. doOnComplete: 即将完成
// 11. onComplete: 完成
// 6. doAfterTerminate: 已经终止
// 7. doFinally: 最终执行
说明:这些doOnXXX操作符非常有用,可以用于日志记录、状态监控、资源清理等。
4.2 过滤操作符
4.2.1 skip - 跳过元素
System.out.println("=== skip操作符示例 ===");
Observable<Integer> source = Observable.just(1, 2, 3, 4, 5, 6, 7, 8, 9, 10);
System.out.print("跳过前4个: ");
source.skip(4).subscribe(item -> System.out.print(item + " "));
System.out.println();
System.out.print("跳过最后4个: ");
source.skipLast(4).subscribe(item -> System.out.print(item + " "));
System.out.println();
System.out.print("跳过直到大于5: ");
source.skipWhile(x -> x <= 5).subscribe(item -> System.out.print(item + " "));
System.out.println();
// 输出:
// === skip操作符示例 ===
// 跳过前4个: 5 6 7 8 9 10
// 跳过最后4个: 1 2 3 4 5 6
// 跳过直到大于5: 6 7 8 9 10
4.2.2 debounce - 防抖动
System.out.println("=== debounce操作符示例 ===");
Observable.create(emitter -> {
// 模拟用户快速输入
emitter.onNext("r");
Thread.sleep(100);
emitter.onNext("rx");
Thread.sleep(200);
emitter.onNext("rxj");
Thread.sleep(150);
emitter.onNext("rxja");
Thread.sleep(50);
emitter.onNext("rxjav");
Thread.sleep(80);
emitter.onNext("rxjava");
emitter.onComplete();
})
.debounce(200, TimeUnit.MILLISECONDS) // 200ms内只取最后一次
.subscribe(
query -> System.out.println("搜索: " + query),
error -> System.out.println("错误: " + error),
() -> System.out.println("搜索完成")
);
// 输出:
// === debounce操作符示例 ===
// 搜索: rxjava
// 搜索完成
// 解释:
// 用户快速输入 r -> rx -> rxj -> rxja -> rxjav -> rxjava
// 每次输入间隔都小于200ms,所以只有最后一次"rxjava"被处理
4.2.3 distinct - 去重
System.out.println("=== distinct操作符示例 ===");
Observable.just(2, 3, 4, 4, 2, 1, 3, 5)
.distinct()
.subscribe(item -> System.out.print(item + " "));
System.out.println();
Observable.just(1, 1, 2, 1, 2, 3, 3, 4)
.distinctUntilChanged() // 只去除相邻重复
.subscribe(item -> System.out.print(item + " "));
System.out.println();
// 自定义去重规则
Observable.just("apple", "banana", "apricot", "blueberry")
.distinct(fruit -> fruit.charAt(0)) // 根据首字母去重
.subscribe(item -> System.out.print(item + " "));
System.out.println();
// 输出:
// === distinct操作符示例 ===
// 2 3 4 1 5
// 1 2 1 2 3 4
// apple banana
4.2.4 filter - 条件过滤
System.out.println("=== filter操作符示例 ===");
// 基础过滤
Observable.range(1, 10)
.filter(x -> x % 2 == 0)
.subscribe(x -> System.out.print(x + " "));
System.out.println();
// 复杂过滤条件
Observable.just("Apple", "Banana", "Cherry", "Date", "Elderberry")
.filter(fruit -> fruit.length() > 5 && fruit.contains("e"))
.subscribe(fruit -> System.out.print(fruit + " "));
System.out.println();
// 结合其他操作符
Observable.just(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)
.filter(x -> x > 3)
.filter(x -> x < 8)
.map(x -> x * 10)
.subscribe(x -> System.out.print(x + " "));
System.out.println();
// 输出:
// === filter操作符示例 ===
// 2 4 6 8 10
// Elderberry
// 40 50 60 70
4.2.5 take - 取前N个
System.out.println("=== take操作符示例 ===");
Observable.interval(500, TimeUnit.MILLISECONDS)
.take(5) // 只取前5个
.subscribe(
x -> System.out.println("收到: " + x),
error -> System.out.println("错误: " + error),
() -> System.out.println("完成(取前5个)")
);
// 等待3秒让程序执行
Thread.sleep(3000);
System.out.println("=== takeLast示例 ===");
Observable.range(1, 10)
.takeLast(3)
.subscribe(x -> System.out.print(x + " "));
System.out.println();
System.out.println("=== takeWhile示例 ===");
Observable.just(2, 4, 6, 8, 10, 11, 12)
.takeWhile(x -> x % 2 == 0) // 直到遇到奇数停止
.subscribe(x -> System.out.print(x + " "));
System.out.println();
// 输出:
// === take操作符示例 ===
// 收到: 0
// 收到: 1
// 收到: 2
// 收到: 3
// 收到: 4
// 完成(取前5个)
// === takeLast示例 ===
// 8 9 10
// === takeWhile示例 ===
// 2 4 6 8 10
4.3 变换操作符
4.3.1 map - 一对一转换
System.out.println("=== map操作符示例 ===");
Observable.just("apple", "banana", "cherry")
.map(String::toUpperCase)
.map(fruit -> fruit + "!")
.subscribe(fruit -> System.out.println("水果: " + fruit));
// 输出:
// === map操作符示例 ===
// 水果: APPLE!
// 水果: BANANA!
// 水果: CHERRY!
// 另一个示例:数据转换
Observable.just(1, 2, 3, 4, 5)
.map(x -> x * x) // 平方
.map(x -> "数值: " + x)
.subscribe(System.out::println);
// 输出:
// 数值: 1
// 数值: 4
// 数值: 9
// 数值: 16
// 数值: 25
4.3.2 flatMap - 一对多转换(无序)
System.out.println("=== flatMap操作符示例(无序) ===");
Observable.just("A", "B", "C")
.flatMap(letter ->
Observable.intervalRange(1, 3, 0, 100, TimeUnit.MILLISECONDS)
.map(number -> "(" + letter + ", " + number + ")"))
.subscribe(item -> System.out.println(item));
// 等待执行完成
Thread.sleep(500);
// 可能的输出(顺序不确定):
// === flatMap操作符示例(无序) ===
// (A, 1)
// (B, 1)
// (C, 1)
// (A, 2)
// (B, 2)
// (C, 2)
// (A, 3)
// (B, 3)
// (C, 3)
4.3.3 concatMap - 一对多转换(有序)
System.out.println("=== concatMap操作符示例(有序) ===");
Observable.just("A", "B", "C")
.concatMap(letter ->
Observable.intervalRange(1, 3, 0, 100, TimeUnit.MILLISECONDS)
.map(number -> "(" + letter + ", " + number + ")"))
.subscribe(item -> System.out.println(item));
// 等待执行完成
Thread.sleep(500);
// 输出(保证顺序):
// === concatMap操作符示例(有序) ===
// (A, 1)
// (A, 2)
// (A, 3)
// (B, 1)
// (B, 2)
// (B, 3)
// (C, 1)
// (C, 2)
// (C, 3)
重要区别:
flatMap:顺序不保证,适合并行处理concatMap:顺序保证,适合需要顺序执行的场景
4.3.4 buffer - 缓冲分组
System.out.println("=== buffer操作符示例 ===");
// 按数量缓冲
System.out.print("按4个一组缓冲: ");
Observable.range(1, 10)
.buffer(4)
.subscribe(list -> System.out.print(list + " "));
System.out.println();
// 按时间窗口缓冲
System.out.println("按时间窗口缓冲:");
Observable.interval(250, TimeUnit.MILLISECONDS)
.buffer(1, TimeUnit.SECONDS) // 每1秒缓冲一次
.take(3) // 取前3个缓冲
.subscribe(
list -> System.out.println("收到 " + list.size() + " 个数据: " + list),
error -> System.out.println("错误: " + error),
() -> System.out.println("缓冲完成")
);
// 等待执行
Thread.sleep(3500);
// 输出:
// === buffer操作符示例 ===
// 按4个一组缓冲: [1, 2, 3, 4] [5, 6, 7, 8] [9, 10]
// 按时间窗口缓冲:
// 收到 4 个数据: [0, 1, 2, 3]
// 收到 4 个数据: [4, 5, 6, 7]
// 收到 4 个数据: [8, 9, 10, 11]
// 缓冲完成
4.3.5 scan - 累积计算
System.out.println("=== scan操作符示例 ===");
// 累积求和
Observable.just(5, 3, 8, 1, 7)
.scan(0, (partialSum, x) -> partialSum + x)
.subscribe(sum -> System.out.print(sum + " "));
System.out.println();
// 累积求积
Observable.just(1, 2, 3, 4, 5)
.scan(1, (product, x) -> product * x)
.subscribe(product -> System.out.print(product + " "));
System.out.println();
// 字符串连接
Observable.just("Hello", " ", "World", "!")
.scan("", (str, word) -> str + word)
.subscribe(result -> System.out.println("中间结果: " + result));
// 输出:
// === scan操作符示例 ===
// 0 5 8 16 17 24
// 1 1 2 6 24 120
// 中间结果: Hello
// 中间结果: Hello
// 中间结果: Hello World
// 中间结果: Hello World!
4.4 组合操作符
4.4.1 merge - 合并多个Observable
System.out.println("=== merge操作符示例 ===");
Observable<String> names = Observable.just("Hello", "World");
Observable<String> otherNames = Observable.just("Git", "Code", "8");
Observable.merge(names, otherNames)
.subscribe(name -> System.out.println("收到: " + name));
// 或者使用mergeWith
names.mergeWith(otherNames)
.subscribe(name -> System.out.println("收到: " + name));
// 输出:
// === merge操作符示例 ===
// 收到: Hello
// 收到: World
// 收到: Git
// 收到: Code
// 收到: 8
// 收到: Hello
// 收到: World
// 收到: Git
// 收到: Code
// 收到: 8
// mergeDelayError示例:延迟错误处理
System.out.println("=== mergeDelayError示例 ===");
Observable<String> source1 = Observable.just("A", "B");
Observable<String> source2 = Observable.error(new RuntimeException("错误!"));
Observable<String> source3 = Observable.just("C", "D");
Observable.mergeDelayError(source1, source2, source3)
.subscribe(
item -> System.out.println("收到: " + item),
error -> System.out.println("错误: " + error.getMessage())
);
// 输出:
// === mergeDelayError示例 ===
// 收到: A
// 收到: B
// 收到: C
// 收到: D
// 错误: 错误!
4.4.2 zip - 一对一组合
System.out.println("=== zip操作符示例 ===");
Observable<String> colors = Observable.just("Red", "Green", "Blue");
Observable<String> animals = Observable.just("Fox", "Elephant", "Whale");
Observable<Integer> numbers = Observable.just(1, 2, 3);
Observable.zip(colors, animals, numbers,
(color, animal, number) ->
number + ". " + color + " " + animal)
.subscribe(item -> System.out.println(item));
// 输出:
// === zip操作符示例 ===
// 1. Red Fox
// 2. Green Elephant
// 3. Blue Whale
// 注意:如果Observable长度不同,以最短的为准
Observable<String> shortObs = Observable.just("A", "B");
Observable<String> longObs = Observable.just("1", "2", "3", "4");
Observable.zip(shortObs, longObs,
(a, b) -> a + b)
.subscribe(item -> System.out.print(item + " "));
System.out.println();
// 输出:
// A1 B2
4.4.3 combineLatest - 最新值组合
System.out.println("=== combineLatest操作符示例 ===");
PublishSubject<String> firstName = PublishSubject.create();
PublishSubject<String> lastName = PublishSubject.create();
Observable.combineLatest(firstName, lastName,
(first, last) -> first + " " + last)
.subscribe(fullName -> System.out.println("全名: " + fullName));
// 发射数据
firstName.onNext("John");
lastName.onNext("Doe");
firstName.onNext("Jane");
lastName.onNext("Smith");
// 输出:
// === combineLatest操作符示例 ===
// 全名: John Doe
// 全名: Jane Doe
// 全名: Jane Smith
// 实际应用:表单验证
System.out.println("=== 表单验证示例 ===");
PublishSubject<Boolean> usernameValid = PublishSubject.create();
PublishSubject<Boolean> passwordValid = PublishSubject.create();
PublishSubject<Boolean> emailValid = PublishSubject.create();
Observable.combineLatest(usernameValid, passwordValid, emailValid,
(userOk, passOk, emailOk) -> userOk && passOk && emailOk)
.subscribe(isValid ->
System.out.println("表单是否有效: " + isValid));
// 模拟用户输入
usernameValid.onNext(true); // 用户名有效
passwordValid.onNext(false); // 密码无效
emailValid.onNext(true); // 邮箱有效
passwordValid.onNext(true); // 密码变为有效
// 输出:
// === 表单验证示例 ===
// 表单是否有效: false (true && false && true)
// 表单是否有效: true (true && true && true)
4.5 错误处理操作符
4.5.1 onErrorReturn - 错误时返回默认值
System.out.println("=== onErrorReturn操作符示例 ===");
Observable.create(emitter -> {
emitter.onNext("数据1");
emitter.onNext("数据2");
emitter.onError(new RuntimeException("模拟错误"));
emitter.onNext("数据3"); // 不会发射
})
.onErrorReturn(error -> {
System.out.println("发生错误,返回默认值");
return "默认值";
})
.subscribe(
item -> System.out.println("收到: " + item),
error -> System.out.println("错误处理: " + error), // 不会执行
() -> System.out.println("完成")
);
// 输出:
// === onErrorReturn操作符示例 ===
// 收到: 数据1
// 收到: 数据2
// 发生错误,返回默认值
// 收到: 默认值
// 完成
// onErrorReturnItem简化版
Observable.error(new RuntimeException("错误"))
.onErrorReturnItem("默认项")
.subscribe(
item -> System.out.println("收到: " + item),
error -> System.out.println("不会执行错误处理")
);
// 输出:
// 收到: 默认项
4.5.2 onErrorResumeNext - 错误时切换到备用Observable
System.out.println("=== onErrorResumeNext操作符示例 ===");
Observable.create(emitter -> {
emitter.onNext("主数据1");
emitter.onNext("主数据2");
emitter.onError(new IOException("网络错误"));
})
.onErrorResumeNext(throwable -> {
if (throwable instanceof IOException) {
System.out.println("网络错误,切换到本地缓存");
return Observable.just("缓存数据1", "缓存数据2");
}
return Observable.error(throwable);
})
.subscribe(
item -> System.out.println("收到: " + item),
error -> System.out.println("错误: " + error.getMessage()),
() -> System.out.println("完成")
);
// 输出:
// === onErrorResumeNext操作符示例 ===
// 收到: 主数据1
// 收到: 主数据2
// 网络错误,切换到本地缓存
// 收到: 缓存数据1
// 收到: 缓存数据2
// 完成
4.5.3 retry - 重试机制
System.out.println("=== retry操作符示例 ===");
int attemptCount = 0;
Observable.create(emitter -> {
attemptCount++;
System.out.println("第 " + attemptCount + " 次尝试");
if (attemptCount < 3) {
emitter.onError(new IOException("网络连接失败"));
} else {
emitter.onNext("数据获取成功");
emitter.onComplete();
}
})
.retry(5) // 最多重试5次
.subscribe(
data -> System.out.println("成功: " + data),
error -> System.out.println("最终失败: " + error.getMessage()),
() -> System.out.println("操作完成")
);
// 输出:
// === retry操作符示例 ===
// 第 1 次尝试
// 第 2 次尝试
// 第 3 次尝试
// 成功: 数据获取成功
// 操作完成
// retryWhen示例:条件重试
System.out.println("=== retryWhen操作符示例 ===");
Observable.create(emitter -> {
System.out.println("尝试连接服务器...");
emitter.onError(new IOException("连接失败"));
})
.retryWhen(errors ->
errors.zipWith(Observable.range(1, 3), (error, retryCount) -> {
if (retryCount > 3) {
throw new RuntimeException("超过最大重试次数");
}
return retryCount;
})
.flatMap(retryCount -> {
System.out.println("第 " + retryCount + " 次重试,等待 " + retryCount + " 秒");
return Observable.timer(retryCount, TimeUnit.SECONDS);
})
)
.subscribe(
item -> System.out.println("成功"),
error -> System.out.println("最终失败: " + error.getMessage())
);
// 等待执行
Thread.sleep(10000);
// 输出:
// === retryWhen操作符示例 ===
// 尝试连接服务器...
// 第 1 次重试,等待 1 秒
// 尝试连接服务器...
// 第 2 次重试,等待 2 秒
// 尝试连接服务器...
// 第 3 次重试,等待 3 秒
// 尝试连接服务器...
// 最终失败: 超过最大重试次数
第五章:背压策略与Flowable
5.1 背压问题场景
System.out.println("=== 背压问题演示 ===");
// 创建快速生产数据的Observable
Observable<Long> fastProducer = Observable.interval(1, TimeUnit.MILLISECONDS);
// 慢速消费者
Disposable disposable = fastProducer
.observeOn(Schedulers.computation())
.subscribe(
item -> {
// 模拟慢速处理
Thread.sleep(10);
System.out.println("处理: " + item);
},
error -> {
if (error instanceof MissingBackpressureException) {
System.out.println("背压异常: " + error.getMessage());
}
}
);
// 运行一段时间后停止
Thread.sleep(2000);
disposable.dispose();
System.out.println("停止订阅");
// 可能输出(具体数字可能不同):
// === 背压问题演示 ===
// 处理: 0
// 处理: 1
// 处理: 2
// ...(可能很快出现MissingBackpressureException)
5.2 Flowable的背压策略
System.out.println("=== Flowable背压策略 ===");
// 1. BUFFER策略 - 缓存所有来不及处理的数据
System.out.println("1. BUFFER策略:");
Flowable.interval(1, TimeUnit.MILLISECONDS)
.onBackpressureBuffer(1000) // 缓冲区大小1000
.observeOn(Schedulers.computation())
.take(10) // 只取10个避免无限执行
.subscribe(
item -> {
Thread.sleep(10);
System.out.println("BUFFER处理: " + item);
},
error -> System.out.println("BUFFER错误: " + error)
);
Thread.sleep(150);
System.out.println();
// 2. DROP策略 - 丢弃来不及处理的数据
System.out.println("2. DROP策略:");
Flowable.interval(1, TimeUnit.MILLISECONDS)
.onBackpressureDrop(dropped ->
System.out.println("丢弃: " + dropped))
.observeOn(Schedulers.computation())
.take(10)
.subscribe(
item -> {
Thread.sleep(10);
System.out.println("DROP处理: " + item);
}
);
Thread.sleep(150);
System.out.println();
// 3. LATEST策略 - 只保留最新的数据
System.out.println("3. LATEST策略:");
Flowable.interval(1, TimeUnit.MILLISECONDS)
.onBackpressureLatest()
.observeOn(Schedulers.computation())
.take(10)
.subscribe(
item -> {
Thread.sleep(10);
System.out.println("LATEST处理: " + item);
}
);
Thread.sleep(150);
System.out.println();
// 4. ERROR策略 - 抛出MissingBackpressureException
System.out.println("4. ERROR策略:");
Flowable.interval(1, TimeUnit.MILLISECONDS)
.observeOn(Schedulers.computation()) // 没有背压策略
.take(10)
.subscribe(
item -> {
Thread.sleep(10);
System.out.println("ERROR处理: " + item);
},
error -> {
if (error instanceof MissingBackpressureException) {
System.out.println("背压错误: " + error.getMessage());
}
}
);
Thread.sleep(150);
// 输出(具体数字和顺序可能不同):
// === Flowable背压策略 ===
// 1. BUFFER策略:
// BUFFER处理: 0
// BUFFER处理: 1
// BUFFER处理: 2
// ...
//
// 2. DROP策略:
// DROP处理: 0
// 丢弃: 1
// 丢弃: 2
// 丢弃: 3
// DROP处理: 4
// ...
//
// 3. LATEST策略:
// LATEST处理: 0
// LATEST处理: 24
// LATEST处理: 49
// ...
//
// 4. ERROR策略:
// ERROR处理: 0
// 背压错误: Could not emit value due to lack of requests
5.3 背压策略选择指南
| 策略 | 适用场景 | 优点 | 缺点 |
|---|---|---|---|
| BUFFER | 数据重要,不能丢失 | 不丢失数据 | 可能内存溢出 |
| DROP | 实时数据,可以丢弃旧数据 | 内存友好 | 丢失数据 |
| LATEST | 只需要最新数据 | 内存友好,保留最新数据 | 丢失中间数据 |
| ERROR | 需要及时发现问题 | 快速失败 | 用户体验差 |
// 实际应用示例:实时传感器数据
Flowable<SensorData> sensorFlowable = Flowable.create(emitter -> {
// 模拟传感器每秒产生1000个数据
for (int i = 0; i < 1000; i++) {
emitter.onNext(new SensorData(i));
}
emitter.onComplete();
}, BackpressureStrategy.BUFFER); // 直接在创建时指定策略
// 或者使用已有Observable转换
Observable<SensorData> sensorObservable = Observable.create(emitter -> {
for (int i = 0; i < 1000; i++) {
emitter.onNext(new SensorData(i));
}
emitter.onComplete();
});
Flowable<SensorData> sensorFlowable = sensorObservable
.toFlowable(BackpressureStrategy.LATEST);
第六章:Android实战应用
6.1 网络请求组合
// 多个网络请求顺序执行
public class NetworkExample {
// 模拟API接口
interface ApiService {
Observable<User> getUser(String userId);
Observable<List<Order>> getOrders(String userId);
Observable<Address> getAddress(String userId);
}
public void fetchUserData(String userId) {
System.out.println("开始获取用户数据...");
Observable<User> userObservable = api.getUser(userId);
Observable<List<Order>> ordersObservable = api.getOrders(userId);
// flatMap顺序执行
userObservable
.doOnNext(user -> System.out.println("获取到用户: " + user.getName()))
.flatMap(user ->
ordersObservable
.doOnNext(orders ->
System.out.println("获取到订单: " + orders.size() + " 个"))
.map(orders -> {
user.setOrders(orders);
return user;
})
)
.flatMap(user ->
api.getAddress(userId)
.doOnNext(address ->
System.out.println("获取到地址: " + address.getCity()))
.map(address -> {
user.setAddress(address);
return user;
})
)
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(
user -> {
System.out.println("数据获取完成");
updateUI(user);
},
error -> {
System.out.println("数据获取失败: " + error.getMessage());
showError(error);
}
);
}
private void updateUI(User user) {
// 更新UI
System.out.println("更新UI: " + user);
}
private void showError(Throwable error) {
// 显示错误
System.out.println("显示错误: " + error.getMessage());
}
}
// 模拟执行输出:
// 开始获取用户数据...
// 获取到用户: John Doe
// 获取到订单: 5 个
// 获取到地址: New York
// 数据获取完成
// 更新UI: User{name='John Doe', orders=5, address='New York'}
6.2 并行请求与结果合并
public class ParallelRequestsExample {
public void fetchAllData(String userId) {
System.out.println("开始并行获取数据...");
Observable<UserInfo> userInfoObs = api.getUserInfo(userId)
.subscribeOn(Schedulers.io())
.doOnNext(info -> System.out.println("用户信息获取完成"));
Observable<UserProfile> profileObs = api.getUserProfile(userId)
.subscribeOn(Schedulers.io())
.doOnNext(profile -> System.out.println("用户资料获取完成"));
Observable<List<Friend>> friendsObs = api.getFriends(userId)
.subscribeOn(Schedulers.io())
.doOnNext(friends ->
System.out.println("好友列表获取完成: " + friends.size() + " 个好友"));
// 使用zip并行执行,等待所有请求完成
Observable.zip(userInfoObs, profileObs, friendsObs,
(userInfo, profile, friends) -> {
UserData data = new UserData();
data.setUserInfo(userInfo);
data.setProfile(profile);
data.setFriends(friends);
return data;
})
.observeOn(AndroidSchedulers.mainThread())
.subscribe(
userData -> {
System.out.println("所有数据获取完成");
displayUserData(userData);
},
error -> {
System.out.println("数据获取失败: " + error.getMessage());
handleError(error);
}
);
}
// 或者使用combineLatest(不等待全部完成)
public void fetchDataIncrementally(String userId) {
Observable.combineLatest(
api.getUserInfo(userId),
api.getUserProfile(userId),
api.getFriends(userId),
(userInfo, profile, friends) ->
new UserData(userInfo, profile, friends))
.subscribe(userData -> {
System.out.println("有数据更新");
// 每次任一数据源更新都会触发
});
}
}
// 模拟执行输出:
// 开始并行获取数据...
// 用户信息获取完成
// 用户资料获取完成
// 好友列表获取完成: 25 个好友
// 所有数据获取完成
// 显示用户数据...
6.3 表单验证
public class FormValidationExample {
private EditText etUsername;
private EditText etPassword;
private EditText etEmail;
private Button btnSubmit;
private CompositeDisposable disposables = new CompositeDisposable();
public void setupFormValidation() {
System.out.println("设置表单验证...");
// 使用RxBinding监听EditText变化
Observable<Boolean> usernameValid =
RxTextView.textChanges(etUsername)
.skip(1) // 跳过初始值
.map(CharSequence::toString)
.map(text -> text.length() >= 3)
.distinctUntilChanged()
.doOnNext(valid ->
System.out.println("用户名验证: " + (valid ? "有效" : "无效")));
Observable<Boolean> passwordValid =
RxTextView.textChanges(etPassword)
.skip(1)
.map(CharSequence::toString)
.map(text -> text.length() >= 6)
.distinctUntilChanged()
.doOnNext(valid ->
System.out.println("密码验证: " + (valid ? "有效" : "无效")));
Observable<Boolean> emailValid =
RxTextView.textChanges(etEmail)
.skip(1)
.map(CharSequence::toString)
.map(text -> Patterns.EMAIL_ADDRESS.matcher(text).matches())
.distinctUntilChanged()
.doOnNext(valid ->
System.out.println("邮箱验证: " + (valid ? "有效" : "无效")));
// 合并所有验证结果
disposables.add(
Observable.combineLatest(usernameValid, passwordValid, emailValid,
(usernameOk, passwordOk, emailOk) ->
usernameOk && passwordOk && emailOk)
.distinctUntilChanged()
.subscribe(isValid -> {
System.out.println("表单整体验证: " +
(isValid ? "有效,启用按钮" : "无效,禁用按钮"));
btnSubmit.setEnabled(isValid);
btnSubmit.setAlpha(isValid ? 1.0f : 0.5f);
})
);
// 监听提交按钮点击
disposables.add(
RxView.clicks(btnSubmit)
.throttleFirst(1, TimeUnit.SECONDS) // 防止重复点击
.subscribe(event -> {
System.out.println("提交表单...");
submitForm();
})
);
}
private void submitForm() {
String username = etUsername.getText().toString();
String password = etPassword.getText().toString();
String email = etEmail.getText().toString();
// 发送注册请求
api.register(username, password, email)
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.doOnSubscribe(disposable ->
System.out.println("开始注册..."))
.subscribe(
response -> {
System.out.println("注册成功: " + response.getMessage());
showSuccess();
},
error -> {
System.out.println("注册失败: " + error.getMessage());
showError(error.getMessage());
}
);
}
@Override
protected void onDestroy() {
super.onDestroy();
disposables.clear(); // 防止内存泄漏
}
}
// 模拟用户操作输出:
// 设置表单验证...
// 用户输入: "jo" (用户名)
// 用户名验证: 无效
// 表单整体验证: 无效,禁用按钮
// 用户输入: "joh" (用户名)
// 用户名验证: 有效
// 用户输入: "12345" (密码)
// 密码验证: 无效
// 表单整体验证: 无效,禁用按钮
// 用户输入: "123456" (密码)
// 密码验证: 有效
// 用户输入: "test@" (邮箱)
// 邮箱验证: 无效
// 表单整体验证: 无效,禁用按钮
// 用户输入: "[email protected]" (邮箱)
// 邮箱验证: 有效
// 表单整体验证: 有效,启用按钮
// 点击提交按钮
// 开始注册...
// 注册成功: 注册成功
6.4 防止内存泄漏
public class LeakFreeActivity extends AppCompatActivity {
private CompositeDisposable disposables = new CompositeDisposable();
private PublishSubject<String> searchSubject = PublishSubject.create();
@Override
protected void onCreate(Bundle savedInstanceState) {
super.onCreate(savedInstanceState);
setupSearch();
loadData();
}
private void setupSearch() {
System.out.println("设置搜索功能...");
disposables.add(
searchSubject
.debounce(300, TimeUnit.MILLISECONDS) // 防抖
.distinctUntilChanged() // 去重
.switchMap(query -> {
if (query.isEmpty()) {
return Observable.just(new ArrayList<String>());
}
return api.search(query)
.onErrorResumeNext(Observable.just(new ArrayList<String>()));
})
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(
results -> updateSearchResults(results),
error -> showError("搜索失败: " + error.getMessage())
)
);
}
private void loadData() {
System.out.println("加载数据...");
// 添加订阅到CompositeDisposable
disposables.add(
api.getData()
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.doOnSubscribe(disposable ->
showLoading(true))
.doFinally(() ->
showLoading(false))
.subscribe(
data -> {
System.out.println("数据加载成功");
updateUI(data);
},
error -> {
System.out.println("数据加载失败: " + error.getMessage());
showError("加载失败,请重试");
}
)
);
}
// 搜索输入
public void onSearchTextChanged(String text) {
searchSubject.onNext(text);
}
@Override
protected void onDestroy() {
super.onDestroy();
System.out.println("Activity销毁,清理订阅");
// 取消所有订阅,防止内存泄漏
disposables.clear();
// 或者使用disposables.dispose()也可以
}
}
// 模拟执行输出:
// 设置搜索功能...
// 加载数据...
// 显示加载中...
// 数据加载成功
// 隐藏加载中...
// 更新UI...
// 用户输入搜索: "rx"
// 用户输入搜索: "rxj"
// 开始搜索: rxj
// 搜索完成,更新结果
// Activity销毁,清理订阅
第七章:RxJava 2到RxJava 3的迁移
7.1 主要变化总结
// ========== 1. 包名变更 ==========
// RxJava 2
import io.reactivex.Observable;
import io.reactivex.schedulers.Schedulers;
// RxJava 3 (包名中添加了.rxjava3)
import io.reactivex.rxjava3.core.Observable;
import io.reactivex.rxjava3.schedulers.Schedulers;
// ========== 2. API变更示例 ==========
System.out.println("=== RxJava 2 -> RxJava 3 API变更 ===");
// 2.1 as() 改为 to()
// RxJava 2: observable.as(...)
// RxJava 3: observable.to(...)
// 2.2 startWith() 分为 startWithItem() 和 startWithIterable()
Observable.just("B", "C")
.startWithItem("A") // RxJava 3新增
.subscribe(item -> System.out.print(item + " "));
System.out.println();
// 输出: A B C
Observable.just("D", "E")
.startWithIterable(Arrays.asList("A", "B", "C")) // RxJava 3新增
.subscribe(item -> System.out.print(item + " "));
System.out.println();
// 输出: A B C D E
// 2.3 Maybe.defaultIfEmpty() 返回类型变化
// RxJava 2: 返回Maybe<T>
// RxJava 3: 返回Single<T>
Maybe.just("数据")
.defaultIfEmpty("默认值")
.subscribe(
data -> System.out.println("收到: " + data),
error -> System.out.println("错误")
);
// 输出: 收到: 数据
Maybe.empty()
.defaultIfEmpty("默认值")
.subscribe(
data -> System.out.println("收到: " + data),
error -> System.out.println("错误")
);
// 输出: 收到: 默认值
// 2.4 Supplier代替Callable
// RxJava 2: fromCallable()
// RxJava 3: fromSupplier() (推荐)
Observable.fromSupplier(() -> {
System.out.println("执行Supplier");
return "数据";
}).subscribe(System.out::println);
// 输出: 执行Supplier
// 输出: 数据
// ========== 3. 删除的功能 ==========
// 以下在RxJava 3中已被删除:
// - observable.as(...) // 改用to(...)
// - observable.startWith(T) // 改用startWithItem(T)
// - observable.startWith(Iterable) // 改用startWithIterable(Iterable)
// - Maybe.toSingle(T) // 改用defaultIfEmpty(T).toSingle()
// - Single.toCompletable()
// - Flowable.subscribe(4个参数的重载)
// - Observable.subscribe(4个参数的重载)
7.2 迁移步骤
步骤1:更新依赖
// build.gradle (Module: app)
dependencies {
// 从RxJava 2迁移到RxJava 3
implementation 'io.reactivex.rxjava3:rxjava:3.1.6'
implementation 'io.reactivex.rxjava3:rxandroid:3.0.2'
// 如果有Retrofit,也需要更新
implementation 'com.squareup.retrofit2:adapter-rxjava3:2.9.0'
// 如果有RxBinding
implementation 'com.jakewharton.rxbinding4:rxbinding:4.0.0'
}
步骤2:更新导入语句
// 批量替换导入语句
// 将:
import io.reactivex.*;
import io.reactivex.android.schedulers.AndroidSchedulers;
import io.reactivex.schedulers.Schedulers;
import io.reactivex.disposables.CompositeDisposable;
import io.reactivex.disposables.Disposable;
import io.reactivex.functions.*;
import io.reactivex.subjects.*;
// 替换为:
import io.reactivex.rxjava3.core.*;
import io.reactivex.rxjava3.android.schedulers.AndroidSchedulers;
import io.reactivex.rxjava3.schedulers.Schedulers;
import io.reactivex.rxjava3.disposables.CompositeDisposable;
import io.reactivex.rxjava3.disposables.Disposable;
import io.reactivex.rxjava3.functions.*;
import io.reactivex.rxjava3.subjects.*;
步骤3:更新API调用
// 常见的需要更新的API调用
public class MigrationExamples {
// 1. as() -> to()
public void example1() {
// RxJava 2: observable.as(AutoDispose.autoDisposable(scope))
// RxJava 3: observable.to(AutoDispose.autoDisposable(scope))
}
// 2. startWith() -> startWithItem() / startWithIterable()
public void example2() {
// RxJava 2: observable.startWith("A")
// RxJava 3: observable.startWithItem("A")
// RxJava 2: observable.startWith(Arrays.asList("A", "B"))
// RxJava 3: observable.startWithIterable(Arrays.asList("A", "B"))
}
// 3. Maybe.toSingle(T) 的替代方案
public void example3() {
Maybe<String> maybe = Maybe.empty();
// RxJava 2: maybe.toSingle("默认值")
// RxJava 3: maybe.defaultIfEmpty("默认值").toSingle()
Single<String> single = maybe
.defaultIfEmpty("默认值")
.toSingle();
}
// 4. 删除的subscribe重载
public void example4() {
// RxJava 2: observable.subscribe(onNext, onError, onComplete, onSubscribe)
// RxJava 3: 这个重载已被删除,需要分开处理
// RxJava 3的写法:
Disposable disposable = observable.subscribe(
item -> { /* onNext */ },
error -> { /* onError */ },
() -> { /* onComplete */ }
);
// 如果需要onSubscribe回调,使用doOnSubscribe
observable.doOnSubscribe(d -> { /* onSubscribe */ })
.subscribe(...);
}
// 5. 使用Supplier代替Callable
public void example5() {
// RxJava 2: Observable.fromCallable(() -> "数据")
// RxJava 3: Observable.fromSupplier(() -> "数据") (推荐)
// 注意: fromCallable()仍然存在,但推荐使用fromSupplier()
}
}
步骤4:使用迁移工具
对于大型项目,可以使用IDE的查找替换功能:
- 使用"Find in Path"查找所有RxJava 2的导入
- 使用正则表达式批量替换
- 编译并修复API不兼容的错误
第八章:最佳实践与性能优化
8.1 最佳实践
实践1:合理选择Observable类型
public class ObservableTypeSelection {
public void selectRightType() {
// 场景1:网络请求返回单个结果
// 正确:使用Single
Single<User> userSingle = api.getUser(userId);
// 场景2:可能返回null的查询
// 正确:使用Maybe
Maybe<User> userMaybe = api.findUserByEmail(email);
// 场景3:只关心操作是否成功
// 正确:使用Completable
Completable updateCompletable = api.updateUser(user);
// 场景4:UI事件流(点击、文本变化)
// 正确:使用Observable(数据量小,不需要背压)
Observable<ViewClickEvent> clickObservable = RxView.clicks(button);
// 场景5:大量数据流(传感器数据、日志流)
// 正确:使用Flowable(需要背压控制)
Flowable<SensorData> sensorFlowable = sensorManager.getSensorData();
}
}
实践2:合理使用调度器
public class SchedulerBestPractices {
public void properSchedulerUsage() {
// 好的实践:明确指定每个阶段的线程
api.getData()
.subscribeOn(Schedulers.io()) // 第1步:网络请求在IO线程
.observeOn(Schedulers.computation()) // 第2步:数据处理在计算线程
.map(data -> {
// 复杂计算或数据转换
return processData(data);
})
.observeOn(Schedulers.io()) // 第3步:再次IO操作(如保存到数据库)
.flatMap(processedData ->
database.save(processedData))
.observeOn(AndroidSchedulers.mainThread()) // 第4步:UI更新在主线程
.subscribe(
result -> updateUI(result),
error -> showError(error)
);
// 避免的实践:不必要的线程切换
api.getData()
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread()) // 切换到主线程
.map(data -> processData(data)) // 错误:在主线程做复杂计算
.subscribe(...);
// 改进:将复杂计算放在合适的线程
api.getData()
.subscribeOn(Schedulers.io())
.map(data -> processData(data)) // 在IO线程处理数据
.observeOn(AndroidSchedulers.mainThread()) // 然后切换到主线程
.subscribe(...);
}
}
实践3:避免内存泄漏
public class MemoryLeakPrevention {
// 方法1:使用CompositeDisposable
private CompositeDisposable disposables = new CompositeDisposable();
public void safeSubscribe() {
disposables.add(
api.getData()
.subscribe(data -> updateUI(data))
);
disposables.add(
RxView.clicks(button)
.subscribe(event -> handleClick())
);
}
@Override
protected void onDestroy() {
super.onDestroy();
disposables.clear(); // 清理所有订阅
}
// 方法2:使用AutoDispose(需要额外依赖)
public void useAutoDispose() {
api.getData()
.as(AutoDispose.autoDisposable(AndroidLifecycleScopeProvider.from(this)))
.subscribe(data -> updateUI(data));
}
// 方法3:手动管理Disposable
private Disposable manualDisposable;
public void manualManagement() {
manualDisposable = api.getData()
.subscribe(data -> updateUI(data));
}
@Override
protected void onDestroy() {
super.onDestroy();
if (manualDisposable != null && !manualDisposable.isDisposed()) {
manualDisposable.dispose(); // 手动取消订阅
}
}
}
8.2 性能优化
优化1:避免不必要的对象创建
public class PerformanceOptimization {
// 不好的实践:每次订阅都创建新对象
public void badPractice() {
Observable.range(1, 1000)
.map(x -> new ExpensiveProcessor().process(x)) // 每次创建新对象
.subscribe(...);
}
// 好的实践:重用对象
public void goodPractice() {
ExpensiveProcessor processor = new ExpensiveProcessor(); // 创建一次
Observable.range(1, 1000)
.map(processor::process) // 重用对象
.subscribe(...);
}
// 特别优化:使用原始类型避免装箱拆箱
public void primitiveOptimization() {
// 对于大量整数操作,使用IntObservable可以避免装箱
Observable<Integer> boxed = Observable.range(1, 1000000); // 产生Integer对象
// 如果使用RxJava扩展库,可以使用原始类型Observable
// ObservableInt.range(1, 1000000) // 产生int值,避免装箱
}
}
优化2:合理使用缓存
public class CacheOptimization {
public void useCache() {
// 场景:多次订阅相同数据
Observable<String> networkData = api.getData()
.subscribeOn(Schedulers.io())
.cache(); // 缓存结果,后续订阅直接使用缓存
// 第一次订阅:发起网络请求
networkData.subscribe(data -> System.out.println("订阅1: " + data));
// 第二次订阅:使用缓存,不发起网络请求
networkData.subscribe(data -> System.out.println("订阅2: " + data));
// 注意:cache()会永久缓存,如果需要限制缓存时间,使用replay()
Observable<String> timedCache = api.getData()
.subscribeOn(Schedulers.io())
.replay(1, TimeUnit.MINUTES) // 缓存1分钟
.autoConnect();
}
// 避免无限缓存导致内存泄漏
public void avoidMemoryLeak() {
// 错误的缓存使用
Observable<BigData> badCache = api.getBigData()
.cache(); // 如果数据很大,会一直占用内存
// 改进:使用弱引用或限制缓存大小
Observable<BigData> betterCache = api.getBigData()
.replay(1) // 只缓存最近1个数据
.autoConnect();
}
}
优化3:使用适当的背压策略
public class BackpressureOptimization {
public void optimizeBackpressure() {
// 场景:快速生产,慢速消费
// 方案1:使用合适的缓冲区大小
Flowable.interval(1, TimeUnit.MILLISECONDS)
.onBackpressureBuffer(1000, // 缓冲区大小
() -> System.out.println("缓冲区溢出"),
BackpressureOverflowStrategy.DROP_OLDEST) // 溢出策略
.subscribe(...);
// 方案2:采样,降低数据频率
Flowable.interval(1, TimeUnit.MILLISECONDS)
.sample(10, TimeUnit.MILLISECONDS) // 每10ms采样一次
.subscribe(...);
// 方案3:批量处理
Flowable.interval(1, TimeUnit.MILLISECONDS)
.buffer(100, TimeUnit.MILLISECONDS) // 每100ms缓冲一次
.subscribe(batch -> {
// 批量处理数据
processBatch(batch);
});
}
}
第九章:调试与测试
9.1 调试技巧
public class DebuggingTechniques {
public void basicDebugging() {
System.out.println("=== RxJava调试技巧 ===");
// 1. 使用doOn操作符添加日志
Observable.just("A", "B", "C")
.doOnSubscribe(d ->
System.out.println("开始订阅"))
.doOnNext(item ->
System.out.println("发射: " + item))
.doOnError(error ->
System.err.println("错误: " + error))
.doOnComplete(() ->
System.out.println("完成发射"))
.doOnDispose(() ->
System.out.println("取消订阅"))
.subscribe(...);
// 2. 使用compose操作符统一添加调试逻辑
Observable.just("Data")
.compose(addDebugging("测试流"))
.subscribe(...);
}
// 统一的调试Transformer
private <T> ObservableTransformer<T, T> addDebugging(String tag) {
return upstream -> upstream
.doOnSubscribe(d ->
System.out.println(tag + ": 订阅开始"))
.doOnNext(item ->
System.out.println(tag + ": 发射 " + item))
.doOnError(error ->
System.err.println(tag + ": 错误 " + error))
.doOnComplete(() ->
System.out.println(tag + ": 完成"))
.doOnDispose(() ->
System.out.println(tag + ": 取消订阅"));
}
// 3. 使用RxJavaPlugins全局监控
public void setupGlobalDebugging() {
RxJavaPlugins.setErrorHandler(error -> {
System.err.println("RxJava全局错误: " + error);
// 可以发送到错误统计服务器
});
RxJavaPlugins.setScheduleHandler((runnable) -> {
System.out.println("调度任务: " + runnable);
return runnable;
});
}
// 4. 使用TestObserver进行调试
public void testObserverDebugging() {
TestObserver<String> testObserver = new TestObserver<String>() {
@Override
public void onNext(String value) {
System.out.println("TestObserver收到: " + value);
super.onNext(value);
}
@Override
public void onError(Throwable e) {
System.err.println("TestObserver错误: " + e);
super.onError(e);
}
@Override
public void onComplete() {
System.out.println("TestObserver完成");
super.onComplete();
}
};
Observable.just("A", "B", "C")
.subscribe(testObserver);
// 检查断言
testObserver.assertValues("A", "B", "C");
testObserver.assertComplete();
}
}
9.2 单元测试
public class RxJavaUnitTest {
@Test
public void testObservable() {
System.out.println("=== Observable单元测试 ===");
// 创建TestObserver
TestObserver<String> testObserver = new TestObserver<>();
// 执行被测Observable
Observable.just("Hello", "World")
.map(String::toUpperCase)
.subscribe(testObserver);
// 验证结果
testObserver.assertValues("HELLO", "WORLD");
testObserver.assertNoErrors();
testObserver.assertComplete();
System.out.println("测试通过!");
}
@Test
public void testErrorHandling() {
System.out.println("=== 错误处理测试 ===");
TestObserver<String> testObserver = new TestObserver<>();
Observable.<String>error(new RuntimeException("测试错误"))
.onErrorReturn(error -> "默认值")
.subscribe(testObserver);
testObserver.assertValue("默认值");
testObserver.assertNoErrors();
testObserver.assertComplete();
System.out.println("错误处理测试通过!");
}
@Test
public void testWithTestScheduler() {
System.out.println("=== 时间相关测试 ===");
TestScheduler testScheduler = new TestScheduler();
TestObserver<Long> testObserver = new TestObserver<>();
Observable.interval(1, TimeUnit.SECONDS, testScheduler)
.take(3)
.subscribe(testObserver);
// 初始状态:没有数据
testObserver.assertNoValues();
System.out.println("初始状态: 无数据");
// 快进1秒
testScheduler.advanceTimeBy(1, TimeUnit.SECONDS);
testObserver.assertValues(0L);
System.out.println("1秒后: 收到 0");
// 快进2秒
testScheduler.advanceTimeBy(2, TimeUnit.SECONDS);
testObserver.assertValues(0L, 1L, 2L);
System.out.println("3秒后: 收到 0, 1, 2");
testObserver.assertComplete();
System.out.println("时间测试通过!");
}
@Test
public void testAsyncOperation() {
System.out.println("=== 异步操作测试 ===");
// 使用TestScheduler控制异步操作
TestScheduler testScheduler = new TestScheduler();
Observable<String> asyncObservable = Observable
.fromCallable(() -> {
System.out.println("执行异步操作");
return "结果";
})
.subscribeOn(Schedulers.io())
.observeOn(testScheduler); // 使用TestScheduler
TestObserver<String> testObserver = new TestObserver<>();
asyncObservable.subscribe(testObserver);
// 触发所有待执行的任务
testScheduler.triggerActions();
testObserver.assertValue("结果");
testObserver.assertComplete();
System.out.println("异步测试通过!");
}
}
9.3 集成测试
public class RxJavaIntegrationTest {
@Mock
private ApiService apiService;
@Before
public void setUp() {
MockitoAnnotations.initMocks(this);
}
@Test
public void testNetworkRequest() {
System.out.println("=== 网络请求集成测试 ===");
// 模拟API响应
User mockUser = new User("John", "Doe");
when(apiService.getUser("123"))
.thenReturn(Observable.just(mockUser));
// 执行测试
TestObserver<User> testObserver = new TestObserver<>();
apiService.getUser("123")
.subscribe(testObserver);
// 验证
testObserver.assertValue(mockUser);
testObserver.assertComplete();
verify(apiService).getUser("123");
System.out.println("网络请求测试通过!");
}
@Test
public void testErrorScenario() {
System.out.println("=== 错误场景测试 ===");
// 模拟网络错误
when(apiService.getUser("999"))
.thenReturn(Observable.error(new IOException("网络错误")));
TestObserver<User> testObserver = new TestObserver<>();
apiService.getUser("999")
.onErrorResumeNext(error -> {
if (error instanceof IOException) {
return Observable.just(new User("默认", "用户"));
}
return Observable.error(error);
})
.subscribe(testObserver);
testObserver.assertValue(user ->
user.getFirstName().equals("默认"));
testObserver.assertComplete();
System.out.println("错误场景测试通过!");
}
}
第十章:常见问题与解决方案
10.1 为什么onComplete不执行?
public class CommonIssue1 {
public void whyOnCompleteNotCalled() {
System.out.println("=== 问题:onComplete不执行 ===");
// 情况1:使用了Single或Maybe
Single.just("数据")
.subscribe(
data -> System.out.println("收到: " + data),
error -> System.out.println("错误: " + error),
() -> System.out.println("完成") // 这里不会执行!
);
// 解释:Single没有onComplete,只有onSuccess和onError
// 情况2:发生了未处理的错误
Observable.create(emitter -> {
emitter.onNext("数据1");
emitter.onError(new RuntimeException("错误"));
emitter.onNext("数据2"); // 不会发射
emitter.onComplete(); // 不会执行
})
.subscribe(
data -> System.out.println("收到: " + data),
error -> System.out.println("错误: " + error),
() -> System.out.println("完成") // 不会执行,因为发生了错误
);
// 情况3:使用了永远不会结束的Observable
Observable.interval(1, TimeUnit.SECONDS)
.subscribe(
data -> System.out.println("收到: " + data),
error -> System.out.println("错误: " + error),
() -> System.out.println("完成") // 不会执行,因为interval永不结束
);
// 解决方案:使用take操作符限制数量
Observable.interval(1, TimeUnit.SECONDS)
.take(3) // 只取3个
.subscribe(
data -> System.out.println("收到: " + data),
error -> System.out.println("错误: " + error),
() -> System.out.println("完成") // 现在会执行
);
}
}
10.2 如何处理背压异常?
public class CommonIssue2 {
public void handleBackpressureException() {
System.out.println("=== 问题:MissingBackpressureException ===");
// 问题场景:快速生产,慢速消费
Observable.interval(1, TimeUnit.MILLISECONDS)
.observeOn(Schedulers.computation())
.subscribe(
item -> {
Thread.sleep(100); // 慢速消费
System.out.println("消费: " + item);
},
error -> {
// 会收到MissingBackpressureException
System.out.println("错误: " + error.getClass().getSimpleName());
}
);
// 解决方案1:使用Flowable
Flowable.interval(1, TimeUnit.MILLISECONDS)
.onBackpressureBuffer(1000) // 添加缓冲区
.observeOn(Schedulers.computation())
.subscribe(
item -> {
Thread.sleep(100);
System.out.println("Flowable消费: " + item);
}
);
// 解决方案2:使用合适的背压策略
Observable.interval(1, TimeUnit.MILLISECONDS)
.toFlowable(BackpressureStrategy.DROP) // 转换为Flowable并指定策略
.observeOn(Schedulers.computation())
.subscribe(...);
// 解决方案3:采样降低频率
Observable.interval(1, TimeUnit.MILLISECONDS)
.sample(10, TimeUnit.MILLISECONDS) // 每10ms采样一次
.subscribe(...);
// 解决方案4:批量处理
Observable.interval(1, TimeUnit.MILLISECONDS)
.buffer(100, TimeUnit.MILLISECONDS) // 每100ms缓冲一次
.subscribe(batch -> {
// 批量处理
processBatch(batch);
});
}
}
10.3 如何取消订阅?
public class CommonIssue3 {
public void howToUnsubscribe() {
System.out.println("=== 问题:如何正确取消订阅 ===");
// 方法1:使用Disposable
Disposable disposable1 = Observable.interval(1, TimeUnit.SECONDS)
.subscribe(item -> System.out.println("流1: " + item));
Disposable disposable2 = Observable.interval(500, TimeUnit.MILLISECONDS)
.subscribe(item -> System.out.println("流2: " + item));
// 取消单个订阅
disposable1.dispose();
System.out.println("流1已取消");
// 方法2:使用CompositeDisposable(推荐)
CompositeDisposable compositeDisposable = new CompositeDisposable();
compositeDisposable.add(
Observable.interval(1, TimeUnit.SECONDS)
.subscribe(item -> System.out.println("复合流1: " + item))
);
compositeDisposable.add(
Observable.interval(500, TimeUnit.MILLISECONDS)
.subscribe(item -> System.out.println("复合流2: " + item))
);
// 取消所有订阅
compositeDisposable.clear();
System.out.println("所有复合流已取消");
// 方法3:使用takeUntil操作符
PublishSubject<Boolean> stopSignal = PublishSubject.create();
Observable.interval(1, TimeUnit.SECONDS)
.takeUntil(stopSignal) // 直到stopSignal发射数据
.subscribe(item -> System.out.println("条件流: " + item));
// 发送停止信号
stopSignal.onNext(true);
System.out.println("条件流已停止");
// 方法4:使用doOnSubscribe获取Disposable
Observable.interval(1, TimeUnit.SECONDS)
.doOnSubscribe(d -> {
// 在这里保存Disposable
Disposable d2 = d;
// 可以添加到CompositeDisposable
})
.subscribe(...);
// 重要:检查Disposable状态
if (disposable1 != null && !disposable1.isDisposed()) {
disposable1.dispose();
}
}
}
10.4 如何处理线程安全问题?
public class CommonIssue4 {
private int counter = 0;
public void threadSafetyIssue() {
System.out.println("=== 问题:线程安全 ===");
// 问题示例:多线程修改共享状态
Observable.range(1, 1000)
.subscribeOn(Schedulers.computation())
.flatMap(i ->
Observable.just(i)
.subscribeOn(Schedulers.io())
.map(j -> {
counter++; // 线程不安全!
return j;
})
)
.subscribe();
// 可能的结果:counter的值不确定,可能小于1000
// 解决方案1:使用原子类
AtomicInteger safeCounter = new AtomicInteger(0);
Observable.range(1, 1000)
.subscribeOn(Schedulers.computation())
.flatMap(i ->
Observable.just(i)
.subscribeOn(Schedulers.io())
.map(j -> {
safeCounter.incrementAndGet(); // 线程安全
return j;
})
)
.subscribe();
System.out.println("安全计数器: " + safeCounter.get()); // 保证是1000
// 解决方案2:避免共享状态
Observable.range(1, 1000)
.subscribeOn(Schedulers.computation())
.flatMap(i ->
Observable.just(i)
.subscribeOn(Schedulers.io())
.map(j -> {
// 不修改共享状态,只处理自己的数据
return processItem(j);
})
)
.subscribe();
// 解决方案3:使用serialize()操作符
PublishSubject<Integer> subject = PublishSubject.create();
subject
.serialize() // 保证线程安全
.subscribe(item -> {
// 现在可以安全地修改状态
counter++;
});
// 从多个线程发射数据
Observable.range(1, 100)
.subscribeOn(Schedulers.io())
.subscribe(subject::onNext);
Observable.range(101, 100)
.subscribeOn(Schedulers.computation())
.subscribe(subject::onNext);
}
}
总结
RxJava 3作为响应式编程在Android开发中的强大工具,通过其丰富的操作符和优雅的线程管理机制,极大地简化了异步编程的复杂性。从简单的数据转换到复杂的异步操作组合,RxJava都提供了清晰的解决方案。
关键要点回顾:
- 选择合适的Observable类型:根据场景选择
Observable、Flowable、Single、Maybe或Completable - 合理管理线程:使用
subscribeOn和observeOn控制执行线程 - 处理背压问题:对于可能产生大量数据的场景,使用
Flowable和适当的背压策略 - 防止内存泄漏:使用
CompositeDisposable管理订阅生命周期 - 优雅的错误处理:利用
onErrorReturn、onErrorResumeNext、retry等操作符
学习建议:
- 从简单开始:先掌握
Observable和常用操作符 - 实践出真知:在实际项目中应用RxJava,从简单场景开始
- 理解原理:不仅仅是使用API,要理解响应式编程的思想
- 查阅官方文档:RxJava的操作符非常丰富,遇到问题时查阅官方文档
性能优化建议:
- 避免不必要的对象创建:重用对象,使用原始类型
- 合理使用缓存:对于重复订阅的数据,使用
cache()或replay() - 优化线程使用:避免不必要的线程切换,在主线程做轻量操作
- 监控内存使用:注意
cache()可能导致的内存泄漏
调试建议:
- 使用doOn操作符添加日志:监控数据流的每个阶段
- 使用TestObserver进行单元测试:确保逻辑正确
- 使用TestScheduler测试时间相关操作:避免等待真实时间
- 设置全局错误处理器:捕获未处理的错误
RxJava虽然学习曲线较陡,但一旦掌握,将极大提升代码的可读性和可维护性。随着实践的深入,你会越来越体会到响应式编程的魅力所在。
延伸学习资源:
- RxJava官方GitHub
- RxJava官方文档
- RxJava操作符决策树
- RxAndroid(适用于Android的RxJava绑定库)
- RxBinding(Android View绑定库)
- Awesome-RxJava(RxJava资源集合)
记住:响应式编程是一种思维方式,而RxJava是实现这种思维方式的工具。Happy Coding!
RxJava 3 完全指南:响应式编程在Android中的精妙艺术
https://blog.uso6.com/archives/rxjava-3-wan-quan-zhi-nan-xiang-ying-shi-bian-cheng-zai-androidzhong-de-jing-miao-yi-shu
评论