GCP: Big Query

巨大なデータセットの処理方法には 2 つある。

  • BigQuery による宣言型の方法
  • DataFlow によるプログラミング型の方法

SQL 文、Python、Java でデータを処理するロジックを書く。

BigQuery とは GCP 上のペタバイト規模のデータウェアハウス。 大規模データにたいして高速にクエリを実行できる。 インデックスを作成する必要はなく、BigQuery に適した非正規化データを用意するだけでいい。

CLI

データセットを作成

LOCATION="asia-northeast1"
PROJECT_ID="my-project"
DATASET="my-dataset"

bq --location=$LOCATION mk \
  --project_id $PROJECT_ID \
  --dataset \
  $PROJECT_ID:$DATASET

Ref:
データセットの作成  |  BigQuery  |  Google Cloud

ローカルの CSV ファイルをインポートしてテーブル作成

LOCATION="asia-northeast1"
PROJECT_ID="my-project"
DATASET="my-dataset"
TABLE="my-table"
FILE="test.csv"

bq --location=$LOCATION load \
  --autodetect \
  --project_id $PROJECT_ID \
  --source_format=CSV \
  $PROJECT_ID:$DATASET.$TABLE \
  $FILE

Ref:
データをローカル データソースから読み込む  |  BigQuery  |  Google Cloud

データセット内のテーブル一覧を取得

bq ls $PROJECT_ID:$DATA_SET

データセットを GCS にインポート

bq extract cpb101_flight_data.AIRPORTS gs://$BUCKET/bq/airports2.csv

テーブル作成

schema.jsonを使って空のテーブルを作る。

bq mk --table <PROJECT_ID>:<DATA_SET>.<TABLE> schema.json

スキーマ取得

bq show --schema <PROJECT_ID>:<DATASET>.<TABLE>

# jsonで取得
bq show --schema --format=prettyjson <PROJECT_ID>:<DATASET>.<TABLE>

REST API

Ref: https://cloud.google.com/bigquery/docs/reference/rest/v2/tables#methods

クエリ、関数の実行

コンソールの URL bigquery.cloud.google.com

Use Legacy SQL をオフにする。

クエリ言語は標準の SQL 2011 ネストや繰り返しにも対応。 データの取り込みには json などのデータ形式に対応。

ほぼリアルタイムの分析ができるが、ミリ秒単位のリアルタイム性が求められる場合は Cloud SQL や Spanner が適している。

料金はクエリ実行に対する支払い。 ストレージ料金もかかるがとても安いので実質クエリに課金。

データセットを社内の誰か(=ドメイン内であれば)と共有できる。 共有した相手がデータをクエリするときに権限がいらない。 ログイン方法などを考える必要がない。 クエリを共有できる。

監査ログも提供される。 クエリの実行を常に把握して監査ログを保持できる。 ログは改ざんできない。 データ瀬戸が使われた時期や編集が行われた時期を把握できる。

向いている用途:

  • 大規模データセットのアドホック分析
  • データウェアハウス
  • BI

格納したデータに対して費用が発生(費用は Cloud Storage とほぼ同じ)。 -> 使っていないデータがあれば自動割引が適用される

SELECT
    departure_airport,
    count(1) as num_flights
FROM
    `bigquery-samples.airline_ontime_data.flights`
GROUP BY
    departure_airport
ORDER BY
    num_flights DESC
LIMIT 10

AppEngine -> Pub/Sub -> Dataflow -> BigQuery

データの関連性を持つには履歴データを処理する。

データは次のパターンで Dataflow へ読み込まれる

  • リアルタイムデータは Pub/Sub から
  • 履歴データは Cloud Storage から

データセットとはテーブルの集合 基本的にはデータセット単位でアクセス制御する。 アクセス制御対象にはビューも含まれる。 ビューはテーブルのライブビューで基本的にはクエリ。 ビューに対するクエリを発行できる。

ビューはデータセットのアクセス制御に使える。 既存のデータセットから特定の行、列を選択したビューを独立したデータセットとし、それに対してアクセス権を与える。

