GCP: DataFlow
- データ処理パイプラインをクラウド上で実行するためのサービス。
- 必要に応じてオートスケールする。
- Apache Beamを使っている
DataFlowを使ったときの処理の流れ
- パイプラインを作成する
- フィルタリング、グルーピング、変換を行う
- 結果をCloud Storage上に書き出す
フィルタリング、グルーピング、変換の処理については自分でコードを書く。
Pub/Subから来るストリーミングデータに対してグルーピング処理を適用するときは、ウィンドウを使って一定時間や一定レコードごとのグルーピングを行う。
Hello world
gcloud auth application-default login
# GCPのプロジェクトIDを設定
PROJECT_ID=<PROJECT_ID>
brew cask install java
brew cask install maven
java --version
mvn -v
# サンプルプロジェクトをダウンロード
mvn archetype:generate \
-DarchetypeGroupId=org.apache.beam \
-DarchetypeArtifactId=beam-sdks-java-maven-archetypes-examples \
-DgroupId=com.example \
-DartifactId=dataflow-intro \
-Dversion="0.1" \
-DinteractiveMode=false \
-Dpackage=com.example
cd dataflow-intro
gsutil mb gs://$PROJECT_ID
# 実行
mvn compile exec:java \
-Dexec.mainClass=com.example.WordCount \
-Dexec.args="--project=${PROJECT_ID} \
--gcpTempLocation=gs://${PROJECT_ID}/tmp/ \
--output=gs://${PROJECT_ID}/output \
--runner=DataflowRunner \
--jobName=dataflow-intro" \
-Pdataflow-runner
mvn compile exec:java \
-Dexec.mainClass=com.example.WordCount \
-Dexec.args="--project=${PROJECT_ID} \
--gcpTempLocation=gs://${PROJECT_ID}/tmp/ \
--input=gs://${PROJECT_ID}
--output=gs://${PROJECT_ID}/output \
--runner=DataflowRunner \
--jobName=dataflow-intro" \
-Pdataflow-runner
Kotlin + Gradleではじめる
Java8を入れる
brew tap AdoptOpenJDK/openjdk
brew cask install adoptopenjdk8
CombineByKey vs GroupByKey
SUM, AVGなど標準で関数が用意されている場合は*.perKey()
を使ったほうが高速。
特殊な集計を行いたい場合はGroupByKey()
してから値のiterableを明示的に処理する。
- グループ化すると1つのキーにつき1つのワーカーしか使えない。 パイプラインの最後にグループ化するようにすると処理効率が上がる。