データ変換ツールであるdbtには、SaaS製品のdbtCloudとOSS製品のdbtCoreがあります。
dbtCloudにはコマンドを定期実行するデプロイ機能がありますが、dbtCoreにはありません。
ワークフローツールの選択肢の一つであるApache AirflowでdbtCoreのコマンドをワークフロー実行するまでの手順を紹介します。
先に以下の記事に従ってdbtCoreを使用できるコンテナ環境を構築してください。
https://knowledge.insight-lab.co.jp/bi/-dbtcore-simple-start
Apache Airflowをインストールします。
pip install apache-airflow
⚠️ ライブラリの依存関係によるコンフリクトが出ますが、ここではスルーします dbt-semantic-interfaces 0.4.2 has requirement pydantic~=1.10, but you have pydantic 2.5.3. |
airflowデータベースを初期化
airflow db init
airflowにログインするユーザを作成
airflow users create \
--username admin \
--firstname namae \
--lastname myoji \
--role Admin \
--email xxxxxx@xxxxxxxxxx.co.jp
password入力を促されるので任意のパスワードを入力
Password:P@ssw0rd
Repeat for confirmation:P@ssw0rd
※パスワード入力中の文字は表示されません
airflowサーバを起動
airflow webserver --port 8080 -d
作成したadminユーザでログインします。
ログイン完了
airflowでdbtプロジェクトを扱うairflowg-dbtをインストール
pip install airflow-dbt
airflowで実行したいdagの内容を記述するファイルを管理するためのディレクトリを作成します。
mkdir ~/airflow/dags
dbtで実行したいDAGファイルで定義します。
import airflow
from airflow import DAG
from airflow_dbt.operators.dbt_operator import DbtRunOperator, DbtTestOperator
from airflow.utils.dates import days_ago
default_args = {
'start_date': days_ago(0),
'retries': 0,
}
with DAG(
'dbt_dag',
default_args=default_args,
schedule_interval=None
) as dag:
task_run = DbtRunOperator(
task_id='task_run',
select='example',
profiles_dir='/root/.dbt',
dir='/root/isl_dbt_snowflake',
dag=dag,
)
task_test = DbtTestOperator(
task_id='task_test',
select='example',
profiles_dir='/root/.dbt',
dir='/root/isl_dbt_snowflake',
dag=dag,
)
task_run >> task_test
作成しただけではairflow画面には反映されず、ファイル情報を定期的に読み込むジョブが実行される必要があります。そのため、以下のコマンドでairflowのスケジューラーを起動させます。
airflow scheduler
AirflowのWeb画面に戻りページ更新をすると一覧に、dag.pyで作成したDAGが表示されます。
DAG右側にある「▶」をクリックしてDAGを実行します。
すると、dag.pyに記述した順序でdbtコマンドが実行されていきます。
SnowflakeにはAirflowからの命令で実行されたクエリが実行されています。
本記事では、dbt環境にApache Airflowをインストールし、dbtCoreで作成したモデルをApache Airflowから実行しました。
データ分析基盤を管理するうえで、ExtractやLoad、ReverseETLなどは、また別のツールで行われるのが一般的です。それらのツールの実行をAirflowで管理(オーケストレーション)し、運用のしやすいデータパイプラインを目指しましょう!