・Dataflowとは、統合されたストリーム データ処理とバッチデータ処理を大規模に提供する Google Cloud サービスです。Dataflow を使用して、1 つ以上のソースからデータを読み取り、変換し、宛先に書き込むデータ パイプラインを作成します。
・初めにGoogleCloudStorageに入力元データの格納とBigQueryの入力先のテーブルを準備します。
GoogleCloudコンソールからプロジェクトを選択し、GoogleCloudStorageにバケットを新規作成しデータのファイルとなるCSVファイルとスキーマ情報のファイルを格納します。
ログの出力用にフォルダも作成します。
CSVファイルの中身とスキーマ情報は下記になります。
次に、BigQueryに新規データセットと空のテーブルとエラーデータのテーブルを作成します。これで準備は完了です。
・Dataflowを[分析] [Dataflow]から画面を開きます。「ジョブビルダーを使用してジョブを作成」を押下しジョブ作成画面に移ります。
テンプレートの中は以下の構成になっています。
①ジョブ名:ジョブ名を入力してください。
②リージョンポイント:Dataflowを実行するリージョンを選択します。ここではasia-northeast1(東京)を選択しています
③Dataflowテンプレート:データの読み取り、変換、宛先に書き込むなどの処理をテンプレート化さ れたものを選択できます。ここではCSV Files on Cloud Strage to BigQuery を選択します
④Source:BigQueryに取り込むソースのCSVファイルが格納されている、CloudStrageのバゲットを入力します。
⑤Target:BigQueryに取り込むソースのスキーマ情報ファイルが格納されている、CloudStrageのバゲットを入力します。
⑥Targetテーブル名:BigQueryの格納先テーブル名を入力します
⑦Dataflow一時ファイルの場所:実行ログのGoogleCloudStorageの出力場所を入力します。stagingフォルダを指定します。
⑧エラーデータ格納場所:BigQueryの正常処理されなかったレコードを入れるテーブルを入力します。
⑨区切り文字:ここではCSVの区切り文字","を指定します。
⑩改行指定:ここでは"Default"を指定します。
⑪Dataflow一時ファイルの場所:実行ログのGoogleCloudStorageの出力場所を入力します。tempフォルダを指定します。
⑫必須パラメータの入力が完了したらジョブの実行を押下します。
・正常終了を確認します。
・BigQueryのテーブルにデータが書き込まれている事を確認します。
今回はDataflowを用いてCloudStorage上のファイルをBigQueryに取り込む処理を行いました。この処理は日次でBigQueryに取り込む処理や1つ以上のファイルを結合して取り込む処理など幅広く使える処理になります。
また、DataflowはPythonコードで作成する事が可能で、思いのままの処理を作成し機能を拡張することができます。次回はPythonコードのデプロイを解説します。