RxJava 3是Android响应式编程的核心工具,通过观察者模式与函数式编程结合,优雅解决异步操作、线程切换与数据流处理难题。其五大基类(Observable、Flowable、Single等)覆盖不同场景,丰富操作符链支持数据过滤、变换、组合与错误处理。Flowable支持背压策略应对数据速率失衡,Schedulers提供灵活线程调度。结合CompositeDisposable防止内存泄漏,使异步代码更简洁、可维护,大幅提升Android开发效率与代码质量。

博主博客

前言

每个Android开发者都对RxJava怀有特殊的情感。它那简洁优雅的线程切换机制、强大的多网络请求合并能力,配合Retrofit使用,无疑是现代Android应用开发的利器。随着技术的演进,RxJava已经迎来了它的第三代版本。

版本演进与现状

  • RxJava 3 于2020年发布,对RxJava 2保持了良好的兼容性
  • RxJava 2 已于2020年12月31日停止官方支持
  • 错误修复会同时在2.x和3.x版本中进行,但新功能只在3.x上添加
  • RxAndroid目前尚未发布3.0版本,但RxJava 3本身已可在Android项目中使用

特别说明:虽然RxAndroid 3还未正式发布,但RxJava 3本身已可独立使用。对于Android开发者,可以使用RxJava 3的核心功能,并结合AndroidSchedulers.mainThread()来操作主线程。

第一章:为什么需要RxJava?

1.1 传统异步编程的痛点

在Android开发中,我们经常面临以下挑战:

  • 复杂的线程管理和切换
  • 回调地狱(Callback Hell)
  • 多个异步操作的协调与合并
  • 错误处理的分散和重复

回调地狱示例

// 传统方式:深层嵌套的回调
api.getUser(userId, new Callback<User>() {
    @Override
    public void onSuccess(User user) {
        api.getOrders(user.getId(), new Callback<List<Order>>() {
            @Override
            public void onSuccess(List<Order> orders) {
                api.getAddress(user.getId(), new Callback<Address>() {
                    @Override
                    public void onSuccess(Address address) {
                        // 最终处理所有数据
                        updateUI(user, orders, address);
                    }
                    
                    @Override
                    public void onError(Throwable error) {
                        // 错误处理
                    }
                });
            }
            
            @Override
            public void onError(Throwable error) {
                // 错误处理
            }
        });
    }
    
    @Override
    public void onError(Throwable error) {
        // 错误处理
    }
});

1.2 RxJava的解决方案

RxJava通过观察者模式函数式编程的结合,提供了:

  • 声明式的异步操作链
  • 统一的错误处理机制
  • 强大的操作符集合
  • 内存泄漏的预防(通过Disposable)

RxJava解决方案

// RxJava方式:链式调用,清晰易读
Observable.zip(
    api.getUserRx(userId),
    api.getOrdersRx(userId),
    api.getAddressRx(userId),
    (user, orders, address) -> new UserData(user, orders, address)
)
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(
    userData -> updateUI(userData),
    error -> showError(error)
);

第二章:RxJava 3核心概念

2.1 数据流与观察者模式

在RxJava中,数据以流(Stream)的形式组织和传递:

// 基本的数据流结构
Observable.just("Data1", "Data2", "Data3")  // 数据源
  .filter(data -> data.contains("2"))        // 操作符1:过滤
  .map(data -> data.toUpperCase())           // 操作符2:转换
  .subscribe(                                // 订阅并消费数据
      data -> System.out.println("收到: " + data),
      error -> System.out.println("错误: " + error),
      () -> System.out.println("完成")
  );

输出结果

收到: DATA2
完成

重要术语

  • 上流(Upstream):当前操作符之前的步骤
  • 下流(Downstream):当前操作符之后的步骤
  • 发射(Emission):数据源发出数据的过程

理解示例

// 对于map操作符来说:
Observable.just(1, 2, 3)          // ← 上流
  .map(x -> x * 2)                 // ← 当前操作符
  .subscribe(System.out::println)  // ← 下流
  
// 输出:
// 2
// 4
// 6

2.2 数据流中的对象

在RxJava文档中,以下术语都指代在数据流中传递的数据对象:

  • Emission - 发射行为
  • Emits - 发射的数据
  • Item - 数据项
  • Event - 事件
  • Signal - 信号
  • Data - 数据
  • Message - 消息

2.3 背压(Backpressure)机制

问题背景

当异步步骤执行速度不一致时(上流发射太快,下流处理太慢),会导致:

  • 内存溢出(缓存过多数据)
  • 数据丢失(丢弃来不及处理的数据)

问题示例

// 生产者和消费者速度不匹配
Observable.interval(1, TimeUnit.MILLISECONDS)  // 生产者:每1ms生产一个
    .observeOn(Schedulers.computation())       // 切换到计算线程
    .subscribe(item -> {
        Thread.sleep(100);  // 消费者:每100ms消费一个
        System.out.println("消费: " + item);
    });
    
// 问题:生产速度(1个/ms) > 消费速度(1个/100ms)
// 结果:内存迅速增长,可能导致OOM

RxJava的解决方案

  • 支持背压Flowable类,适用于可能产生大量数据的场景
  • 不支持背压ObservableSingleMaybeCompletable类,适用于数据量可控的场景

背压解决方案示例

// 使用Flowable处理背压
Flowable.interval(1, TimeUnit.MILLISECONDS)
    .onBackpressureBuffer(1000)  // 设置缓冲区大小为1000
    .observeOn(Schedulers.computation())
    .subscribe(
        item -> {
            Thread.sleep(100);
            System.out.println("消费: " + item);
        },
        error -> {
            // 处理背压错误
            if (error instanceof MissingBackpressureException) {
                System.out.println("背压异常: 数据产生太快!");
            }
        }
    );

2.4 线程调度器(Schedulers)

RxJava通过调度器实现优雅的线程切换:

调度器 适用场景 Android特有
Schedulers.computation() CPU密集型计算
Schedulers.io() I/O操作、网络请求
Schedulers.single() 需要单一线程顺序执行
Schedulers.trampoline() 在当前线程排队执行
AndroidSchedulers.mainThread() Android主线程操作
// 线程切换示例
Observable.just("A", "B", "C")
  .subscribeOn(Schedulers.io())          // 数据产生在IO线程
  .map(item -> {
      System.out.println("map在: " + Thread.currentThread().getName());
      return item.toLowerCase();
  })
  .observeOn(Schedulers.computation())   // 切换到计算线程
  .filter(item -> !item.equals("b"))
  .observeOn(AndroidSchedulers.mainThread()) // 切换到主线程
  .subscribe(item -> {
      System.out.println("消费在: " + Thread.currentThread().getName());
      System.out.println("收到: " + item);
  });

// 可能的输出:
// map在: RxCachedThreadScheduler-1 (IO线程)
// map在: RxCachedThreadScheduler-1
// map在: RxCachedThreadScheduler-1
// 消费在: main (主线程)
// 收到: a
// 消费在: main
// 收到: c

2.5 RxJava的五大基类

类型 说明 背压支持 适用场景
Flowable 发射0到N个数据 ✅ 支持 大数据流、背压敏感场景
Observable 发射0到N个数据 ❌ 不支持 小数据流、UI事件
Single 发射单个数据或错误 ❌ 不支持 网络请求、数据库查询
Completable 不发射数据,只通知完成或错误 ❌ 不支持 只关心操作是否完成
Maybe 发射0或1个数据,或错误 ❌ 不支持 可能返回null的查询

五大基类使用示例

// 1. Observable - 多个数据
Observable<String> observable = Observable.just("A", "B", "C");
observable.subscribe(System.out::println);
// 输出: A B C

// 2. Single - 单个数据
Single<String> single = Single.just("成功");
single.subscribe(
    data -> System.out.println("成功: " + data),
    error -> System.out.println("失败: " + error)
);
// 输出: 成功: 成功

// 3. Maybe - 0或1个数据
Maybe<String> maybe = Maybe.empty();  // 空数据
maybe.subscribe(
    data -> System.out.println("数据: " + data),
    error -> System.out.println("错误"),
    () -> System.out.println("完成(无数据)")
);
// 输出: 完成(无数据)

// 4. Completable - 只关心完成
Completable completable = Completable.fromAction(() -> {
    System.out.println("执行操作");
    // 执行某些操作,不返回数据
});
completable.subscribe(
    () -> System.out.println("操作完成"),
    error -> System.out.println("操作失败: " + error)
);
// 输出: 执行操作
// 输出: 操作完成

// 5. Flowable - 支持背压的数据流
Flowable<Integer> flowable = Flowable.range(1, 1000);
flowable.subscribe(
    data -> System.out.println("数据: " + data),
    error -> System.out.println("错误: " + error),
    () -> System.out.println("完成"),
    subscription -> {
        // 背压控制
        subscription.request(100); // 初始请求100个
    }
);

第三章:环境搭建与基础使用

3.1 添加依赖

// build.gradle (Module: app)
dependencies {
    // RxJava 3
    implementation 'io.reactivex.rxjava3:rxjava:3.1.6'
    
    // RxAndroid (Android特定绑定)
    implementation 'io.reactivex.rxjava3:rxandroid:3.0.2'
    
    // 如果需要与Retrofit配合使用
    implementation 'com.squareup.retrofit2:adapter-rxjava3:2.9.0'
    
    // 如果需要RxBinding (View绑定)
    implementation 'com.jakewharton.rxbinding4:rxbinding:4.0.0'
}

3.2 创建第一个RxJava程序

