2024年1月29日~30日のアップデートで、タスクの実行コマンドに「EXECUTE TASK … RETRY LAST」が追加されました。
https://docs.snowflake.com/en/release-notes/2024/8_04#automatic-task-graph-retry-general-availability
このコマンドは、直近で失敗したルートタスクに対して失敗したタスクとその後続タスクのみ手動で再実行することができます。
普段Snowflakeのタスク機能を使用していないユーザにとっては「何が嬉しいの?」という機能ですが、タスクでデータパイプラインを構築しているユーザにとっては画期的な機能です。
※dbtだと「Rerun from Failure」が近い
Snowflakeでは、タスクと呼ばれるスキーマオブジェクトを作成し、クエリやプロシージャを登録することができます。
タスクが実行されると登録したSQLステートメントが実行され、データ分析基盤におけるバッチ更新処理を定時に行ったり、Snowflake運用上の定期的な作業を自動化することができます。
Snowflakeのタスクの基本的な使い方については以下の記事をご覧ください。
https://knowledge.insight-lab.co.jp/snowflake/create_task
新しいコマンドを検証するため、TABLEA~TABLE_Dの4テーブルを更新するタスクパイプラインを構築します。
このバッチ処理では、TASK_Aをルートタスクとなり、TASK_Aの正常終了をトリガーとし、TASK_BとTASK_Cが起動します。そしてTASK_Cの正常終了をトリガーとし、TASK_Dが起動します。
※通常タスクでバッチ更新処理を構成する場合は、冪等性を保証した処理をプロシージャに登録して、タスクからプロシージャをCALLするのが一般的です。
テーブルの作成
create or replace table table_a (id varchar);
create or replace table table_b (id varchar);
create or replace table table_c (id number); -- ※データ型エラーを発生させる
create or replace table table_d (id varchar);
タスクの作成
create or replace task task_a
warehouse = wh_survey_xs
as
insert into table_a (id) select to_char(current_date(), 'yyyymmdd') || '_a'
;create or replace task task_b
warehouse = wh_survey_xs
after task_a
as
insert into table_b (id) select to_char(current_date(), 'yyyymmdd') || '_b'
;create or replace task task_c
warehouse = wh_survey_xs
after task_a
as
insert into table_c (id) select to_char(current_date(), 'yyyymmdd') || '_c'
;create or replace task task_d
warehouse = wh_survey_xs
after task_c
as
insert into table_d (id) select to_char(current_date(), 'yyyymmdd') || '_d'
;
子タスクの起動
alter task task_b resume;
alter task task_c resume;
alter task task_d resume;
※SCHEDULE、AFTERおよびFINALIZEが設定されたタスクがトリガー条件を満たしたとき実行されるようにalter task … resumeを実行する必要があります。
タスクの状況
show tasks in database ... ;
タスクを実行
execute task task_a;
TASK_Cでデータ型不一致によるエラーが発生し、TASK_Dはスキップされます。
データやロジック起因でエラーが発生した場合、タスクを手動実行して再処理を行います。しかし、従来は手動で実行できるタスクは「ルートタスクのみ」でした。
つまり、エラー原因解消後に再処理を行うにも、TASK_Cのみを手動実行することができず以下の方法を取る必要がありました。
-- エラー原因解消
create or replace table table_c (id varchar);-- タスク実行
execute task task_a;
しかし、上記の方法にはいくつか問題点があります。
※今回のタスク処理はINSERTのみのため、TASK_Aが二度実行されるとデータが重複します。
タスクDAG内で正常に終了しているタスクを除いて再実行を行える機能が「RETRY LAST」によるタスクの再実行です。
execute task {task_name} retry last;
再度エラーを発生させエラー原因を解消した後、リトライ実行を行います。
-- テーブルを作り直す
create or replace table table_a (id varchar);
create or replace table table_b (id varchar);
create or replace table table_c (id number); -- ※データ型エラーを発生させる
create or replace table table_d (id varchar);-- タスク実行
execute task task_a;-- エラー原因解消
create or replace table table_c (id varchar);-- 失敗しタスクのみ再実行
execute task task_a retry last;
履歴を下から見ると、最初の実行でTASK_A=成功、TASK_B=成功、TASK_C=失敗となった後、リトライ実行を行うと失敗したTASK_Cから実行されます。その後TASK_Dが実行されタスクが終了します。
また、TASK_Aは一度しか実行されていないため、重複は発生していません。
このRETRY LAST実行にはいくつか使用条件があります。
直近のタスクが正常終了していると、リトライ実行はエラーとなります。
エラー後、タスクに何らかの変更が加えられた場合、リトライ実行はエラーとなります。
変更とみなされるコマンド
💡タスクにはSQLではなくプロシージャを呼び出すCALL文を設定しよう
エラーがロジック起因の場合、TASKに直接SQLを設定しているとロジック変更がタスク変更とみなされリトライ実行が行えなくなってしまいます。 直接SQLを設定せず、プロシージャ—にSQLを設定しタスクからプロシージャをCALLするのがベターです。 |
14日を過ぎるとリトライ実行を行うことができません。
本記事では、新しく追加されたタスクのリトライ実行機能「EXECUTE TASK … RETRY LAST」を検証しました。
この機能により、バッチ更新処理の途中でエラーが発生しても、失敗したDAGから再実行を行うことが可能になります。
※snowsightのタスク履歴上でのリトライ実行は、通常実行とリトライ実行が合わせて表示されるようです
リトライ実行にはタスク変更が許されていないため、TASKに直接SQLを設定せず、プロシージャにSQLを設定しタスクからCALLする構成。ロジックに問題があった場合はプロシージャを変更するような形が良いと思います。
プロシージャの作成例
create or replace procedure proc_task_c()
returns varchar not null
language sql
as
begin
insert into table_c (id) select to_char(current_date(), 'yyyymmdd') || '_c';
return 'Done.';
end;
タスク作成例(プロシージャ呼び出し型)
create or replace task task_c
warehouse = wh_survey_xs
after task_a
as
call proc_task_c();
;
⇒ プロシージャのSQLを修正してもタスクの変更とみなされず「RETRY LAST」が使用可能
また、データ分析基盤を運用するにあたり、データ更新処理を何度実行しても同じ結果となる冪等性を担保したログ・処理設計ができると運用が楽になります。