RXJava 实例解析

要点

  • 响应式编程是一种处理异步数据流的规范
  • 响应式为数据流的转换和聚合以及数据流的控制管理提供了工具支持
  • 弹珠交互图(Marble Diagram)以可交互的方式可视化响应式的结构
  • 响应式编程风格看起来跟 Java Streams API 有点相似,不过本质上是不一样的
  • 如何连接到动态流处理异步数据源 在高并发编程范式的发展过程中,我们使用过很多工具,比如 java.util.concurrent 包、Akka Streams 框架、CompletableFuture 类以及 Netty 框架。响应式编程近来大受欢迎,这要得益于它强大的功能和健壮的工具包。

响应式编程是一种处理异步数据流的规范,它为数据流的转换和聚合以及数据流的控制管理提供了工具支持,它让考量程序整体设计的工作变得简单。

但它使用起来并不简单,它的学习曲线也并不平坦。对于我们当中的那些数学家来说,学习响应式就好比当初他们从学习标准代数的无向量过渡到学习线性代数的向量、矩阵和张量,它们实际上是被单元化的数据流。传统的编程模式以对象为基础,而响应式以事件流为基础。事件可能以多种形式出现,比如对象、数据源、鼠标移动信息或者异常。在传统的编程范式里,“异常”这个词描述的是对意外情况的处理,因为在这个背景下,没有按照预想发生的情况都算异常。而在响应式编程范式里,异常却是一等公民。因为数据流一般是异步的,所以抛出异常是没有意义的,任何一个异常都会被当成数据流里的一个事件。

在这篇文章里,我们会探讨响应式编程的基本原理,以一种教与学的方式来强化一些重要的概念。

首先要记住的是,响应式里所有的东西都是流。Observable 封装了流,是最基本的单元。流可以包含零个或多个事件,有未完成和已完成两种状态,可以正常结束也可以发生错误。如果一个流正常完成或者发生错误,说明处理结束了,虽然有些工具可以对错误进行重试或者使用不同的流替换发生错误的流。

在运行我们给出的例子之前,需要把 RxJava 的依赖加入到项目里。可以在 Maven 里加入这个依赖:

1
2
3
4
5
<dependency>
<groupId>io.reactivex.rxjava</groupId>
<artifactId>rxjava</artifactId>
<version>1.1.10</version>
</dependency>

Observable 类有几个静态工厂方法和实例方法,它们被用来生成各种新的 Observable 对象,或者把 Observable 对象添加到感兴趣的处理流程里。Observable 是可变的,所以针对它们的操作总是会生成新的 Observable 对象。为了更好地理解我们的例子,我们先来温习一下 Observable 的基本操作,因为在后面的例子里会用到它们。

Observable.just 方法生成一个简单对象,然后返回。例如:

1
Observable.just("Howdy!")

这行代码生成一个新的 Observable 对象,在结束之前触发一个单独的事件,生成字符串 Howdy!

可以把新生成的 Observable 对象赋给一个 Observable 变量:

1
Observable<String> hello = Observable.just("Howdy!");

不过知道这个还远远不够。就像那个著名的哲学问题一样,森林里的一颗树倒下来,如果周围没有人听见,那么就等于说树的倒下是无声无息的。一个 Observable 对象必须要有一个订阅者来处理它所生成的事件。所幸的是,现在 Java 支持 Lambda 表达式,我们就可以使用简洁的声明式风格来表示订阅操作:

1
2
Observable<String> howdy = Observable.just("Howdy!");
howdy.subscribe(System.out::println);

这段代码仍然会生成字符串Howdy!

跟 Observable 的其它方法一样,just 方法可以被重载:

1
Observable.just("Hello", "World").subscribe(System.out::println);

这行代码会输出

1
2
Hello
World

just 方法可以被重载,最多可以接收 10 个参数。这里要注意,输出的结果分成两行显示,说明它们是两个独立的事件。

让我们来看看如果使用列表会发生什么情况:

1
2
3
4
5
6
7
8
9
10
11
12
13
List<String> words = Arrays.asList(
"the",
"quick",
"brown",
"fox",
"jumped",
"over",
"the",
"lazy",
"dog"
);

Observable.just(words).subscribe(word->System.out.println(word));

这段代码输出一个很平常的结果:

1
[the, quick, brown, fox, jumped, over, the, lazy, dog]

我们本以为每个单词会是一个单独的事件,但实际上整个列表被当成了一个事件。为了达到我们想要的结果,我们引入 from 方法:

1
Observable.from(words).subscribe(System.out::println);

这行代码把数组或者列表转换成一系列事件,每个元素就是一个事件。

执行这行代码会得到我们想要的多行输出:

1
2
3
4
5
6
7
8
9
the
quick
brown
fox
jumped
over
the
lazy
dog

为了能从中获取编号,我们要在 Observable 上多做一些工作。