// 最简单的RxJava示例
public class FirstRxJavaExample {
    public static void main(String[] args) {
        System.out.println("=== 开始RxJava示例 ===");
        
        // 创建Observable
        Observable<String> observable = Observable.create(emitter -> {
            System.out.println("Observable: 开始发射数据");
            emitter.onNext("Hello");
            emitter.onNext("RxJava");
            emitter.onNext("3!");
            emitter.onComplete();
            System.out.println("Observable: 数据发射完成");
        });
        
        // 订阅并消费数据
        observable.subscribe(
            // onNext回调
            item -> {
                System.out.println("Observer: 收到数据 - " + item);
            },
            // onError回调
            error -> {
                System.out.println("Observer: 发生错误 - " + error.getMessage());
            },
            // onComplete回调
            () -> {
                System.out.println("Observer: 所有数据接收完成");
            }
        );
        
        System.out.println("=== 示例结束 ===");
    }
}

输出结果

=== 开始RxJava示例 ===
Observable: 开始发射数据
Observer: 收到数据 - Hello
Observer: 收到数据 - RxJava
Observer: 收到数据 - 3!
Observable: 数据发射完成
Observer: 所有数据接收完成
=== 示例结束 ===

3.3 链式调用示例

// 更实用的链式调用示例
Observable.fromArray("apple", "banana", "cherry", "date", "elderberry")
    .filter(fruit -> fruit.length() > 5)        // 过滤长度大于5的水果
    .map(String::toUpperCase)                   // 转换为大写
    .sorted()                                   // 排序
    .subscribe(
        fruit -> System.out.println("水果: " + fruit),
        error -> System.out.println("错误: " + error),
        () -> System.out.println("所有水果处理完成")
    );

输出结果

水果: BANANA
水果: CHERRY
水果: ELDERBERRY
所有水果处理完成

第四章:核心操作符详解

4.1 实用操作符

4.1.1 observeOnsubscribeOn

System.out.println("主线程: " + Thread.currentThread().getName());

Observable.create(emitter -> {
    // 模拟耗时操作
    System.out.println("create线程: " + Thread.currentThread().getName());
    Thread.sleep(1000);
    emitter.onNext("数据1");
    emitter.onNext("数据2");
    emitter.onComplete();
})
.subscribeOn(Schedulers.io())          // 指定create在IO线程执行
.doOnSubscribe(disposable -> 
    System.out.println("订阅线程: " + Thread.currentThread().getName()))
.observeOn(Schedulers.computation())   // 后续操作在计算线程
.map(data -> {
    System.out.println("map线程: " + Thread.currentThread().getName());
    return data + "_处理";
})
.observeOn(AndroidSchedulers.mainThread()) // 最终在主线程
.subscribe(data -> {
    System.out.println("消费线程: " + Thread.currentThread().getName());
    System.out.println("收到: " + data);
});

// 输出可能(线程名称可能不同):
// 主线程: main
// 订阅线程: main
// create线程: RxCachedThreadScheduler-1
// map线程: RxComputationThreadPool-1
// 消费线程: main
// 收到: 数据1_处理
// map线程: RxComputationThreadPool-1
// 消费线程: main
// 收到: 数据2_处理

重要说明

  • subscribeOn():指定Observable创建和发射数据的线程,多次调用只有第一次有效
  • observeOn():指定下游操作执行的线程,每次调用都会影响后续操作
  • 如果没有指定线程,默认在当前线程执行

4.1.2 生命周期回调操作符

Observable.just("A", "B", "C")
    .doOnSubscribe(disposable -> 
        System.out.println("1. doOnSubscribe: 开始订阅"))
    .doOnNext(item -> 
        System.out.println("2. doOnNext: 将要处理 - " + item))
    .doAfterNext(item -> 
        System.out.println("3. doAfterNext: 处理完成 - " + item))
    .doOnError(error -> 
        System.out.println("4. doOnError: 发生错误 - " + error))
    .doOnComplete(() -> 
        System.out.println("5. doOnComplete: 即将完成"))
    .doAfterTerminate(() -> 
        System.out.println("6. doAfterTerminate: 已经终止"))
    .doFinally(() -> 
        System.out.println("7. doFinally: 最终执行"))
    .doOnDispose(() -> 
        System.out.println("8. doOnDispose: 被取消订阅"))
    .subscribe(
        item -> System.out.println("9. onNext: 消费 - " + item),
        error -> System.out.println("10. onError: 错误"),
        () -> System.out.println("11. onComplete: 完成")
    );

// 正常情况输出:
// 1. doOnSubscribe: 开始订阅
// 2. doOnNext: 将要处理 - A
// 9. onNext: 消费 - A
// 3. doAfterNext: 处理完成 - A
// 2. doOnNext: 将要处理 - B
// 9. onNext: 消费 - B
// 3. doAfterNext: 处理完成 - B
// 2. doOnNext: 将要处理 - C
// 9. onNext: 消费 - C
// 3. doAfterNext: 处理完成 - C
// 5. doOnComplete: 即将完成
// 11. onComplete: 完成
// 6. doAfterTerminate: 已经终止
// 7. doFinally: 最终执行

说明:这些doOnXXX操作符非常有用,可以用于日志记录、状态监控、资源清理等。

4.2 过滤操作符

4.2.1 skip - 跳过元素

System.out.println("=== skip操作符示例 ===");

Observable<Integer> source = Observable.just(1, 2, 3, 4, 5, 6, 7, 8, 9, 10);

System.out.print("跳过前4个: ");
source.skip(4).subscribe(item -> System.out.print(item + " "));
System.out.println();

System.out.print("跳过最后4个: ");
source.skipLast(4).subscribe(item -> System.out.print(item + " "));
System.out.println();

System.out.print("跳过直到大于5: ");
source.skipWhile(x -> x <= 5).subscribe(item -> System.out.print(item + " "));
System.out.println();

// 输出:
// === skip操作符示例 ===
// 跳过前4个: 5 6 7 8 9 10
// 跳过最后4个: 1 2 3 4 5 6
// 跳过直到大于5: 6 7 8 9 10

4.2.2 debounce - 防抖动

System.out.println("=== debounce操作符示例 ===");

Observable.create(emitter -> {
    // 模拟用户快速输入
    emitter.onNext("r");
    Thread.sleep(100);
    emitter.onNext("rx");
    Thread.sleep(200);
    emitter.onNext("rxj");
    Thread.sleep(150);
    emitter.onNext("rxja");
    Thread.sleep(50);
    emitter.onNext("rxjav");
    Thread.sleep(80);
    emitter.onNext("rxjava");
    emitter.onComplete();
})
.debounce(200, TimeUnit.MILLISECONDS)  // 200ms内只取最后一次
.subscribe(
    query -> System.out.println("搜索: " + query),
    error -> System.out.println("错误: " + error),
    () -> System.out.println("搜索完成")
);

// 输出:
// === debounce操作符示例 ===
// 搜索: rxjava
// 搜索完成

// 解释:
// 用户快速输入 r -> rx -> rxj -> rxja -> rxjav -> rxjava
// 每次输入间隔都小于200ms,所以只有最后一次"rxjava"被处理

4.2.3 distinct - 去重

System.out.println("=== distinct操作符示例 ===");

Observable.just(2, 3, 4, 4, 2, 1, 3, 5)
    .distinct()
    .subscribe(item -> System.out.print(item + " "));
System.out.println();

Observable.just(1, 1, 2, 1, 2, 3, 3, 4)
    .distinctUntilChanged()  // 只去除相邻重复
    .subscribe(item -> System.out.print(item + " "));
System.out.println();

// 自定义去重规则
Observable.just("apple", "banana", "apricot", "blueberry")
    .distinct(fruit -> fruit.charAt(0))  // 根据首字母去重
    .subscribe(item -> System.out.print(item + " "));
System.out.println();

// 输出:
// === distinct操作符示例 ===
// 2 3 4 1 5
// 1 2 1 2 3 4
// apple banana

4.2.4 filter - 条件过滤

System.out.println("=== filter操作符示例 ===");

// 基础过滤
Observable.range(1, 10)
    .filter(x -> x % 2 == 0)
    .subscribe(x -> System.out.print(x + " "));
System.out.println();

// 复杂过滤条件
Observable.just("Apple", "Banana", "Cherry", "Date", "Elderberry")
    .filter(fruit -> fruit.length() > 5 && fruit.contains("e"))
    .subscribe(fruit -> System.out.print(fruit + " "));
System.out.println();

// 结合其他操作符
Observable.just(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)
    .filter(x -> x > 3)
    .filter(x -> x < 8)
    .map(x -> x * 10)
    .subscribe(x -> System.out.print(x + " "));
System.out.println();

// 输出:
// === filter操作符示例 ===
// 2 4 6 8 10
// Elderberry
// 40 50 60 70

4.2.5 take - 取前N个

System.out.println("=== take操作符示例 ===");

Observable.interval(500, TimeUnit.MILLISECONDS)
    .take(5)  // 只取前5个
    .subscribe(
        x -> System.out.println("收到: " + x),
        error -> System.out.println("错误: " + error),
        () -> System.out.println("完成(取前5个)")
    );

// 等待3秒让程序执行
Thread.sleep(3000);

System.out.println("=== takeLast示例 ===");

Observable.range(1, 10)
    .takeLast(3)
    .subscribe(x -> System.out.print(x + " "));
System.out.println();

System.out.println("=== takeWhile示例 ===");

