GCP: DataFlow

  • データ処理パイプラインをクラウド上で実行するためのサービス。
  • 必要に応じてオートスケールする。
  • Apache Beamを使っている

DataFlowを使ったときの処理の流れ

  1. パイプラインを作成する
  2. フィルタリング、グルーピング、変換を行う
  3. 結果をCloud Storage上に書き出す

フィルタリング、グルーピング、変換の処理については自分でコードを書く。

Pub/Subから来るストリーミングデータに対してグルーピング処理を適用するときは、ウィンドウを使って一定時間や一定レコードごとのグルーピングを行う。

Hello world

gcloud auth application-default login

# GCPのプロジェクトIDを設定
PROJECT_ID=<PROJECT_ID>

brew cask install java
brew cask install maven

java --version
mvn -v

# サンプルプロジェクトをダウンロード
mvn archetype:generate \
    -DarchetypeGroupId=org.apache.beam \
    -DarchetypeArtifactId=beam-sdks-java-maven-archetypes-examples \
    -DgroupId=com.example \
    -DartifactId=dataflow-intro \
    -Dversion="0.1" \
    -DinteractiveMode=false \
    -Dpackage=com.example

cd dataflow-intro
gsutil mb gs://$PROJECT_ID

# 実行
mvn compile exec:java \
    -Dexec.mainClass=com.example.WordCount \
    -Dexec.args="--project=${PROJECT_ID} \
    --gcpTempLocation=gs://${PROJECT_ID}/tmp/ \
    --output=gs://${PROJECT_ID}/output \
    --runner=DataflowRunner \
    --jobName=dataflow-intro" \
    -Pdataflow-runner

mvn compile exec:java \
    -Dexec.mainClass=com.example.WordCount \
    -Dexec.args="--project=${PROJECT_ID} \
    --gcpTempLocation=gs://${PROJECT_ID}/tmp/ \
    --input=gs://${PROJECT_ID}
    --output=gs://${PROJECT_ID}/output \
    --runner=DataflowRunner \
    --jobName=dataflow-intro" \
    -Pdataflow-runner

Kotlin + Gradleではじめる

Java8を入れる

brew tap AdoptOpenJDK/openjdk
brew cask install adoptopenjdk8

CombineByKey vs GroupByKey

SUM, AVGなど標準で関数が用意されている場合は*.perKey()を使ったほうが高速。 特殊な集計を行いたい場合はGroupByKey()してから値のiterableを明示的に処理する。

  • グループ化すると1つのキーにつき1つのワーカーしか使えない。 パイプラインの最後にグループ化するようにすると処理効率が上がる。