不过在写代码之前,我们先来看看另外两个操作,rangeziprange(i,n) 会创建一个包含 n 个数的流,它的第一个数是从 i 开始的。如果我们有办法把这种区间流跟上面的单词流组合在一起,就可以解决编号的问题。

RX Marbles 这个网站对我们学习响应式编程很有帮助。这个网站使用 JavaScript 渲染大部分响应式操作,而且是可交互的。每个响应式操作使用“弹珠”来描述一个或多个源流(source stream)以及由操作生成的结果流(result stream)。时间从左到右,事件用弹珠表示。单击或者拖动弹珠,可以看到它们是如何影响结果的。

执行一个 zip 操作就跟遵照医嘱一样简单。让我们用弹珠交互图来解释一下这个过程:

zip 操作通过成对的zip映射转换把源流的元素跟另一个给定流的元素组合起来,其中的映射可以使用 Lambda 表达式来表示。只要其中的一个流完成操作,整个 zip 操作也跟着停止,另一个未完成的流剩下的事件就会被忽略。zip 可以支持最多 9 个源流的 zip 操作。zipWith 操作可以把一个指定流合并到一个已存在的流里。

现在回到我们的例子上,我们可以使用 rangezipWith 操作加入编号,并用 String.format 做映射转换:

1
2
3
4
Observable.from(words)
.zipWith(Observable.range(1, Integer.MAX_VALUE),
(string, count)->String.format("%2d. %s", count, string))
.subscribe(System.out::println);

这段代码会输出:

1
2
3
4
5
6
7
8
9
1. the
2. quick
3. brown
4. fox
5. jumped
6. over
7. the
8. lazy
9. dog

看起来很不错!现在假设我们要列出单词里的字母而不是单词本身,这个时候要用到 flatMap,flatMap 会从 Observable 里获取事件源(对象、集合或数组),并把这些元素分别映射成 Observable,然后把这些 Observable 扁平化成一个单独的 Observable。

对于我们的例子来说,我们会先用 split 方法把每个单词拆分成一个字母数组,然后用 flatMap 创建一个新的 Observable 对象,这个 Observable 对象包含了组成这些单词的所有字母:

1
2
3
4
5
Observable.from(words)
.flatMap(word -> Observable.from(word.split("")))
.zipWith(Observable.range(1, Integer.MAX_VALUE),
(string, count) -> String.format("%2d. %s", count, string))
.subscribe(System.out::println);

这段代码会输出:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
 1. t
2. h
3. e
4. q
5. u
6. i
7. c
8. k
...
30. l
31. a
32. z
33. y
34. d
35. o
36. g

所有单词的字母都出现在这里。不过这样太繁琐了,我们希望相同的字母只出现一次:

1
2
3
4
5
6
Observable.from(words)
.flatMap(word -> Observable.from(word.split("")))
.distinct()
.zipWith(Observable.range(1, Integer.MAX_VALUE),
(string, count) -> String.format("%2d. %s", count, string))
.subscribe(System.out::println);

这段代码输出:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
 1. t
2. h
3. e
4. q
5. u
6. i
7. c
8. k
9. b
10. r
11. o
12. w
13. n
14. f
15. x
16. j
17. m
18. p
19. d
20. v
21. l
22. a
23. z
24. y
25. g

我们从小被告知quick brown fox这个全字母短句包含了英语里所有的字母,不过在这里我们只看到 25 个,而不是 26 个。现在让我们对这些字母进行排序,找出丢失的那个字母:

1
2
3
4
5
6
7
Observable.from(words)
.flatMap(word -> Observable.from(word.split("")))
.distinct()
.sorted()
.zipWith(Observable.range(1, Integer.MAX_VALUE),
(string, count) -> String.format("%2d. %s", count, string))
.subscribe(System.out::println);

这段代码输出:

1
2
3
4
5
6
7
8
9
10
11
12
13
 1. a
2. b
3. c
...
17. q
18. r
19. t
20. u
21. v
22. w
23. x
24. y
25. z

看样子是字母s丢掉了。为了得到我们期望的结果,需要对数组做一点修改:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
List<String> words = Arrays.asList(
"the",
"quick",
"brown",
"fox",
"jumped",
"over",
"the",
"lazy",
"dogs"
);

Observable.from(words)
.flatMap(word -> Observable.from(word.split("")))
.distinct()
.sorted()
.zipWith(Observable.range(1, Integer.MAX_VALUE),
(string, count) -> String.format("%2d. %s", count, string))
.subscribe(System.out::println);

输出:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
 1. a
2. b
3. c
4. d
5. e
6. f
7. g
8. h
9. i
10. j
11. k
12. l
13. m
14. n
15. o
16. p
17. q
18. r
19. s
20. t
21. u
22. v
23. w
24. x
25. y
26. z

现在好了!

