お問い合わせ
9 分で読むことができます

Snowflakeのタスク失敗通知をAWSで実装してみた

執筆者 BI LAB編集室 更新日時 2020年12月25日

目次

はじめに

以前の記事でご紹介した通り、
SnowflakeにはタスクというSQLを定期実行できる便利な機能があります。

タスクを用いて、
元データを整形→データマートを生成…
といったデータパイプラインを組んでいる方は多いことでしょう。

しかし便利な機能である一方、
タスクが実行失敗した際の通知(メールやSlackへの通知)を
Snowflake単体で飛ばすことはできないのです…。※2020年12月現在、私調べ
Webコンソール上でポチポチして通知設定できたらすごく便利なんですけどね。Snowflake様、機能追加待っております!!お願いします!!

 

困った顔で働く会社員のイラスト(男性)
  さて どうしよ

 

…データベース管理者の皆さん。

迅速にリカバリ対応するためにも、
タスクが実行失敗した際の通知は必須ですよね。

通知機能を備えたデータ連携ツール等を導入していれば問題ないかもしれませんが、
今回はそういったツールが導入されていない(導入できない)場合を想定し、
AWSの各サービスを組み合わせて通知の仕組みを実現してみようと思います!

 

TASK_HISTORY

Snowflakeは、
タスクの実行ログをTASK_HISTORYから確認できます。

上記SQLを実行すると…

こんな感じで結果が返ってきます。
沢山カラムがありますね。

主要なカラムの説明は下記表を参考に。
※全カラムの詳細な説明は公式ドキュメントのTASK_HISTORYに記載されています!

カラム名 説明
QUERY_ID SQLステートメントのID
NAME タスク名
DATABASE_NAME タスクの格納先DB名
SCHEMA_NAME タスクの格納先スキーマ名
QUERY_TEXT SQLステートメントの内容
STATE タスクのステータス:SCHEDULED/EXECUTING/SUCCEEDED/FAILED/CANCELLED
ERROR_CODE エラーコード
ERROR_MESSAGE エラーメッセージ
SCHEDULED_TIME タスクのスケジュール時刻
QUERY_START_TIME タスクの実行時刻
COMPLETED_TIME タスクの完了時刻

 

ふむふむ…

この実行ログから必要なデータを抽出して…定期的にどこかに置いて…
メッセージ生成プログラムを発火⇒通知みたいな仕組みを作れば…いけそうな…気が…するぞ?

 

ではどうするのか?

今回はAWSのサービス3つを使用します。

  • Amazon S3(Simple Storage Service)
    • みんな大好きオブジェクトストレージ
    • ファイルの置き場所
  • AWS Lambda
    • みんな大好きサーバーレスなプログラム実行環境
    • Python/Node.js/Java/Ruby/COBOL等が動く
  • Amazon SNS(Simple Notification Service)
    • みんな大好きフルマネージドなメッセージングサービス
    • メールに簡単に飛ばせる
    • Lambda ⇒ SNS ⇒ Lambdaのように仲介役も可能

EC2とかは使いません。コスパ重視。サーバーレス最高。

 

そして、下記の構成で無理やりスマートに通知機能を実現します!

 

①定期的にタスクの実行ログをS3にアンロード
②S3トリガーでLambdaを起動して通知用メッセージを生成
③SNSにメッセージをキューイング
④メールを送信

 

これでなんとなく通知機能のイメージはできましたか?

 

通知機能の要件

続いて通知機能の要件をざっくり決めちゃいます!

  • タスクが実行失敗した際、30分以内にメールで通知されること
  • 実行失敗したタスク単位でメールを送信すること
    • 例)30分以内に3タスク実行失敗したら3つメールを送信
  • メールのタイトルにタスク名を含めること
  • 本文には下記の情報を含めること
    • タスク名
    • DB名
    • タスク終了時刻
    • エラーコード
    • エラーメッセージ
  • S3には過去の実行ログを残さないこと(最新分だけ残す)

 

これらの要件を踏まえて実装を進めます。

 

実装の手順

いよいよ実装フェーズに移ります!

 

前提

実装前に、前提条件として下記作業の実施をお願いします。

  • Amazon S3上でバケット作成
    • 任意のバケット名で作成しておいて下さい。
    • タスク実行ログの出力先として使用します。
  • Snowflake上でAmazon S3へのストレージ統合作成
    • 接続・認証設定のようなものです。
    • 対象バケットのAmazon リソースネーム (ARN)が必要になります。
  • Snowflake上でAmazon S3への外部ステージ作成
    • 接続後のロードやアンロードに関する更に細かい仕様を設定するものです。
    • SnowflakeからS3のバケットが参照できるようになります。

 

【Amazon SNS】トピック&サブスクリプション設定

トピックの作成

SNSコンソールを開き、[トピックの作成]をクリックします。

 

[スタンダード]にチェックを入れ、任意の名前を入力します。

 

必要に応じてオプションを設定し、[トピックの作成]をクリックします。

作成したトピックの情報が表示されます。
※ARNは後程Lambdaで使用するので、メモしておきましょう!

 

サブスクリプションの作成

次に[サブスクリプションの作成]をクリックします。

 

