Datastore に溜まっていたログをBigQuery にインポート
はじめに
こんにちは、 Zaim の @naoki85 です。
最近、サーバーサイドチームからデータ分析チームに移動し、主に分析基盤の構築の業務を担当するようになりました。
前回の記事でも記載したように、 Zaim では BigQuery を主要なデータウェアハウスとして利用しています。
ただ、 Zaim のサービスの中にはログデータを GCP の Datastore に保存しているものもあります。
こちらを BigQuery にもインポートしようと考えました。
Datastore から BigQuery へのインポートは一度 Cloud Storage を経由する必要があります。
ヒントになるドキュメントも散らばっていたので少し苦労しました。
以下の手順で実施したので、本記事で解説したいと思います。
Datastore をエクスポートする関数を作成
Cloud Scheduler でジョブを定義
バケットの変更をトリガーにして BigQuery にインポート
なお、新規でログデータを BigQuery に保存するのであれば、上記の過去記事にあるように DataFlow 経由で送信する方が簡単なのではないかと考えています。
Datastore をエクスポートする関数を作成
Datastore からその場で Cloud Storage にエクスポートするのは、コンソール上から実行できます。
https://cloud.google.com/Datastore/docs/export-import-entities?hl=ja
ただ、定期的に実行する場合には Cloud Functions をスケジューラで定期的に実行する必要があります。
そのため、まずは Cloud Functions で関数を定義します。
幸いなことに公式のサンプルコードがあるので、こちらを利用すると良いかと思います。
Cloud Functions 用のサービスアカウントを作成
今回の Cloud Functions 用のサービスアカウントを作成します。
権限としては、
Cloud Datastore インポート / エクスポート管理者
ストレージ管理者
編集者
を付与しておきます。
Cloud Scheduler でジョブを定義
https://cloud.google.com/Datastore/docs/schedule-export?hl=ja
定期的に、DataStore のバックアップを Cloud Storage のバケットに保存するために、スケジューラを定義します。
Cloud Functions 用のサンプルコードの場合、スケジューラからバケットとエクスポート対象の Datastore のデータセットを指定できます。
Cloud Functions に渡す json を以下のように設定します。
{
"bucket": "gs://アップロード先のバケット",
"kinds": ["エクスポートしたいデータセット",...]
}
バケットの変更をトリガーにして BQ にインポート
Cloud Storage のバケットでは、新規作成したイベントで Cloud Functions を起動できます。
Cloud Storage のトリガーはコマンドによる指定か、関数作成時にコンソール上で設定できます。
Cloud Functions に使用した関数は以下になります。
from google.cloud import bigquery
PROJECT_NAME = 'project'
META_FILE_PATH = '/path/all_namespaces_kind_dataset.export_metadata'
BUCKET = 'bucket'
DESTINATION = 'BigQueryDataSet.table_name'
def include_meta_file_path(name):
return META_FILE_PATH in name
def main(event, context):
print('File: {}'.format(event['name']))
# 対象のファイルとバケットを特定
if not include_meta_file_path(event['name']):
print('Is not target, finish')
return
if event['bucket'] != BUCKET:
print('bucket name is wrong, finish')
return
source_uri = 'gs://bucket_name/' + event['name']
print('SourceUri: {}'.format(source_uri))
print('Destination: {}'.format(DESTINATION))
# BQ へのインポート準備
client = bigquery.Client(project=PROJECT_NAME)
job_config = bigquery.LoadJobConfig()
# データのインサート方法を指定。ここでは洗い替え
job_config.write_disposition = bigquery.WriteDisposition.WRITE_TRUNCATE
# Datastore の入力ファイルであることを指定
job_config.source_format = bigquery.SourceFormat.DATASTORE_BACKUP
# パーティショニングする場合は指定
job_config.time_partitioning = bigquery.table.TimePartitioning(field='columnName')
client.load_table_from_uri(
source_uris=source_uri,
destination=DESTINATION,
job_config=job_config
)
print('Finish load')
Datastore のエクスポート時にメタデータが作成されるので、それを BigQuery の load ジョブに渡すことでテーブルを作成できます。
なお、この関数に割り当てるサービスアカウントには、 BQ の編集権限と Storage の読み取り権限が必要なようです。
さいごに
今回は Datastore のデータを BigQuery にインポートした方法を共有しました。
クラウドサービスはリソースを組み合わせてシステムを構築する必要があります。それが楽しくもあり、今回のようにいくつかステップを踏まないといけないのが面倒であったりはしますね。Zaim ではまだまだ整備されていない部分が多く存在しています。
家計簿データを分析し、ユーザーのみなさまの生活に還元できるような新しいサービス作りのために、私と一緒に基盤強化をしてくれる仲間を募集しています!