RxJS
リアクティブプログラミングのためのjavascriptライブラリ。 https://www.learnrxjs.io/
入門はここがわかりやすい: RxJS を学ぼう
(読みごたえありすぎる)公式ドキュメント: http://reactivex.io/rxjs/manual/overview.html#introduction
Rxを学ぶ(in English): https://www.learnrxjs.io/
RxJS5.0とRxJS6.0の違い: https://github.com/ReactiveX/rxjs/blob/master/docs_app/content/guide/v6/migration.md
概要
RxJSは、Web開発で最も人気のあるライブラリの一つです。イベント処理のための強力な関数型アプローチを提供し、多くのフレームワーク、ライブラリと組み合わせることができます。 多くのAPIや、オブジェクト指向とは異なる関数型プログラミングの概念、そして、命令型から宣言型へという考え方の違いから、RxJSを学ぶことは非常に難しいと言われています。
基本概念
RxJSの基本概念は次の通りです。
- Observable: Observerに処理させる、時間経過とともに流れるイベントや値のリストを表します。
- Observer: Observableから渡される値を処理するコールバック関数のリストです。
- Subscription: Observableの実行を表します。Observableの実行をキャンセルするためによく使われます。
- Operators:
filter
やmap
のようなリストを扱う純粋関数です。 - Subject: イベントや値を複数のObserverに通知します。
- Schedulers: イベントの処理タイミングを制御するためのものです。例)
setTimeout
Observable
Observableは遅延プッシュコレクションで、関数のようなものです。 以下、関数とObservableの違いについて説明していきます。
関数とObservableの定義と呼び出しはそれぞれ以下のようになります。
// 関数
const func = () => console.log('hello')
// Observable
const obs = of('hello')
// 関数の実行
func()
// Observableの実行
obs.subscribe(val => console.log(val))
例えば、関数では以下のように複数の値をreturnすることはできません。
const func = () => {
return 1
return 2 // これは到達不能コードになる
}
Observableではこれが可能です。
const foo = Rx.Observable.create(observer => {
console.log('Hello')
observer.next(1) // これは return 1 と同じです。
observer.next(2) // これは return 2 と同じです。
observer.next(3) // これは return 3 と同じです。
})
foo.subscribe(val => console.log(val))
// -> 1, 2, 3
さらに、非同期で値を返すこともできます。
const foo = Rx.Observable.create(observer => {
console.log('Hello')
observer.next(1)
observer.next(2)
observer.next(3)
setTimeout(() => {
observer.next(4) // 非同期で1秒後に'4'を返す
}, 1000);
})
foo.subscribe(val => console.log(val))
// -> 1, 2, 3, 4
// ※'1', '2', '3'を表示し、1秒後に'4'を表示
PullとPush
関数はPull型、ObservableはPush型です。
- Pull: コンシューマが、プロデューサからいつデータを受け取るかをを決めます。
- Push: プロデューサが、コンシューマにいつデータを送信するかを決めます。
これは、プロデューサ="関数"、コンシューマ="関数の呼び出し元"と考えるとわかりやすい。
Typescriptでの使い方
TypescriptではこのようにObservableを生成すると型の恩恵が得られる。
import { Observable, Observer } from "rxjs"
const observable = new Observer((obs: Observer<number>) => {
obs.next(1)
obs.next(2)
obs.next('hello') // コンパイルエラー
})
Observableのまとめ
- 関数の呼び出し: 1つの値を同期的に返す
- Observableのsubscribe: 0個以上の値を同期または非同期で返す
Observer
Observerは、Observableが配信するnext
、complete
、error
の各イベントに対応する3種類のコールバック関数を持つオブジェクトです。
具体的にはこのようなオブジェクトになります。
const observer = {
next: x => console.log('Observer got a next value: ' + x),
error: err => console.error('Observer got an error: ' + err),
complete: () => console.log('Observer got a complete notification'),
}
Observerは、Observableのsubscribe()
に渡して使います。
observable.subscribe(observer);
Observerは、部分的にコールバックを持たせることもできます。例えば、次の例はcomplete
の処理がないObserverです。
const observer = {
next: x => console.log('Observer got a next value: ' + x),
error: err => console.error('Observer got an error: ' + err),
}
この場合、Observableのcomplete
通知で処理をしないことになりますが、正常に使えます。
Observableのsubscribe()
に直接コールバック関数を入れることもできます。
その場合はnext
のみを処理するObserverとなります。
observable.subscribe(x => console.log('Observer got a next value: ' + x));
実は、3つの引数を設定することでnext
、complete
、error
のコールバック関数をもつObserverを直接設定することもできます。
observable.subscribe(
x => console.log('Observer got a next value: ' + x),
err => console.error('Observer got an error: ' + err),
() => console.log('Observer got a complete notification')
)
Typescriptでの使い方
次の例はnumber
を配信するnext
への処理を定義したObserverです。
import { NextObserver } from "rxjs"
const observer: NextObserver<number> = {
next: n => console.log(n)
}
詳しくはrxjsのtypes.d.ts
を参照してください。
Subscription
Subscriptionは、subscribe()
をキャンセルするためのオブジェクトです。
キャンセルするにはSubscriptionのもつunsbscribe()
を呼びます。
// 1秒ごとに0, 1, 2...を表示
interval(1000).subscribe(x => console.log(x))
// ...
// その後、unsubscribe()を呼んだ時点で表示が止まる
subscription.unsubscribe();
add()
を使って、複数のObserverをキャンセルすることもできます。
const obs1 = Rx.Observable.interval(400);
const obs2 = Rx.Observable.interval(300);
const subscription = obs1.subscribe(x => console.log('first: ' + x));
const childSubscription = obs2.subscribe(x => console.log('second: ' + x));
subscription.add(childSubscription);
// 1秒後に両方のObserverをキャンセル
setTimeout(() => {
subscription.unsubscribe();
}, 1000);
add()
の逆の動きをするremove()
もあります。
Subject
Subjectは複数のObserverへ値を配信することができるObserbableです。
Subjectのsubscribe()
はaddListener()
と似ています。単純に、SubjectがもつObserverのリストへObserverを追加します。
SubjectはObserverであるため、next
、complete
、error
のメソッドも持ちます。
これらのメソッドを呼ぶことで、自身に登録している複数のObserverへ値を配信できます。
const subject = new Rx.Subject();
subject.subscribe(v => console.log('A: ' + v))
subject.subscribe(v => console.log('B: ' + v))
subject.next(1)
subject.next(2)
これの実行結果は次のようになります。
A: 1
B: 1
A: 2
B: 2
通常のSubjectに加えて、用途に応じて専用の機能を持った以下のSubjectが存在します。
- BehaviorSubject
- ReplaySubject
- AsyncSubject
基礎
Observer
Observer
は、通知されたデータをoperator
を利用して加工し、加工したデータに対する処理ができる。
// オブザーバのソースを作って
of("hoge").pipe(
// 加工したり変換したりして
[operator]()
[operator]()
...
// 結果に対して何か処理する
).subscribe(val => doSomething(val))
基本的に(*1)Observer
はsubscribe()
されることで初めて処理を実行します。
*1: Observer
にはCOLD
とHOT
の2種類があり、HOT
の場合subscribe()
されなくとも処理が実行されますが、Observer
は通常COLD
で生成されます。
COLD: 最初のデータから各Observerで処理
HOT: 今流れているデータを各Observerで処理
Subject
Subject
を使うとaddEventHandler()
的な使い方ができる。
- subscribe(): 引数に入れたObserverを通知先へ追加する
- next():
subscribe()
しているObserverたちへイベントを通知する
import { Subject } from "rxjs";
const print = (val: any) => console.log(val);
const subject = new Subject()
subject.subscribe(print)
subject.next("hello") // -> hello
subject.subscribe(print)
subject.next("world") // world world
ObserverとSubjectでデータの監視処理を実装
オブザーバの生成
import { of, from } from "rxjs";
const print = (val: any) => console.log(val);
const promise = new Promise(resolve => resolve("Hello World!"));
// 単一の値から生成
of(1, 2, 3).subscribe(print);
print("-----");
// 配列から生成
from([1, 2, 3]).subscribe(print);
print("-----");
// Promiseから生成
from(promise).subscribe(print)
print("-----");
// 文字列から生成
from("abcde").subscribe(print) // 'a', 'b', 'c', 'd', 'e'
エラーハンドリング
import { timer, throwError, of, pipe, range, from } from "rxjs";
import { catchError, mergeMap } from "rxjs/operators";
const print = (val: any) => console.log(val);
// エラーのオブザーバーを生成
throwError("This is an error!")
.pipe(
// エラーが発生したらそれ用のオブザーバーに変換
catchError(val => of(`I caught: ${val}`))
)
.subscribe(print); // -> I caught: This is an error!
// Promise
const badPromise = () =>
new Promise<any>((resolve, reject) => reject("rejected!"));
// 1秒後に"error: rejected!"を表示する
timer(1000)
.pipe(
mergeMap(_ =>
from(badPromise()).pipe(catchError(error => of(`error: ${error}`)))
)
)
.subscribe(print);
mapTo()
: 固定値に変換
マウスのクリックイベントごとにカウントアップする場合などに。
// クリックするたびに1, 2, 3...と表示
fromEvent(document.querySelector('#btn'), 'click')
.mapTo(1)
.scan((acc, val) => acc + val, 0)
.subscribe(val => console.log(val))
throttle: 入力の間引き
throttle()
の内部のObserver
のタイマーが有効な間、後続の通知を間引く。
fromEvent(document.querySelector('#target'), 'mousemove')
.throttle(() => interval(1000))
// ミリ秒を直接指定できる関数もある。こっちのほうが楽
// .throttleTime(1000)
debounce: 指定時間同じイベントが発生しなかった場合に処理
マウスを放置すると消えるUIなどに。
// マウス移動イベント発生後、1秒間マウスが動かなかったらログ出力
.fromEvent(document.querySelector('#target'), 'mousemove')
.debounce(() => Rx.Observable.interval(1000))
// ミリ秒を直接指定できる関数もある。こっちのほうが楽
// .debounceTime(1000)
.subscribe(val => console.log(val))
TypeScript + ts-nodeで簡単に試す
必要なものをダウンロード。
# 依存関係のダウンロード
yarn add -D typescript ts-node
yarn add rxjs
# tsconfig.jsonのひな形を生成
yarn tsc --init
生成されたtsconfig.json
を以下のように書き換える。(書き換えないでそのままこれをコピって作成してもいいけど)
{
"compilerOptions": {
"target": "es5",
/* import文を使うために必要 */
"module": "commonjs",
/* dom, es5, scripthostはデフォルト値。それにPromiseを追加する */
"lib": ["dom", "es5", "scripthost", "es2015.promise"],
"strict": true,
"moduleResolution": "node",
"esModuleInterop": true
}
}