Observable.just(2, 4, 6, 8, 10, 11, 12)
    .takeWhile(x -> x % 2 == 0)  // 直到遇到奇数停止
    .subscribe(x -> System.out.print(x + " "));
System.out.println();

// 输出:
// === take操作符示例 ===
// 收到: 0
// 收到: 1
// 收到: 2
// 收到: 3
// 收到: 4
// 完成(取前5个)
// === takeLast示例 ===
// 8 9 10
// === takeWhile示例 ===
// 2 4 6 8 10

4.3 变换操作符

4.3.1 map - 一对一转换

System.out.println("=== map操作符示例 ===");

Observable.just("apple", "banana", "cherry")
    .map(String::toUpperCase)
    .map(fruit -> fruit + "!")
    .subscribe(fruit -> System.out.println("水果: " + fruit));

// 输出:
// === map操作符示例 ===
// 水果: APPLE!
// 水果: BANANA!
// 水果: CHERRY!

// 另一个示例:数据转换
Observable.just(1, 2, 3, 4, 5)
    .map(x -> x * x)  // 平方
    .map(x -> "数值: " + x)
    .subscribe(System.out::println);

// 输出:
// 数值: 1
// 数值: 4
// 数值: 9
// 数值: 16
// 数值: 25

4.3.2 flatMap - 一对多转换(无序)

System.out.println("=== flatMap操作符示例(无序) ===");

Observable.just("A", "B", "C")
    .flatMap(letter -> 
        Observable.intervalRange(1, 3, 0, 100, TimeUnit.MILLISECONDS)
            .map(number -> "(" + letter + ", " + number + ")"))
    .subscribe(item -> System.out.println(item));

// 等待执行完成
Thread.sleep(500);

// 可能的输出(顺序不确定):
// === flatMap操作符示例(无序) ===
// (A, 1)
// (B, 1)
// (C, 1)
// (A, 2)
// (B, 2)
// (C, 2)
// (A, 3)
// (B, 3)
// (C, 3)

4.3.3 concatMap - 一对多转换(有序)

System.out.println("=== concatMap操作符示例(有序) ===");

Observable.just("A", "B", "C")
    .concatMap(letter -> 
        Observable.intervalRange(1, 3, 0, 100, TimeUnit.MILLISECONDS)
            .map(number -> "(" + letter + ", " + number + ")"))
    .subscribe(item -> System.out.println(item));

// 等待执行完成
Thread.sleep(500);

// 输出(保证顺序):
// === concatMap操作符示例(有序) ===
// (A, 1)
// (A, 2)
// (A, 3)
// (B, 1)
// (B, 2)
// (B, 3)
// (C, 1)
// (C, 2)
// (C, 3)

重要区别

  • flatMap:顺序不保证,适合并行处理
  • concatMap:顺序保证,适合需要顺序执行的场景

4.3.4 buffer - 缓冲分组

System.out.println("=== buffer操作符示例 ===");

// 按数量缓冲
System.out.print("按4个一组缓冲: ");
Observable.range(1, 10)
    .buffer(4)
    .subscribe(list -> System.out.print(list + " "));
System.out.println();

// 按时间窗口缓冲
System.out.println("按时间窗口缓冲:");
Observable.interval(250, TimeUnit.MILLISECONDS)
    .buffer(1, TimeUnit.SECONDS)  // 每1秒缓冲一次
    .take(3)  // 取前3个缓冲
    .subscribe(
        list -> System.out.println("收到 " + list.size() + " 个数据: " + list),
        error -> System.out.println("错误: " + error),
        () -> System.out.println("缓冲完成")
    );

// 等待执行
Thread.sleep(3500);

// 输出:
// === buffer操作符示例 ===
// 按4个一组缓冲: [1, 2, 3, 4] [5, 6, 7, 8] [9, 10]
// 按时间窗口缓冲:
// 收到 4 个数据: [0, 1, 2, 3]
// 收到 4 个数据: [4, 5, 6, 7]
// 收到 4 个数据: [8, 9, 10, 11]
// 缓冲完成

4.3.5 scan - 累积计算

System.out.println("=== scan操作符示例 ===");

// 累积求和
Observable.just(5, 3, 8, 1, 7)
    .scan(0, (partialSum, x) -> partialSum + x)
    .subscribe(sum -> System.out.print(sum + " "));
System.out.println();

// 累积求积
Observable.just(1, 2, 3, 4, 5)
    .scan(1, (product, x) -> product * x)
    .subscribe(product -> System.out.print(product + " "));
System.out.println();

// 字符串连接
Observable.just("Hello", " ", "World", "!")
    .scan("", (str, word) -> str + word)
    .subscribe(result -> System.out.println("中间结果: " + result));

// 输出:
// === scan操作符示例 ===
// 0 5 8 16 17 24
// 1 1 2 6 24 120
// 中间结果: Hello
// 中间结果: Hello
// 中间结果: Hello World
// 中间结果: Hello World!

4.4 组合操作符

4.4.1 merge - 合并多个Observable

System.out.println("=== merge操作符示例 ===");

Observable<String> names = Observable.just("Hello", "World");
Observable<String> otherNames = Observable.just("Git", "Code", "8");

Observable.merge(names, otherNames)
    .subscribe(name -> System.out.println("收到: " + name));

// 或者使用mergeWith
names.mergeWith(otherNames)
    .subscribe(name -> System.out.println("收到: " + name));

// 输出:
// === merge操作符示例 ===
// 收到: Hello
// 收到: World
// 收到: Git
// 收到: Code
// 收到: 8
// 收到: Hello
// 收到: World
// 收到: Git
// 收到: Code
// 收到: 8

// mergeDelayError示例:延迟错误处理
System.out.println("=== mergeDelayError示例 ===");
Observable<String> source1 = Observable.just("A", "B");
Observable<String> source2 = Observable.error(new RuntimeException("错误!"));
Observable<String> source3 = Observable.just("C", "D");

Observable.mergeDelayError(source1, source2, source3)
    .subscribe(
        item -> System.out.println("收到: " + item),
        error -> System.out.println("错误: " + error.getMessage())
    );

// 输出:
// === mergeDelayError示例 ===
// 收到: A
// 收到: B
// 收到: C
// 收到: D
// 错误: 错误!

4.4.2 zip - 一对一组合

System.out.println("=== zip操作符示例 ===");

Observable<String> colors = Observable.just("Red", "Green", "Blue");
Observable<String> animals = Observable.just("Fox", "Elephant", "Whale");
Observable<Integer> numbers = Observable.just(1, 2, 3);

Observable.zip(colors, animals, numbers,
        (color, animal, number) -> 
            number + ". " + color + " " + animal)
    .subscribe(item -> System.out.println(item));

// 输出:
// === zip操作符示例 ===
// 1. Red Fox
// 2. Green Elephant
// 3. Blue Whale

// 注意:如果Observable长度不同,以最短的为准
Observable<String> shortObs = Observable.just("A", "B");
Observable<String> longObs = Observable.just("1", "2", "3", "4");

Observable.zip(shortObs, longObs,
        (a, b) -> a + b)
    .subscribe(item -> System.out.print(item + " "));
System.out.println();

// 输出:
// A1 B2

4.4.3 combineLatest - 最新值组合

System.out.println("=== combineLatest操作符示例 ===");

PublishSubject<String> firstName = PublishSubject.create();
PublishSubject<String> lastName = PublishSubject.create();

Observable.combineLatest(firstName, lastName,
        (first, last) -> first + " " + last)
    .subscribe(fullName -> System.out.println("全名: " + fullName));

// 发射数据
firstName.onNext("John");
lastName.onNext("Doe");
firstName.onNext("Jane");
lastName.onNext("Smith");

// 输出:
// === combineLatest操作符示例 ===
// 全名: John Doe
// 全名: Jane Doe
// 全名: Jane Smith

// 实际应用:表单验证
System.out.println("=== 表单验证示例 ===");
PublishSubject<Boolean> usernameValid = PublishSubject.create();
PublishSubject<Boolean> passwordValid = PublishSubject.create();
PublishSubject<Boolean> emailValid = PublishSubject.create();

Observable.combineLatest(usernameValid, passwordValid, emailValid,
        (userOk, passOk, emailOk) -> userOk && passOk && emailOk)
    .subscribe(isValid -> 
        System.out.println("表单是否有效: " + isValid));

// 模拟用户输入
usernameValid.onNext(true);   // 用户名有效
passwordValid.onNext(false);  // 密码无效
emailValid.onNext(true);      // 邮箱有效
passwordValid.onNext(true);   // 密码变为有效

// 输出:
// === 表单验证示例 ===
// 表单是否有效: false (true && false && true)
// 表单是否有效: true  (true && true && true)

4.5 错误处理操作符

4.5.1 onErrorReturn - 错误时返回默认值

System.out.println("=== onErrorReturn操作符示例 ===");

Observable.create(emitter -> {
        emitter.onNext("数据1");
        emitter.onNext("数据2");
        emitter.onError(new RuntimeException("模拟错误"));
        emitter.onNext("数据3"); // 不会发射
    })
    .onErrorReturn(error -> {
        System.out.println("发生错误,返回默认值");
        return "默认值";
    })
    .subscribe(
        item -> System.out.println("收到: " + item),
        error -> System.out.println("错误处理: " + error), // 不会执行
        () -> System.out.println("完成")
    );

// 输出:
// === onErrorReturn操作符示例 ===
// 收到: 数据1
// 收到: 数据2
// 发生错误,返回默认值
// 收到: 默认值
// 完成

