サイトトップ

Director Flash 書籍 業務内容 プロフィール

HTML5テクニカルノート

RxJS 6入門 07: Scheduler


Schedulerは、オペーレータがサブスクリプションをいつ開始し、通知はいつ送るのかコントロールします。サブスクリプションや通知が、どのコンテキストで行われるのかをスケジュールできるのです。GitHubのReactiveX/rxjsScheduler」を下じきに、サンプルコードや解説は改めました。

01 Schedulerの機能

Schedulerは、サブスクリプションをいつ開始し、通知はいつ送るのかコントロールします。構成要素となるのはつぎの3つです。

Schedulerを用いることによって、どの実行コンテキストでObservableObserverに通知を送るが決まるのです。まず、Schedulerは使わないコードの実行結果をみておきましょう。Observableは値を同期的に送ります。


const {Observable} = rxjs;
const observable = Observable.create((observer) => {
	observer.next(1);
	observer.next(2);
	observer.next(3);
	observer.complete();
});
console.log('just before subscribe');
observable.subscribe({
	next(x) {console.log('got value ' + x);},
	error(err) {console.error('something wrong occurred: ' + err);},
	complete() {console.log('done');}
});
console.log('just after subscribe');

// コンソール出力
just before subscribe
got value 1
got value 2
got value 3
done
just after subscribe

ObservableobserveOn()オペレータasyncSchedulerを定めると、値が非同期で送られるようになります。具体的には、observeOn()に渡したasyncSchedulerは、内部的にschedule()メソッドが呼び出され、Observerに実行の遅れ(delay)を与えます。デフォルト値は0です。けれど、待ち時間0はsetTimeout()setInterval()の仕組みで扱われ、イペントループのつぎの繰り返しで実行されるため、同期処理には遅れる結果となります。


const {Observable, asyncScheduler} = rxjs;
const {observeOn} = rxjs.operators;
const observable = Observable.create((observer) => {
	observer.next(1);
	observer.next(2);
	observer.next(3);
	observer.complete();
}).pipe(
	observeOn(asyncScheduler)
);
console.log('just before subscribe');
observable.subscribe({
	next(x) {console.log('got value ' + x);},
	error(err) {console.error('something wrong occurred: ' + err);},
	complete() {console.log('done');}
});
console.log('just after subscribe');

// コンソール出力
just before subscribe
just after subscribe
got value 1
got value 2
got value 3
done

observeOn(asyncScheduler)は、Observableと最終Observerとの間に、つぎのような構造のプロキシObserverをつくるのです。


const proxyObserver = {
	next(val) {
		asyncScheduler.schedule(
		(x) => finalObserver.next(x),
		0,  // 遅れ
		val  // 送られる値
		);
	},

}

Schedulerschedule()メソッドの引数となる遅れ(delay)は、自身の内部クロックが測る時間にもとづきます。現実の時計の時間が過ぎるのとは別です。時間に関わるオペレータは、遅れなど時間の経過をSchedulerのクロックにもとづいて扱います。同期的なタスクを実行する場合のテストでも、内部クロックにより時間が組み替えられるので便利です。

02 Schedulerの種類と使い方

02-01 Schedulerの種類

asyncSchedulerは、RxJSが提供する組み込みスケジューラのひとつです。こうしたスケジューラは、Schedulerオブジェクトの静的プロパティがつくって返します。

The async Scheduler is one of the built-in schedulers provided by RxJS. Each of these can be created and returned by using static properties of the Scheduler object.

表001■Schedulerオブジェクトの静的プロパティとその用途

静的プロパティ 用途
null Schedulerを渡さないと、通知が同期的かつ再帰的に送られる。 決まった時間間隔あるいは末尾再帰の操作に用いられる。
queueScheduler 現在のイベントフレーム(trampoline scheduler)のキューにスケジュールを加える(「RxJava の ImmediateScheduler と TrampolineScheduler の違い」参照)。繰り返し操作に用いられる。
asapScheduler マイクロタスクキューにスケジュールを加える。Node.jsのprocess.nextTick()またはWeb WorkerのMessageChannelあるいはsetTimeout()など利用できる中からもっとも速い配信の仕組みが用いられる。
asyncScheduler setInterval()によるスケジュールに加える。 時間軸にもとづく操作に用いられる。
animationFrameScheduler ブラウザコンテンツが再描画される直前にスケジュールされる。スムーズなブラウザアニメーションに用いられる。

02-02 Schedulerを使う

RxJSコードでSchedulerをとくに定めなくても、実際には使われていることがあります。並行処理を扱うObservableのオペレータは、すべてSchedulerが選べるからです。Schedulerが与えてられていない場合、RxJSは最小並行性の原則にもとづいてデフォルトを決めます。 オペレータの求める並行処理をもっとも少なくするSchedulerが選ばれるということです。たとえば、メッセージの数が少ないObservableのオペレータには、RxJSはSchedulerを使いません。つまり、nullまたはundefinedです。逆に、大量あるいは回数のかぎられないメッセージを返すオペレータなら、queueSchedulerが用いられます。タイマーを使うオペレータが選ぶのはasyncSchedulerです。

RxJSは、並行性がもっとも少ないSchedulerを用います。パフォーマンスを考えて、並行処理を取り入れるために、別のSchedulerを選ぶことも可能です。Observableをつくる関数には、引数にSchedulerを指定できるものがあります。たとえば、from()関数は、第2引数にSchedulerが与えられます。

Schedulerを引数にとる関数

たとえば、from()関数は、つぎのような構文で、配列から変換された通知を送るときのSchedulerが定められます。通常、Schedulerは、関数の最後の引数とされます。以下に掲げたのは、Schedulerを引数にとる関数です。


from(array, scheduler)

subscribeOn()オペレータ

subscribeOn()オペレータは、subscribe()がどのコンテキストで呼び出されるのかをスケジュールするために用います。デフォルトでは、subscribe()の呼び出しは、同期的でただちに行われます。subscribeOn(()の引数にSchedulerを渡せば、実際のサブスクリプションは遅らせたり、スケジュールすることもできるのです。

observeOn()オペレータ

observeOn()オペレータは、前述01「Schedulerの機能」の例のように、通知がどのコンテキストで送られるのかをスケジュールするために用います。observeOn()は、もとのObservableと最終Observerとの中継のObserverを組み入れます。最終的なObserverは、引数のSchedulerにもとづいて呼び出されるのです。

Schedulerを引数にとるオペレータ

以下の時間に関わるオペレータは、最後の引数がSchedulerです。省くと、デフォルトのasyncSchedulerで処理を行います。

他にSchedulerを引数にとる関数やオペレータは、以下のとおりです。

publishReplay()Schedulerが与えられるのは、ReplaySubjectクラスを利用するためです。このクラスのコンストラクタには、最後の引数としてSchedulerが与えられます。ReplaySubjectは時間を扱うので、Schedulerのコンテキストにもとづくことになるのです。デフォルトでは、ReplaySubjectクラスがクロックを定めるのに用いるのはqueueSchedulerです。

GitHub「Scheduler」の「Using Schedulers」の項では、cache()オペレータを採り上げて説明しています。けれど、cache()はバージョン5.0.0-rc.1で除かれています

RxJS 6入門


作成者: 野中文雄
作成日: 2018年6月24日


Copyright © 2001-2018 Fumio Nonaka.  All rights reserved.