HTML5テクニカルノート
RxJS入門 02: Observable
- ID: FN1802005
- Technique: HTML5 / JavaScript
- Library: RxJS 5.5.6
Observable
はRxJSの基本となるオブジェクトです。値をいくつでもまとめて、時間の制約なく扱うことができます。公式「Manual」の「Observable」を下じきに、サンプルコードや解説は改めました。
01 Observableを用いたデータの受け渡し
データの受け渡し方には、プル(pull)とプッシュ(push)があります。データを受け手から取り出すのはプル、つくり手から送り出すのがプッシュです。関数はデータを求めるときに呼び出して、値がひとつ得られます。ECMAScript 2015 (ECMAScript 6)には、複数のデータを順に取り出せる反復処理(イテレータ)が備わりました。さらに、同じく新たに加わったPromise
クラスはデータを送り出して、非同期で解決された値はコールバックで扱えます。ただし、ひとつのオブジェクトに送れるデータはひとつです。RxJSのObservable
なら、複数のデータをプッシュして操作できるのです(表001)。
表001■データの受け渡し方
受け渡し\データ数 | 単数 | 複数 |
---|---|---|
プル | 関数 | イテレータ |
プッシュ | Promise | Observable |
以下のコードは、新たなObservable
をつくります。引数の関数に定めるのが値の処理です。関数が受け取るObserver
により、1から3までの整数はただちに(同期的に)プッシュします。さらに時間を(1秒)開けて整数4を加えたうえで、処理は終えます。
Observable.create()
: 新たなObservable
をつくり、値の処理は引数に関数で定めるObserver.next()
:Observable
を実行して、決められた処理を行うObserver.complete()
:Observable
の処理を終える
const observable = Rx.Observable.create((observer) => { observer.next(1); observer.next(2); observer.next(3); setTimeout(() => { observer.next(4); observer.complete(); }, 1000); });
前掲のコードだけでは何も起こらず、結果も表れません。実行するには、Observable.subscribe()
を呼び出さなければならないのです。引数の関数は、Observer.next()
が行う処理を定めます。以下のコードを加えると、コンソールには結果がつぎのように示されます。
just before subscribe got value 1 got value 2 got value 3 just after subscribe got value 4console.log('just before subscribe'); observable.subscribe( (x) => console.log('got value ' + x) ); console.log('just after subscribe');
前掲コードが呼び出したObserver.complete()
については、結果が示されませんでした。この処理は、Observable.subscribe()
の第3引数に関数で渡すのです。なお、第2引数は、エラーの扱いになります。以下のコードのように第3引数の関数を加えれば、コンソールにテキスト(done)がつぎのように出力されます。
just before subscribe got value 1 got value 2 got value 3 just after subscribe got value 4 doneobservable.subscribe( (x) => console.log('got value ' + x), undefined, () => console.log('done') );
Observable.subscribe()
には引数をひとつだけ、オブジェクト(Observer
インタフェース)で渡すこともできます。その場合には、next()
とerror()
およびcomplete()
のメソッドがつぎのように定められます。3つのメソッドすべてを与えなくても構いません。
observable.subscribe({ next(x) {console.log('got value ' + x)}, error(err) {console.error('something wrong occurred: ' + err)}, complete() {console.log('done')}, });
02 プルとプッシュの仕組み
プルとプッシュは異なるプロトコルで、データのつくり手(producer)が使い手(consumer)とどのようにやり取りするかを定めます(表002)。
表002■プルとプッシュの違い
受け渡し\主体 | つくり手 | 使い手 |
---|---|---|
プル | [受動] 求められたらデータをつくる | [能動] データをいつ求めるか決める |
プッシュ | [能動] データをいつつくるか決める | [受動] データを受け取ったら処理する |
プル(pull)
プルの仕組みでは、データの使い手がつくり手からいつ受け取るのかを決めます。つくる側は、いつデータを使う側に送るのかは知りません。JavaScriptの関数はすべてプルの仕組みです。データは関数がつくります。そして、コードから関数を呼び出すことにより、ひとつの戻り値をプルして使うのです。ECMAScript 2015には、プルのまた別の仕組みとして、ジェネレータ関数function*
とGenerator
オブジェクト(イテレータ)が備わりました(「イテレーターとジェネレーター」参照)。iterator.next()
を呼び出すコードが使い手で、つくり手のイテレータから複数の値がプルで取り出せるのです。
プッシュ(push)
プッシュの仕組みでは、データのつくり手が使い手にいつ送るのかを決めます。使う側は、いつデータを受け取るのかはわかりません。今のJavaScriptでは、Promise
オブジェクトがプシュの仕組みとしてもっとも一般的です。Promise
がつくり手となり、使い手であるコールバックに解決された値を送ります。関数とは違って、値をコールバックにいつプッシュするかは、Promise
が決めるのです。RxJSは新たなプッシュの仕組みとしてObservable
を採り入れました。Observable
は複数の値をつくって、受け手であるObserver
にプッシュできるのです。
- 関数: 呼び出したとき同期的にひとつの値を返す
Generator
: 遅延評価され、呼び出すと同期的に複数の値を反復処理により返す(「javascriptのジェネレータ関数と遅延評価」参照)Promise
非同期処理が解決されたときひとつの値を返す(「ES6: Promiseオブジェクトを使う」参照)Observable
遅延評価され、呼び出しに対して同期あるいは非同期で複数の値を返す
03 関数をより一般化したObservable
Observable
はEventEmitter
と似たものだとか、複数の値を扱うPromise
だとか捉えられがちです(EventEmitter
については「[Javascript] イベント駆動型の設計ができるEventEmitterに入門」参照)。たしかに、RxJSのSubject
を用いることにより、EventEmitter
のようにマルチキャストはできます。けれど、Observable
の基本は引数のない関数です。ただし、より一般化されて複数の値が扱えます。たとえば、以下のコードを試すと、コンソールにはつぎのように示されるはずです。
hello 1 hello 1標準JavaScriptfunction test() { console.log('hello'); return 1; } const x = test.call(); // test()の呼び出しと同じ console.log(x); const y = test.call(); // test()の呼び出しと同じ console.log(y);
Observable
を使ったつぎのコードでも、まったく同じ結果が得られます。なお、前掲のコードでFunction.call()
メソッドを用いたのは、Observable
の場合とかたちを揃えるためです。
RxJSconst test = Rx.Observable.create((observer) => { console.log('hello'); observer.next(1); }); test.subscribe( (x) => console.log(x) ); test.subscribe( (y) => console.log(y) );
関数には定義と呼び出し、Observable
には生成(Observable.create()
)と実行(Observable.subscribe()
)がそれぞれあります。関数を定義しただけ、あるいはObservable
もつくっただけでは何も起こりません。呼び出しや実行をしなければならないのです。さらに、呼び出しも実行も、ともに処理は独立しています。上のふたつのコードサンプルで、Function.call()
メソッドの呼び出しとObservable.subscribe()
の実行は2回行い、コンソールにふたつの結果が出力されました。Observable.subscribe()
は関数の呼び出しに当たるわけです。
また、Observable
は必ず非同期的な処理するともかぎりません。前掲の標準JavaScriptのコードをつぎのように試したとしましょう。
標準JavaScriptconsole.log('before'); console.log(test.call()); console.log('after');
コンソールの出力はつぎのようになるはずです。
before hello 1 after
この結果は、Observable
をつぎのように実行しても変わりません。つまり、関数と同じく同期的に処理されます。Observable
は値を同期でも非同期でも送れるのです。
RxJSconsole.log('before'); test.subscribe((x) => console.log(x) ); console.log('after');
Observable
ができて関数にできないのは、値をいくつも返すことです。関数のreturn
文は処理を終えるため、そのあとのコードは実行されません。
標準JavaScriptfunction test() { console.log('hello'); return 1; return 100; // エラーにはならずに実行されない }
Observable
であれば、Observer.next()
でいくつでも値が送れます。Observable.subscribe()
を呼び出して、それらすべてが取り出せるのです。
before hello 1 100 200 afterRxJSconst test = Rx.Observable.create((observer) => { console.log('hello'); observer.next(1); observer.next(100); observer.next(200); }); console.log('before'); test.subscribe((x) => console.log(x) ); console.log('after');
非同期で送られる値が含まれても構いません。値が解決すれば受け取れます。関数が同期的にひとつの値を返すのに対して、Observable
は同期、非同期にかかわらず、送った値をいくつでも受け取れるのです。
before hello 1 100 200 after 300RxJSconst test = Rx.Observable.create((observer) => { console.log('hello'); observer.next(1); observer.next(100); observer.next(200); setTimeout(() => { observer.next(300); }, 1000); });
04 Observableの4つの処理段階
Observable
インスタンスは4つの段階を経て処理されます。第1に、Observable.create()
や生成のオペレータによりオブジェクトがつくられます。第2に、Observable.subscribe()
などで、データの処理を登録(サブスクライブ)します。第3に、データの受け取り結果に応じてObservable
に登録したnext()
やerror()
あるいはcomplete()
を実行します。そして第4に、実行は取り消されることがあります。この4つの段階はObservable
だけでなく、Observer
やSubscription
などにも関わります。
04-01 Observableをつくる
Observable
は、Observable.create()
でつくれます。引数に渡した関数が定めるのは値の処理です。つぎのコードはObserver.next()
で整数値をふたつ送っています。データを取り出すには、Observable.subscribe()
で実行しなければなりません。引数の関数が、解決された値を処理します。
const observable = Rx.Observable.create((observer) => { observer.next(1); observer.next(2); }); observable.subscribe((x) => console.log(x)); // コンソール出力 1 2
Observable
オブジェクトを簡単に生成するオペレータもあります。たとえば、Observable.from()
は、送りたい値を配列にして引数に渡せばオブジェクトがつくれるのです。
const observable = /* Rx.Observable.create((observer) => { observer.next(1); observer.next(2); }); */ Rx.Observable.from([1, 2]);
04-02 Observableに処理を登録する
Observable.create()
の引数には、データをどのように処理するか関数で与えます。そして、実行するObservable.subscribe()
に渡す関数の仕事も、送られてくるデータの処理です。公式ドキュメントでは、ともにサブスクライブ(subscribe)と表現されています。どちらもObservable
にデータの処理を登録するということでは同じだからです。ただし、新たにつくるObservable
に登録した処理(Observable.create()
の引数)は、オブジェクトと一体になっています。これに対して、Observable.subscribe()
で登録する引数の関数はオブジェクトとは別です。すると、たとえば同じObservable
オブジェクトに異なった処理が加えられます。
const observable = Rx.Observable.create((observer) => { observer.next(1); observer.next(2); }); console.log('original value'); observable.subscribe((x) => console.log(x)); console.log('squared value'); observable.subscribe((y) => console.log(y ** 2)); // コンソール出力 original value 1 2 squared value 1 4
Observable
をサブスクライブするというのは、関数を呼び出して、データが送られるコールバックを渡すのと似ています。Observable
を実行して、送られる値やデータの処理をはじめるのです。
04-03 Observableを実行する
Observable.create()
の引数で登録した関数は、そのままでは処理されません。Observable.subscribe()
で実行してはじめて、値がObservable
につぎつぎと送られて関数により同期的あるいは非同期的に処理されるのです。Observable.create()
に渡した関数は、引数にObserver
を受け取ります。そのオブジェクトには、Observable.subscribe()
の引数で定められるのは、つぎの3つの処理です。
next()
: 数値や文字列あるいはオブジェクトなどの値を送るerror()
: JavaScriptのエラーまたは式を送るcomplete()
: 値は送らず終了を知らせる
Observer.next()
は、渡されたデータの基本の処理を担います。Observable.subscribe()
の第1引数の関数で、コールバックが定められます。
const observable = Rx.Observable.create((observer) => { observer.next(1); observer.next(2); observer.next(3); }); observable.subscribe((x) => console.log(x)); // コンソール出力 1 2 3
error()
はエラー、complete()
は処理が終わったことを示し、そのあとコードは実行されません。それぞれObservable.subscribe()
の第2引数と第3引数の関数で、コールバックが定められます。
const observable = Rx.Observable.create((observer) => { observer.next(1); observer.next(2); observer.complete(); observer.next(3); // 実行されない }); observable.subscribe( (x) => console.log(x), undefined, () => console.log('done') ); // コンソール出力 1 2 done
Observable.subscribe()
には引数をひとつだけオブジェクト(Observer
インタフェース)で渡し、next()
とerror()
およびcomplete()
をメソッドとして与えることもできます。error()
を送る場合には、try...catch
構文を用いるとよいでしょう。
const observable = Rx.Observable.create((observer) => { try { observer.next(1); observer.next(2); const n = 1; n.notExists(); observer.complete(); // 実行されない } catch(err) { observer.error(err); } }); observable.subscribe({ next(x) {console.log(x)}, error(err) {console.error('error: ' + err)}, complete() {console.log('done')} }); // コンソール出力 1 2 error: TypeError: n.notExists is not a function
04-04 Observableの実行を取り消す
Observable
を実行すると、いつまでも値を送り続けることができます。無駄な負荷を増やさないためには、必要がなくなったら止めなければなりません。実行するときは、Observable.subscribe()
をオブジェクトに対して呼び出します。したがって、止めるのもオブジェクトごとです。Observable.subscribe()
を呼び出すと、実行中であることを示すオブジェクトSubscription
が返されます。取り消すために呼び出すのは、Subscription.unsubscribe()
です。
const observable = Rx.Observable.create((observer) => { observer.next(1); observer.next(2); setTimeout(() => { observer.next(3); // 取り消される }, 1000); }); const subscription = observable.subscribe(x => console.log(x)); subscription.unsubscribe(); // コンソール出力 1 2
Subscription.unsubscribe()
はObservable
の実行を止めるだけです。たとえば、つぎのようにコードを書き替えると、setInterval()
メソッドの処理は止まらずに繰り返されます。
const observable = Rx.Observable.create((observer) => { observer.next(1); observer.next(2); // setTimeout(() => { setInterval(() => { console.log('interval'); // 出力されつづける observer.next(3); // 実行は止まる }, 1000); });
Subscription.unsubscribe()
が呼び出されたときに行いたい処理は、関数にしてObservable.create()
に渡すコールバックからつぎのように戻り値として定めます。
const observable = Rx.Observable.create((observer) => { observer.next(1); observer.next(2); const intervalID = setInterval(() => { console.log('interval'); // 出力されない observer.next(3); }, 1000); return () => { console.log('unsubscribed'); clearInterval(intervalID); }; }); const subscription = observable.subscribe(x => console.log(x)); subscription.unsubscribe(); // コンソール出力 1 2 unsubscribed
RxJS入門
- RxJS入門 01: RxJSを使ってみる
- RxJS入門 03: Observer
- RxJS入門 04: Subscription
- RxJS入門 05: Subject
- RxJS入門 06: オペレータ
- RxJS入門 07: Scheduler
作成者: 野中文雄
作成日: 2018年2月20日
Copyright © 2001-2018 Fumio Nonaka. All rights reserved.