サイトトップ

Director Flash 書籍 業務内容 プロフィール

HTML5テクニカルノート

RxJS入門 02: Observable


ObservableはRxJSの基本となるオブジェクトです。値をいくつでもまとめて、時間の制約なく扱うことができます。公式「Manual」の「Observable」を下じきに、サンプルコードや解説は改めました。

01 Observableを用いたデータの受け渡し

データの受け渡し方には、プル(pull)とプッシュ(push)があります。データを受け手から取り出すのはプル、つくり手から送り出すのがプッシュです。関数はデータを求めるときに呼び出して、値がひとつ得られます。ECMAScript 2015 (ECMAScript 6)には、複数のデータを順に取り出せる反復処理(イテレータ)が備わりました。さらに、同じく新たに加わったPromiseクラスはデータを送り出して、非同期で解決された値はコールバックで扱えます。ただし、ひとつのオブジェクトに送れるデータはひとつです。RxJSのObservableなら、複数のデータをプッシュして操作できるのです(表001)。

表001■データの受け渡し方

受け渡し\データ数 単数 複数
プル 関数 イテレータ
プッシュ Promise Observable

以下のコードは、新たなObservableをつくります。引数の関数に定めるのが値の処理です。関数が受け取るObserverにより、1から3までの整数はただちに(同期的に)プッシュします。さらに時間を(1秒)開けて整数4を加えたうえで、処理は終えます。


const observable = Rx.Observable.create((observer) => {
	observer.next(1);
	observer.next(2);
	observer.next(3);
	setTimeout(() => {
		observer.next(4);
		observer.complete();
	}, 1000);
});

前掲のコードだけでは何も起こらず、結果も表れません。実行するには、Observable.subscribe()を呼び出さなければならないのです。引数の関数は、Observer.next()が行う処理を定めます。以下のコードを加えると、コンソールには結果がつぎのように示されます。

just before subscribe
got value 1
got value 2
got value 3
just after subscribe
got value 4

console.log('just before subscribe');
observable.subscribe(
	(x) => console.log('got value ' + x)
);
console.log('just after subscribe');

前掲コードが呼び出したObserver.complete()については、結果が示されませんでした。この処理は、Observable.subscribe()の第3引数に関数で渡すのです。なお、第2引数は、エラーの扱いになります。以下のコードのように第3引数の関数を加えれば、コンソールにテキスト(done)がつぎのように出力されます。

just before subscribe
got value 1
got value 2
got value 3
just after subscribe
got value 4
done

observable.subscribe(
	(x) => console.log('got value ' + x),
	undefined,
	() => console.log('done')
);

Observable.subscribe()には引数をひとつだけ、オブジェクト(Observerインタフェース)で渡すこともできます。その場合には、next()error()およびcomplete()のメソッドがつぎのように定められます。3つのメソッドすべてを与えなくても構いません。


observable.subscribe({
	next(x) {console.log('got value ' + x)},
	error(err) {console.error('something wrong occurred: ' + err)},
	complete() {console.log('done')},
});

02 プルとプッシュの仕組み

プルとプッシュは異なるプロトコルで、データのつくり手(producer)が使い手(consumer)とどのようにやり取りするかを定めます(表002)。

表002■プルとプッシュの違い

受け渡し\主体 つくり手 使い手
プル [受動] 求められたらデータをつくる [能動] データをいつ求めるか決める
プッシュ [能動] データをいつつくるか決める [受動] データを受け取ったら処理する

プル(pull)

プルの仕組みでは、データの使い手がつくり手からいつ受け取るのかを決めます。つくる側は、いつデータを使う側に送るのかは知りません。JavaScriptの関数はすべてプルの仕組みです。データは関数がつくります。そして、コードから関数を呼び出すことにより、ひとつの戻り値をプルして使うのです。ECMAScript 2015には、プルのまた別の仕組みとして、ジェネレータ関数function*Generatorオブジェクト(イテレータ)が備わりました(「イテレーターとジェネレーター」参照)。iterator.next()を呼び出すコードが使い手で、つくり手のイテレータから複数の値がプルで取り出せるのです。

プッシュ(push)

プッシュの仕組みでは、データのつくり手が使い手にいつ送るのかを決めます。使う側は、いつデータを受け取るのかはわかりません。今のJavaScriptでは、Promiseオブジェクトがプシュの仕組みとしてもっとも一般的です。Promiseがつくり手となり、使い手であるコールバックに解決された値を送ります。関数とは違って、値をコールバックにいつプッシュするかは、Promiseが決めるのです。RxJSは新たなプッシュの仕組みとしてObservableを採り入れました。Observableは複数の値をつくって、受け手であるObserverにプッシュできるのです。

03 関数をより一般化したObservable

