HTML5テクニカルノート
RxJS入門 05: Subject
- ID: FN1803001
- Technique: HTML5 / JavaScript
- Library: RxJS 5.5.6
Subject
は、Observable
であり、なおかつObserver
です。Observable
と違って、複数のObserver
に値をマルチキャストできます。機能が加えられたサブクラスもいくつかあります。公式「Manual」の「Subject」を下じきに、サンプルコードや解説は改めました。
01 Subjectでマルチキャストする
RxJSのSubject
はObservable
を継承するサブクラスで、複数のObserver
に値がマルチキャストできます。親のObservable
は単一キャストで、実行したObserver
が単独でオブジェクトを保持するのと異なる点です。Subject
は、Observer
をイベントリスナーのように複数登録して扱います。
Subject
はObservable
です。Subject
を実行(サブスクライブ)してObserver
が与えられると、Observable
の場合と同じように値が送られます。Observer
からは、実行されたのが単一キャストのObservable
なのかマルチキャストのSubject
なのかはわかりません。Subject
のサブスクリプトは、内部的には実行して値を送るわけではありません。Observer
を実行のリストに加えるだけです。イベントリスナーに似た仕組みといえます。
Subject
はObserver
でもあります。next()
とerror()
およびcomplete()
の3つのメソッドを備えたオブジェクトです。next()
に値を渡して呼び出せば、Subject
のリストに登録されたObserver
すべてに値はマルチキャストで送られます。つぎのコードは、Observable.subscribe()
メソッドでSubject
にふたつのObserver
を加えたうえで、next()
により値を送ります。
const subject = new Rx.Subject(); subject.subscribe({ next(v) {console.log('observerA: ' + v);} }); subject.subscribe({ next(v) {console.log('observerB: ' + v);} }); console.log('calling next()'); subject.next(1); subject.next(2); // コンソール出力 calling next() observerA: 1 observerB: 1 observerA: 2 observerB: 2
Subject
はObserver
です。したがって、Subject
をObservable.subscribe()
メソッドの引数にして呼び出すこともできます。すると、単一キャストのObservable
をマルチキャストにすることができるのです。つぎのコードは、Observable
にSubject
を渡して実行することにより、登録された複数のObserver
に値を送ります。なお、静的メソッドObservable.from()
は、配列からつくったObservable
に要素を値として送ります。
const observable = Rx.Observable.from([1, 2, 3]); const subject = new Rx.Subject(); subject.subscribe({ next(v) {console.log('observerA: ' + v);} }); subject.subscribe({ next(v) {console.log('observerB: ' + v);} }); observable.subscribe(subject); // コンソール出力 observerA: 1 observerB: 1 observerA: 2 observerB: 2 observerA: 3 observerB: 3
Subject
を継承するサブクラスには、あとに述べるBehaviorSubject
やReplaySubject
、あるいはAsyncSubject
などがあります。
02 Observableをマルチキャストにする
Observable
そのものは単一キャストで、ひとつのObserver
にしか値を送れません。けれども、Subject
を介すれば、複数の通知先をもつマルチキャストのObservable
にできます。Subject
により、内部的に複数のObserver
に同じObservable
の実行を扱わせることができるのです。multicast()
オペレータを使えば、複数のObserver
をSubject
に登録し、Observable
から受け取ったデータが送れます。前掲のコードは、multicast()
オペレータを使って以下のように書き替えられます。
multicast()
が返すのは、Observable
を継承するConnectableObservable
のオブジェクトです(第2引数がない場合。「RxJS: multicast’s Secret」参照)。引数に渡したSubject
によりマルチキャストの機能が備わります(なお、ConnectableObservable
については「RxJS を学ぼう #4 - COLD と HOT について学ぶ / ConnectableObservable」参照)。ConnectableObservable.connect()
メソッドが、登録されたObservable
の実行を始めます。つぎのコードでメソッドの呼び出しは、内部的にobservable.subscribe(subject)
と同じ役割を果たすのです。メソッドはSubscription
を返しますので、Subscription.unsubscribe()
で実行は取り消せます。
const observable = Rx.Observable.from([1, 2, 3]); const subject = new Rx.Subject(); const multicasted = observable.multicast(subject); // subject.subscribe({ multicasted.subscribe({ next(v) {console.log('observerA: ' + v);} }); // subject.subscribe({ multicasted.subscribe({ next(v) {console.log('observerB: ' + v);} }); // observable.subscribe(subject); multicasted.connect();
03 参照を調べる
マルチキャストにしたObservable
に、複数のObserver
で時間差の実行をしてみましょう。最初の実行でObserver
が加えられたらconnect()
を呼び出し、最後の実行を止めたときunsubscribe()
で取り消します。そのために、connect()
の戻り値は変数にとっておかなければなりません。
const observable = Rx.Observable.interval(500); const subject = new Rx.Subject(); const multicasted = observable.multicast(subject); let subscription1, subscription2, subscriptionConnect; console.log('observerA subscribed'); subscription1 = multicasted.subscribe({ next(v) {console.log('observerA: ' + v);} }); subscriptionConnect = multicasted.connect(); // 実行開始 setTimeout(() => { console.log('observerB subscribed'); subscription2 = multicasted.subscribe({ next(v) {console.log('observerB: ' + v);} }); }, 600); setTimeout(() => { console.log('observerA unsubscribed'); subscription1.unsubscribe(); }, 1200); setTimeout(() => { console.log('observerB unsubscribed'); subscription2.unsubscribe(); subscriptionConnect.unsubscribe(); // 実行終了 }, 2000); // コンソール出力 observerA subscribed observerA: 0 observerB subscribed observerA: 1 observerB: 1 observerA unsubscribed observerB: 2 observerB unsubscribed
マルチキャストのObservable
にsubscribe()
でObserver
が加わったら値の受け取りを始め、すべてunsubscribe()
されてObserver
がなくなったら止めてしまえると便利でしょう。登録されている参照数を確かめて実行・停止するのがConnectableObservable.refCount()
です。Observable
を返すので(新たなConnectableObservable
をつくるのではありません)、そのオブジェクトにsubscribe()
とunsubscribe()
を行います。参照数が0より増えれば、内部的にconnect()
が呼び出されます。そして、0になれば実行を止めるのです。
const observable = Rx.Observable.interval(500); const subject = new Rx.Subject(); // const multicasted = observable.multicast(subject); const multicasted = observable.multicast(subject).refCount(); let subscription1, subscription2; // , subscriptionConnect; console.log('observerA subscribed'); subscription1 = multicasted.subscribe({ // 実行開始 next(v) {console.log('observerA: ' + v);} }); // subscriptionConnect = multicasted.connect(); setTimeout(() => { console.log('observerB subscribed'); subscription2 = multicasted.subscribe({ next(v) {console.log('observerB: ' + v);} }); }, 600); setTimeout(() => { console.log('observerA unsubscribed'); subscription1.unsubscribe(); }, 1200); setTimeout(() => { console.log('observerB unsubscribed'); subscription2.unsubscribe(); // 実行終了 // subscriptionConnect.unsubscribe(); }, 2000);
04 BehaviorSubject
Subject
のサブクラスのひとつBehaviorSubject
は、オブジェクトが現在値をもちます。直近に送られた値を納め、Observer
が実行されると同時にBehaviorSubject
からその値を受け取るのです。つぎのコードでは、BehaviorSubject
に初期値0を与えて、オブジェクトがつくられます。すると、subscribe()
で実行されたとき、ただちにObserver
にはその値が送られるのです。つぎの実行でも、直近の値(2)を受け取ってから、next()
で値が送られることになります。
const subject = new Rx.BehaviorSubject(0); // 初期値 subject.subscribe({ next(v) {console.log('observerA: ' + v);} }); subject.next(1); subject.next(2); subject.subscribe({ next(v) {console.log('observerB: ' + v);} }); subject.next(3); // コンソール出力 observerA: 0 observerA: 1 observerA: 2 observerB: 2 observerA: 3 observerB: 3
05 ReplaySubject
ReplaySubject
は、送信済みの値がつぎの実行に送れるということはBehaviorSubject
と似ています。違いは、Observable
に送った値を複数さかのぼって記録できることです。つぎのコードのように、ReplaySubject
はさかのぼる値の数(バッファーサイズ)を渡してつくります。
const subject = new Rx.ReplaySubject(3); // さかのぼる値の数 subject.subscribe({ next(v) {console.log('observerA: ' + v);} }); subject.next(1); subject.next(2); subject.next(3); subject.next(4); subject.subscribe({ next(v) {console.log('observerB: ' + v);} }); subject.next(5); // コンソール出力 observerA: 1 observerA: 2 observerA: 3 observerA: 4 observerB: 2 observerB: 3 observerB: 4 observerA: 5 observerB: 5
ReplaySubject
に記録する値の数を定めるのに、第1引数のバッファーサイズに加え、第2引数でさかのぼるウィンドウの時間をミリ秒で与えることができます。つぎのコードでは、第1引数(100)はあえて大きくしたうえで、第2引数のミリ秒(500)を狭めました。すると、ふたつ目の実行でObserver
は、直近のその時間に送られた値だけさかのぼって受け取るのです。
const subject = new Rx.ReplaySubject(100, 500); // 第2引数はさかのぼるミリ秒 subject.subscribe({ next(v) {console.log('observerA: ' + v);} }); let i = 1; setInterval(() => subject.next(i++), 200); setTimeout(() => { subject.subscribe({ next(v) {console.log('observerB: ' + v);} }); }, 1000); setTimeout(() => { subject.complete(); }, 1500); // コンソール出力 observerA: 1 observerA: 2 observerA: 3 observerA: 4 observerA: 5 observerB: 3 observerB: 4 observerB: 5 observerA: 6 observerB: 6 observerA: 7 observerB: 7
06 AsyncSubject
もうひとつご紹介するSubject
のサブクラスAsyncSubject
は、Observable
を実行して最後の値だけがObserver
に送られます。値を受け取るのは、complete()
で実行が終わったときです。
const subject = new Rx.AsyncSubject(); subject.subscribe({ next(v) {console.log('observerA: ' + v);} }); subject.next(1); subject.next(2); subject.next(3); subject.subscribe({ next(v) {console.log('observerB: ' + v);} }); subject.next(4); subject.next(5); subject.complete(); // コンソール出力 observerA: 5 observerB: 5
AsyncSubject
の働きは、last()
オペレータと似ています。last()
は実行が終わるのを待って、最後の値だけを送ります。
RxJS入門
- RxJS入門 01: RxJSを使ってみる
- RxJS入門 02: Observable
- RxJS入門 03: Observer
- RxJS入門 04: Subscription
- RxJS入門 06: オペレータ
- RxJS入門 07: Scheduler
作成者: 野中文雄
作成日: 2018年3月3日
Copyright © 2001-2018 Fumio Nonaka. All rights reserved.