目次
はじめに
以前の記事でご紹介した通り、
SnowflakeにはタスクというSQLを定期実行できる便利な機能があります。
タスクを用いて、
元データを整形→データマートを生成…
といったデータパイプラインを組んでいる方は多いことでしょう。
しかし便利な機能である一方、
タスクが実行失敗した際の通知(メールやSlackへの通知)を
Snowflake単体で飛ばすことはできないのです…。※2020年12月現在、私調べ
Webコンソール上でポチポチして通知設定できたらすごく便利なんですけどね。Snowflake様、機能追加待っております!!お願いします!!
さて どうしよ
…データベース管理者の皆さん。
迅速にリカバリ対応するためにも、
タスクが実行失敗した際の通知は必須ですよね。
通知機能を備えたデータ連携ツール等を導入していれば問題ないかもしれませんが、
今回はそういったツールが導入されていない(導入できない)場合を想定し、
AWSの各サービスを組み合わせて通知の仕組みを実現してみようと思います!
TASK_HISTORY
Snowflakeは、
タスクの実行ログをTASK_HISTORYから確認できます。
上記SQLを実行すると…
こんな感じで結果が返ってきます。
沢山カラムがありますね。
主要なカラムの説明は下記表を参考に。
※全カラムの詳細な説明は公式ドキュメントのTASK_HISTORYに記載されています!
カラム名 | 説明 |
QUERY_ID | SQLステートメントのID |
NAME | タスク名 |
DATABASE_NAME | タスクの格納先DB名 |
SCHEMA_NAME | タスクの格納先スキーマ名 |
QUERY_TEXT | SQLステートメントの内容 |
STATE | タスクのステータス:SCHEDULED/EXECUTING/SUCCEEDED/FAILED/CANCELLED |
ERROR_CODE | エラーコード |
ERROR_MESSAGE | エラーメッセージ |
SCHEDULED_TIME | タスクのスケジュール時刻 |
QUERY_START_TIME | タスクの実行時刻 |
COMPLETED_TIME | タスクの完了時刻 |
ふむふむ…
この実行ログから必要なデータを抽出して…定期的にどこかに置いて…
メッセージ生成プログラムを発火⇒通知みたいな仕組みを作れば…いけそうな…気が…するぞ?
ではどうするのか?
今回はAWSのサービス3つを使用します。
- Amazon S3(Simple Storage Service)
- みんな大好きオブジェクトストレージ
- ファイルの置き場所
- AWS Lambda
- みんな大好きサーバーレスなプログラム実行環境
- Python/Node.js/Java/Ruby/COBOL等が動く
- Amazon SNS(Simple Notification Service)
- みんな大好きフルマネージドなメッセージングサービス
- メールに簡単に飛ばせる
- Lambda ⇒ SNS ⇒ Lambdaのように仲介役も可能
EC2とかは使いません。コスパ重視。サーバーレス最高。
そして、下記の構成で無理やりスマートに通知機能を実現します!
①定期的にタスクの実行ログをS3にアンロード
②S3トリガーでLambdaを起動して通知用メッセージを生成
③SNSにメッセージをキューイング
④メールを送信
これでなんとなく通知機能のイメージはできましたか?
通知機能の要件
続いて通知機能の要件をざっくり決めちゃいます!
- タスクが実行失敗した際、30分以内にメールで通知されること
- 実行失敗したタスク単位でメールを送信すること
- 例)30分以内に3タスク実行失敗したら3つメールを送信
- メールのタイトルにタスク名を含めること
- 本文には下記の情報を含めること
- タスク名
- DB名
- タスク終了時刻
- エラーコード
- エラーメッセージ
- S3には過去の実行ログを残さないこと(最新分だけ残す)
これらの要件を踏まえて実装を進めます。
実装の手順
いよいよ実装フェーズに移ります!
前提
実装前に、前提条件として下記作業の実施をお願いします。
- Amazon S3上でバケット作成
- 任意のバケット名で作成しておいて下さい。
- タスク実行ログの出力先として使用します。
- Snowflake上でAmazon S3へのストレージ統合作成
- 接続・認証設定のようなものです。
- 対象バケットのAmazon リソースネーム (ARN)が必要になります。
- Snowflake上でAmazon S3への外部ステージ作成
- 接続後のロードやアンロードに関する更に細かい仕様を設定するものです。
- SnowflakeからS3のバケットが参照できるようになります。
【Amazon SNS】トピック&サブスクリプション設定
トピックの作成
SNSコンソールを開き、[トピックの作成]をクリックします。
[スタンダード]にチェックを入れ、任意の名前を入力します。
必要に応じてオプションを設定し、[トピックの作成]をクリックします。
作成したトピックの情報が表示されます。
※ARNは後程Lambdaで使用するので、メモしておきましょう!
サブスクリプションの作成
次に[サブスクリプションの作成]をクリックします。
[プロトコル]は[Eメール]を選択し、[エンドポイント]に通知先のメールアドレスを入力します。
必要に応じてオプションを設定し、
[サブスクリプションの作成]をクリックします。
すると、先ほど入力したメールアドレス宛にAWSから確認メールが届きます。
メール本文のリンク[Confirm subscription]をクリックします。
ブラウザが立ち上がり、下記ページが表示されます。
トピックの画面に戻ってみましょう。
サブスクリプションが追加され、[ステータス]が確認済になっていますね。
これでSNS側の作業は完了です!
【AWS Lambda】通知文生成処理実装
Pythonの実装
Lambdaコンソールを開きます。
任意の関数名を入力し、ランタイムは[Python 3.8]を選択して関数を作成。
サンプルコードが表示されます。
サンプルコードは削除し、下記のコードをCopy&Pasteします。
コードの細かい説明は省きますが、
S3のファイルをロード⇒値を変数に格納⇒通知用のタイトル&本文生成⇒SNSに送信、といった処理を組んでいます。
環境変数の追加
上記のコードは環境変数からS3とSNSの接続情報を取得するようにしているため、
環境変数を追加する必要があります。
[環境変数の編集]を開きます。
[S3_BUCKET_NAME]にS3のバケット名を、
[SNS_TOPIC_ARN]にSNSのARNを入力し、保存します。
タイムアウトの設定
Lambdaはデフォルトだと3秒でタイムアウトするのですが、
3秒だと処理が終わらない可能性があります…。念のため変更しておきましょう!
[基本設定を編集]を開きます。
5分あれば十分かと。
変更したら保存します。
S3トリガーの設定
S3にファイルが出力されたタイミングでLambda関数を実行したいため、
トリガーを設定する必要があります。
[デザイナー]>[トリガーを追加]をクリックします。
トリガーの一覧から[S3]を選択し、[バケット]は該当のバケット名を選択します。
イベントタイプは[PUT]を選択しましょう。
※プレフィックスとサフィックスはオプションですが、バケットの構成によっては入力する必要があります。
右下の[追加]をクリックしたらトリガーの設定完了です。
[デザイナー]を見ると、左側にS3が追加されていると思います。
後はコードをデプロイしたらLambda側の作業は完了です!
【Snowflake】S3アンロード用タスク作成
Snowflake上で下記のSQLを実行します。
ウェアハウス名と外部ステージ名は事前に作成したものを指定してください。
- 過去30分間において実行失敗(FAILED)したタスクのログを抽出し、
30分毎にtsv形式でS3へアンロード(COPY)する処理を組んでいます。 - tsvファイルは同一ファイル名で上書きするようにしており、
S3上には最新分のファイルしか残らないようにしています。 - error_mesageは改行コードを含む場合があるため、置換処理しています。
タスクは作成した直後は停止(SUSPEND)状態なので、起動(RESUME)しましょう。
これで準備が整いました!
検証
動作検証してみましょう。
検証用に、エラーで失敗するタスクをSnowflake上で作成します。
とりあえずゼロ除算エラーを起こすタスクを作成。
RESUMEして、30分ほど待ちましょうか。
すると…。
…。
お、通知が来たぞ?
メールボックスを開くと…
この通り!
ちゃんとメールで通知されましたね。
余談
今回はタスク失敗通知の機能を実現すべく、
Snowflake(TASK_HISTORY)⇒S3⇒Lambda⇒SNSといったフローを構築しましたが、
別にTASK_HISTORYに限らず何のデータでもこのフローに組みことはできますので、
例えばSnowflake上のデータマートの一部を定期的に特定メンバーへ送信…といった用途にも応用できると思います。
最後に
この記事がいずれ不要になることを切に願います。
Snowflake様、機能追加待っております!!お願いします!!
以上、
「Snowflakeのタスク失敗通知をAWSで実装してみた」でした!