// onErrorReturnItem简化版
Observable.error(new RuntimeException("错误"))
    .onErrorReturnItem("默认项")
    .subscribe(
        item -> System.out.println("收到: " + item),
        error -> System.out.println("不会执行错误处理")
    );
// 输出:
// 收到: 默认项

4.5.2 onErrorResumeNext - 错误时切换到备用Observable

System.out.println("=== onErrorResumeNext操作符示例 ===");

Observable.create(emitter -> {
        emitter.onNext("主数据1");
        emitter.onNext("主数据2");
        emitter.onError(new IOException("网络错误"));
    })
    .onErrorResumeNext(throwable -> {
        if (throwable instanceof IOException) {
            System.out.println("网络错误,切换到本地缓存");
            return Observable.just("缓存数据1", "缓存数据2");
        }
        return Observable.error(throwable);
    })
    .subscribe(
        item -> System.out.println("收到: " + item),
        error -> System.out.println("错误: " + error.getMessage()),
        () -> System.out.println("完成")
    );

// 输出:
// === onErrorResumeNext操作符示例 ===
// 收到: 主数据1
// 收到: 主数据2
// 网络错误,切换到本地缓存
// 收到: 缓存数据1
// 收到: 缓存数据2
// 完成

4.5.3 retry - 重试机制

System.out.println("=== retry操作符示例 ===");

int attemptCount = 0;

Observable.create(emitter -> {
        attemptCount++;
        System.out.println("第 " + attemptCount + " 次尝试");
        
        if (attemptCount < 3) {
            emitter.onError(new IOException("网络连接失败"));
        } else {
            emitter.onNext("数据获取成功");
            emitter.onComplete();
        }
    })
    .retry(5)  // 最多重试5次
    .subscribe(
        data -> System.out.println("成功: " + data),
        error -> System.out.println("最终失败: " + error.getMessage()),
        () -> System.out.println("操作完成")
    );

// 输出:
// === retry操作符示例 ===
// 第 1 次尝试
// 第 2 次尝试
// 第 3 次尝试
// 成功: 数据获取成功
// 操作完成

// retryWhen示例:条件重试
System.out.println("=== retryWhen操作符示例 ===");
Observable.create(emitter -> {
        System.out.println("尝试连接服务器...");
        emitter.onError(new IOException("连接失败"));
    })
    .retryWhen(errors -> 
        errors.zipWith(Observable.range(1, 3), (error, retryCount) -> {
            if (retryCount > 3) {
                throw new RuntimeException("超过最大重试次数");
            }
            return retryCount;
        })
        .flatMap(retryCount -> {
            System.out.println("第 " + retryCount + " 次重试,等待 " + retryCount + " 秒");
            return Observable.timer(retryCount, TimeUnit.SECONDS);
        })
    )
    .subscribe(
        item -> System.out.println("成功"),
        error -> System.out.println("最终失败: " + error.getMessage())
    );

// 等待执行
Thread.sleep(10000);
// 输出:
// === retryWhen操作符示例 ===
// 尝试连接服务器...
// 第 1 次重试,等待 1 秒
// 尝试连接服务器...
// 第 2 次重试,等待 2 秒
// 尝试连接服务器...
// 第 3 次重试,等待 3 秒
// 尝试连接服务器...
// 最终失败: 超过最大重试次数

第五章:背压策略与Flowable

5.1 背压问题场景

System.out.println("=== 背压问题演示 ===");

// 创建快速生产数据的Observable
Observable<Long> fastProducer = Observable.interval(1, TimeUnit.MILLISECONDS);

// 慢速消费者
Disposable disposable = fastProducer
    .observeOn(Schedulers.computation())
    .subscribe(
        item -> {
            // 模拟慢速处理
            Thread.sleep(10);
            System.out.println("处理: " + item);
        },
        error -> {
            if (error instanceof MissingBackpressureException) {
                System.out.println("背压异常: " + error.getMessage());
            }
        }
    );

// 运行一段时间后停止
Thread.sleep(2000);
disposable.dispose();
System.out.println("停止订阅");

// 可能输出(具体数字可能不同):
// === 背压问题演示 ===
// 处理: 0
// 处理: 1
// 处理: 2
// ...(可能很快出现MissingBackpressureException)

5.2 Flowable的背压策略

System.out.println("=== Flowable背压策略 ===");

// 1. BUFFER策略 - 缓存所有来不及处理的数据
System.out.println("1. BUFFER策略:");
Flowable.interval(1, TimeUnit.MILLISECONDS)
    .onBackpressureBuffer(1000)  // 缓冲区大小1000
    .observeOn(Schedulers.computation())
    .take(10)  // 只取10个避免无限执行
    .subscribe(
        item -> {
            Thread.sleep(10);
            System.out.println("BUFFER处理: " + item);
        },
        error -> System.out.println("BUFFER错误: " + error)
    );

Thread.sleep(150);
System.out.println();

// 2. DROP策略 - 丢弃来不及处理的数据
System.out.println("2. DROP策略:");
Flowable.interval(1, TimeUnit.MILLISECONDS)
    .onBackpressureDrop(dropped -> 
        System.out.println("丢弃: " + dropped))
    .observeOn(Schedulers.computation())
    .take(10)
    .subscribe(
        item -> {
            Thread.sleep(10);
            System.out.println("DROP处理: " + item);
        }
    );

Thread.sleep(150);
System.out.println();

// 3. LATEST策略 - 只保留最新的数据
System.out.println("3. LATEST策略:");
Flowable.interval(1, TimeUnit.MILLISECONDS)
    .onBackpressureLatest()
    .observeOn(Schedulers.computation())
    .take(10)
    .subscribe(
        item -> {
            Thread.sleep(10);
            System.out.println("LATEST处理: " + item);
        }
    );

Thread.sleep(150);
System.out.println();

// 4. ERROR策略 - 抛出MissingBackpressureException
System.out.println("4. ERROR策略:");
Flowable.interval(1, TimeUnit.MILLISECONDS)
    .observeOn(Schedulers.computation())  // 没有背压策略
    .take(10)
    .subscribe(
        item -> {
            Thread.sleep(10);
            System.out.println("ERROR处理: " + item);
        },
        error -> {
            if (error instanceof MissingBackpressureException) {
                System.out.println("背压错误: " + error.getMessage());
            }
        }
    );

Thread.sleep(150);

// 输出(具体数字和顺序可能不同):
// === Flowable背压策略 ===
// 1. BUFFER策略:
// BUFFER处理: 0
// BUFFER处理: 1
// BUFFER处理: 2
// ...
// 
// 2. DROP策略:
// DROP处理: 0
// 丢弃: 1
// 丢弃: 2
// 丢弃: 3
// DROP处理: 4
// ...
// 
// 3. LATEST策略:
// LATEST处理: 0
// LATEST处理: 24
// LATEST处理: 49
// ...
// 
// 4. ERROR策略:
// ERROR处理: 0
// 背压错误: Could not emit value due to lack of requests

5.3 背压策略选择指南

策略 适用场景 优点 缺点
BUFFER 数据重要,不能丢失 不丢失数据 可能内存溢出
DROP 实时数据,可以丢弃旧数据 内存友好 丢失数据
LATEST 只需要最新数据 内存友好,保留最新数据 丢失中间数据
ERROR 需要及时发现问题 快速失败 用户体验差
// 实际应用示例:实时传感器数据
Flowable<SensorData> sensorFlowable = Flowable.create(emitter -> {
    // 模拟传感器每秒产生1000个数据
    for (int i = 0; i < 1000; i++) {
        emitter.onNext(new SensorData(i));
    }
    emitter.onComplete();
}, BackpressureStrategy.BUFFER);  // 直接在创建时指定策略

// 或者使用已有Observable转换
Observable<SensorData> sensorObservable = Observable.create(emitter -> {
    for (int i = 0; i < 1000; i++) {
        emitter.onNext(new SensorData(i));
    }
    emitter.onComplete();
});

Flowable<SensorData> sensorFlowable = sensorObservable
    .toFlowable(BackpressureStrategy.LATEST);

第六章:Android实战应用

6.1 网络请求组合

// 多个网络请求顺序执行
public class NetworkExample {
    
    // 模拟API接口
    interface ApiService {
        Observable<User> getUser(String userId);
        Observable<List<Order>> getOrders(String userId);
        Observable<Address> getAddress(String userId);
    }
    
    public void fetchUserData(String userId) {
        System.out.println("开始获取用户数据...");
        
        Observable<User> userObservable = api.getUser(userId);
        Observable<List<Order>> ordersObservable = api.getOrders(userId);
        
        // flatMap顺序执行
        userObservable
            .doOnNext(user -> System.out.println("获取到用户: " + user.getName()))
            .flatMap(user -> 
                ordersObservable
                    .doOnNext(orders -> 
                        System.out.println("获取到订单: " + orders.size() + " 个"))
                    .map(orders -> {
                        user.setOrders(orders);
                        return user;
                    })
            )
            .flatMap(user -> 
                api.getAddress(userId)
                    .doOnNext(address -> 
                        System.out.println("获取到地址: " + address.getCity()))
                    .map(address -> {
                        user.setAddress(address);
                        return user;
                    })
            )
            .subscribeOn(Schedulers.io())
            .observeOn(AndroidSchedulers.mainThread())
            .subscribe(
                user -> {
                    System.out.println("数据获取完成");
                    updateUI(user);
                },
                error -> {
                    System.out.println("数据获取失败: " + error.getMessage());
                    showError(error);
                }
            );
    }
    
