HTML5テクニカルノート
RxJS 6入門 06: Observableをつくる関数とオペレータ
- ID: FN1806004
- Technique: HTML5 / JavaScript
- Library: RxJS 6.2.1
RxJSの基礎となるのはObservable
です。オペレータはObservable
を操作するのにとても役立ちます。複雑な非同期の処理を、わかりやすいコードで組み立てられるのです。このRxJS 6入門シリーズは公式「Manual」を下じきに、サンプルコードや解説は書き改めるかたちで進めています。けれど、「Operators」についてはRxJS 6で内容が大きく変わりました。そのため、コードも解説も大幅に手直ししています。
01 Observableをつくる関数
Observable
オブジェクトは、Observable.create()
メソッドでつくれます。そのほかに、一定のふるまいのObservable
を生成できる関数があります。たとえば、interval()
関数です。戻り値のObservable
は、引数に渡されたミリ秒の間隔で、0から始まる連番整数を送ります。
const {interval} = rxjs; const observable = interval(400); const subscription = observable.subscribe((x) => console.log(x)); setTimeout(() => subscription.unsubscribe() , 1000);
// コンソール出力 0 1
range()
関数は連番整数のObservable
オブジェクトをつくります。ふたつの引数は、はじまりと終わりの整数です。値を送るにはサブスクライブしなければなりません。
さらに、const {range} = rxjs; const observable = range(2, 3) const subscription = observable.subscribe((x) => console.log(x));
// コンソール出力 2 3 4
from()
関数は配列からObservable
オブジェクトがつくれます。引数に渡せるのは配列のほか、Promise
や反復可能(iterable)なオブジェクトです(「from」参照)。
const {from} = rxjs; const observable = from([1, 2, 3, 5]) const subscription = observable.subscribe((x) => console.log(x));
// コンソール出力 1 2 3 5
複数の入力Observable
を引数にしてひとつの出力にまとめるには、merge()
関数を用います。
const {interval, merge} = rxjs; const observable1 = interval(700); const observable2 = interval(300); const observables = merge(observable1, observable2) const subscription = observables.subscribe((x) => console.log(x)); setTimeout(() => subscription.unsubscribe() , 2000);
// コンソール出力 0 1 0 <- observable1 2 3 1 <- observable1 4 5
複数のObservable
をひとつにまとめる関数には、ほかにcombineLatest()
あるいはconcat()
などがあります。
02 オペレータを使う
オペレータはObservable
オブジェクトに値の処理を加えて返す関数です。filter()
やmap()
あるいはscan()
などが備わっています。オペレータを呼び出しても参照したインスタンスは変えません。Subscription
の定めを受け継いだ新たなObservable
が返されるのです。オペレータは、もとのObservable
はそのままに、新たなObservable
をつくる純粋な関数といえます。
オペレータを読み込む名前空間はrxjs.operatorsです。filter()
は、引数のコールバックがtrue
を返す値だけ送ります。オペレータはpipe()
メソッドに引数として渡してください。
const {range} = rxjs; const {filter} = rxjs.operators; const observable = range(1, 5); observable.pipe( filter((x) => x % 2 == 1) ) .subscribe((x) => console.log(x));
// コンソール出力 1 3 5
pipe()
メソッドには、オペレータをいくつでも引数に加えられます。Observable
は引数の順にオペレータに入出力されるのです。map()
オペレータは、受け取った値を引数のコールバックに渡し、その戻り値が送られます。
const {range} = rxjs; const {filter, map} = rxjs.operators; const observable = range(1, 5); observable.pipe( filter((x) => x % 2 == 1), map((x) => x ** 2) ) .subscribe((x) => console.log(x));
// コンソール出力 1 9 25
scan()
オペレータは、引数のコールバックの戻り値を、つぎの値のコールバックが第1引数として受け取ります。Observable
から送られる値は第2引数になるのです。scan()
オペレータの第2引数には、最初のコールバックに渡す初期値を与えます。
const {range} = rxjs; const {filter, map, scan} = rxjs.operators; const observable = range(1, 5); observable.pipe( filter((x) => x % 2 == 1), map((x) => x ** 2), scan((acc, x) => acc + x, 0) ) .subscribe((x) => console.log(x));
// コンソール出力 1 10 35
もうひとつ、take()
オペレータをご紹介しましょう。Observable
から送られる値の数を引数値に制限します。interval()
関数でつくられたObservable
でも、送る値がその数で止まるのです。
const {interval} = rxjs; const {take} = rxjs.operators; const observable = interval(1000) .{pipe( {take(5) ) .subscribe((x) => console.log(x));
// コンソール出力 0 1 2 3 4
03 オペレータをつくる
オペレータは、基本的にひとつのObservable
を入力として受け取り、もうひとつの新たなObservable
をつくって返す関数です。出力Observable
をサブスクライブすると、入力Observable
もサブスクライブされます。つぎのコードで定めたカスタムオペレータ関数は、入力Observable
から受け取る値をそれぞれ2乗します。出力オブジェクトに対してObservable.subscribe()
メソッドを呼び出したとき、入力Observable
もサブスクライブされることにご注目ください。 これを「オペレータサブスクリプションチェーン」と呼びます。
const {Observable, from} = rxjs; function square(input) { return Observable.create((observer) => { input.subscribe({ next(value) {observer.next(value ** 2);}, error(err) {observer.error(err);}, complete() {observer.complete();} }); }); } const input = from([1, 2, 3, 4]); square(input) .subscribe((x) => console.log(x));
// コンソール出力 1 4 9 16
前掲カスタムオペレータは、組み込み済みのオペレータと同じようには使えません。RxJS 6のオペレータは、Observable
に対して呼び出すpipe()
メソッドの引数に与えなければならないからです。カスタムオペレータをpipe()
に渡すには、引数にObservable
オブジェクトを受け取るコールバック関数の中から呼び出すようにします。
const input = from([1, 2, 3, 4]); // square(input) input.pipe( (input) => square(input) ) .subscribe((x) => console.log(x));
つまり、つぎのようにカスタムオペレータから、引数にObservable
を受け取るコールバックが返されれば、組み込み済みオペレータと同じようにpipe()
メソッドの引数に渡せるのです。
const {Observable, from} = rxjs; // function square(input) { function square() { return (input) => // return Observable.create((observer) => { input.subscribe({ next(value) {observer.next(value ** 2);}, error(err) {observer.error(err);}, complete() {observer.complete();} }); }); } const input = from([1, 2, 3, 4]); input.pipe( // (input) => square(input) square() ) .subscribe((x) => console.log(x));
// コンソール出力 1 4 9 16
04 マーブルダイヤグラム
オペレータの働きを、文章だけで説明しようとするとかぎりがあります。オペレータの多くは時間に関わり、それぞれ異なったやり方で値を送ります。それらは文章より、図で説明するとわかりやすくなることが少なくありません。マーブルダイヤグラム(Marble Diagram)は、オペレータの働きを図で示したものです。その中には、入力Observable
やオペレータとその引数、さらに出力Observable
が含まれます。
つぎの図001が、マーブルダイヤグラムの例です。時間は左から右に流れ、マーブルで示された値がObservable
の実行によりどのように送られるのかを表しています。RxJS 5.5公式Referenceでは、マーブルダイヤグラムでオペレータの仕組みを説明しています(たとえばmap()
オペレータ)。ところが、本校執筆時RxJS 6公式Referenceにはマーブルダイヤグラムが掲げられていません。おそらく、サイトがまだベータだからと考えられ、各オペレータの解説ページ「Description」にリンク切れで示されている図に加えられるものと思われます(たとえばmap()
オペレータ)。
図001■マーブルダイヤグラムとその説明
RxJS 6入門
- RxJS 6入門 01: RxJSを使ってみる
- RxJS 6入門 02: Observable
- RxJS 6入門 03: Observer
- RxJS 6入門 04: Subscription
- RxJS 6入門 05: Subject
- RxJS 6入門 07: Scheduler
作成者: 野中文雄
作成日: 2018年6月23日
Copyright © 2001-2018 Fumio Nonaka. All rights reserved.