ObservableEventEmitterと似たものだとか、複数の値を扱うPromiseだとか捉えられがちです(EventEmitterについては「[Javascript] イベント駆動型の設計ができるEventEmitterに入門」参照)。たしかに、RxJSのSubjectを用いることにより、EventEmitterのようにマルチキャストはできます。けれど、Observableの基本は引数のない関数です。ただし、より一般化されて複数の値が扱えます。たとえば、以下のコードを試すと、コンソールにはつぎのように示されるはずです。

hello
1
hello
1
標準JavaScript

function test() {
	console.log('hello');
	return 1;
}
const x = test.call();  // test()の呼び出しと同じ
console.log(x);
const y = test.call();  // test()の呼び出しと同じ
console.log(y);

Observableを使ったつぎのコードでも、まったく同じ結果が得られます。なお、前掲のコードでFunction.call()メソッドを用いたのは、Observableの場合とかたちを揃えるためです。

RxJS

const test = Rx.Observable.create((observer) => {
	console.log('hello');
	observer.next(1);
});
test.subscribe(
	(x) => console.log(x)
);
test.subscribe(
	(y) => console.log(y)
);

関数には定義と呼び出し、Observableには生成(Observable.create())と実行(Observable.subscribe())がそれぞれあります。関数を定義しただけ、あるいはObservableもつくっただけでは何も起こりません。呼び出しや実行をしなければならないのです。さらに、呼び出しも実行も、ともに処理は独立しています。上のふたつのコードサンプルで、Function.call()メソッドの呼び出しとObservable.subscribe()の実行は2回行い、コンソールにふたつの結果が出力されました。Observable.subscribe()は関数の呼び出しに当たるわけです。

また、Observableは必ず非同期的な処理するともかぎりません。前掲の標準JavaScriptのコードをつぎのように試したとしましょう。

標準JavaScript

console.log('before');
console.log(test.call());
console.log('after');

コンソールの出力はつぎのようになるはずです。

before
hello
1
after

この結果は、Observableをつぎのように実行しても変わりません。つまり、関数と同じく同期的に処理されます。Observableは値を同期でも非同期でも送れるのです。

RxJS

console.log('before');
test.subscribe((x) =>
	console.log(x)
);
console.log('after');

Observableができて関数にできないのは、値をいくつも返すことです。関数のreturn文は処理を終えるため、そのあとのコードは実行されません。

標準JavaScript

function test() {
	console.log('hello');
	return 1;
	return 100;  // エラーにはならずに実行されない
}

Observableであれば、Observer.next()でいくつでも値が送れます。Observable.subscribe()を呼び出して、それらすべてが取り出せるのです。

before
hello
1
100
200
after
RxJS

const test = Rx.Observable.create((observer) => {
	console.log('hello');
	observer.next(1);
	observer.next(100);
	observer.next(200);
});
console.log('before');
test.subscribe((x) =>
	console.log(x)
);
console.log('after');

非同期で送られる値が含まれても構いません。値が解決すれば受け取れます。関数が同期的にひとつの値を返すのに対して、Observableは同期、非同期にかかわらず、送った値をいくつでも受け取れるのです。

before
hello
1
100
200
after
300
RxJS

const test = Rx.Observable.create((observer) => {
	console.log('hello');
	observer.next(1);
	observer.next(100);
	observer.next(200);
	setTimeout(() => {
		observer.next(300);
	}, 1000);
});

04 Observableの4つの処理段階

Observableインスタンスは4つの段階を経て処理されます。第1に、Observable.create()や生成のオペレータによりオブジェクトがつくられます。第2に、Observable.subscribe()などで、データの処理を登録(サブスクライブ)します。第3に、データの受け取り結果に応じてObservableに登録したnext()error()あるいはcomplete()を実行します。そして第4に、実行は取り消されることがあります。この4つの段階はObservableだけでなく、ObserverSubscriptionなどにも関わります。

04-01 Observableをつくる

Observableは、Observable.create()でつくれます。引数に渡した関数が定めるのは値の処理です。つぎのコードはObserver.next()で整数値をふたつ送っています。データを取り出すには、Observable.subscribe()で実行しなければなりません。引数の関数が、解決された値を処理します。


const observable = Rx.Observable.create((observer) => {
	observer.next(1);
	observer.next(2);
});
observable.subscribe((x) => console.log(x));

// コンソール出力
1
2

Observableオブジェクトを簡単に生成するオペレータもあります。たとえば、Observable.from()は、送りたい値を配列にして引数に渡せばオブジェクトがつくれるのです。


const observable = /* Rx.Observable.create((observer) => {
	observer.next(1);
	observer.next(2);
}); */
Rx.Observable.from([1, 2]);

04-02 Observableに処理を登録する