[プロトコル]は[Eメール]を選択し、[エンドポイント]に通知先のメールアドレスを入力します。

必要に応じてオプションを設定し、
[サブスクリプションの作成]をクリックします。

 

すると、先ほど入力したメールアドレス宛にAWSから確認メールが届きます。

メール本文のリンク[Confirm subscription]をクリックします。

 

ブラウザが立ち上がり、下記ページが表示されます。

 

トピックの画面に戻ってみましょう。
サブスクリプションが追加され、[ステータス]が確認済になっていますね。

 

これでSNS側の作業は完了です!

 

【AWS Lambda】通知文生成処理実装

Pythonの実装

Lambdaコンソールを開きます。

任意の関数名を入力し、ランタイムは[Python 3.8]を選択して関数を作成。

 

サンプルコードが表示されます。

サンプルコードは削除し、下記のコードをCopy&Pasteします。

コードの細かい説明は省きますが、
S3のファイルをロード⇒値を変数に格納⇒通知用のタイトル&本文生成⇒SNSに送信、といった処理を組んでいます。

 

環境変数の追加

上記のコードは環境変数からS3とSNSの接続情報を取得するようにしているため、
環境変数を追加する必要があります。

[環境変数の編集]を開きます。

[S3_BUCKET_NAME]にS3のバケット名を、
[SNS_TOPIC_ARN]にSNSのARNを入力し、保存します。

 

タイムアウトの設定

Lambdaはデフォルトだと3秒でタイムアウトするのですが、
3秒だと処理が終わらない可能性があります…。念のため変更しておきましょう!

[基本設定を編集]を開きます。

5分あれば十分かと。
変更したら保存します。

 

S3トリガーの設定

S3にファイルが出力されたタイミングでLambda関数を実行したいため、
トリガーを設定する必要があります。

[デザイナー]>[トリガーを追加]をクリックします。

 

トリガーの一覧から[S3]を選択し、[バケット]は該当のバケット名を選択します。
イベントタイプは[PUT]を選択しましょう。

※プレフィックスとサフィックスはオプションですが、バケットの構成によっては入力する必要があります。


右下の[追加]をクリックしたらトリガーの設定完了です。

 

[デザイナー]を見ると、左側にS3が追加されていると思います。

 

後はコードをデプロイしたらLambda側の作業は完了です!

 

【Snowflake】S3アンロード用タスク作成

Snowflake上で下記のSQLを実行します。
ウェアハウス名と外部ステージ名は事前に作成したものを指定してください。

  • 過去30分間において実行失敗(FAILED)したタスクのログを抽出し、
    30分毎にtsv形式でS3へアンロード(COPY)する処理を組んでいます。
  • tsvファイルは同一ファイル名で上書きするようにしており、
    S3上には最新分のファイルしか残らないようにしています。
  • error_mesageは改行コードを含む場合があるため、置換処理しています。

 

タスクは作成した直後は停止(SUSPEND)状態なので、起動(RESUME)しましょう。

これで準備が整いました!

 

検証

動作検証してみましょう。

検証用に、エラーで失敗するタスクをSnowflake上で作成します。

とりあえずゼロ除算エラーを起こすタスクを作成。

RESUMEして、30分ほど待ちましょうか。

 

すると…。

…。

お、通知が来たぞ?

メールボックスを開くと…

この通り!
ちゃんとメールで通知されましたね。

 

余談

今回はタスク失敗通知の機能を実現すべく、
Snowflake(TASK_HISTORY)⇒S3⇒Lambda⇒SNSといったフローを構築しましたが、
別にTASK_HISTORYに限らず何のデータでもこのフローに組みことはできますので、
例えばSnowflake上のデータマートの一部を定期的に特定メンバーへ送信…といった用途にも応用できると思います。

 

最後に

この記事がいずれ不要になることを切に願います。
Snowflake様、機能追加待っております!!お願いします!!

以上、
Snowflakeのタスク失敗通知をAWSで実装してみた」でした!

Snowflakeを体験してみませんか?

INSIGHT LABではSnowflake紹介セミナーを定期開催しています。Snowflakeの製品紹介だけでなく、デモンストレーションを通してSnowflakeのシンプルなUI操作や処理パフォーマンスの高さを体感いただけます。

詳細はこちら

BI LAB編集室

執筆者 BI LAB編集室

BI LAB(データ活用研究所)編集室です。 BI、AI、DWHなどデータ活用に関するトレンドやニュースやコラムをほぼ毎日配信しています。押さえておきたい基本知識から、最新ニュース、事例インタビューやお役立ち情報・セミナーレポートまで、データ活用の専門家ならではの視点と情報量でお届けします。

4 分で読むことができます。
Snowflakeの料金体系|クレジットと費用最適化のポイントをご紹介
5 分で読むことができます。
【禁断の比較?】SnowflakeとTreasure Dataを比べてみました
1 分で読むことができます。
誤ってupdateしてしまったレコードをtime travelで復元する
5 分で読むことができます。
【Snowflake】新機能「Streamlit in Snowflake」とは何者か!?
3 分で読むことができます。
AWS Lambdaを使ってSnowflakeとSFTPサーバーを連携してみた