SIer だけど技術やりたいブログ

Rxjava ConnectableFlowable(Hotストリーム)のオペレータ

Java reactive

以下のバージョンで検証しました。

pom.xml

        <dependency>
            <groupId>io.reactivex.rxjava2</groupId>
            <artifactId>rxjava</artifactId>
            <version>2.1.14</version>
        </dependency>

ConnectableFlowable

ConnectableFlowableはHotなストリームで、複数のSubscriberに同じストリームを購読させるときに利用するクラス。
参考 ConnectableFlowable Javadoc

publishconnect

publish(), connect()

publish()を利用するとCold -> Hotなストリームに変換できる。 ConnectableFlowableはconnect()が呼ばれた時点からデータを流す。

    @Test
    public void publish() throws InterruptedException {
        ConnectableFlowable flowable = Flowable
                .interval(100, TimeUnit.MILLISECONDS)
                .take(2)
                .publish();

        flowable.subscribe(new DebugSubscriber());
        flowable.subscribe(new DebugSubscriber());

        flowable.connect();
        TimeUnit.MILLISECONDS.sleep(10000);
    }
09:11:31.433 [main] INFO DebugSubscriber - onSubscribe
09:11:31.438 [main] INFO DebugSubscriber - onSubscribe
09:11:31.550 [RxComputationThreadPool-1] INFO DebugSubscriber - onNext 0
09:11:31.550 [RxComputationThreadPool-1] INFO DebugSubscriber - onNext 0
09:11:31.649 [RxComputationThreadPool-1] INFO DebugSubscriber - onNext 1
09:11:31.649 [RxComputationThreadPool-1] INFO DebugSubscriber - onNext 1
09:11:31.651 [RxComputationThreadPool-1] INFO DebugSubscriber - onComplete 
09:11:31.651 [RxComputationThreadPool-1] INFO DebugSubscriber - onComplete 

connect()しないと、データが流れない。

    @Test
    public void publish() throws InterruptedException {
        ConnectableFlowable flowable = Flowable
                .interval(100, TimeUnit.MILLISECONDS)
                .take(2)
                .publish();

        flowable.subscribe(new DebugSubscriber());

        // flowable.connect();
        TimeUnit.MILLISECONDS.sleep(10000);
    }
16:40:27.981 [main] INFO DebugSubscriber - onSubscribe

すでにconnect()で処理が開始しているストリームをsubscribe()すると、ストリームの途中からデータを受け取ることになる。

    @Test
    public void publish() throws InterruptedException {
        ConnectableFlowable flowable = Flowable
                .interval(100, TimeUnit.MILLISECONDS)
                .take(3)
                .publish();

        flowable.connect();

        TimeUnit.MILLISECONDS.sleep(200);
        flowable.subscribe(new DebugSubscriber());

        TimeUnit.MILLISECONDS.sleep(10000);
    }
16:42:15.367 [main] INFO DebugSubscriber - onSubscribe
16:42:15.466 [RxComputationThreadPool-1] INFO DebugSubscriber - onNext 2
16:42:15.467 [RxComputationThreadPool-1] INFO DebugSubscriber - onComplete 

処理が終わったストリームをsubscribe()した場合、何もデータが流れない。

    @Test
    public void publish() throws InterruptedException {
        ConnectableFlowable flowable = Flowable
                .interval(100, TimeUnit.MILLISECONDS)
                .take(3)
                .publish();

        flowable.connect();

        flowable.subscribe(new DebugSubscriber());
        TimeUnit.MILLISECONDS.sleep(400);
        flowable.subscribe(new DebugSubscriber());

        TimeUnit.MILLISECONDS.sleep(10000);
    }
17:13:12.205 [main] INFO DebugSubscriber - onSubscribe
17:13:12.304 [RxComputationThreadPool-1] INFO DebugSubscriber - onNext 0
17:13:12.403 [RxComputationThreadPool-1] INFO DebugSubscriber - onNext 1
17:13:12.508 [RxComputationThreadPool-1] INFO DebugSubscriber - onNext 2
17:13:12.510 [RxComputationThreadPool-1] INFO DebugSubscriber - onComplete 
17:13:12.609 [main] INFO DebugSubscriber - onSubscribe

再度connect()すると、初めからデータが流れる。

    @Test
    public void publish() throws InterruptedException {
        ConnectableFlowable flowable = Flowable
                .interval(100, TimeUnit.MILLISECONDS)
                .take(3)
                .publish();

        flowable.connect();

        flowable.subscribe(new DebugSubscriber());
        TimeUnit.MILLISECONDS.sleep(400);

        flowable.connect();
        flowable.subscribe(new DebugSubscriber());

        TimeUnit.MILLISECONDS.sleep(10000);
    }
