RxJS

リアクティブプログラミングのためのjavascriptライブラリ。 https://www.learnrxjs.io/

入門はここがわかりやすい: RxJS を学ぼう

(読みごたえありすぎる)公式ドキュメント: http://reactivex.io/rxjs/manual/overview.html#introduction

Rxを学ぶ(in English): https://www.learnrxjs.io/

RxJS5.0とRxJS6.0の違い: https://github.com/ReactiveX/rxjs/blob/master/docs_app/content/guide/v6/migration.md

概要

RxJSは、Web開発で最も人気のあるライブラリの一つです。イベント処理のための強力な関数型アプローチを提供し、多くのフレームワーク、ライブラリと組み合わせることができます。 多くのAPIや、オブジェクト指向とは異なる関数型プログラミングの概念、そして、命令型から宣言型へという考え方の違いから、RxJSを学ぶことは非常に難しいと言われています。

基本概念

RxJSの基本概念は次の通りです。

  • Observable: Observerに処理させる、時間経過とともに流れるイベントや値のリストを表します。
  • Observer: Observableから渡される値を処理するコールバック関数のリストです。
  • Subscription: Observableの実行を表します。Observableの実行をキャンセルするためによく使われます。
  • Operators: filtermapのようなリストを扱う純粋関数です。
  • Subject: イベントや値を複数のObserverに通知します。
  • Schedulers: イベントの処理タイミングを制御するためのものです。例) setTimeout

Observable

Observableは遅延プッシュコレクションで、関数のようなものです。 以下、関数とObservableの違いについて説明していきます。

関数とObservableの定義と呼び出しはそれぞれ以下のようになります。

// 関数
const func = () => console.log('hello')
// Observable
const obs = of('hello')

// 関数の実行
func()
// Observableの実行
obs.subscribe(val => console.log(val))

例えば、関数では以下のように複数の値をreturnすることはできません。

const func = () => {
    return 1
    return 2 // これは到達不能コードになる
}

Observableではこれが可能です。

const foo = Rx.Observable.create(observer => {
  console.log('Hello')
  observer.next(1) // これは return 1 と同じです。
  observer.next(2) // これは return 2 と同じです。
  observer.next(3) // これは return 3 と同じです。
})

foo.subscribe(val => console.log(val))
// -> 1, 2, 3

さらに、非同期で値を返すこともできます。

const foo = Rx.Observable.create(observer => {
  console.log('Hello')
  observer.next(1)
  observer.next(2)
  observer.next(3)
  setTimeout(() => {
    observer.next(4) // 非同期で1秒後に'4'を返す
  }, 1000);
})

foo.subscribe(val => console.log(val))
// -> 1, 2, 3, 4
// ※'1', '2', '3'を表示し、1秒後に'4'を表示

PullとPush

関数はPull型、ObservableはPush型です。

  • Pull: コンシューマが、プロデューサからいつデータを受け取るかをを決めます。
  • Push: プロデューサが、コンシューマにいつデータを送信するかを決めます。

これは、プロデューサ="関数"、コンシューマ="関数の呼び出し元"と考えるとわかりやすい。

Typescriptでの使い方

TypescriptではこのようにObservableを生成すると型の恩恵が得られる。

import { Observable, Observer } from "rxjs"

const observable = new Observer((obs: Observer<number>) => {
  obs.next(1)
  obs.next(2)
  obs.next('hello') // コンパイルエラー
})

Observableのまとめ

  • 関数の呼び出し: 1つの値を同期的に返す
  • Observableのsubscribe: 0個以上の値を同期または非同期で返す

Observer

Observerは、Observableが配信するnextcompleteerrorの各イベントに対応する3種類のコールバック関数を持つオブジェクトです。

具体的にはこのようなオブジェクトになります。

const observer = {
  next: x => console.log('Observer got a next value: ' + x),
  error: err => console.error('Observer got an error: ' + err),
  complete: () => console.log('Observer got a complete notification'),
}

Observerは、Observableのsubscribe()に渡して使います。

observable.subscribe(observer);

Observerは、部分的にコールバックを持たせることもできます。例えば、次の例はcompleteの処理がないObserverです。

const observer = {
  next: x => console.log('Observer got a next value: ' + x),
  error: err => console.error('Observer got an error: ' + err),
}

この場合、Observableのcomplete通知で処理をしないことになりますが、正常に使えます。

Observableのsubscribe()に直接コールバック関数を入れることもできます。 その場合はnextのみを処理するObserverとなります。

observable.subscribe(x => console.log('Observer got a next value: ' + x));

実は、3つの引数を設定することでnextcompleteerrorのコールバック関数をもつObserverを直接設定することもできます。

observable.subscribe(
  x => console.log('Observer got a next value: ' + x),
  err => console.error('Observer got an error: ' + err),
  () => console.log('Observer got a complete notification')
)

Typescriptでの使い方

次の例はnumberを配信するnextへの処理を定義したObserverです。

import { NextObserver } from "rxjs"

const observer: NextObserver<number> = {
  next: n => console.log(n)
}

詳しくはrxjsのtypes.d.tsを参照してください。

Subscription

Subscriptionは、subscribe()をキャンセルするためのオブジェクトです。 キャンセルするにはSubscriptionのもつunsbscribe()を呼びます。

// 1秒ごとに0, 1, 2...を表示
interval(1000).subscribe(x => console.log(x))

// ...
// その後、unsubscribe()を呼んだ時点で表示が止まる
subscription.unsubscribe();

add()を使って、複数のObserverをキャンセルすることもできます。

