HTML5テクニカルノート
RxJS入門 07: Scheduler
- ID: FN1804002
- Technique: HTML5 / ECMAScript 2015
- Library: RxJS 5.5.8
Scheduler
は、オペーレータがサブスクリプションをいつ開始し、通知はいつ送るのかコントロールします。サブスクリプションや通知が、どのコンテキストで行われるのかをスケジュールできるのです。公式「Manual」の「Scheduler」を下じきに、サンプルコードや解説は改めました。
01 Schedulerの機能
Scheduler
は、サブスクリプションをいつ開始し、通知はいつ送るのかコントロールします。構成要素となるのはつぎの3つです。
- データ構造: タスクをどのように保持し、キューに入れるのか、優先度やその他の基準にもとづいて定める。
- 実行コンテキスト: タスクが実行される場所とタイミングを決める。
- たとえば、直ちに実行するか、
setTimeout()
やprocess.nextTick()
などのコールバックの仕組みを使うか、あるいはアニメーションフレームにもとづくか。
- たとえば、直ちに実行するか、
- (仮想)クロック:
Scheduler
のgetterメソッドnow()
により、時間の概念が与えられる。Scheduler
にスケジューリングされたタスクは、そのクロックの示す時間にのみしたがう。
Scheduler
を用いることによって、Observable
がObserver
に通知を送る実行コンテキストが定められるのです。まず、Scheduler
は使わないコードの実行結果をみておきましょう。Observable
は、値を同期的に送ります。
const observable = Rx.Observable.create((observer) => { observer.next(1); observer.next(2); observer.next(3); observer.complete(); }) console.log('just before subscribe'); observable.subscribe({ next(x) {console.log('got value ' + x);}, error(err) {console.error('something wrong occurred: ' + err);}, complete() {console.log('done');} }); console.log('just after subscribe'); /* コンソール出力 just before subscribe got value 1 got value 2 got value 3 done just after subscribe */
Observable
にobserveOn()
オペレータでScheduler.async
を定めると、値が非同期で送られるようになります。具体的には、Scheduler.async
のもとでsubscribe()
に渡されたObserver
には、内部的に用いられるschedule()
により実行の遅れ(delay)が与えられます。デフォルト値は0です。けれど、待ち時間0はsetTimeout()
やsetInterval()
と同じく、イペントループのつぎの繰り返しで実行されるため、同期の処理には遅れる結果となります。
const observable = Rx.Observable.create((observer) => { observer.next(1); observer.next(2); observer.next(3); observer.complete(); }) .observeOn(Rx.Scheduler.async); console.log('just before subscribe'); observable.subscribe({ next(x) {console.log('got value ' + x);}, error(err) {console.error('something wrong occurred: ' + err);}, complete() {console.log('done');} }); console.log('just after subscribe'); /* コンソール出力 just before subscribe just after subscribe got value 1 got value 2 got value 3 done */
Scheduler
のschedule()
メソッドの引数となる遅れ(delay)は、自身のクロックが測る時間にもとづきます。現実の時計の時間が過ぎるのとは別です。同期的なタスクを実行する場合のテストでも、内部クロックにより時間が組み替えられるので便利です。
02 Schedulerの種類と使い方
02-01 Schedulerの種類
Scheduler.async
は、RxJSが提供する組み込みスケジューラのひとつです。こうしたスケジューラは、Scheduler
オブジェクトの静的プロパティがつくって返します。
表001■Schedulerオブジェクトの静的プロパティとその用途
静的プロパティ | 用途 |
---|---|
null
|
Scheduler をまったくとおさず、通知が同期的かつ再帰的に送られる。 決まった時間間隔あるいは末尾再帰の操作に用いられる。 |
Scheduler.queue
|
現在のイベントフレームのキューにスケジュールを加える(trampoline scheduler)。繰り返し操作に用いられる。 |
Scheduler.asap
|
マイクロタスクキューにスケジュールを加える。Node.jsのprocess.nextTick() あるいはWeb WorkerのMessageChannelまたはsetTimeout() など利用できる中からもっとも速い配信の仕組みが用いられる。
|
Scheduler.async
|
setInterval() によるスケジュールに加える。 時間軸にもとづく操作に用いられる。
|
02-02 Schedulerを使う
RxJSコードでScheduler
をとくに定めなくても、実際には使われていることがあります。並列処理を扱うObservable
のオペレータは、すべてScheduler
が選べるからです。Scheduler
が与えてられていない場合、RxJSは最小並列性の原則にもとづいてデフォルトを決めます。 オペレータの求める並列処理をもっとも少なくするScheduler
が選ばれるということです。たとえば、メッセージの数が少ないObservable
のオペレータには、RxJSはScheduler
を使いません。つまり、null
またはundefined
です。逆に、量が多く回数もかぎられないメッセージを返すオペレータなら、Scheduler.queue
が用いられます。タイマーを使うオペレータが選ぶのはScheduler.async
です。
RxJSは、並列性がもっとも少ないScheduler
を用います。パフォーマンスを考えて、並列処理を取り入れるために、別のScheduler
を選ぶことも可能です。Scheduler
を指定したいとき、オペレータメソッドにはそれができるものもあります。たとえば、Observable.from()
には、第2引数にScheduler
が与えられます。
Schedulerを引数にとる静的生成オペレータ
静的生成オペレータは、基本的にScheduler
を引数にとります。たとえば、Observable.from()
オペレータは、つぎのような構文で、配列から変換された通知を送るときのScheduler
が定められるのです。通常、Scheduler
は、オペレータの最後の引数とされます。以下に掲げたのは、Scheduler
を引数にとる静的生成オペレータです。
Rx.Observable.from(array, scheduler)
Observable.bindCallback()
Observable.bindNodeCallback()
Observable.combineLatest
Observable.concat
Observable.empty
Observable.from
Observable.fromPromise
Observable.interval
Observable.merge
Observable.of
Observable.range
Observable.throw
Observable.timer
subscribeOn()オペレータ
subscribeOn()
オペレータは、subscribe()
がどのコンテキストで呼び出されるのかをスケジュールするために用います。デフォルトでは、subscribe()
の呼び出しは、同期的でただちに行われます。インスタンスオペレータsubscribeOn(()
の引数にScheduler
を渡せば、実際のサブスクリプションは遅らせたり、スケジュールすることもできるのです。
observeOn()オペレータ
observeOn()
オペレータは、前述01「Schedulerの機能」の例のように、通知がどのコンテキストで送られるのかをスケジュールするために用います。インスタンスオペレータobserveOn()
は、引数のScheduler
にもとづいて中継のObserver
を組み入れます。最終的なObserver
は、そのスケジュールにしたがって呼び出されるのです。
Schedulerを引数にとるインスタンスオペレータ
Scheduler
引数にとるインスタンスオペレータもあります。以下の時間に関わるオペレータは、最後の引数がScheduler
です。あとの時間関連オペレータは、デフォルトのScheduler.async
で処理を行います。
bufferTime()
debounceTime()
delay()
auditTime()
sampleTime()
throttleTime()
timeInterval()
timeout()
timeoutWith()
windowTime()
そのほかに、Scheduler
を引数にとるインスタンスオペレータはつぎのとおりです。
cache()
とpublishReplay()
にScheduler
が与えられるのは、ReplaySubject
クラスを利用するためです。このクラスのコンストラクタには、最後の引数としてScheduler
が与えられます。ReplaySubject
は時間を扱うので、Scheduler
のコンテキストにもとづくことになるのです。デフォルトでは、ReplaySubject
クラスは、Scheduler.queue
のクロックを用います。
RxJS入門
- RxJS入門 01: RxJSを使ってみる
- RxJS入門 02: Observable
- RxJS入門 03: Observer
- RxJS入門 04: Subscription
- RxJS入門 05: Subject
- RxJS入門 06: オペレータ
作成者: 野中文雄
作成日: 2018年4月8日
Copyright © 2001-2018 Fumio Nonaka. All rights reserved.