18:20:07.100 [main] INFO DebugSubscriber - onSubscribe
18:20:07.198 [RxComputationThreadPool-1] INFO DebugSubscriber - onNext 0
18:20:07.297 [RxComputationThreadPool-1] INFO DebugSubscriber - onNext 1
18:20:07.397 [RxComputationThreadPool-1] INFO DebugSubscriber - onNext 2
18:20:07.398 [RxComputationThreadPool-1] INFO DebugSubscriber - onComplete 
18:20:07.503 [main] INFO DebugSubscriber - onSubscribe
18:20:07.604 [RxComputationThreadPool-2] INFO DebugSubscriber - onNext 0
18:20:07.704 [RxComputationThreadPool-2] INFO DebugSubscriber - onNext 1
18:20:07.805 [RxComputationThreadPool-2] INFO DebugSubscriber - onNext 2
18:20:07.805 [RxComputationThreadPool-2] INFO DebugSubscriber - onComplete 

connect()の戻り値であるDisposableを利用してdispose()すると、ストリームを破棄できる。

    @Test
    public void publish() throws InterruptedException {
        ConnectableFlowable flowable = Flowable
                .interval(100, TimeUnit.MILLISECONDS)
                .publish();

        flowable.subscribe(new DebugSubscriber());
        Disposable disposable = flowable.connect();

        TimeUnit.MILLISECONDS.sleep(200);
        disposable.dispose();
        flowable.connect();

        flowable.subscribe(new DebugSubscriber());

        TimeUnit.MILLISECONDS.sleep(10000);
    }
17:07:51.631 [main] INFO DebugSubscriber - onSubscribe
17:07:51.758 [RxComputationThreadPool-1] INFO DebugSubscriber - onNext 0
17:07:51.858 [RxComputationThreadPool-1] INFO DebugSubscriber - onNext 1
17:07:51.858 [main] INFO DebugSubscriber - onSubscribe
17:07:51.959 [RxComputationThreadPool-2] INFO DebugSubscriber - onNext 0
17:07:52.060 [RxComputationThreadPool-2] INFO DebugSubscriber - onNext 1
17:07:52.168 [RxComputationThreadPool-2] INFO DebugSubscriber - onNext 2

replay()

publish()で生成したストリームは、subscribe()した時点からのデータしかSubscriberに流せない。ストリームのデータをバッファしておき、subscribe()にバッファしたデータも返したいときは、replay()を利用する。

replay )

    @Test
    public void publish() throws InterruptedException {
        ConnectableFlowable flowable = Flowable
                .interval(100, TimeUnit.MILLISECONDS)
                .take(3)
                .replay();

        flowable.connect();

        flowable.subscribe(new DebugSubscriber());
        TimeUnit.MILLISECONDS.sleep(200);
        flowable.subscribe(new DebugSubscriber());

        TimeUnit.MILLISECONDS.sleep(10000);
    }
09:51:37.879 [main] INFO DebugSubscriber - onSubscribe
09:51:37.977 [RxComputationThreadPool-1] INFO DebugSubscriber - onNext 0
09:51:38.077 [RxComputationThreadPool-1] INFO DebugSubscriber - onNext 1
09:51:38.083 [main] INFO DebugSubscriber - onSubscribe
09:51:38.083 [main] INFO DebugSubscriber - onNext 0
09:51:38.083 [main] INFO DebugSubscriber - onNext 1
09:51:38.176 [RxComputationThreadPool-1] INFO DebugSubscriber - onNext 2
09:51:38.176 [RxComputationThreadPool-1] INFO DebugSubscriber - onNext 2
09:51:38.177 [RxComputationThreadPool-1] INFO DebugSubscriber - onComplete 
09:51:38.177 [RxComputationThreadPool-1] INFO DebugSubscriber - onComplete 

バッファのポリシは、

バッファサイズや

時刻で指定できる。

デフォルトでは無制限にバッファがキャッシュされるみたいなので、ちょうどいいバッファポリシを選択する必要がある。

ConnectableFlowable -> Flowable

ConnectableFlowableはconnect()を呼ばないとデータが流れない。connect()の呼び出しを自動でやってくれるFlowableに変換するメソッドが複数用意されている。

refCount()

Subscriberがsubscribe()したときにconnect()を呼び出すFlowableを生成する。

    @Test
    public void refCount() throws InterruptedException {
        Flowable flowable = Flowable
                .interval(100, TimeUnit.MILLISECONDS)
                .take(3)
                .publish().refCount();

        flowable.subscribe(new DebugSubscriber());
        TimeUnit.MILLISECONDS.sleep(200);
        flowable.subscribe(new DebugSubscriber());

        TimeUnit.MILLISECONDS.sleep(10000);
    }
