前提知識
RxJavaにはバックプレッシャと呼ばれる、流量制御の仕組みがある。
参考 詳解RxJava2:Backpressureで流速制御
onBackpressureBuffer
PublisherがSubscriberの消費スピードよりも早くデータを作った場合、バッファに生成されたデータを溜めておきたい。
このために、onBackpressureBufferメソッドが存在する。
このとき、引数でバッファの上限を設定できる。 onBackpressureBuffer(int capacity)
observeOn
Subscriberの実行スレッドを切り替えたい。
このためにobserveOnメソッドが存在する。
第3引数にbufferSizeが指定でき、request(bufferSize)が裏で実行される。
observeOn(Scheduler scheduler, boolean delayError, int bufferSize)
サンプル
Publisherが100ms毎にデータを生成し、Subscriberが1000msかけて消費する。
@Test public void バッファが溢れる() throws InterruptedException { Flowable.interval(100, TimeUnit.MILLISECONDS) //.doOnNext(x -> logger.info("1 doOnNext " + x)) //.doOnRequest(t -> logger.info("1 doOnRequest " + t)) //.doOnError(e -> logger.info("1 doOnError " + e)) .onBackpressureBuffer(5) //.doOnNext(x -> logger.info("2 doOnNext " + x)) //.doOnRequest(t -> logger.info("2 doOnRequest " + t)) //.doOnError(e -> logger.info("2 doOnError " + e)) .observeOn(Schedulers.computation(), false, 3) //.doOnNext(x -> logger.info("3 doOnNext " + x)) //.doOnRequest(t -> logger.info("3 doOnRequest " + t)) //.doOnError(e -> logger.info("3 doOnError " + e)) .subscribe(e -> { TimeUnit.SECONDS.sleep(1); }, Throwable::printStackTrace); TimeUnit.MINUTES.sleep(10); }
図にすると以下のようになる。
これを実行すると、以下のようなログが出力される。
22:11:12.850 [main] INFO TestRxJava - 3 doOnRequest 9223372036854775807 22:11:12.856 [main] INFO TestRxJava - 2 doOnRequest 3 22:11:12.856 [main] INFO TestRxJava - 1 doOnRequest 9223372036854775807 22:11:12.958 [RxComputationThreadPool-2] INFO TestRxJava - 1 doOnNext 0 22:11:12.959 [RxComputationThreadPool-2] INFO TestRxJava - 2 doOnNext 0 22:11:12.959 [RxComputationThreadPool-1] INFO TestRxJava - 3 doOnNext 0 22:11:13.058 [RxComputationThreadPool-2] INFO TestRxJava - 1 doOnNext 1 22:11:13.058 [RxComputationThreadPool-2] INFO TestRxJava - 2 doOnNext 1 22:11:13.158 [RxComputationThreadPool-2] INFO TestRxJava - 1 doOnNext 2 22:11:13.158 [RxComputationThreadPool-2] INFO TestRxJava - 2 doOnNext 2 22:11:13.258 [RxComputationThreadPool-2] INFO TestRxJava - 1 doOnNext 3 22:11:13.359 [RxComputationThreadPool-2] INFO TestRxJava - 1 doOnNext 4 22:11:13.460 [RxComputationThreadPool-2] INFO TestRxJava - 1 doOnNext 5 22:11:13.558 [RxComputationThreadPool-2] INFO TestRxJava - 1 doOnNext 6 22:11:13.659 [RxComputationThreadPool-2] INFO TestRxJava - 1 doOnNext 7 22:11:13.759 [RxComputationThreadPool-2] INFO TestRxJava - 1 doOnNext 8 22:11:13.859 [RxComputationThreadPool-2] INFO TestRxJava - 1 doOnNext 9 22:11:13.959 [RxComputationThreadPool-2] INFO TestRxJava - 1 doOnNext 10 22:11:13.960 [RxComputationThreadPool-1] INFO TestRxJava - 3 doOnNext 1 22:11:14.059 [RxComputationThreadPool-2] INFO TestRxJava - 1 doOnNext 11 22:11:45.094 [RxComputationThreadPool-1] INFO TestRxJava - 3 doOnNext 2 io.reactivex.exceptions.MissingBackpressureException: Buffer is full at io.reactivex.internal.operators.flowable.FlowableOnBackpressureBuffer$BackpressureBufferSubscriber.onNext(FlowableOnBackpressureBuffer.java:99) at io.reactivex.internal.operators.flowable.FlowableDoOnEach$DoOnEachSubscriber.onNext(FlowableDoOnEach.java:92) at io.reactivex.internal.operators.flowable.FlowableDoOnLifecycle$SubscriptionLambdaSubscriber.onNext(FlowableDoOnLifecycle.java:79) at io.reactivex.internal.operators.flowable.FlowableDoOnEach$DoOnEachSubscriber.onNext(FlowableDoOnEach.java:92) at io.reactivex.internal.operators.flowable.FlowableInterval$IntervalSubscriber.run(FlowableInterval.java:93) at io.reactivex.internal.schedulers.ScheduledDirectPeriodicTask.run(ScheduledDirectPeriodicTask.java:38) at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308) at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180) at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:745) 22:11:47.421 [RxComputationThreadPool-1] INFO TestRxJava - 2 doOnRequest 3 22:11:47.422 [RxComputationThreadPool-2] INFO TestRxJava - 2 doOnError io.reactivex.exceptions.MissingBackpressureException: Buffer is full 22:11:47.422 [RxComputationThreadPool-1] INFO TestRxJava - 3 doOnError io.reactivex.exceptions.MissingBackpressureException: Buffer is full
ログから、11のデータが生成されたときにBuffer is fullになっている。
しかし、Subscriberは2のデータまでしか消費していない。
22:11:13.158 [RxComputationThreadPool-2] INFO TestRxJava - 2 doOnNext 2
したがって、onBackpressureBufferに9のデータを溜めようとしたときにエラーになっている。
設定したcapacityは5なのに・・・。
onBackpressureBuffer(int capacity)
バッファが溢れるタイミング
onBackpressureBufferメソッドは裏でFlowableOnBackpressureBufferを作る。
このとき、内部でバッファ管理するために以下のようにQueueが作られていた。
FlowableOnBackpressureBuffer.javaのGithubソースコード
で、このQueue実装がこんなかんじになっていて、キューサイズの指定が2のx乗(1,2,4,8,16,32...)に切り上げた値になっていた。なので今回の場合は、4 < 5 < 8 で実際のキャパシティが8。
別パターンで試す。onBackpressureBuffer(10)
にすると、 8 < 10 < 16のため、16に切り上げられるはず。
@Test public void バッファが溢れる() throws InterruptedException { Flowable.interval(100, TimeUnit.MILLISECONDS) .doOnNext(x -> logger.info("1 doOnNext " + x)) .doOnRequest(t -> logger.info("1 doOnRequest " + t)) .doOnError(e -> logger.info("1 doOnError " + e)) .onBackpressureBuffer(10) .doOnNext(x -> logger.info("2 doOnNext " + x)) .doOnRequest(t -> logger.info("2 doOnRequest " + t)) .doOnError(e -> logger.info("2 doOnError " + e)) .observeOn(Schedulers.computation(), false, 3) .doOnNext(x -> logger.info("3 doOnNext " + x)) .doOnRequest(t -> logger.info("3 doOnRequest " + t)) .doOnError(e -> logger.info("3 doOnError " + e)) .subscribe(e -> { TimeUnit.SECONDS.sleep(1); }, Throwable::printStackTrace); TimeUnit.MINUTES.sleep(10); }
ログを見ると、17こめのデータをバッファに溜めるときに例外が出ているのがわかる。
07:55:03.742 [main] INFO TestRxJava - 3 doOnRequest 9223372036854775807 07:55:03.742 [main] INFO TestRxJava - 2 doOnRequest 3 07:55:03.742 [main] INFO TestRxJava - 1 doOnRequest 9223372036854775807 07:55:03.852 [RxComputationThreadPool-2] INFO TestRxJava - 1 doOnNext 0 07:55:03.852 [RxComputationThreadPool-2] INFO TestRxJava - 2 doOnNext 0 07:55:03.852 [RxComputationThreadPool-1] INFO TestRxJava - 3 doOnNext 0 07:55:03.977 [RxComputationThreadPool-2] INFO TestRxJava - 1 doOnNext 1 07:55:03.977 [RxComputationThreadPool-2] INFO TestRxJava - 2 doOnNext 1 07:55:04.071 [RxComputationThreadPool-2] INFO TestRxJava - 1 doOnNext 2 07:55:04.071 [RxComputationThreadPool-2] INFO TestRxJava - 2 doOnNext 2 07:55:04.180 [RxComputationThreadPool-2] INFO TestRxJava - 1 doOnNext 3 07:55:04.258 [RxComputationThreadPool-2] INFO TestRxJava - 1 doOnNext 4 07:55:04.352 [RxComputationThreadPool-2] INFO TestRxJava - 1 doOnNext 5 07:55:04.461 [RxComputationThreadPool-2] INFO TestRxJava - 1 doOnNext 6 07:55:04.555 [RxComputationThreadPool-2] INFO TestRxJava - 1 doOnNext 7 07:55:04.658 [RxComputationThreadPool-2] INFO TestRxJava - 1 doOnNext 8 07:55:04.752 [RxComputationThreadPool-2] INFO TestRxJava - 1 doOnNext 9 07:55:04.861 [RxComputationThreadPool-1] INFO TestRxJava - 3 doOnNext 1 07:55:04.861 [RxComputationThreadPool-2] INFO TestRxJava - 1 doOnNext 10 07:55:04.950 [RxComputationThreadPool-2] INFO TestRxJava - 1 doOnNext 11 07:55:05.054 [RxComputationThreadPool-2] INFO TestRxJava - 1 doOnNext 12 07:55:05.151 [RxComputationThreadPool-2] INFO TestRxJava - 1 doOnNext 13 07:55:05.264 [RxComputationThreadPool-2] INFO TestRxJava - 1 doOnNext 14 07:55:05.357 [RxComputationThreadPool-2] INFO TestRxJava - 1 doOnNext 15 07:55:05.451 [RxComputationThreadPool-2] INFO TestRxJava - 1 doOnNext 16 07:55:05.561 [RxComputationThreadPool-2] INFO TestRxJava - 1 doOnNext 17 07:55:05.657 [RxComputationThreadPool-2] INFO TestRxJava - 1 doOnNext 18 07:55:05.751 [RxComputationThreadPool-2] INFO TestRxJava - 1 doOnNext 19 07:55:05.751 [RxComputationThreadPool-2] INFO TestRxJava - 2 doOnError io.reactivex.exceptions.MissingBackpressureException: Buffer is full io.reactivex.exceptions.MissingBackpressureException: Buffer is full 07:55:05.876 [RxComputationThreadPool-1] INFO TestRxJava - 3 doOnError io.reactivex.exceptions.MissingBackpressureException: Buffer is full at io.reactivex.internal.operators.flowable.FlowableOnBackpressureBuffer$BackpressureBufferSubscriber.onNext(FlowableOnBackpressureBuffer.java:99) at io.reactivex.internal.operators.flowable.FlowableDoOnEach$DoOnEachSubscriber.onNext(FlowableDoOnEach.java:92) at io.reactivex.internal.operators.flowable.FlowableDoOnLifecycle$SubscriptionLambdaSubscriber.onNext(FlowableDoOnLifecycle.java:79) at io.reactivex.internal.operators.flowable.FlowableDoOnEach$DoOnEachSubscriber.onNext(FlowableDoOnEach.java:92) at io.reactivex.internal.operators.flowable.FlowableInterval$IntervalSubscriber.run(FlowableInterval.java:93) at io.reactivex.internal.schedulers.ScheduledDirectPeriodicTask.run(ScheduledDirectPeriodicTask.java:38) at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308) at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180) at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:745)
結論
onBackpressureBuffer の capacity は2のx乗(1,2,4,8,16,32...)に切り上げるため、厳密な値ではないらしい。