Observable.create()の引数には、データをどのように処理するか関数で与えます。そして、実行するObservable.subscribe()に渡す関数の仕事も、送られてくるデータの処理です。公式ドキュメントでは、ともにサブスクライブ(subscribe)と表現されています。どちらもObservableにデータの処理を登録するということでは同じだからです。ただし、新たにつくるObservableに登録した処理(Observable.create()の引数)は、オブジェクトと一体になっています。これに対して、Observable.subscribe()で登録する引数の関数はオブジェクトとは別です。すると、たとえば同じObservableオブジェクトに異なった処理が加えられます。


const observable = Rx.Observable.create((observer) => {
	observer.next(1);
	observer.next(2);
});
console.log('original value');
observable.subscribe((x) => console.log(x));
console.log('squared value');
observable.subscribe((y) => console.log(y ** 2));

// コンソール出力
original value
1
2
squared value
1
4

Observableをサブスクライブするというのは、関数を呼び出して、データが送られるコールバックを渡すのと似ています。Observableを実行して、送られる値やデータの処理をはじめるのです。

04-03 Observableを実行する

Observable.create()の引数で登録した関数は、そのままでは処理されません。Observable.subscribe()で実行してはじめて、値がObservableにつぎつぎと送られて関数により同期的あるいは非同期的に処理されるのです。Observable.create()に渡した関数は、引数にObserverを受け取ります。そのオブジェクトには、Observable.subscribe()の引数で定められるのは、つぎの3つの処理です。

Observer.next()は、渡されたデータの基本の処理を担います。Observable.subscribe()の第1引数の関数で、コールバックが定められます。


const observable = Rx.Observable.create((observer) => {
	observer.next(1);
	observer.next(2);
	observer.next(3);
});
observable.subscribe((x) => console.log(x));

// コンソール出力
1
2
3

error()はエラー、complete()は処理が終わったことを示し、そのあとコードは実行されません。それぞれObservable.subscribe()の第2引数と第3引数の関数で、コールバックが定められます。


const observable = Rx.Observable.create((observer) => {
	observer.next(1);
	observer.next(2);
	observer.complete();
	observer.next(3);  // 実行されない
});
observable.subscribe(
	(x) => console.log(x),
	undefined,
	() => console.log('done')
);

// コンソール出力
1
2
done

Observable.subscribe()には引数をひとつだけオブジェクト(Observerインタフェース)で渡し、next()error()およびcomplete()をメソッドとして与えることもできます。error()を送る場合には、try...catch構文を用いるとよいでしょう。


const observable = Rx.Observable.create((observer) => {
	try {
		observer.next(1);
		observer.next(2);
		const n = 1;
		n.notExists();
		observer.complete();  // 実行されない
	} catch(err) {
		observer.error(err);
	}
});
observable.subscribe({
	next(x) {console.log(x)},
	error(err) {console.error('error: ' + err)},
	complete() {console.log('done')}
});

// コンソール出力
1
2
error: TypeError: n.notExists is not a function

04-04 Observableの実行を取り消す

Observableを実行すると、いつまでも値を送り続けることができます。無駄な負荷を増やさないためには、必要がなくなったら止めなければなりません。実行するときは、Observable.subscribe()をオブジェクトに対して呼び出します。したがって、止めるのもオブジェクトごとです。Observable.subscribe()を呼び出すと、実行中であることを示すオブジェクトSubscriptionが返されます。取り消すために呼び出すのは、Subscription.unsubscribe()です。


const observable = Rx.Observable.create((observer) => {
	observer.next(1);
	observer.next(2);
	setTimeout(() => {
		observer.next(3);  // 取り消される
	}, 1000);
});
const subscription = observable.subscribe(x => console.log(x));
subscription.unsubscribe();

// コンソール出力
1
2

Subscription.unsubscribe()Observableの実行を止めるだけです。たとえば、つぎのようにコードを書き替えると、setInterval()メソッドの処理は止まらずに繰り返されます。


const observable = Rx.Observable.create((observer) => {
	observer.next(1);
	observer.next(2);
	// setTimeout(() => {
	setInterval(() => {
		console.log('interval');  // 出力されつづける
		observer.next(3);  // 実行は止まる
	}, 1000);
});

Subscription.unsubscribe()が呼び出されたときに行いたい処理は、関数にしてObservable.create()に渡すコールバックからつぎのように戻り値として定めます。


const observable = Rx.Observable.create((observer) => {
	observer.next(1);
	observer.next(2);
	const intervalID = setInterval(() => {
		console.log('interval');  // 出力されない
		observer.next(3);
	}, 1000);
	return () => {
		console.log('unsubscribed');
		clearInterval(intervalID);
	};
});
const subscription = observable.subscribe(x => console.log(x));
subscription.unsubscribe();

// コンソール出力
1
2
unsubscribed

RxJS入門


作成者: 野中文雄
作成日: 2018年2月20日


Copyright © 2001-2018 Fumio Nonaka.  All rights reserved.