テーブル単位ではないのは、結合されることがあるから。

クエリはジョブ。 インポート、エクスポート、コピーなどはすべてジョブ。

BigQuery はすべての列が個別のファイルとして保存される。 個別のファイルなので、インデックス、キー、パーティションは不要。 ただし、パーティションを使って費用を節約することができる。

課金対象は処理を行った列数なので、クエリを実行する列数を制限すると節約できる。

ビューを作る

WITHを使ってビューが作れる。

WITH <データセット名> AS (
    SELECT ...
)

結合

boolean を返す関数を JOIN の条件に使える。

Ref: https://cloud.google.com/bigquery/docs/reference/standard-sql/functions-and-operators

クエリ、関数

クエリ結果は csv や json としてダウンロードできる。

スプレッドシートを既存のデータセットと結合するクエリを記述できる。

テーブルは、<プロジェクト>.<データセット>.<テーブル>で表す。 プロジェクトを省略すると、デフォルトで現在のプロジェクトになる。

[<プロジェクト>.<データセット>.events_<日付>]で、データセットの特定の日付内のデータに対してのみクエリを実行できる。

5/1~5/3 のデータに対してのみクエリを実行する例。

SELECT
  FORMAT_UTC_USEC(event.timestamp_in_usec) AS time,
  request_url
FROM
    [myproject-1234.applogs.events_20120501],
    [myproject-1234.applogs.events_20120502],
    [myproject-1234.applogs.events_20120503]
WHERE
  event.username = 'root' AND
  NOT event.source_ip.is_internal;

遅れたフライトの数と、フライト数を出力するクエリの例。

SELECT
    airline,
    SUM(IF (arrival_delay > 0, 1, 0)) AS,
    COUNT(arrival_delay) AS total_flights
FROM
    `bigquery-samples.airline_ontime_data.flights`
WHERE
    arrival_airport='OKC'
    AND departure_airport='DFW'
GROUP BY
    airline

練習用にbigquery-samplesプロジェクトのデータセットを使える。

基本的なクエリ

SELECT
  airline,
  date,
  departure_delay
FROM
  `bigquery-samples.airline_ontime_data.flights`
WHERE
  departure_delay > 0
  AND departure_airport = 'LGA'
LIMIT
  100

関数

  • CONCAT(): カンマ区切りで文字列を繋げる
  • CAST(<column> AS <TYPE>): 型変換
  • LPAD(<column>, <桁数>, <埋める文字>): ゼロ埋めなどに使える。

ネストフィールド、反復フィールド

ウィンドウ関数、ユーザー定義関数

データ読み込み、エクスポート

csv 変換

データの取得元

  • AppEngine
  • ログファイル
  • GoogleAnalytics
  • Pub/Sub

データの変換処理

  • Dataflow
  • Dataproc

データのインポートは、CLI のbqコマンド、または WebUI からできる。

インポートできる形式

  • csv
  • json
  • avro
  • Cloud Datastore backup

bq loadで csv ファイルをインポートできる。

SQL でワイルドカードを使う

バッククォートでエスケープする必要がある。 パターンに一致するすべてのテーブルを対象にしたりできる。

`FROM bigquery-public-data.noaa_gsod.gsod*`

単なる SQL 以外の高度なクエリ

パフォーマンス、料金

高速で負荷が少ないほど優れたクエリと言える。

負荷とは

  • クエリが読み取るデータ量

  • シャッフル(あるステージから次のステージに渡されるデータ量)

  • 実体化の有無 メモリ内で済む場合は速い。 中間出力を実体化して書き出す必要があるクエリは遅い。

  • 呼び出す関数に関連する CPU オーバーヘッド SUM や COUNT などは低コスト。SIN や COS などは高コスト

パフォーマンスを向上させるには