    private void updateUI(User user) {
        // 更新UI
        System.out.println("更新UI: " + user);
    }
    
    private void showError(Throwable error) {
        // 显示错误
        System.out.println("显示错误: " + error.getMessage());
    }
}

// 模拟执行输出:
// 开始获取用户数据...
// 获取到用户: John Doe
// 获取到订单: 5 个
// 获取到地址: New York
// 数据获取完成
// 更新UI: User{name='John Doe', orders=5, address='New York'}

6.2 并行请求与结果合并

public class ParallelRequestsExample {
    
    public void fetchAllData(String userId) {
        System.out.println("开始并行获取数据...");
        
        Observable<UserInfo> userInfoObs = api.getUserInfo(userId)
            .subscribeOn(Schedulers.io())
            .doOnNext(info -> System.out.println("用户信息获取完成"));
            
        Observable<UserProfile> profileObs = api.getUserProfile(userId)
            .subscribeOn(Schedulers.io())
            .doOnNext(profile -> System.out.println("用户资料获取完成"));
            
        Observable<List<Friend>> friendsObs = api.getFriends(userId)
            .subscribeOn(Schedulers.io())
            .doOnNext(friends -> 
                System.out.println("好友列表获取完成: " + friends.size() + " 个好友"));
        
        // 使用zip并行执行,等待所有请求完成
        Observable.zip(userInfoObs, profileObs, friendsObs,
                (userInfo, profile, friends) -> {
                    UserData data = new UserData();
                    data.setUserInfo(userInfo);
                    data.setProfile(profile);
                    data.setFriends(friends);
                    return data;
                })
            .observeOn(AndroidSchedulers.mainThread())
            .subscribe(
                userData -> {
                    System.out.println("所有数据获取完成");
                    displayUserData(userData);
                },
                error -> {
                    System.out.println("数据获取失败: " + error.getMessage());
                    handleError(error);
                }
            );
    }
    
    // 或者使用combineLatest(不等待全部完成)
    public void fetchDataIncrementally(String userId) {
        Observable.combineLatest(
                api.getUserInfo(userId),
                api.getUserProfile(userId),
                api.getFriends(userId),
                (userInfo, profile, friends) -> 
                    new UserData(userInfo, profile, friends))
            .subscribe(userData -> {
                System.out.println("有数据更新");
                // 每次任一数据源更新都会触发
            });
    }
}

// 模拟执行输出:
// 开始并行获取数据...
// 用户信息获取完成
// 用户资料获取完成
// 好友列表获取完成: 25 个好友
// 所有数据获取完成
// 显示用户数据...

6.3 表单验证

public class FormValidationExample {
    
    private EditText etUsername;
    private EditText etPassword;
    private EditText etEmail;
    private Button btnSubmit;
    
    private CompositeDisposable disposables = new CompositeDisposable();
    
    public void setupFormValidation() {
        System.out.println("设置表单验证...");
        
        // 使用RxBinding监听EditText变化
        Observable<Boolean> usernameValid = 
            RxTextView.textChanges(etUsername)
                .skip(1)  // 跳过初始值
                .map(CharSequence::toString)
                .map(text -> text.length() >= 3)
                .distinctUntilChanged()
                .doOnNext(valid -> 
                    System.out.println("用户名验证: " + (valid ? "有效" : "无效")));
        
        Observable<Boolean> passwordValid = 
            RxTextView.textChanges(etPassword)
                .skip(1)
                .map(CharSequence::toString)
                .map(text -> text.length() >= 6)
                .distinctUntilChanged()
                .doOnNext(valid -> 
                    System.out.println("密码验证: " + (valid ? "有效" : "无效")));
        
        Observable<Boolean> emailValid = 
            RxTextView.textChanges(etEmail)
                .skip(1)
                .map(CharSequence::toString)
                .map(text -> Patterns.EMAIL_ADDRESS.matcher(text).matches())
                .distinctUntilChanged()
                .doOnNext(valid -> 
                    System.out.println("邮箱验证: " + (valid ? "有效" : "无效")));
        
        // 合并所有验证结果
        disposables.add(
            Observable.combineLatest(usernameValid, passwordValid, emailValid,
                    (usernameOk, passwordOk, emailOk) -> 
                        usernameOk && passwordOk && emailOk)
                .distinctUntilChanged()
                .subscribe(isValid -> {
                    System.out.println("表单整体验证: " + 
                        (isValid ? "有效,启用按钮" : "无效,禁用按钮"));
                    btnSubmit.setEnabled(isValid);
                    btnSubmit.setAlpha(isValid ? 1.0f : 0.5f);
                })
        );
        
        // 监听提交按钮点击
        disposables.add(
            RxView.clicks(btnSubmit)
                .throttleFirst(1, TimeUnit.SECONDS)  // 防止重复点击
                .subscribe(event -> {
                    System.out.println("提交表单...");
                    submitForm();
                })
        );
    }
    
    private void submitForm() {
        String username = etUsername.getText().toString();
        String password = etPassword.getText().toString();
        String email = etEmail.getText().toString();
        
        // 发送注册请求
        api.register(username, password, email)
            .subscribeOn(Schedulers.io())
            .observeOn(AndroidSchedulers.mainThread())
            .doOnSubscribe(disposable -> 
                System.out.println("开始注册..."))
            .subscribe(
                response -> {
                    System.out.println("注册成功: " + response.getMessage());
                    showSuccess();
                },
                error -> {
                    System.out.println("注册失败: " + error.getMessage());
                    showError(error.getMessage());
                }
            );
    }
    
    @Override
    protected void onDestroy() {
        super.onDestroy();
        disposables.clear();  // 防止内存泄漏
    }
}

// 模拟用户操作输出:
// 设置表单验证...
// 用户输入: "jo" (用户名)
// 用户名验证: 无效
// 表单整体验证: 无效,禁用按钮
// 用户输入: "joh" (用户名)
// 用户名验证: 有效
// 用户输入: "12345" (密码)
// 密码验证: 无效
// 表单整体验证: 无效,禁用按钮
// 用户输入: "123456" (密码)
// 密码验证: 有效
// 用户输入: "test@" (邮箱)
// 邮箱验证: 无效
// 表单整体验证: 无效,禁用按钮
// 用户输入: "[email protected]" (邮箱)
// 邮箱验证: 有效
// 表单整体验证: 有效,启用按钮
// 点击提交按钮
// 开始注册...
// 注册成功: 注册成功

6.4 防止内存泄漏

public class LeakFreeActivity extends AppCompatActivity {
    
    private CompositeDisposable disposables = new CompositeDisposable();
    private PublishSubject<String> searchSubject = PublishSubject.create();
    
    @Override
    protected void onCreate(Bundle savedInstanceState) {
        super.onCreate(savedInstanceState);
        
        setupSearch();
        loadData();
    }
    
    private void setupSearch() {
        System.out.println("设置搜索功能...");
        
        disposables.add(
            searchSubject
                .debounce(300, TimeUnit.MILLISECONDS)  // 防抖
                .distinctUntilChanged()  // 去重
                .switchMap(query -> {
                    if (query.isEmpty()) {
                        return Observable.just(new ArrayList<String>());
                    }
                    return api.search(query)
                        .onErrorResumeNext(Observable.just(new ArrayList<String>()));
                })
                .subscribeOn(Schedulers.io())
                .observeOn(AndroidSchedulers.mainThread())
                .subscribe(
                    results -> updateSearchResults(results),
                    error -> showError("搜索失败: " + error.getMessage())
                )
        );
    }
    
    private void loadData() {
        System.out.println("加载数据...");
        
        // 添加订阅到CompositeDisposable
        disposables.add(
            api.getData()
                .subscribeOn(Schedulers.io())
                .observeOn(AndroidSchedulers.mainThread())
                .doOnSubscribe(disposable -> 
                    showLoading(true))
                .doFinally(() -> 
                    showLoading(false))
                .subscribe(
                    data -> {
                        System.out.println("数据加载成功");
                        updateUI(data);
                    },
                    error -> {
                        System.out.println("数据加载失败: " + error.getMessage());
                        showError("加载失败,请重试");
                    }
                )
        );
    }
    
    // 搜索输入
    public void onSearchTextChanged(String text) {
        searchSubject.onNext(text);
    }
    
    @Override
    protected void onDestroy() {
        super.onDestroy();
        System.out.println("Activity销毁,清理订阅");
        
        // 取消所有订阅,防止内存泄漏
        disposables.clear();
        
        // 或者使用disposables.dispose()也可以
    }
}

// 模拟执行输出:
// 设置搜索功能...
// 加载数据...
// 显示加载中...
// 数据加载成功
// 隐藏加载中...
// 更新UI...
// 用户输入搜索: "rx"
// 用户输入搜索: "rxj"
// 开始搜索: rxj
// 搜索完成,更新结果
// Activity销毁,清理订阅

第七章:RxJava 2到RxJava 3的迁移

7.1 主要变化总结

// ========== 1. 包名变更 ==========
// RxJava 2
import io.reactivex.Observable;
import io.reactivex.schedulers.Schedulers;

// RxJava 3 (包名中添加了.rxjava3)
import io.reactivex.rxjava3.core.Observable;
import io.reactivex.rxjava3.schedulers.Schedulers;

// ========== 2. API变更示例 ==========
System.out.println("=== RxJava 2 -> RxJava 3 API变更 ===");

// 2.1 as() 改为 to()
// RxJava 2: observable.as(...)
// RxJava 3: observable.to(...)

