以前の記事でご紹介した通り、
SnowflakeにはタスクというSQLを定期実行できる便利な機能があります。
タスクを用いて、
元データを整形→データマートを生成…
といったデータパイプラインを組んでいる方は多いことでしょう。
しかし便利な機能である一方、
タスクが実行失敗した際の通知(メールやSlackへの通知)を
Snowflake単体で飛ばすことはできないのです…。※2020年12月現在、私調べ
Webコンソール上でポチポチして通知設定できたらすごく便利なんですけどね。Snowflake様、機能追加待っております!!お願いします!!
さて どうしよ
…データベース管理者の皆さん。
迅速にリカバリ対応するためにも、
タスクが実行失敗した際の通知は必須ですよね。
通知機能を備えたデータ連携ツール等を導入していれば問題ないかもしれませんが、
今回はそういったツールが導入されていない(導入できない)場合を想定し、
AWSの各サービスを組み合わせて通知の仕組みを実現してみようと思います!
Snowflakeは、
タスクの実行ログをTASK_HISTORYから確認できます。
上記SQLを実行すると…
こんな感じで結果が返ってきます。
沢山カラムがありますね。
主要なカラムの説明は下記表を参考に。
※全カラムの詳細な説明は公式ドキュメントのTASK_HISTORYに記載されています!
カラム名 | 説明 |
QUERY_ID | SQLステートメントのID |
NAME | タスク名 |
DATABASE_NAME | タスクの格納先DB名 |
SCHEMA_NAME | タスクの格納先スキーマ名 |
QUERY_TEXT | SQLステートメントの内容 |
STATE | タスクのステータス:SCHEDULED/EXECUTING/SUCCEEDED/FAILED/CANCELLED |
ERROR_CODE | エラーコード |
ERROR_MESSAGE | エラーメッセージ |
SCHEDULED_TIME | タスクのスケジュール時刻 |
QUERY_START_TIME | タスクの実行時刻 |
COMPLETED_TIME | タスクの完了時刻 |
ふむふむ…
この実行ログから必要なデータを抽出して…定期的にどこかに置いて…
メッセージ生成プログラムを発火⇒通知みたいな仕組みを作れば…いけそうな…気が…するぞ?
今回はAWSのサービス3つを使用します。
EC2とかは使いません。コスパ重視。サーバーレス最高。
そして、下記の構成で無理やりスマートに通知機能を実現します!
①定期的にタスクの実行ログをS3にアンロード
②S3トリガーでLambdaを起動して通知用メッセージを生成
③SNSにメッセージをキューイング
④メールを送信
これでなんとなく通知機能のイメージはできましたか?
続いて通知機能の要件をざっくり決めちゃいます!
これらの要件を踏まえて実装を進めます。
いよいよ実装フェーズに移ります!
実装前に、前提条件として下記作業の実施をお願いします。
SNSコンソールを開き、[トピックの作成]をクリックします。
[スタンダード]にチェックを入れ、任意の名前を入力します。
必要に応じてオプションを設定し、[トピックの作成]をクリックします。
作成したトピックの情報が表示されます。
※ARNは後程Lambdaで使用するので、メモしておきましょう!
次に[サブスクリプションの作成]をクリックします。
[プロトコル]は[Eメール]を選択し、[エンドポイント]に通知先のメールアドレスを入力します。
必要に応じてオプションを設定し、
[サブスクリプションの作成]をクリックします。
すると、先ほど入力したメールアドレス宛にAWSから確認メールが届きます。
メール本文のリンク[Confirm subscription]をクリックします。
ブラウザが立ち上がり、下記ページが表示されます。
トピックの画面に戻ってみましょう。
サブスクリプションが追加され、[ステータス]が確認済になっていますね。
これでSNS側の作業は完了です!
Lambdaコンソールを開きます。
任意の関数名を入力し、ランタイムは[Python 3.8]を選択して関数を作成。
サンプルコードが表示されます。
サンプルコードは削除し、下記のコードをCopy&Pasteします。
コードの細かい説明は省きますが、
S3のファイルをロード⇒値を変数に格納⇒通知用のタイトル&本文生成⇒SNSに送信、といった処理を組んでいます。
上記のコードは環境変数からS3とSNSの接続情報を取得するようにしているため、
環境変数を追加する必要があります。
[環境変数の編集]を開きます。
[S3_BUCKET_NAME]にS3のバケット名を、
[SNS_TOPIC_ARN]にSNSのARNを入力し、保存します。
Lambdaはデフォルトだと3秒でタイムアウトするのですが、
3秒だと処理が終わらない可能性があります…。念のため変更しておきましょう!
[基本設定を編集]を開きます。
5分あれば十分かと。
変更したら保存します。
S3にファイルが出力されたタイミングでLambda関数を実行したいため、
トリガーを設定する必要があります。
[デザイナー]>[トリガーを追加]をクリックします。
トリガーの一覧から[S3]を選択し、[バケット]は該当のバケット名を選択します。
イベントタイプは[PUT]を選択しましょう。
※プレフィックスとサフィックスはオプションですが、バケットの構成によっては入力する必要があります。
右下の[追加]をクリックしたらトリガーの設定完了です。
[デザイナー]を見ると、左側にS3が追加されていると思います。
後はコードをデプロイしたらLambda側の作業は完了です!
Snowflake上で下記のSQLを実行します。
ウェアハウス名と外部ステージ名は事前に作成したものを指定してください。
タスクは作成した直後は停止(SUSPEND)状態なので、起動(RESUME)しましょう。
これで準備が整いました!
動作検証してみましょう。
検証用に、エラーで失敗するタスクをSnowflake上で作成します。
とりあえずゼロ除算エラーを起こすタスクを作成。
RESUMEして、30分ほど待ちましょうか。
すると…。
…。
お、通知が来たぞ?
メールボックスを開くと…
この通り!
ちゃんとメールで通知されましたね。
今回はタスク失敗通知の機能を実現すべく、
Snowflake(TASK_HISTORY)⇒S3⇒Lambda⇒SNSといったフローを構築しましたが、
別にTASK_HISTORYに限らず何のデータでもこのフローに組みことはできますので、
例えばSnowflake上のデータマートの一部を定期的に特定メンバーへ送信…といった用途にも応用できると思います。
この記事がいずれ不要になることを切に願います。
Snowflake様、機能追加待っております!!お願いします!!
以上、
「Snowflakeのタスク失敗通知をAWSで実装してみた」でした!