サイトトップ

Director Flash 書籍 業務内容 プロフィール

HTML5テクニカルノート

RxJS 6入門 05: Subject


Subjectは、Observableであり、なおかつObserverです。Observableと違って、複数のObserverに値をマルチキャストできます。機能が加えられたサブクラスもいくつかあります。RxJS 5公式「Manual」の「Subject」を下じきに、バージョン6に改め、コード例や解説も書き替えました。

01 Subjectでマルチキャストする

RxJSのSubjectObservableを継承するサブクラスで、複数のObserverに値がマルチキャストできます。親のObservableは単一キャストで、実行したObserverが単独でオブジェクトを保持するのと異なる点です。Subjectは、Observerをイベントリスナーのように複数登録して扱います。

SubjectObservableです。Subjectを実行(サブスクライブ)してObserverが与えられると、Observableの場合と同じように値が送られます。Observerからは、実行されたのが単一キャストのObservableなのかマルチキャストのSubjectなのかはわかりません。Subjectのサブスクリプトは、内部的には実行して値を送るわけではありません。Observerを実行のリストに加えるだけです。イベントリスナーに似た仕組みといえます。

SubjectObserverでもあります。next()error()およびcomplete()の3つのメソッドを備えたオブジェクトです。next()に値を渡して呼び出せば、Subjectのリストに登録されたObserverすべてに値はマルチキャストで送られます。つぎのコードは、Observable.subscribe()メソッドでSubjectにふたつのObserverを加えたうえで、next()により値を送ります。


const {Subject} = rxjs;
const subject = new Subject();
subject.subscribe({
	next(v) {console.log('observerA: ' + v);}
});
subject.subscribe({
	next(v) {console.log('observerB: ' + v);}
});
console.log('calling next()');
subject.next(1);
subject.next(2);

// コンソール出力
calling next()
observerA: 1
observerB: 1
observerA: 2
observerB: 2

SubjectObserverです。したがって、SubjectObservable.subscribe()メソッドの引数にして呼び出すこともできます。すると、単一キャストのObservableをマルチキャストにすることができるのです。つぎのコードは、ObservableSubjectを渡して実行することにより、登録された複数のObserverに値を送ります。なお、from()関数は、配列からつくったObservableに要素を値として送ります(「RxJS 6入門 02」04-01「Observableをつくる」参照)。


const {Subject, from} = rxjs;
const observable = from([1, 2, 3]);
const subject = new Subject();
subject.subscribe({
	next(v) {console.log('observerA: ' + v);}
});
subject.subscribe({
	next(v) {console.log('observerB: ' + v);}
});
observable.subscribe(subject);

// コンソール出力
observerA: 1
observerB: 1
observerA: 2
observerB: 2
observerA: 3
observerB: 3

Subjectを継承するサブクラスには、あとに述べるBehaviorSubjectReplaySubject、あるいはAsyncSubjectなどがあります。

02 Observableをマルチキャストにする

Observableそのものは単一キャストで、ひとつのObserverにしか値を送れません。けれども、Subjectを介すれば、複数の通知先をもつマルチキャストのObservableにできます。Subjectにより、内部的に複数のObserverに同じObservableの実行を扱わせることができるのです。multicast()オペレータを使えば、複数のObserverSubjectに登録し、Observableから受け取ったデータが送れます。前掲のコードは、multicast()オペレータを使って以下のように書き替えられます。オペレータはpipe()メソッドの引数に渡してください。

Observerは取りまとめ役のSubjectにサブスクライブします。そして、SubjectがマルチキャストにしたいObservableにサブスクライブするのです。こうして、ObservableSubjectの力を借りてマルチキャストすることになります。

