HTML5テクニカルノート
RxJS 6入門 07: Scheduler
- ID: FN1806005
- Technique: HTML5 / ECMAScript 2015
- Library: RxJS 6.2.1
Scheduler
は、オペーレータがサブスクリプションをいつ開始し、通知はいつ送るのかコントロールします。サブスクリプションや通知が、どのコンテキストで行われるのかをスケジュールできるのです。GitHubのReactiveX/rxjs「Scheduler」を下じきに、サンプルコードや解説は改めました。
01 Schedulerの機能
Scheduler
は、サブスクリプションをいつ開始し、通知はいつ送るのかコントロールします。構成要素となるのはつぎの3つです。
- データ構造: タスクをどのように保持し、キューに入れるのか、優先度やその他の基準にもとづいて定める。
- 実行コンテキスト: タスクが実行される場所と時期を決める。
- たとえば、直ちに実行するか、
setTimeout()
やprocess.nextTick()
などのコールバックの仕組みを使うか、あるいはアニメーションフレームにもとづくか。
- たとえば、直ちに実行するか、
- (仮想)クロック:
Scheduler
のgetterメソッドnow()
により、時間の概念が与えられる。Scheduler
にスケジューリングされたタスクは、そのクロックの示す時間にのみしたがう。
Scheduler
を用いることによって、どの実行コンテキストでObservable
がObserver
に通知を送るが決まるのです。まず、Scheduler
は使わないコードの実行結果をみておきましょう。Observable
は値を同期的に送ります。
const {Observable} = rxjs; const observable = Observable.create((observer) => { observer.next(1); observer.next(2); observer.next(3); observer.complete(); }); console.log('just before subscribe'); observable.subscribe({ next(x) {console.log('got value ' + x);}, error(err) {console.error('something wrong occurred: ' + err);}, complete() {console.log('done');} }); console.log('just after subscribe');
// コンソール出力 just before subscribe got value 1 got value 2 got value 3 done just after subscribe
Observable
にobserveOn()
オペレータでasyncScheduler
を定めると、値が非同期で送られるようになります。具体的には、observeOn()
に渡したasyncScheduler
は、内部的にschedule()
メソッドが呼び出され、Observer
に実行の遅れ(delay)を与えます。デフォルト値は0です。けれど、待ち時間0はsetTimeout()
やsetInterval()
の仕組みで扱われ、イペントループのつぎの繰り返しで実行されるため、同期処理には遅れる結果となります。
const {Observable, asyncScheduler} = rxjs; const {observeOn} = rxjs.operators; const observable = Observable.create((observer) => { observer.next(1); observer.next(2); observer.next(3); observer.complete(); }).pipe( observeOn(asyncScheduler) ); console.log('just before subscribe'); observable.subscribe({ next(x) {console.log('got value ' + x);}, error(err) {console.error('something wrong occurred: ' + err);}, complete() {console.log('done');} }); console.log('just after subscribe');
// コンソール出力 just before subscribe just after subscribe got value 1 got value 2 got value 3 done
observeOn(asyncScheduler)
は、Observable
と最終Observer
との間に、つぎのような構造のプロキシObserver
をつくるのです。
const proxyObserver = { next(val) { asyncScheduler.schedule( (x) => finalObserver.next(x), 0, // 遅れ val // 送られる値 ); }, }
Scheduler
のschedule()
メソッドの引数となる遅れ(delay)は、自身の内部クロックが測る時間にもとづきます。現実の時計の時間が過ぎるのとは別です。時間に関わるオペレータは、遅れなど時間の経過をScheduler
のクロックにもとづいて扱います。同期的なタスクを実行する場合のテストでも、内部クロックにより時間が組み替えられるので便利です。
02 Schedulerの種類と使い方
02-01 Schedulerの種類
asyncScheduler
は、RxJSが提供する組み込みスケジューラのひとつです。こうしたスケジューラは、Scheduler
オブジェクトの静的プロパティがつくって返します。
The async Scheduler is one of the built-in schedulers provided by RxJS. Each of these can be created and returned by using static properties of the Scheduler object.
表001■Schedulerオブジェクトの静的プロパティとその用途
静的プロパティ | 用途 |
---|---|
null
|
Scheduler を渡さないと、通知が同期的かつ再帰的に送られる。 決まった時間間隔あるいは末尾再帰の操作に用いられる。 |
queueScheduler
|
現在のイベントフレーム(trampoline scheduler)のキューにスケジュールを加える(「RxJava の ImmediateScheduler と TrampolineScheduler の違い」参照)。繰り返し操作に用いられる。 |
asapScheduler
|
マイクロタスクキューにスケジュールを加える。Node.jsのprocess.nextTick() またはWeb WorkerのMessageChannelあるいはsetTimeout() など利用できる中からもっとも速い配信の仕組みが用いられる。
|
asyncScheduler
|
setInterval() によるスケジュールに加える。 時間軸にもとづく操作に用いられる。
|
animationFrameScheduler
|
ブラウザコンテンツが再描画される直前にスケジュールされる。スムーズなブラウザアニメーションに用いられる。 |
02-02 Schedulerを使う
RxJSコードでScheduler
をとくに定めなくても、実際には使われていることがあります。並行処理を扱うObservable
のオペレータは、すべてScheduler
が選べるからです。Scheduler
が与えてられていない場合、RxJSは最小並行性の原則にもとづいてデフォルトを決めます。 オペレータの求める並行処理をもっとも少なくするScheduler
が選ばれるということです。たとえば、メッセージの数が少ないObservable
のオペレータには、RxJSはScheduler
を使いません。つまり、null
またはundefined
です。逆に、大量あるいは回数のかぎられないメッセージを返すオペレータなら、queueScheduler
が用いられます。タイマーを使うオペレータが選ぶのはasyncScheduler
です。
RxJSは、並行性がもっとも少ないScheduler
を用います。パフォーマンスを考えて、並行処理を取り入れるために、別のScheduler
を選ぶことも可能です。Observable
をつくる関数には、引数にScheduler
を指定できるものがあります。たとえば、from()
関数は、第2引数にScheduler
が与えられます。
Schedulerを引数にとる関数
たとえば、from()
関数は、つぎのような構文で、配列から変換された通知を送るときのScheduler
が定められます。通常、Scheduler
は、関数の最後の引数とされます。以下に掲げたのは、Scheduler
を引数にとる関数です。
from(array, scheduler)
bindCallback()
bindNodeCallback()
combineLatest()
concat()
empty()
from()
fromPromise()
interval()
merge()
of()
range()
throw()
timer()
subscribeOn()オペレータ
subscribeOn()
オペレータは、subscribe()
がどのコンテキストで呼び出されるのかをスケジュールするために用います。デフォルトでは、subscribe()
の呼び出しは、同期的でただちに行われます。subscribeOn(()
の引数にScheduler
を渡せば、実際のサブスクリプションは遅らせたり、スケジュールすることもできるのです。
observeOn()オペレータ
observeOn()
オペレータは、前述01「Schedulerの機能」の例のように、通知がどのコンテキストで送られるのかをスケジュールするために用います。observeOn()
は、もとのObservable
と最終Observer
との中継のObserver
を組み入れます。最終的なObserver
は、引数のScheduler
にもとづいて呼び出されるのです。
Schedulerを引数にとるオペレータ
以下の時間に関わるオペレータは、最後の引数がScheduler
です。省くと、デフォルトのasyncScheduler
で処理を行います。
bufferTime()
debounceTime()
delay()
auditTime()
sampleTime()
throttleTime()
timeInterval()
timeout()
timeoutWith()
windowTime()
他にScheduler
を引数にとる関数やオペレータは、以下のとおりです。
publishReplay()
にScheduler
が与えられるのは、ReplaySubject
クラスを利用するためです。このクラスのコンストラクタには、最後の引数としてScheduler
が与えられます。ReplaySubject
は時間を扱うので、Scheduler
のコンテキストにもとづくことになるのです。デフォルトでは、ReplaySubject
クラスがクロックを定めるのに用いるのはqueueScheduler
です。
GitHub「Scheduler」の「Using Schedulers」の項では、cache()
オペレータを採り上げて説明しています。けれど、cache()
はバージョン5.0.0-rc.1で除かれています。
RxJS 6入門
- RxJS 6入門 01: RxJSを使ってみる
- RxJS 6入門 02: Observable
- RxJS 6入門 03: Observer
- RxJS 6入門 04: Subscription
- RxJS 6入門 05: Subject
- RxJS 6入門 06: Observableをつくる関数とオペレータ
作成者: 野中文雄
作成日: 2018年6月24日
Copyright © 2001-2018 Fumio Nonaka. All rights reserved.