10:14:43.912 [main] INFO DebugSubscriber - onSubscribe
10:14:44.027 [RxComputationThreadPool-1] INFO DebugSubscriber - onNext 0
10:14:44.125 [main] INFO DebugSubscriber - onSubscribe
10:14:44.126 [RxComputationThreadPool-1] INFO DebugSubscriber - onNext 1
10:14:44.126 [RxComputationThreadPool-1] INFO DebugSubscriber - onNext 1
10:14:44.227 [RxComputationThreadPool-1] INFO DebugSubscriber - onNext 2
10:14:44.227 [RxComputationThreadPool-1] INFO DebugSubscriber - onNext 2
10:14:44.228 [RxComputationThreadPool-1] INFO DebugSubscriber - onComplete 
10:14:44.229 [RxComputationThreadPool-1] INFO DebugSubscriber - onComplete 

いくつのSubscriberがsubscribe()したときにconnect()するかを、調整できる。

    @Test
    public void refCount() throws InterruptedException {
        Flowable flowable = Flowable
                .interval(100, TimeUnit.MILLISECONDS)
                .take(3)
                .publish().refCount(2);

        flowable.subscribe(new DebugSubscriber());
        TimeUnit.MILLISECONDS.sleep(400);
        flowable.subscribe(new DebugSubscriber());

        TimeUnit.MILLISECONDS.sleep(10000);
    }
10:26:58.425 [main] INFO DebugSubscriber - onSubscribe
10:26:58.829 [main] INFO DebugSubscriber - onSubscribe
10:26:58.945 [RxComputationThreadPool-1] INFO DebugSubscriber - onNext 0
10:26:58.945 [RxComputationThreadPool-1] INFO DebugSubscriber - onNext 0
10:26:59.044 [RxComputationThreadPool-1] INFO DebugSubscriber - onNext 1
10:26:59.044 [RxComputationThreadPool-1] INFO DebugSubscriber - onNext 1
10:26:59.144 [RxComputationThreadPool-1] INFO DebugSubscriber - onNext 2
10:26:59.144 [RxComputationThreadPool-1] INFO DebugSubscriber - onNext 2
10:26:59.146 [RxComputationThreadPool-1] INFO DebugSubscriber - onComplete 
10:26:59.146 [RxComputationThreadPool-1] INFO DebugSubscriber - onComplete 

処理が終了したストリームをsubscribe()すると、初めからデータを流し直す。(Subscriberが0になると、disposeする。Subscriber数がしきい値を超えるとconnectする)

    @Test
    public void refCount() throws InterruptedException {
        Flowable flowable = Flowable
                .interval(100, TimeUnit.MILLISECONDS)
                .take(3)
                .publish().refCount();

        flowable.subscribe(new DebugSubscriber());
        TimeUnit.MILLISECONDS.sleep(400);
        flowable.subscribe(new DebugSubscriber());

        TimeUnit.MILLISECONDS.sleep(10000);
    
10:22:59.397 [main] INFO DebugSubscriber - onSubscribe
10:22:59.511 [RxComputationThreadPool-1] INFO DebugSubscriber - onNext 0
10:22:59.610 [RxComputationThreadPool-1] INFO DebugSubscriber - onNext 1
10:22:59.712 [RxComputationThreadPool-1] INFO DebugSubscriber - onNext 2
10:22:59.714 [RxComputationThreadPool-1] INFO DebugSubscriber - onComplete 
10:22:59.812 [main] INFO DebugSubscriber - onSubscribe
10:22:59.913 [RxComputationThreadPool-2] INFO DebugSubscriber - onNext 0
10:23:00.014 [RxComputationThreadPool-2] INFO DebugSubscriber - onNext 1
10:23:00.113 [RxComputationThreadPool-2] INFO DebugSubscriber - onNext 2
10:23:00.113 [RxComputationThreadPool-2] INFO DebugSubscriber - onComplete 

autoConnect()

ほぼrefCount()と同じ。

ただし処理が完了したストリームにsubscribe()してもデータを流し直さない。(再度connectしない)

    @Test
    public void autoConnect() throws InterruptedException {
        Flowable flowable = Flowable
                .interval(100, TimeUnit.MILLISECONDS)
                .take(3)
                .publish().autoConnect();

        flowable.subscribe(new DebugSubscriber());
        TimeUnit.MILLISECONDS.sleep(400);
        flowable.subscribe(new DebugSubscriber());

        TimeUnit.MILLISECONDS.sleep(10000);
    }
10:52:27.423 [main] INFO DebugSubscriber - onSubscribe
10:52:27.534 [RxComputationThreadPool-1] INFO DebugSubscriber - onNext 0
10:52:27.634 [RxComputationThreadPool-1] INFO DebugSubscriber - onNext 1
10:52:27.740 [RxComputationThreadPool-1] INFO DebugSubscriber - onNext 2
10:52:27.742 [RxComputationThreadPool-1] INFO DebugSubscriber - onComplete 
10:52:27.835 [main] INFO DebugSubscriber - onSubscribe

share()

Flowable.publish().refCount()のエイリアス。

参考