multicast()が返すのは、Observableを継承するConnectableObservableのオブジェクトです(第2引数がない場合。「RxJS: multicast’s Secret」参照)。引数に渡したSubjectによりマルチキャストの機能が備わります(なお、ConnectableObservableについては「RxJS を学ぼう #4 - COLD と HOT について学ぶ / ConnectableObservable」参照)。ConnectableObservable.connect()メソッドが、登録されたObservableの実行を始めます。つぎのコードでメソッドの呼び出しは、内部的にobservable.subscribe(subject)と同じ役割を果たすのです。メソッドはSubscriptionを返しますので、Subscription.unsubscribe()で実行は取り消せます。


const {Subject, from} = rxjs;
const {multicast} = rxjs.operators;
// const observable = from([1, 2, 3]);
const subject = new Subject();
const connected = from([1, 2, 3])
.pipe(
	multicast(subject)
)
subject.subscribe({
	next(v) {console.log('observerA: ' + v);}
});
subject.subscribe({
	next(v) {console.log('observerB: ' + v);}
});
// observable.subscribe(subject);
connected.connect();

03 参照を調べる

マルチキャストにしたObservableに、複数のObserverで時間差の実行をしてみましょう。最初の実行でObserverが加えられたらconnect()を呼び出し、最後の実行を止めたときunsubscribe()で取り消します。そのために、connect()の戻り値は変数にとっておかなければなりません。


const {Subject, interval} = rxjs;
const {multicast} = rxjs.operators;
const subject = new Subject();
const connected = interval(400)
.pipe(
	multicast(subject)
);
let subscription1, subscription2, subscriptionConnect;
console.log('observerA subscribed');
subscription1 = connected.subscribe({
	next(v) {console.log('observerA: ' + v);}
});
subscriptionConnect = connected.connect();  // 実行開始
setTimeout(() => {
	console.log('observerB subscribed');
	subscription2 = connected.subscribe({
		next(v) {console.log('observerB: ' + v);}
	});
}, 600);
setTimeout(() => {
	console.log('observerA unsubscribed');
	subscription1.unsubscribe();
}, 900);
setTimeout(() => {
	console.log('observerB unsubscribed');
	subscription2.unsubscribe();
	subscriptionConnect.unsubscribe(); // 実行終了
}, 1500);

// コンソール出力
observerA subscribed
observerA: 0
observerB subscribed
observerA: 1
observerB: 1
observerA unsubscribed
observerB: 2
observerB unsubscribed

マルチキャストのObservablesubscribe()Observerが加わったら値の受け取りを始め、すべてunsubscribe()されてObserverがなくなったら止めてしまえると便利でしょう。登録されている参照数を確かめて実行・停止するのがオベレータrefCount()です。Observableを返すので(新たなConnectableObservableをつくるのではありません)、そのオブジェクトにsubscribe()unsubscribe()を行います。参照数が0より増えれば、内部的にconnect()が呼び出されます。そして、0になれば実行を止めるのです。


const {Subject, interval} = rxjs;
const {multicast, refCount} = rxjs.operators;
const subject = new Subject();
const connected = interval(400)
.pipe(
	multicast(subject),
	refCount()
);
let subscription1, subscription2;  // , subscriptionConnect;
console.log('observerA subscribed');
subscription1 = connected.subscribe({  // 実行開始
	next(v) {console.log('observerA: ' + v);}
});
// subscriptionConnect = connected.connect();
setTimeout(() => {
	console.log('observerB subscribed');
	subscription2 = connected.subscribe({
		next(v) {console.log('observerB: ' + v);}
	});
}, 600);
setTimeout(() => {
	console.log('observerA unsubscribed');
	subscription1.unsubscribe();
}, 900);
setTimeout(() => {
	console.log('observerB unsubscribed');
	subscription2.unsubscribe();  // 実行終了
	// subscriptionConnect.unsubscribe();
}, 1500);

04 BehaviorSubject

SubjectのサブクラスのひとつBehaviorSubjectは、オブジェクトが現在値をもちます。直近に送られた値を納め、Observerが実行されると同時にBehaviorSubjectからその値を受け取るのです。つぎのコードでは、BehaviorSubjectに初期値0を与えて、オブジェクトがつくられます。すると、subscribe()で実行されたとき、ただちにObserverにはその値が送られるのです。つぎの実行でも、直近の値(2)を受け取ってから、next()で値が送られることになります。


const {BehaviorSubject} = rxjs;
const subject = new BehaviorSubject(0);  // 初期値
subject.subscribe({
	next(v) {console.log('observerA: ' + v);}
});
subject.next(1);
subject.next(2);
subject.subscribe({
	next(v) {console.log('observerB: ' + v);}
});
subject.next(3);

// コンソール出力
observerA: 0
observerA: 1
observerA: 2
observerB: 2
observerA: 3
observerB: 3

05 ReplaySubject

ReplaySubjectは、送信済みの値がつぎの実行に送れるということはBehaviorSubjectと似ています。違いは、Observableに送った値を複数さかのぼって記録できることです。つぎのコードのように、ReplaySubjectはさかのぼる値の数(バッファーサイズ)を渡してつくります。


const {ReplaySubject} = rxjs;
const subject = new ReplaySubject(3);  // さかのぼる値の数
subject.subscribe({
	next(v) {console.log('observerA: ' + v);}
});
subject.next(1);
subject.next(2);
subject.next(3);
subject.next(4);
subject.subscribe({
	next(v) {console.log('observerB: ' + v);}
});
subject.next(5);

// コンソール出力
observerA: 1
observerA: 2
observerA: 3
observerA: 4
observerB: 2
observerB: 3
observerB: 4
observerA: 5
observerB: 5

ReplaySubjectに記録する値の数を定めるのに、第1引数のバッファーサイズに加え、第2引数でさかのぼるウィンドウの時間をミリ秒で与えることができます。つぎのコードでは、第1引数(100)はあえて大きくしたうえで、第2引数のミリ秒(500)を狭めました。すると、ふたつ目の実行でObserverは、直近のその時間に送られた値だけさかのぼって受け取るのです。


const {ReplaySubject} = rxjs;
const subject = new ReplaySubject(100, 500);  // 第2引数はさかのぼるミリ秒
subject.subscribe({
	next(v) {console.log('observerA: ' + v);}
});
let i = 1;
setInterval(() => subject.next(i++), 200);
setTimeout(() => {
	subject.subscribe({
		next(v) {console.log('observerB: ' + v);}
	});
}, 1000);
setTimeout(() => {
	subject.complete();
}, 1500);

// コンソール出力
observerA: 1
observerA: 2
observerA: 3
observerA: 4
observerA: 5
observerB: 3
observerB: 4
observerB: 5
observerA: 6
observerB: 6
observerA: 7
observerB: 7

06 AsyncSubject

もうひとつご紹介するSubjectのサブクラスAsyncSubjectは、Observableを実行して最後の値だけがObserverに送られます。値を受け取るのは、complete()で実行が終わったときです。


const {AsyncSubject} = rxjs;
const subject = new AsyncSubject();
subject.subscribe({
	next(v) {console.log('observerA: ' + v);}
});
subject.next(1);
subject.next(2);
subject.next(3);
subject.subscribe({
	next(v) {console.log('observerB: ' + v);}
});
subject.next(4);
subject.next(5);
subject.complete();

// コンソール出力
observerA: 5
observerB: 5

AsyncSubjectの働きは、last()オペレータと似ています。last()は実行が終わるのを待って、最後の値だけを送ります。

RxJS 6入門

 


作成者: 野中文雄
作成日: 2018年6月11日


Copyright © 2001-2018 Fumio Nonaka.  All rights reserved.