// 2.2 startWith() 分为 startWithItem() 和 startWithIterable()
Observable.just("B", "C")
    .startWithItem("A")  // RxJava 3新增
    .subscribe(item -> System.out.print(item + " "));
System.out.println();
// 输出: A B C

Observable.just("D", "E")
    .startWithIterable(Arrays.asList("A", "B", "C"))  // RxJava 3新增
    .subscribe(item -> System.out.print(item + " "));
System.out.println();
// 输出: A B C D E

// 2.3 Maybe.defaultIfEmpty() 返回类型变化
// RxJava 2: 返回Maybe<T>
// RxJava 3: 返回Single<T>
Maybe.just("数据")
    .defaultIfEmpty("默认值")
    .subscribe(
        data -> System.out.println("收到: " + data),
        error -> System.out.println("错误")
    );
// 输出: 收到: 数据

Maybe.empty()
    .defaultIfEmpty("默认值")
    .subscribe(
        data -> System.out.println("收到: " + data),
        error -> System.out.println("错误")
    );
// 输出: 收到: 默认值

// 2.4 Supplier代替Callable
// RxJava 2: fromCallable()
// RxJava 3: fromSupplier() (推荐)
Observable.fromSupplier(() -> {
    System.out.println("执行Supplier");
    return "数据";
}).subscribe(System.out::println);
// 输出: 执行Supplier
// 输出: 数据

// ========== 3. 删除的功能 ==========
// 以下在RxJava 3中已被删除:
// - observable.as(...)  // 改用to(...)
// - observable.startWith(T)  // 改用startWithItem(T)
// - observable.startWith(Iterable)  // 改用startWithIterable(Iterable)
// - Maybe.toSingle(T)  // 改用defaultIfEmpty(T).toSingle()
// - Single.toCompletable()
// - Flowable.subscribe(4个参数的重载)
// - Observable.subscribe(4个参数的重载)

7.2 迁移步骤

步骤1:更新依赖

// build.gradle (Module: app)
dependencies {
    // 从RxJava 2迁移到RxJava 3
    implementation 'io.reactivex.rxjava3:rxjava:3.1.6'
    implementation 'io.reactivex.rxjava3:rxandroid:3.0.2'
    
    // 如果有Retrofit,也需要更新
    implementation 'com.squareup.retrofit2:adapter-rxjava3:2.9.0'
    
    // 如果有RxBinding
    implementation 'com.jakewharton.rxbinding4:rxbinding:4.0.0'
}

步骤2:更新导入语句

// 批量替换导入语句
// 将:
import io.reactivex.*;
import io.reactivex.android.schedulers.AndroidSchedulers;
import io.reactivex.schedulers.Schedulers;
import io.reactivex.disposables.CompositeDisposable;
import io.reactivex.disposables.Disposable;
import io.reactivex.functions.*;
import io.reactivex.subjects.*;

// 替换为:
import io.reactivex.rxjava3.core.*;
import io.reactivex.rxjava3.android.schedulers.AndroidSchedulers;
import io.reactivex.rxjava3.schedulers.Schedulers;
import io.reactivex.rxjava3.disposables.CompositeDisposable;
import io.reactivex.rxjava3.disposables.Disposable;
import io.reactivex.rxjava3.functions.*;
import io.reactivex.rxjava3.subjects.*;

步骤3:更新API调用

// 常见的需要更新的API调用
public class MigrationExamples {
    
    // 1. as() -> to()
    public void example1() {
        // RxJava 2: observable.as(AutoDispose.autoDisposable(scope))
        // RxJava 3: observable.to(AutoDispose.autoDisposable(scope))
    }
    
    // 2. startWith() -> startWithItem() / startWithIterable()
    public void example2() {
        // RxJava 2: observable.startWith("A")
        // RxJava 3: observable.startWithItem("A")
        
        // RxJava 2: observable.startWith(Arrays.asList("A", "B"))
        // RxJava 3: observable.startWithIterable(Arrays.asList("A", "B"))
    }
    
    // 3. Maybe.toSingle(T) 的替代方案
    public void example3() {
        Maybe<String> maybe = Maybe.empty();
        
        // RxJava 2: maybe.toSingle("默认值")
        // RxJava 3: maybe.defaultIfEmpty("默认值").toSingle()
        
        Single<String> single = maybe
            .defaultIfEmpty("默认值")
            .toSingle();
    }
    
    // 4. 删除的subscribe重载
    public void example4() {
        // RxJava 2: observable.subscribe(onNext, onError, onComplete, onSubscribe)
        // RxJava 3: 这个重载已被删除,需要分开处理
        
        // RxJava 3的写法:
        Disposable disposable = observable.subscribe(
            item -> { /* onNext */ },
            error -> { /* onError */ },
            () -> { /* onComplete */ }
        );
        // 如果需要onSubscribe回调,使用doOnSubscribe
        observable.doOnSubscribe(d -> { /* onSubscribe */ })
            .subscribe(...);
    }
    
    // 5. 使用Supplier代替Callable
    public void example5() {
        // RxJava 2: Observable.fromCallable(() -> "数据")
        // RxJava 3: Observable.fromSupplier(() -> "数据") (推荐)
        // 注意: fromCallable()仍然存在,但推荐使用fromSupplier()
    }
}

步骤4:使用迁移工具
对于大型项目,可以使用IDE的查找替换功能:

  1. 使用"Find in Path"查找所有RxJava 2的导入
  2. 使用正则表达式批量替换
  3. 编译并修复API不兼容的错误

第八章:最佳实践与性能优化

8.1 最佳实践

实践1:合理选择Observable类型

public class ObservableTypeSelection {
    
    public void selectRightType() {
        // 场景1:网络请求返回单个结果
        // 正确:使用Single
        Single<User> userSingle = api.getUser(userId);
        
        // 场景2:可能返回null的查询
        // 正确:使用Maybe
        Maybe<User> userMaybe = api.findUserByEmail(email);
        
        // 场景3:只关心操作是否成功
        // 正确:使用Completable
        Completable updateCompletable = api.updateUser(user);
        
        // 场景4:UI事件流(点击、文本变化)
        // 正确:使用Observable(数据量小,不需要背压)
        Observable<ViewClickEvent> clickObservable = RxView.clicks(button);
        
        // 场景5:大量数据流(传感器数据、日志流)
        // 正确:使用Flowable(需要背压控制)
        Flowable<SensorData> sensorFlowable = sensorManager.getSensorData();
    }
}

实践2:合理使用调度器

public class SchedulerBestPractices {
    
    public void properSchedulerUsage() {
        // 好的实践:明确指定每个阶段的线程
        api.getData()
            .subscribeOn(Schedulers.io())          // 第1步:网络请求在IO线程
            .observeOn(Schedulers.computation())   // 第2步:数据处理在计算线程
            .map(data -> {
                // 复杂计算或数据转换
                return processData(data);
            })
            .observeOn(Schedulers.io())            // 第3步:再次IO操作(如保存到数据库)
            .flatMap(processedData -> 
                database.save(processedData))
            .observeOn(AndroidSchedulers.mainThread()) // 第4步:UI更新在主线程
            .subscribe(
                result -> updateUI(result),
                error -> showError(error)
            );
            
        // 避免的实践:不必要的线程切换
        api.getData()
            .subscribeOn(Schedulers.io())
            .observeOn(AndroidSchedulers.mainThread()) // 切换到主线程
            .map(data -> processData(data))            // 错误:在主线程做复杂计算
            .subscribe(...);
            
        // 改进:将复杂计算放在合适的线程
        api.getData()
            .subscribeOn(Schedulers.io())
            .map(data -> processData(data))            // 在IO线程处理数据
            .observeOn(AndroidSchedulers.mainThread()) // 然后切换到主线程
            .subscribe(...);
    }
}

实践3:避免内存泄漏

public class MemoryLeakPrevention {
    
    // 方法1:使用CompositeDisposable
    private CompositeDisposable disposables = new CompositeDisposable();
    
    public void safeSubscribe() {
        disposables.add(
            api.getData()
                .subscribe(data -> updateUI(data))
        );
        
        disposables.add(
            RxView.clicks(button)
                .subscribe(event -> handleClick())
        );
    }
    
    @Override
    protected void onDestroy() {
        super.onDestroy();
        disposables.clear();  // 清理所有订阅
    }
    
    // 方法2:使用AutoDispose(需要额外依赖)
    public void useAutoDispose() {
        api.getData()
            .as(AutoDispose.autoDisposable(AndroidLifecycleScopeProvider.from(this)))
            .subscribe(data -> updateUI(data));
    }
    
    // 方法3:手动管理Disposable
    private Disposable manualDisposable;
    
    public void manualManagement() {
        manualDisposable = api.getData()
            .subscribe(data -> updateUI(data));
    }
    
    @Override
    protected void onDestroy() {
        super.onDestroy();
        if (manualDisposable != null && !manualDisposable.isDisposed()) {
            manualDisposable.dispose();  // 手动取消订阅
        }
    }
}

8.2 性能优化

优化1:避免不必要的对象创建

public class PerformanceOptimization {
    
    // 不好的实践:每次订阅都创建新对象
    public void badPractice() {
        Observable.range(1, 1000)
            .map(x -> new ExpensiveProcessor().process(x))  // 每次创建新对象
            .subscribe(...);
    }
    
    // 好的实践:重用对象
    public void goodPractice() {
        ExpensiveProcessor processor = new ExpensiveProcessor();  // 创建一次
        
        Observable.range(1, 1000)
            .map(processor::process)  // 重用对象
            .subscribe(...);
    }
    
