BI LAB データ活用研究所 - INSIGHT LAB

【Airflow】環境構築からジョブ実行までをシンプルにやってみる(dbt+Snowflake)

作成者: uta|2023年12月29日

はじめに

データ変換ツールであるdbtには、SaaS製品のdbtCloudとOSS製品のdbtCoreがあります。
dbtCloudにはコマンドを定期実行するデプロイ機能がありますが、dbtCoreにはありません。

ワークフローツールの選択肢の一つであるApache AirflowでdbtCoreのコマンドをワークフロー実行するまでの手順を紹介します。

 

環境

  • Dockerコンテナ(python3.9.10 burst)
  • Snowflake
  • dbtCore
  • Apache Airflow

 

環境構築

先に以下の記事に従ってdbtCoreを使用できるコンテナ環境を構築してください。

https://knowledge.insight-lab.co.jp/bi/-dbtcore-simple-start

 

Apache Airflow

 

インストール

Apache Airflowをインストールします。

pip install apache-airflow

 


 ⚠️ ライブラリの依存関係によるコンフリクトが出ますが、ここではスルーします

dbt-semantic-interfaces 0.4.2 has requirement pydantic~=1.10, but you have pydantic 2.5.3.

 

airflowデータベースを初期化

airflow db init

 

airflowにログインするユーザを作成

airflow users create \
--username admin \
--firstname namae \
--lastname myoji \
--role Admin \
--email xxxxxx@xxxxxxxxxx.co.jp

 

password入力を促されるので任意のパスワードを入力

Password:P@ssw0rd
Repeat for confirmation:P@ssw0rd

※パスワード入力中の文字は表示されません

 

airflowサーバを起動

airflow webserver --port 8080 -d

 

ログイン

作成したadminユーザでログインします。

 

ログイン完了

 

DAGの作成

airflowでdbtプロジェクトを扱うairflowg-dbtをインストール

 

pip install airflow-dbt

 

airflowで実行したいdagの内容を記述するファイルを管理するためのディレクトリを作成します。

 

mkdir ~/airflow/dags

 

dbtで実行したいDAGファイルで定義します。

import airflow
from airflow import DAG
from airflow_dbt.operators.dbt_operator import DbtRunOperator, DbtTestOperator

from airflow.utils.dates import days_ago

default_args = {
  'start_date': days_ago(0),
  'retries': 0,
}

with DAG(
    'dbt_dag',
    default_args=default_args,
    schedule_interval=None
) as dag:

  task_run = DbtRunOperator(
    task_id='task_run',
    select='example',
    profiles_dir='/root/.dbt',
    dir='/root/isl_dbt_snowflake',
    dag=dag,
  )

  task_test = DbtTestOperator(
    task_id='task_test',
    select='example',
    profiles_dir='/root/.dbt',
    dir='/root/isl_dbt_snowflake',
    dag=dag,
  )
  
  task_run >> task_test

 

作成しただけではairflow画面には反映されず、ファイル情報を定期的に読み込むジョブが実行される必要があります。そのため、以下のコマンドでairflowのスケジューラーを起動させます。

 

airflow scheduler

 

DAGの実行

 

AirflowのWeb画面に戻りページ更新をすると一覧に、dag.pyで作成したDAGが表示されます。
DAG右側にある「▶」をクリックしてDAGを実行します。

 

すると、dag.pyに記述した順序でdbtコマンドが実行されていきます。

SnowflakeにはAirflowからの命令で実行されたクエリが実行されています。

 

おわりに

本記事では、dbt環境にApache Airflowをインストールし、dbtCoreで作成したモデルをApache Airflowから実行しました。

データ分析基盤を管理するうえで、ExtractやLoad、ReverseETLなどは、また別のツールで行われるのが一般的です。それらのツールの実行をAirflowで管理(オーケストレーション)し、運用のしやすいデータパイプラインを目指しましょう!