HTML5テクニカルノート
RxJS 6入門 06: Observableをつくる関数とオペレータ
- ID: FN1806004
- Technique: HTML5 / JavaScript
- Library: RxJS 6.2.1
RxJSの基礎となるのはObservableです。オペレータはObservableを操作するのにとても役立ちます。複雑な非同期の処理を、わかりやすいコードで組み立てられるのです。このRxJS 6入門シリーズは公式「Manual」を下じきに、サンプルコードや解説は書き改めるかたちで進めています。けれど、「Operators」についてはRxJS 6で内容が大きく変わりました。そのため、コードも解説も大幅に手直ししています。
01 Observableをつくる関数
Observableオブジェクトは、Observable.create()メソッドでつくれます。そのほかに、一定のふるまいのObservableを生成できる関数があります。たとえば、interval()関数です。戻り値のObservableは、引数に渡されたミリ秒の間隔で、0から始まる連番整数を送ります。
const {interval} = rxjs; const observable = interval(400); const subscription = observable.subscribe((x) => console.log(x)); setTimeout(() => subscription.unsubscribe() , 1000);// コンソール出力 0 1
range()関数は連番整数のObservableオブジェクトをつくります。ふたつの引数は、はじまりと終わりの整数です。値を送るにはサブスクライブしなければなりません。
さらに、const {range} = rxjs; const observable = range(2, 3) const subscription = observable.subscribe((x) => console.log(x));// コンソール出力 2 3 4
from()関数は配列からObservableオブジェクトがつくれます。引数に渡せるのは配列のほか、Promiseや反復可能(iterable)なオブジェクトです(「from」参照)。
const {from} = rxjs; const observable = from([1, 2, 3, 5]) const subscription = observable.subscribe((x) => console.log(x));// コンソール出力 1 2 3 5
複数の入力Observableを引数にしてひとつの出力にまとめるには、merge()関数を用います。
const {interval, merge} = rxjs; const observable1 = interval(700); const observable2 = interval(300); const observables = merge(observable1, observable2) const subscription = observables.subscribe((x) => console.log(x)); setTimeout(() => subscription.unsubscribe() , 2000);// コンソール出力 0 1 0 <- observable1 2 3 1 <- observable1 4 5
複数のObservableをひとつにまとめる関数には、ほかにcombineLatest()あるいはconcat()などがあります。
02 オペレータを使う
オペレータはObservableオブジェクトに値の処理を加えて返す関数です。filter()やmap()あるいはscan()などが備わっています。オペレータを呼び出しても参照したインスタンスは変えません。Subscriptionの定めを受け継いだ新たなObservableが返されるのです。オペレータは、もとのObservableはそのままに、新たなObservableをつくる純粋な関数といえます。
オペレータを読み込む名前空間はrxjs.operatorsです。filter()は、引数のコールバックがtrueを返す値だけ送ります。オペレータはpipe()メソッドに引数として渡してください。
const {range} = rxjs; const {filter} = rxjs.operators; const observable = range(1, 5); observable.pipe( filter((x) => x % 2 == 1) ) .subscribe((x) => console.log(x));// コンソール出力 1 3 5
pipe()メソッドには、オペレータをいくつでも引数に加えられます。Observableは引数の順にオペレータに入出力されるのです。map()オペレータは、受け取った値を引数のコールバックに渡し、その戻り値が送られます。
const {range} = rxjs; const {filter, map} = rxjs.operators; const observable = range(1, 5); observable.pipe( filter((x) => x % 2 == 1), map((x) => x ** 2) ) .subscribe((x) => console.log(x));// コンソール出力 1 9 25
scan()オペレータは、引数のコールバックの戻り値を、つぎの値のコールバックが第1引数として受け取ります。Observableから送られる値は第2引数になるのです。scan()オペレータの第2引数には、最初のコールバックに渡す初期値を与えます。
const {range} = rxjs; const {filter, map, scan} = rxjs.operators; const observable = range(1, 5); observable.pipe( filter((x) => x % 2 == 1), map((x) => x ** 2), scan((acc, x) => acc + x, 0) ) .subscribe((x) => console.log(x));// コンソール出力 1 10 35
もうひとつ、take()オペレータをご紹介しましょう。Observableから送られる値の数を引数値に制限します。interval()関数でつくられたObservableでも、送る値がその数で止まるのです。
const {interval} = rxjs; const {take} = rxjs.operators; const observable = interval(1000) .{pipe( {take(5) ) .subscribe((x) => console.log(x));// コンソール出力 0 1 2 3 4
03 オペレータをつくる
オペレータは、基本的にひとつのObservableを入力として受け取り、もうひとつの新たなObservableをつくって返す関数です。出力Observableをサブスクライブすると、入力Observableもサブスクライブされます。つぎのコードで定めたカスタムオペレータ関数は、入力Observableから受け取る値をそれぞれ2乗します。出力オブジェクトに対してObservable.subscribe()メソッドを呼び出したとき、入力Observableもサブスクライブされることにご注目ください。 これを「オペレータサブスクリプションチェーン」と呼びます。
const {Observable, from} = rxjs; function square(input) { return Observable.create((observer) => { input.subscribe({ next(value) {observer.next(value ** 2);}, error(err) {observer.error(err);}, complete() {observer.complete();} }); }); } const input = from([1, 2, 3, 4]); square(input) .subscribe((x) => console.log(x));// コンソール出力 1 4 9 16
前掲カスタムオペレータは、組み込み済みのオペレータと同じようには使えません。RxJS 6のオペレータは、Observableに対して呼び出すpipe()メソッドの引数に与えなければならないからです。カスタムオペレータをpipe()に渡すには、引数にObservableオブジェクトを受け取るコールバック関数の中から呼び出すようにします。
const input = from([1, 2, 3, 4]); // square(input) input.pipe( (input) => square(input) ) .subscribe((x) => console.log(x));
つまり、つぎのようにカスタムオペレータから、引数にObservableを受け取るコールバックが返されれば、組み込み済みオペレータと同じようにpipe()メソッドの引数に渡せるのです。
const {Observable, from} = rxjs; // function square(input) { function square() { return (input) => // return Observable.create((observer) => { input.subscribe({ next(value) {observer.next(value ** 2);}, error(err) {observer.error(err);}, complete() {observer.complete();} }); }); } const input = from([1, 2, 3, 4]); input.pipe( // (input) => square(input) square() ) .subscribe((x) => console.log(x));// コンソール出力 1 4 9 16
04 マーブルダイヤグラム
オペレータの働きを、文章だけで説明しようとするとかぎりがあります。オペレータの多くは時間に関わり、それぞれ異なったやり方で値を送ります。それらは文章より、図で説明するとわかりやすくなることが少なくありません。マーブルダイヤグラム(Marble Diagram)は、オペレータの働きを図で示したものです。その中には、入力Observableやオペレータとその引数、さらに出力Observableが含まれます。
つぎの図001が、マーブルダイヤグラムの例です。時間は左から右に流れ、マーブルで示された値がObservableの実行によりどのように送られるのかを表しています。RxJS 5.5公式Referenceでは、マーブルダイヤグラムでオペレータの仕組みを説明しています(たとえばmap()オペレータ)。ところが、本校執筆時RxJS 6公式Referenceにはマーブルダイヤグラムが掲げられていません。おそらく、サイトがまだベータだからと考えられ、各オペレータの解説ページ「Description」にリンク切れで示されている図に加えられるものと思われます(たとえばmap()オペレータ)。
図001■マーブルダイヤグラムとその説明
RxJS 6入門
- RxJS 6入門 01: RxJSを使ってみる
- RxJS 6入門 02: Observable
- RxJS 6入門 03: Observer
- RxJS 6入門 04: Subscription
- RxJS 6入門 05: Subject
- RxJS 6入門 07: Scheduler
作成者: 野中文雄
作成日: 2018年6月23日
Copyright © 2001-2018 Fumio Nonaka. All rights reserved.