BigQuery は列形式データベースなので、処理する列数を制限するといい。

  • SELECTするときは必要なフィールドだけに列数を制限する 全フィールドが必要でないかぎりSELECT *は使ってはいけない。

  • WHEREをできるだけ早い段階で使う 早めにフィルタリングしておくことで、ステージ間で渡されるデータ量が減る。 処理する行数が多いほど、クエリは遅くなる。

  • JOINするときは、結合する順序を考慮する 最初に大きな結合をし、そのあとで小さな結合をする。

  • GROUP BYを使うときは、各GROUP BYによって処理される行数を少なくする GROUP BYに複数列を指定した場合は多数のシャッフルが発生して遅くなる。 WHEREを使って、関連するデータが少ないキーについては除外するとテールレイテンシが改善される。

  • 関数を使うときは CPU オーバーヘッドを考慮する 組み込み関数 > SQL UDF > JavaScript UDF の優先順で使う。

関数 速度
組み込み関数 速い
SQL UDF 中間
JavaScript UDF 遅い

似たような関数でも速度に違いのあるものもある。 例: APPROX_COUNTのほうがCOUNTよりも早い。

  • ORDER BYは一番外側のクエリにのみ適用する

  • ワイルドカードを使うときは、共通する文字はできるだけ多く指定する。 OK: FROM bigquery-public-data.nooa_gsod.gsod* NG: FROM bigquery_public-data.nooa*

  • ワイルドカードの代わりにタイムスタンプによるパーティションを利用する テーブルを小さくした場合と同じようにパフォーマンスが向上する。

Cloud Dataflow

リアルタイムデータと過去のデータを同じ方法で処理できるのが利点。 バッチ処理との違いは、扱うデータが無限であるということ。

データを処理するプログラムを記述する バッチ処理もストリーミングでも同じコードでデータを扱える

対応言語

  • Python
  • Java

MapReduce

SideInputs 大規模データに加えて入力される小さなデータ

.bigqueryrc

bqコマンドのデフォルトフラグを設定できる。

Ref: https://cloud.google.com/bigquery/docs/bq-command-line-tool#setting_default_values_for_command-line_flags

その他

CloudShell 上ではDEVSHELL_PROJECT_ID変数が現在のプロジェクト ID に設定されている。

視覚化ツール

  • Looker
  • DataPortal

Tips

テーブル一覧を表示

bq lsで取得する例:

DATASET='my-dataset'
bq ls --format="json" --max_results 1000000 $DATASET | jq 'map(select(.type == "TABLE"))' | jq -r '.[].id'

SQL で取得する例:

DATASET='my-dataset'
bq query --use_legacy_sql=false --format=csv --max_rows=1000000 "SELECT table_name FROM ${DATASET}.INFORMATION_SCHEMA.TABLES" | awk 'NR>2'

テーブルの件数を取得する例:

DATASET='my-dataset'
bq query --use_legacy_sql=false --format=csv --max_rows=200000 "SELECT count(*) FROM ${DATASET}.INFORMATION_SCHEMA.TABLES WHERE table_type = 'BASE TABLE'"

ビューの数を表示

SELECT count(*) FROM my-dataset.INFORMATION_SCHEMA.TABLES WHERE table_type = 'VIEW'

列の説明を表示

列名と説明を表示する例:

SELECT
  column_name,
  description
FROM
  `<PROJECT_ID>.<DATASET_NAME>.INFORMATION_SCHEMA.COLUMN_FIELD_PATHS`
WHERE
  table_name = '<TABLE_NAME>'

Ref: https://cloud.google.com/bigquery/docs/information-schema-tables?hl=ja

特定のカラムが特定の型に変換できない行を抽出

SAFE_CASTを使うとキャストに失敗したときNULLが変えるのことを利用する。 数値に変換できない値の行のみ抽出する例:

SELECT
  val
FROM
  `dataset.table`
WHERE
  SAFE_CAST(val AS INT64) is NULL

ローカルの JSON Lines をテーブルに挿入する

bq load --source_format=NEWLINE_DELIMITED_JSON <DATASET>.TABLE data.json

スキーマの説明だけ更新

列の型推論つきで改行区切り json をテーブルにインポート

https://www.reddit.com/r/bigquery/comments/6b3uj3/is_there_a_function_in_bigquery_to_infer_the_data/