以下のバージョンで検証しました。
pom.xml
<dependency> <groupId>io.reactivex.rxjava2</groupId> <artifactId>rxjava</artifactId> <version>2.1.14</version> </dependency>
ConnectableFlowable
ConnectableFlowableはHotなストリームで、複数のSubscriberに同じストリームを購読させるときに利用するクラス。
参考 ConnectableFlowable Javadoc
publish(), connect()
publish()
を利用するとCold -> Hotなストリームに変換できる。
ConnectableFlowableはconnect()
が呼ばれた時点からデータを流す。
@Test public void publish() throws InterruptedException { ConnectableFlowable flowable = Flowable .interval(100, TimeUnit.MILLISECONDS) .take(2) .publish(); flowable.subscribe(new DebugSubscriber()); flowable.subscribe(new DebugSubscriber()); flowable.connect(); TimeUnit.MILLISECONDS.sleep(10000); }
09:11:31.433 [main] INFO DebugSubscriber - onSubscribe 09:11:31.438 [main] INFO DebugSubscriber - onSubscribe 09:11:31.550 [RxComputationThreadPool-1] INFO DebugSubscriber - onNext 0 09:11:31.550 [RxComputationThreadPool-1] INFO DebugSubscriber - onNext 0 09:11:31.649 [RxComputationThreadPool-1] INFO DebugSubscriber - onNext 1 09:11:31.649 [RxComputationThreadPool-1] INFO DebugSubscriber - onNext 1 09:11:31.651 [RxComputationThreadPool-1] INFO DebugSubscriber - onComplete 09:11:31.651 [RxComputationThreadPool-1] INFO DebugSubscriber - onComplete
connect()
しないと、データが流れない。
@Test public void publish() throws InterruptedException { ConnectableFlowable flowable = Flowable .interval(100, TimeUnit.MILLISECONDS) .take(2) .publish(); flowable.subscribe(new DebugSubscriber()); // flowable.connect(); TimeUnit.MILLISECONDS.sleep(10000); }
16:40:27.981 [main] INFO DebugSubscriber - onSubscribe
すでにconnect()
で処理が開始しているストリームをsubscribe()
すると、ストリームの途中からデータを受け取ることになる。
@Test public void publish() throws InterruptedException { ConnectableFlowable flowable = Flowable .interval(100, TimeUnit.MILLISECONDS) .take(3) .publish(); flowable.connect(); TimeUnit.MILLISECONDS.sleep(200); flowable.subscribe(new DebugSubscriber()); TimeUnit.MILLISECONDS.sleep(10000); }
16:42:15.367 [main] INFO DebugSubscriber - onSubscribe 16:42:15.466 [RxComputationThreadPool-1] INFO DebugSubscriber - onNext 2 16:42:15.467 [RxComputationThreadPool-1] INFO DebugSubscriber - onComplete
処理が終わったストリームをsubscribe()
した場合、何もデータが流れない。
@Test public void publish() throws InterruptedException { ConnectableFlowable flowable = Flowable .interval(100, TimeUnit.MILLISECONDS) .take(3) .publish(); flowable.connect(); flowable.subscribe(new DebugSubscriber()); TimeUnit.MILLISECONDS.sleep(400); flowable.subscribe(new DebugSubscriber()); TimeUnit.MILLISECONDS.sleep(10000); }
17:13:12.205 [main] INFO DebugSubscriber - onSubscribe 17:13:12.304 [RxComputationThreadPool-1] INFO DebugSubscriber - onNext 0 17:13:12.403 [RxComputationThreadPool-1] INFO DebugSubscriber - onNext 1 17:13:12.508 [RxComputationThreadPool-1] INFO DebugSubscriber - onNext 2 17:13:12.510 [RxComputationThreadPool-1] INFO DebugSubscriber - onComplete 17:13:12.609 [main] INFO DebugSubscriber - onSubscribe
再度connect()
すると、初めからデータが流れる。
@Test public void publish() throws InterruptedException { ConnectableFlowable flowable = Flowable .interval(100, TimeUnit.MILLISECONDS) .take(3) .publish(); flowable.connect(); flowable.subscribe(new DebugSubscriber()); TimeUnit.MILLISECONDS.sleep(400); flowable.connect(); flowable.subscribe(new DebugSubscriber()); TimeUnit.MILLISECONDS.sleep(10000); }
18:20:07.100 [main] INFO DebugSubscriber - onSubscribe 18:20:07.198 [RxComputationThreadPool-1] INFO DebugSubscriber - onNext 0 18:20:07.297 [RxComputationThreadPool-1] INFO DebugSubscriber - onNext 1 18:20:07.397 [RxComputationThreadPool-1] INFO DebugSubscriber - onNext 2 18:20:07.398 [RxComputationThreadPool-1] INFO DebugSubscriber - onComplete 18:20:07.503 [main] INFO DebugSubscriber - onSubscribe 18:20:07.604 [RxComputationThreadPool-2] INFO DebugSubscriber - onNext 0 18:20:07.704 [RxComputationThreadPool-2] INFO DebugSubscriber - onNext 1 18:20:07.805 [RxComputationThreadPool-2] INFO DebugSubscriber - onNext 2 18:20:07.805 [RxComputationThreadPool-2] INFO DebugSubscriber - onComplete
connect()
の戻り値であるDisposableを利用してdispose()
すると、ストリームを破棄できる。
@Test public void publish() throws InterruptedException { ConnectableFlowable flowable = Flowable .interval(100, TimeUnit.MILLISECONDS) .publish(); flowable.subscribe(new DebugSubscriber()); Disposable disposable = flowable.connect(); TimeUnit.MILLISECONDS.sleep(200); disposable.dispose(); flowable.connect(); flowable.subscribe(new DebugSubscriber()); TimeUnit.MILLISECONDS.sleep(10000); }
17:07:51.631 [main] INFO DebugSubscriber - onSubscribe 17:07:51.758 [RxComputationThreadPool-1] INFO DebugSubscriber - onNext 0 17:07:51.858 [RxComputationThreadPool-1] INFO DebugSubscriber - onNext 1 17:07:51.858 [main] INFO DebugSubscriber - onSubscribe 17:07:51.959 [RxComputationThreadPool-2] INFO DebugSubscriber - onNext 0 17:07:52.060 [RxComputationThreadPool-2] INFO DebugSubscriber - onNext 1 17:07:52.168 [RxComputationThreadPool-2] INFO DebugSubscriber - onNext 2
replay()
publish()
で生成したストリームは、subscribe()
した時点からのデータしかSubscriberに流せない。ストリームのデータをバッファしておき、subscribe()
にバッファしたデータも返したいときは、replay()
を利用する。
)
@Test public void publish() throws InterruptedException { ConnectableFlowable flowable = Flowable .interval(100, TimeUnit.MILLISECONDS) .take(3) .replay(); flowable.connect(); flowable.subscribe(new DebugSubscriber()); TimeUnit.MILLISECONDS.sleep(200); flowable.subscribe(new DebugSubscriber()); TimeUnit.MILLISECONDS.sleep(10000); }
09:51:37.879 [main] INFO DebugSubscriber - onSubscribe 09:51:37.977 [RxComputationThreadPool-1] INFO DebugSubscriber - onNext 0 09:51:38.077 [RxComputationThreadPool-1] INFO DebugSubscriber - onNext 1 09:51:38.083 [main] INFO DebugSubscriber - onSubscribe 09:51:38.083 [main] INFO DebugSubscriber - onNext 0 09:51:38.083 [main] INFO DebugSubscriber - onNext 1 09:51:38.176 [RxComputationThreadPool-1] INFO DebugSubscriber - onNext 2 09:51:38.176 [RxComputationThreadPool-1] INFO DebugSubscriber - onNext 2 09:51:38.177 [RxComputationThreadPool-1] INFO DebugSubscriber - onComplete 09:51:38.177 [RxComputationThreadPool-1] INFO DebugSubscriber - onComplete
バッファのポリシは、
バッファサイズや
時刻で指定できる。
デフォルトでは無制限にバッファがキャッシュされるみたいなので、ちょうどいいバッファポリシを選択する必要がある。
ConnectableFlowable -> Flowable
ConnectableFlowableはconnect()
を呼ばないとデータが流れない。connect()
の呼び出しを自動でやってくれるFlowableに変換するメソッドが複数用意されている。
refCount()
Subscriberがsubscribe()
したときにconnect()
を呼び出すFlowableを生成する。
@Test public void refCount() throws InterruptedException { Flowable flowable = Flowable .interval(100, TimeUnit.MILLISECONDS) .take(3) .publish().refCount(); flowable.subscribe(new DebugSubscriber()); TimeUnit.MILLISECONDS.sleep(200); flowable.subscribe(new DebugSubscriber()); TimeUnit.MILLISECONDS.sleep(10000); }
10:14:43.912 [main] INFO DebugSubscriber - onSubscribe 10:14:44.027 [RxComputationThreadPool-1] INFO DebugSubscriber - onNext 0 10:14:44.125 [main] INFO DebugSubscriber - onSubscribe 10:14:44.126 [RxComputationThreadPool-1] INFO DebugSubscriber - onNext 1 10:14:44.126 [RxComputationThreadPool-1] INFO DebugSubscriber - onNext 1 10:14:44.227 [RxComputationThreadPool-1] INFO DebugSubscriber - onNext 2 10:14:44.227 [RxComputationThreadPool-1] INFO DebugSubscriber - onNext 2 10:14:44.228 [RxComputationThreadPool-1] INFO DebugSubscriber - onComplete 10:14:44.229 [RxComputationThreadPool-1] INFO DebugSubscriber - onComplete
いくつのSubscriberがsubscribe()
したときにconnect()
するかを、調整できる。
@Test public void refCount() throws InterruptedException { Flowable flowable = Flowable .interval(100, TimeUnit.MILLISECONDS) .take(3) .publish().refCount(2); flowable.subscribe(new DebugSubscriber()); TimeUnit.MILLISECONDS.sleep(400); flowable.subscribe(new DebugSubscriber()); TimeUnit.MILLISECONDS.sleep(10000); }
10:26:58.425 [main] INFO DebugSubscriber - onSubscribe 10:26:58.829 [main] INFO DebugSubscriber - onSubscribe 10:26:58.945 [RxComputationThreadPool-1] INFO DebugSubscriber - onNext 0 10:26:58.945 [RxComputationThreadPool-1] INFO DebugSubscriber - onNext 0 10:26:59.044 [RxComputationThreadPool-1] INFO DebugSubscriber - onNext 1 10:26:59.044 [RxComputationThreadPool-1] INFO DebugSubscriber - onNext 1 10:26:59.144 [RxComputationThreadPool-1] INFO DebugSubscriber - onNext 2 10:26:59.144 [RxComputationThreadPool-1] INFO DebugSubscriber - onNext 2 10:26:59.146 [RxComputationThreadPool-1] INFO DebugSubscriber - onComplete 10:26:59.146 [RxComputationThreadPool-1] INFO DebugSubscriber - onComplete
処理が終了したストリームをsubscribe()
すると、初めからデータを流し直す。(Subscriberが0になると、disposeする。Subscriber数がしきい値を超えるとconnectする)
@Test public void refCount() throws InterruptedException { Flowable flowable = Flowable .interval(100, TimeUnit.MILLISECONDS) .take(3) .publish().refCount(); flowable.subscribe(new DebugSubscriber()); TimeUnit.MILLISECONDS.sleep(400); flowable.subscribe(new DebugSubscriber()); TimeUnit.MILLISECONDS.sleep(10000);
10:22:59.397 [main] INFO DebugSubscriber - onSubscribe 10:22:59.511 [RxComputationThreadPool-1] INFO DebugSubscriber - onNext 0 10:22:59.610 [RxComputationThreadPool-1] INFO DebugSubscriber - onNext 1 10:22:59.712 [RxComputationThreadPool-1] INFO DebugSubscriber - onNext 2 10:22:59.714 [RxComputationThreadPool-1] INFO DebugSubscriber - onComplete 10:22:59.812 [main] INFO DebugSubscriber - onSubscribe 10:22:59.913 [RxComputationThreadPool-2] INFO DebugSubscriber - onNext 0 10:23:00.014 [RxComputationThreadPool-2] INFO DebugSubscriber - onNext 1 10:23:00.113 [RxComputationThreadPool-2] INFO DebugSubscriber - onNext 2 10:23:00.113 [RxComputationThreadPool-2] INFO DebugSubscriber - onComplete
autoConnect()
ほぼrefCount()
と同じ。
ただし処理が完了したストリームにsubscribe()
してもデータを流し直さない。(再度connectしない)
@Test public void autoConnect() throws InterruptedException { Flowable flowable = Flowable .interval(100, TimeUnit.MILLISECONDS) .take(3) .publish().autoConnect(); flowable.subscribe(new DebugSubscriber()); TimeUnit.MILLISECONDS.sleep(400); flowable.subscribe(new DebugSubscriber()); TimeUnit.MILLISECONDS.sleep(10000); }
10:52:27.423 [main] INFO DebugSubscriber - onSubscribe 10:52:27.534 [RxComputationThreadPool-1] INFO DebugSubscriber - onNext 0 10:52:27.634 [RxComputationThreadPool-1] INFO DebugSubscriber - onNext 1 10:52:27.740 [RxComputationThreadPool-1] INFO DebugSubscriber - onNext 2 10:52:27.742 [RxComputationThreadPool-1] INFO DebugSubscriber - onComplete 10:52:27.835 [main] INFO DebugSubscriber - onSubscribe
share()
Flowable.publish().refCount()
のエイリアス。