const obs1 = Rx.Observable.interval(400);
const obs2 = Rx.Observable.interval(300);

const subscription = obs1.subscribe(x => console.log('first: ' + x));
const childSubscription = obs2.subscribe(x => console.log('second: ' + x));

subscription.add(childSubscription);

// 1秒後に両方のObserverをキャンセル
setTimeout(() => {
  subscription.unsubscribe();
}, 1000);

add()の逆の動きをするremove()もあります。

Subject

Subjectは複数のObserverへ値を配信することができるObserbableです。 Subjectのsubscribe()addListener()と似ています。単純に、SubjectがもつObserverのリストへObserverを追加します。

SubjectはObserverであるため、nextcompleteerrorのメソッドも持ちます。 これらのメソッドを呼ぶことで、自身に登録している複数のObserverへ値を配信できます。

const subject = new Rx.Subject();

subject.subscribe(v => console.log('A: ' + v))
subject.subscribe(v => console.log('B: ' + v))

subject.next(1)
subject.next(2)

これの実行結果は次のようになります。

A: 1
B: 1
A: 2
B: 2

通常のSubjectに加えて、用途に応じて専用の機能を持った以下のSubjectが存在します。

  • BehaviorSubject
  • ReplaySubject
  • AsyncSubject

基礎

Observer

Observerは、通知されたデータをoperatorを利用して加工し、加工したデータに対する処理ができる。

// オブザーバのソースを作って
of("hoge").pipe(
    // 加工したり変換したりして
    [operator]()
    [operator]()
    ...
// 結果に対して何か処理する
).subscribe(val => doSomething(val))

基本的に(*1)Observersubscribe()されることで初めて処理を実行します。

*1: ObserverにはCOLDHOTの2種類があり、HOTの場合subscribe()されなくとも処理が実行されますが、Observerは通常COLDで生成されます。

COLD: 最初のデータから各Observerで処理

HOT: 今流れているデータを各Observerで処理

Subject

Subjectを使うとaddEventHandler()的な使い方ができる。

  • subscribe(): 引数に入れたObserverを通知先へ追加する
  • next(): subscribe()しているObserverたちへイベントを通知する
import { Subject } from "rxjs";

const print = (val: any) => console.log(val);

const subject = new Subject()
subject.subscribe(print)
subject.next("hello") // -> hello

subject.subscribe(print)
subject.next("world") // world world

ObserverとSubjectでデータの監視処理を実装

オブザーバの生成

import { of, from } from "rxjs";

const print = (val: any) => console.log(val);
const promise = new Promise(resolve => resolve("Hello World!"));

// 単一の値から生成
of(1, 2, 3).subscribe(print);
print("-----");

// 配列から生成
from([1, 2, 3]).subscribe(print);
print("-----");

// Promiseから生成
from(promise).subscribe(print)
print("-----");

// 文字列から生成
from("abcde").subscribe(print) // 'a', 'b', 'c', 'd', 'e'

エラーハンドリング

import { timer, throwError, of, pipe, range, from } from "rxjs";
import { catchError, mergeMap } from "rxjs/operators";

const print = (val: any) => console.log(val);

// エラーのオブザーバーを生成
throwError("This is an error!")
  .pipe(
    // エラーが発生したらそれ用のオブザーバーに変換
    catchError(val => of(`I caught: ${val}`))
  )
  .subscribe(print); // -> I caught: This is an error!

// Promise
const badPromise = () =>
  new Promise<any>((resolve, reject) => reject("rejected!"));

// 1秒後に"error: rejected!"を表示する
timer(1000)
  .pipe(
    mergeMap(_ =>
      from(badPromise()).pipe(catchError(error => of(`error: ${error}`)))
    )
  )
  .subscribe(print);

mapTo(): 固定値に変換

マウスのクリックイベントごとにカウントアップする場合などに。

// クリックするたびに1, 2, 3...と表示
fromEvent(document.querySelector('#btn'), 'click')
    .mapTo(1)
    .scan((acc, val) => acc + val, 0)
    .subscribe(val => console.log(val))

throttle: 入力の間引き

throttle()の内部のObserverのタイマーが有効な間、後続の通知を間引く。

fromEvent(document.querySelector('#target'), 'mousemove')
  .throttle(() => interval(1000))
  // ミリ秒を直接指定できる関数もある。こっちのほうが楽
  // .throttleTime(1000)

debounce: 指定時間同じイベントが発生しなかった場合に処理

マウスを放置すると消えるUIなどに。

// マウス移動イベント発生後、1秒間マウスが動かなかったらログ出力
.fromEvent(document.querySelector('#target'), 'mousemove')
  .debounce(() => Rx.Observable.interval(1000))
  // ミリ秒を直接指定できる関数もある。こっちのほうが楽
  // .debounceTime(1000)
  .subscribe(val => console.log(val))

TypeScript + ts-nodeで簡単に試す

必要なものをダウンロード。

# 依存関係のダウンロード
yarn add -D typescript ts-node
yarn add rxjs

# tsconfig.jsonのひな形を生成
yarn tsc --init

生成されたtsconfig.jsonを以下のように書き換える。(書き換えないでそのままこれをコピって作成してもいいけど)

{
  "compilerOptions": {
    "target": "es5",
    /* import文を使うために必要 */
    "module": "commonjs",
    /* dom, es5, scripthostはデフォルト値。それにPromiseを追加する */
    "lib": ["dom", "es5", "scripthost", "es2015.promise"],
    "strict": true,
    "moduleResolution": "node",
    "esModuleInterop": true
  }
}