    // 特别优化:使用原始类型避免装箱拆箱
    public void primitiveOptimization() {
        // 对于大量整数操作,使用IntObservable可以避免装箱
        Observable<Integer> boxed = Observable.range(1, 1000000);  // 产生Integer对象
        
        // 如果使用RxJava扩展库,可以使用原始类型Observable
        // ObservableInt.range(1, 1000000)  // 产生int值,避免装箱
    }
}

优化2:合理使用缓存

public class CacheOptimization {
    
    public void useCache() {
        // 场景:多次订阅相同数据
        Observable<String> networkData = api.getData()
            .subscribeOn(Schedulers.io())
            .cache();  // 缓存结果,后续订阅直接使用缓存
        
        // 第一次订阅:发起网络请求
        networkData.subscribe(data -> System.out.println("订阅1: " + data));
        
        // 第二次订阅:使用缓存,不发起网络请求
        networkData.subscribe(data -> System.out.println("订阅2: " + data));
        
        // 注意:cache()会永久缓存,如果需要限制缓存时间,使用replay()
        Observable<String> timedCache = api.getData()
            .subscribeOn(Schedulers.io())
            .replay(1, TimeUnit.MINUTES)  // 缓存1分钟
            .autoConnect();
    }
    
    // 避免无限缓存导致内存泄漏
    public void avoidMemoryLeak() {
        // 错误的缓存使用
        Observable<BigData> badCache = api.getBigData()
            .cache();  // 如果数据很大,会一直占用内存
            
        // 改进:使用弱引用或限制缓存大小
        Observable<BigData> betterCache = api.getBigData()
            .replay(1)  // 只缓存最近1个数据
            .autoConnect();
    }
}

优化3:使用适当的背压策略

public class BackpressureOptimization {
    
    public void optimizeBackpressure() {
        // 场景:快速生产,慢速消费
        
        // 方案1:使用合适的缓冲区大小
        Flowable.interval(1, TimeUnit.MILLISECONDS)
            .onBackpressureBuffer(1000,  // 缓冲区大小
                () -> System.out.println("缓冲区溢出"),
                BackpressureOverflowStrategy.DROP_OLDEST)  // 溢出策略
            .subscribe(...);
        
        // 方案2:采样,降低数据频率
        Flowable.interval(1, TimeUnit.MILLISECONDS)
            .sample(10, TimeUnit.MILLISECONDS)  // 每10ms采样一次
            .subscribe(...);
        
        // 方案3:批量处理
        Flowable.interval(1, TimeUnit.MILLISECONDS)
            .buffer(100, TimeUnit.MILLISECONDS)  // 每100ms缓冲一次
            .subscribe(batch -> {
                // 批量处理数据
                processBatch(batch);
            });
    }
}

第九章:调试与测试

9.1 调试技巧

public class DebuggingTechniques {
    
    public void basicDebugging() {
        System.out.println("=== RxJava调试技巧 ===");
        
        // 1. 使用doOn操作符添加日志
        Observable.just("A", "B", "C")
            .doOnSubscribe(d -> 
                System.out.println("开始订阅"))
            .doOnNext(item -> 
                System.out.println("发射: " + item))
            .doOnError(error -> 
                System.err.println("错误: " + error))
            .doOnComplete(() -> 
                System.out.println("完成发射"))
            .doOnDispose(() -> 
                System.out.println("取消订阅"))
            .subscribe(...);
        
        // 2. 使用compose操作符统一添加调试逻辑
        Observable.just("Data")
            .compose(addDebugging("测试流"))
            .subscribe(...);
    }
    
    // 统一的调试Transformer
    private <T> ObservableTransformer<T, T> addDebugging(String tag) {
        return upstream -> upstream
            .doOnSubscribe(d -> 
                System.out.println(tag + ": 订阅开始"))
            .doOnNext(item -> 
                System.out.println(tag + ": 发射 " + item))
            .doOnError(error -> 
                System.err.println(tag + ": 错误 " + error))
            .doOnComplete(() -> 
                System.out.println(tag + ": 完成"))
            .doOnDispose(() -> 
                System.out.println(tag + ": 取消订阅"));
    }
    
    // 3. 使用RxJavaPlugins全局监控
    public void setupGlobalDebugging() {
        RxJavaPlugins.setErrorHandler(error -> {
            System.err.println("RxJava全局错误: " + error);
            // 可以发送到错误统计服务器
        });
        
        RxJavaPlugins.setScheduleHandler((runnable) -> {
            System.out.println("调度任务: " + runnable);
            return runnable;
        });
    }
    
    // 4. 使用TestObserver进行调试
    public void testObserverDebugging() {
        TestObserver<String> testObserver = new TestObserver<String>() {
            @Override
            public void onNext(String value) {
                System.out.println("TestObserver收到: " + value);
                super.onNext(value);
            }
            
            @Override
            public void onError(Throwable e) {
                System.err.println("TestObserver错误: " + e);
                super.onError(e);
            }
            
            @Override
            public void onComplete() {
                System.out.println("TestObserver完成");
                super.onComplete();
            }
        };
        
        Observable.just("A", "B", "C")
            .subscribe(testObserver);
        
        // 检查断言
        testObserver.assertValues("A", "B", "C");
        testObserver.assertComplete();
    }
}

9.2 单元测试

public class RxJavaUnitTest {
    
    @Test
    public void testObservable() {
        System.out.println("=== Observable单元测试 ===");
        
        // 创建TestObserver
        TestObserver<String> testObserver = new TestObserver<>();
        
        // 执行被测Observable
        Observable.just("Hello", "World")
            .map(String::toUpperCase)
            .subscribe(testObserver);
        
        // 验证结果
        testObserver.assertValues("HELLO", "WORLD");
        testObserver.assertNoErrors();
        testObserver.assertComplete();
        
        System.out.println("测试通过!");
    }
    
    @Test
    public void testErrorHandling() {
        System.out.println("=== 错误处理测试 ===");
        
        TestObserver<String> testObserver = new TestObserver<>();
        
        Observable.<String>error(new RuntimeException("测试错误"))
            .onErrorReturn(error -> "默认值")
            .subscribe(testObserver);
        
        testObserver.assertValue("默认值");
        testObserver.assertNoErrors();
        testObserver.assertComplete();
        
        System.out.println("错误处理测试通过!");
    }
    
    @Test
    public void testWithTestScheduler() {
        System.out.println("=== 时间相关测试 ===");
        
        TestScheduler testScheduler = new TestScheduler();
        TestObserver<Long> testObserver = new TestObserver<>();
        
        Observable.interval(1, TimeUnit.SECONDS, testScheduler)
            .take(3)
            .subscribe(testObserver);
        
        // 初始状态:没有数据
        testObserver.assertNoValues();
        System.out.println("初始状态: 无数据");
        
        // 快进1秒
        testScheduler.advanceTimeBy(1, TimeUnit.SECONDS);
        testObserver.assertValues(0L);
        System.out.println("1秒后: 收到 0");
        
        // 快进2秒
        testScheduler.advanceTimeBy(2, TimeUnit.SECONDS);
        testObserver.assertValues(0L, 1L, 2L);
        System.out.println("3秒后: 收到 0, 1, 2");
        
        testObserver.assertComplete();
        System.out.println("时间测试通过!");
    }
    
    @Test
    public void testAsyncOperation() {
        System.out.println("=== 异步操作测试 ===");
        
        // 使用TestScheduler控制异步操作
        TestScheduler testScheduler = new TestScheduler();
        
        Observable<String> asyncObservable = Observable
            .fromCallable(() -> {
                System.out.println("执行异步操作");
                return "结果";
            })
            .subscribeOn(Schedulers.io())
            .observeOn(testScheduler);  // 使用TestScheduler
        
        TestObserver<String> testObserver = new TestObserver<>();
        asyncObservable.subscribe(testObserver);
        
        // 触发所有待执行的任务
        testScheduler.triggerActions();
        
        testObserver.assertValue("结果");
        testObserver.assertComplete();
        
        System.out.println("异步测试通过!");
    }
}

9.3 集成测试

public class RxJavaIntegrationTest {
    
    @Mock
    private ApiService apiService;
    
    @Before
    public void setUp() {
        MockitoAnnotations.initMocks(this);
    }
    
    @Test
    public void testNetworkRequest() {
        System.out.println("=== 网络请求集成测试 ===");
        
        // 模拟API响应
        User mockUser = new User("John", "Doe");
        when(apiService.getUser("123"))
            .thenReturn(Observable.just(mockUser));
        
        // 执行测试
        TestObserver<User> testObserver = new TestObserver<>();
        apiService.getUser("123")
            .subscribe(testObserver);
        
        // 验证
        testObserver.assertValue(mockUser);
        testObserver.assertComplete();
        
        verify(apiService).getUser("123");
        
        System.out.println("网络请求测试通过!");
    }
    
    @Test
    public void testErrorScenario() {
        System.out.println("=== 错误场景测试 ===");
        
        // 模拟网络错误
        when(apiService.getUser("999"))
            .thenReturn(Observable.error(new IOException("网络错误")));
        
        TestObserver<User> testObserver = new TestObserver<>();
        apiService.getUser("999")
            .onErrorResumeNext(error -> {
                if (error instanceof IOException) {
                    return Observable.just(new User("默认", "用户"));
                }
                return Observable.error(error);
            })
            .subscribe(testObserver);
        
        testObserver.assertValue(user -> 
            user.getFirstName().equals("默认"));
        testObserver.assertComplete();
        
        System.out.println("错误场景测试通过!");
    }
}