到目前为止,所有的代码都跟 Java 8 里引入的 Streams API 很相似,不过这种相似只是一种巧合,因为响应式包含的内容远不止这些。

Java StreamsLambda 表达式为编程语言带来很大的价值,不过归根结底,它们只是提供了一种方式来遍历集合和生成集合。它们的作用很有限,而且缺乏可扩展性和可重用性。尽管 Streamparallel 操作可以并行执行任务,但在返回结果前程序无法对整个过程进行干预。相反,响应式引入了执行时间、节流、流量控制等概念,而且它们可以被连接到永不停止的处理流程里。响应式产生的结果虽然不是集合,但你可以用任何期望的方式来处理这些结果。

让我们通过弹珠交互图更好地理解这些概念。

merge 操作可以把最多 9 个源流合并到一个结果里,而且可以保留它们的顺序。无需担心这里会出现竞赛条件,因为所有的事件都被扁平化到一个单独的线程里,包括异常事件和结束事件。

debounce 操作会把在一个时间段内紧挨在一起的几个事件看成一个单独事件,这几个事件里只有最后一个会被触发:

可以看到,上下两个图中的1之间有一个指定的时间间隔,而 2、3、4、5 之间的时间间隔都小于这个间隔,所以它们被看成单个事件。如果把“5”往右挪一点,结果就不一样了:

另一个有趣的操作是 amb,它是一种不确定性的操作。

amb 操作会从所有的输入流中选择第一个出现的流,然后忽略其它剩下的流。如下图,第二个流是最先出现的,所以 amb 操作选择了这个流。

如果把第一个流里的20往左移动,超过第二个流的第一个元素,那么生成的结果又会不一样:

如果你有一个需要接入到某个数据源的处理流程,比如从消息主题上获取数据,可能是 Bloomberg 或者 Reuters,你并不关心接入的到底是哪一个,只要从中选择一个就可以了。在这种情况下,amb 操作就会很有用。

Tick Tock

现在,我们可以使用这些工具基于流生成各种有意义的结果。在接下来的这个例子里,我们有一个数据源,它会每秒钟生成一个事件。不过为了节省 CPU,我们让它在周末时每三秒生成一次。我们使用混合型的节奏器按照一定的节奏生成数据。

首先,我们要创建一个返回 boolean 的方法,它会检查当前时间是否是周末,如果是就返回 true,否则就返回 false:

1
2
3
4
private static boolean isSlowTickTime() {
return LocalDate.now().getDayOfWeek() == DayOfWeek.SATURDAY ||
LocalDate.now().getDayOfWeek() == DayOfWeek.SUNDAY;
}

对于边读这篇文章边在 IDE 里执行这段代码的读者来说,他们可能不想等到下个周末才来验证这个方法是否可行,所以可以使用下面的替代实现,这个实现会在一个 15 秒钟内返回 true,在另一个 15 秒钟内返回 false:

1
2
3
4
private static long start = System.currentTimeMillis();
public static Boolean isSlowTime() {
return (System.currentTimeMillis() - start) % 30_000 >= 15_000;
}

接下来我们创建两个 Observable 对象,fast 和 slow,然后使用过滤器对它们进行调度,并把它们合并起来。

我们使用 Observable.interval 操作来安排调度,它会在每个指定的时间间隔内产生一次数据(从 0 开始计算)。

1
2
Observable<Long> fast = Observable.interval(1, TimeUnit.SECONDS);
Observable<Long> slow = Observable.interval(3, TimeUnit.SECONDS);

fast 每秒生成一个事件,slow 每三秒生成一个事件(我们会忽略事件的值,因为我们只对执行时间感兴趣)。

现在我们把这两个 Observable 合并到一起,通过使用过滤器让 fast 流在工作日生成数据(或者在 15 秒内),slow 流在周末生成数据(或者在另一个 15 秒内)。

1
2
3
4
Observable<Long> clock = Observable.merge(
slow.filter(tick-> isSlowTickTime()),
fast.filter(tick-> !isSlowTickTime())
);

最后,我们要添加一个打印时间的订阅动作。在执行这些代码时,它会根据我们的调度安排打印出系统时间。

1
clock.subscribe(tick-> System.out.println(new Date()));

为了防止程序中途退出,需要在方法的末尾添加一行代码(注意要处理 InterruptedException 异常)。

1
Thread.sleep(60_000);

运行代码的结果:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
Fri Sep 16 03:08:18 BST 2016
Fri Sep 16 03:08:19 BST 2016
Fri Sep 16 03:08:20 BST 2016
Fri Sep 16 03:08:21 BST 2016
Fri Sep 16 03:08:22 BST 2016
Fri Sep 16 03:08:23 BST 2016
Fri Sep 16 03:08:24 BST 2016
Fri Sep 16 03:08:25 BST 2016
Fri Sep 16 03:08:26 BST 2016
Fri Sep 16 03:08:27 BST 2016
Fri Sep 16 03:08:28 BST 2016
Fri Sep 16 03:08:29 BST 2016
Fri Sep 16 03:08:30 BST 2016
Fri Sep 16 03:08:31 BST 2016
Fri Sep 16 03:08:32 BST 2016
Fri Sep 16 03:08:35 BST 2016
Fri Sep 16 03:08:38 BST 2016
Fri Sep 16 03:08:41 BST 2016
Fri Sep 16 03:08:44 BST 2016
. . .

可以看到,前面 15 个事件之间的时间间隔都是 1 秒,后面 15 秒内的事件之间的时间间隔是 3 秒,就像我们所期望的那样。

连接到已存在的数据源

以上方法用于创建能够生成静态数据的 Observable 是没有问题的。但如何把 Observable 连接到已有的数据源上,并享受响应式的流量控制和流操作策略为我们带来的好处呢?

静态 Observable 和动态 Observable

首先我们先岔开话题,来介绍一下静态 Observable 和动态 Observable 之间的区别。

到目前为止我们讨论的都是静态 Observable,它们提供静态的数据,尽管我们可以在执行时间上做一些调节,不过这远远 不够。静态 Observable 只在有订阅者的情况下才会生成事件,而且订阅者收到的是历史数据,不管它们是从何时开始订阅的。相反,动态 Observable 不管有多少个订阅者都会生成数据,而且只生成最新的数据(除非使用了缓存)。可以通过两个步骤把静态 Observable 转化成动态 Observable:

  1. 调用 Observable 的 publish 方法,生成一个新的 ConnectableObservable
  2. 调用 ConnectableObservable 的 connect 方法,开始生成数据

要连接到一个已有的数据源上,可以在这个数据源上添加监听器(如果你喜欢这么做),监听器会把事件传播给订阅者,然后在每个事件发生时调用订阅者的 onNext 方法。在实现监听器的时候要确保每个订阅者仍然处于订阅状态,否则就要停止把事件传播给它,同时要注意回压信号。所幸的是,这些工作可以由 RxJavaAsyncEmitter 来处理。假设我们有一个叫做 SomeFeed 的数据服务,它会生成报价事件,同时有一个 SomeListener 监听这些报价事件以及其它生命周期事件。

我们的数据源监听器有两个方法:

1
2
public void priceTick(PriceTick event);
public void error(Throwable throwable);

PriceTick 类包含了 date、instrument 和 price 字段,还有一个 isLast 方法用来判断它是否是最后一个事件:

让我们来看看如何使用 AsyncEmitter 把 Observable 连接到一个实时的数据源上:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
SomeFeed<PriceTick> feed = new SomeFeed<>(); 
Observable<PriceTick> obs =
Observable.fromEmitter((AsyncEmitter<PriceTick> emitter) ->
{
SomeListener listener = new SomeListener() {
@Override
public void priceTick(PriceTick event) {
emitter.onNext(event);
if (event.isLast()) {
emitter.onCompleted();
}
}

@Override
public void error(Throwable e) {
emitter.onError(e);
}
};
feed.register(listener);
}, AsyncEmitter.BackpressureMode.BUFFER);

这段代码几乎是逐字逐句地从 Observable 类的 Javadoc 里摘抄出来的。AsyncEmitter 封装了监听器(第 5 行)的创建过程,并把它注册到数据源上(第 19 行)。Observable 直接让订阅者对自己进行了订阅。数据源生成的事件被委托给了 AsyncEmitter(第 8 行)。第 20 行告诉观察者要缓冲所有的事件通知,直到它们被订阅者消费。除了缓冲,还有其它几种回压策略:

  • BackpressureMode.NONE 不使用回压。如果流的速度无法保持同步,可能会抛出 MissingBackpressureException 或 IllegalStateException。

  • BackpressureMode.ERROR 会在下游跟不上速度时抛出 MissingBackpressureException。

  • BackpressureMode.DROP 会在下游跟不上速度时把 onNext 的值丢弃。

  • BackpressureMode.LATEST 会一直保留最新的 onNext 的值,直到被下游消费掉。

这样生成的是静态 Observable。静态 Observable 在没有订阅者的时候不会生成数据,而且所有订阅者收到的是同样的历史数据,而这不是我们想要的。

为了把它转化成动态 Observable,让所有订阅者可以实时地接收事件通知,我们必须调用 publishconnect 方法,就像之前提到的那样:

1
2
ConnectableObservable<PriceTick> hotObservable = obs.publish(); 
hotObservable.connect();

最后,我们可以对它进行订阅并显示报价:

1
hotObservable.subscribe((priceTick) -> System.out.printf("%s %4s %6.2f%n", priceTick.getDate(), priceTick.getInstrument(), priceTick.getPrice()));