第十章:常见问题与解决方案

10.1 为什么onComplete不执行?

public class CommonIssue1 {
    
    public void whyOnCompleteNotCalled() {
        System.out.println("=== 问题:onComplete不执行 ===");
        
        // 情况1:使用了Single或Maybe
        Single.just("数据")
            .subscribe(
                data -> System.out.println("收到: " + data),
                error -> System.out.println("错误: " + error),
                () -> System.out.println("完成")  // 这里不会执行!
            );
        // 解释:Single没有onComplete,只有onSuccess和onError
        
        // 情况2:发生了未处理的错误
        Observable.create(emitter -> {
                emitter.onNext("数据1");
                emitter.onError(new RuntimeException("错误"));
                emitter.onNext("数据2");  // 不会发射
                emitter.onComplete();     // 不会执行
            })
            .subscribe(
                data -> System.out.println("收到: " + data),
                error -> System.out.println("错误: " + error),
                () -> System.out.println("完成")  // 不会执行,因为发生了错误
            );
        
        // 情况3:使用了永远不会结束的Observable
        Observable.interval(1, TimeUnit.SECONDS)
            .subscribe(
                data -> System.out.println("收到: " + data),
                error -> System.out.println("错误: " + error),
                () -> System.out.println("完成")  // 不会执行,因为interval永不结束
            );
            
        // 解决方案:使用take操作符限制数量
        Observable.interval(1, TimeUnit.SECONDS)
            .take(3)  // 只取3个
            .subscribe(
                data -> System.out.println("收到: " + data),
                error -> System.out.println("错误: " + error),
                () -> System.out.println("完成")  // 现在会执行
            );
    }
}

10.2 如何处理背压异常?

public class CommonIssue2 {
    
    public void handleBackpressureException() {
        System.out.println("=== 问题:MissingBackpressureException ===");
        
        // 问题场景:快速生产,慢速消费
        Observable.interval(1, TimeUnit.MILLISECONDS)
            .observeOn(Schedulers.computation())
            .subscribe(
                item -> {
                    Thread.sleep(100);  // 慢速消费
                    System.out.println("消费: " + item);
                },
                error -> {
                    // 会收到MissingBackpressureException
                    System.out.println("错误: " + error.getClass().getSimpleName());
                }
            );
        
        // 解决方案1:使用Flowable
        Flowable.interval(1, TimeUnit.MILLISECONDS)
            .onBackpressureBuffer(1000)  // 添加缓冲区
            .observeOn(Schedulers.computation())
            .subscribe(
                item -> {
                    Thread.sleep(100);
                    System.out.println("Flowable消费: " + item);
                }
            );
        
        // 解决方案2:使用合适的背压策略
        Observable.interval(1, TimeUnit.MILLISECONDS)
            .toFlowable(BackpressureStrategy.DROP)  // 转换为Flowable并指定策略
            .observeOn(Schedulers.computation())
            .subscribe(...);
        
        // 解决方案3:采样降低频率
        Observable.interval(1, TimeUnit.MILLISECONDS)
            .sample(10, TimeUnit.MILLISECONDS)  // 每10ms采样一次
            .subscribe(...);
            
        // 解决方案4:批量处理
        Observable.interval(1, TimeUnit.MILLISECONDS)
            .buffer(100, TimeUnit.MILLISECONDS)  // 每100ms缓冲一次
            .subscribe(batch -> {
                // 批量处理
                processBatch(batch);
            });
    }
}

10.3 如何取消订阅?

public class CommonIssue3 {
    
    public void howToUnsubscribe() {
        System.out.println("=== 问题:如何正确取消订阅 ===");
        
        // 方法1:使用Disposable
        Disposable disposable1 = Observable.interval(1, TimeUnit.SECONDS)
            .subscribe(item -> System.out.println("流1: " + item));
        
        Disposable disposable2 = Observable.interval(500, TimeUnit.MILLISECONDS)
            .subscribe(item -> System.out.println("流2: " + item));
        
        // 取消单个订阅
        disposable1.dispose();
        System.out.println("流1已取消");
        
        // 方法2:使用CompositeDisposable(推荐)
        CompositeDisposable compositeDisposable = new CompositeDisposable();
        
        compositeDisposable.add(
            Observable.interval(1, TimeUnit.SECONDS)
                .subscribe(item -> System.out.println("复合流1: " + item))
        );
        
        compositeDisposable.add(
            Observable.interval(500, TimeUnit.MILLISECONDS)
                .subscribe(item -> System.out.println("复合流2: " + item))
        );
        
        // 取消所有订阅
        compositeDisposable.clear();
        System.out.println("所有复合流已取消");
        
        // 方法3:使用takeUntil操作符
        PublishSubject<Boolean> stopSignal = PublishSubject.create();
        
        Observable.interval(1, TimeUnit.SECONDS)
            .takeUntil(stopSignal)  // 直到stopSignal发射数据
            .subscribe(item -> System.out.println("条件流: " + item));
        
        // 发送停止信号
        stopSignal.onNext(true);
        System.out.println("条件流已停止");
        
        // 方法4:使用doOnSubscribe获取Disposable
        Observable.interval(1, TimeUnit.SECONDS)
            .doOnSubscribe(d -> {
                // 在这里保存Disposable
                Disposable d2 = d;
                // 可以添加到CompositeDisposable
            })
            .subscribe(...);
            
        // 重要:检查Disposable状态
        if (disposable1 != null && !disposable1.isDisposed()) {
            disposable1.dispose();
        }
    }
}

10.4 如何处理线程安全问题?

public class CommonIssue4 {
    
    private int counter = 0;
    
    public void threadSafetyIssue() {
        System.out.println("=== 问题:线程安全 ===");
        
        // 问题示例:多线程修改共享状态
        Observable.range(1, 1000)
            .subscribeOn(Schedulers.computation())
            .flatMap(i -> 
                Observable.just(i)
                    .subscribeOn(Schedulers.io())
                    .map(j -> {
                        counter++;  // 线程不安全!
                        return j;
                    })
            )
            .subscribe();
            
        // 可能的结果:counter的值不确定,可能小于1000
        
        // 解决方案1:使用原子类
        AtomicInteger safeCounter = new AtomicInteger(0);
        
        Observable.range(1, 1000)
            .subscribeOn(Schedulers.computation())
            .flatMap(i -> 
                Observable.just(i)
                    .subscribeOn(Schedulers.io())
                    .map(j -> {
                        safeCounter.incrementAndGet();  // 线程安全
                        return j;
                    })
            )
            .subscribe();
            
        System.out.println("安全计数器: " + safeCounter.get());  // 保证是1000
        
        // 解决方案2:避免共享状态
        Observable.range(1, 1000)
            .subscribeOn(Schedulers.computation())
            .flatMap(i -> 
                Observable.just(i)
                    .subscribeOn(Schedulers.io())
                    .map(j -> {
                        // 不修改共享状态,只处理自己的数据
                        return processItem(j);
                    })
            )
            .subscribe();
            
        // 解决方案3:使用serialize()操作符
        PublishSubject<Integer> subject = PublishSubject.create();
        
        subject
            .serialize()  // 保证线程安全
            .subscribe(item -> {
                // 现在可以安全地修改状态
                counter++;
            });
            
        // 从多个线程发射数据
        Observable.range(1, 100)
            .subscribeOn(Schedulers.io())
            .subscribe(subject::onNext);
            
        Observable.range(101, 100)
            .subscribeOn(Schedulers.computation())
            .subscribe(subject::onNext);
    }
}

总结

RxJava 3作为响应式编程在Android开发中的强大工具,通过其丰富的操作符和优雅的线程管理机制,极大地简化了异步编程的复杂性。从简单的数据转换到复杂的异步操作组合,RxJava都提供了清晰的解决方案。

关键要点回顾:

  1. 选择合适的Observable类型:根据场景选择ObservableFlowableSingleMaybeCompletable
  2. 合理管理线程:使用subscribeOnobserveOn控制执行线程
  3. 处理背压问题:对于可能产生大量数据的场景,使用Flowable和适当的背压策略
  4. 防止内存泄漏:使用CompositeDisposable管理订阅生命周期
  5. 优雅的错误处理:利用onErrorReturnonErrorResumeNextretry等操作符

学习建议:

  1. 从简单开始:先掌握Observable和常用操作符
  2. 实践出真知:在实际项目中应用RxJava,从简单场景开始
  3. 理解原理:不仅仅是使用API,要理解响应式编程的思想
  4. 查阅官方文档:RxJava的操作符非常丰富,遇到问题时查阅官方文档

性能优化建议:

  1. 避免不必要的对象创建:重用对象,使用原始类型
  2. 合理使用缓存:对于重复订阅的数据,使用cache()replay()
  3. 优化线程使用:避免不必要的线程切换,在主线程做轻量操作
  4. 监控内存使用:注意cache()可能导致的内存泄漏

调试建议:

  1. 使用doOn操作符添加日志:监控数据流的每个阶段
  2. 使用TestObserver进行单元测试:确保逻辑正确
  3. 使用TestScheduler测试时间相关操作:避免等待真实时间
  4. 设置全局错误处理器:捕获未处理的错误

RxJava虽然学习曲线较陡,但一旦掌握,将极大提升代码的可读性和可维护性。随着实践的深入,你会越来越体会到响应式编程的魅力所在。


延伸学习资源

记住:响应式编程是一种思维方式,而RxJava是实现这种思维方式的工具。Happy Coding!