Snowflake Knowledge - INSIGHT LAB

troccoを使ってSlackからSnowflakeに会話データを送ってみる

作成者: BI LAB編集室|2020年12月17日

こんにちは!

本記事はSnowflake Advent Calendar 2020の18日目の記事となります。

先日、troccoでGoogle SpreadsheetsからSnowflakeへデータを送ってみたというナレッジをJasonが作成していましたが簡単に接続できていたので自分でも触ってみたくなったLeonです。

ちょうどやりたいことがあったのでtroccoを使って試してみたいと思います!

やりたいこと

弊社ではオフィスコンビニサービスを利用しており、オフィス内に設置してある冷蔵庫やお菓子棚から好きなものを取って購入する分のお金を集金ボックスへ入れて支払うようになっています。

基本的には購入管理をする必要はないのですが
購入記録を取ってみてどれぐらい売れているのか気になったので
Slackのチャンネルに購入報告を行うようにルールを決め、購買記録を取れるようにしました!

購入報告は絵文字を送信するようにして楽に報告できるようにしています。
参考にとある日の購入報告はこんな感じになっています。


変な絵文字が並んでいますがこれはお水(50円)になります。。。遊び心です。。。

去年の6月ぐらいから始めた報告ルールですがだいぶデータが溜まってきたはずなので
こちらの購入報告データをtroccoを使ってSnowflakeに取り込んでみたいと思います!

troccoの接続元

さて連携するぞ~ということで
troccoの転送元で確認してみると2020/12時点ではSlackは見当たりません…

なにか使えるものが無いか…

これだ!!!
troccoの接続元ではHttp接続に対応しておりこちらを使えばSlack Web APIで連携できる!はず!

なお、接続元に使いたいデータソースがない場合troccoのサポートにご連絡することで
追加対応していただける可能性がありますのでもし困った場合はお問い合わせください…!
https://trocco.io/lp/function/transfer.html
EmbulkのプラグインにSlack連携があるので対応できるかもしれません…!

Slack Web APIの設定

Slack Web APIはSlackとデータを送受信できるAPIで
例えばメッセージの送信や会話データを取得することができちゃいます。
https://api.slack.com/web
APIのメソッド一覧はこちら
https://api.slack.com/methods
Slack Web APIの具体的な説明はググるとたくさん出てくるので割愛します。

今回、会話データの取得に使うメソッドはconversations.historyです。
https://api.slack.com/methods/conversations.history
会話やイベントの情報を取得できるメソッドです。

それではSlack側の準備をしていきましょう!

Appを作成

https://api.slack.com/apps
上記にアクセスし[Create New App]をクリックしAppを作成します。

いい感じの名前を設定し、取得先のワークスペースを選択します。

スコープ設定

Appが作成できたら次に権限周りを設定します。[Permissions]をクリックします。

conversations.historyを利用するために必要なスコープはメソッドのページに記載されています。

ScopesのUser Token Scopesで[Add an OAuth Scope]クリックしスコープを選択していきます。

Scopesの設定ができました!

ページ上部の[Install App to Workspace]をクリックします。

アクセス権限のリクエストについて確認されるので[許可する]をクリックします。

Tokenが表示がされたら準備完了です。

APIを試そう

設定完了したら実際にAPIが使えるか確認しましょう!
SlackAPIのメソッド説明ページにはテスターが用意されていますのでそちらで試します。
https://api.slack.com/methods/conversations.history/test


tokenには先程作成されたxoxpから始まるトークンを貼り付けます。
そしてchannelにはチャンネルIDが必要になるのですが簡単な取得方法としてはURLの確認です。

アプリのチャンネル一覧で右クリック、「リンクをコピー」を選択します。
https://workspaceID.slack.com/archives/ChannelID
上記のURLの[ChanneID]が対象になります。

それではチャンネルIDを入力し[Test Method]をクリックしてAPIを実行してみましょう。

URLと会話内容のJSONが返ってきました!接続は問題なさそうです!

troccoで転送設定

まず転送設定を作成する前にSnowfakeの接続情報を登録する必要があります。
以下の記事で設定について説明しているので参考に登録してください。
troccoでGoogle SpreadsheetsからSnowflakeへデータを送ってみた

それでは転送設定を作成します!troccoの設定はサクッと作れます!

転送設定作成

転送設定ページから[新規転送設定作成]をクリックします。

転送元で[Http]、転送先で[Snowflake]を選択します。

概要設定

ジョブの名前や説明など設定します。いい感じの名前を設定しましょう。

転送元Httpの設定①:URLとJSONPath

次にSlack Web APIからデータを取り込むHttpを設定します。
必要な情報はconversations.historyのドキュメントにありますので参考してください。


Slack Web APIから返ってくる値はJSONになるので
入力ファイル形式では[JSONPath]を選択します。

ここで重要なのがJSONPathの設定です!
JSONPathについて記載するとこれだけで記事作成できてしまいそうなので割愛します…
Qiitaでわかりやすく説明されている以下の記事を参考にしてください。
JSONPath 使い方まとめ
簡単に言えばJSONのどこを読み取るのかを指定してあげるになります。

JSONPathを確認できるサイトでSlackAPIで返ってきた値を確認してみます。
構造的に[messages]に会話のユーザー情報や内容などのデータが
入ってくる形になるのでmessageを取得するためのJSONPathを[$.messages]と書きます。

実際に入力したところmassegesの中身のみ結果が出ました。
設定にも[$.messages]と入力しましょう!

転送元Httpの設定②:パラメーターとHTTPヘッダ

続けて先程確認で使ったtokenとチャンネルIDを使って設定を作成します。

パラメーターには[channel]を追加しIDを入力します。
パラメーター[oldest]は指定した日時(形式はUnixtime)以降のデータを取得するものです。
一旦数日分で確認したいので数日前のUnixtimeを指定しました。

なお、ジョブ実行時に指定値に設定できる「カスタム変数」がありますが
こちらUnixtimeに対応していないため期間指定を可変させることはできない模様です。。。
しっかりとした接続先にしたい場合はやはりtroccoさんに問い合わせて対応してもらう方が良さそうですね…

HTTPヘッダにはトークンを設定します。
キー名を[Authorization]と入力し、値に[Bearer xoxp-~~~]と入力します。
Bearerの入力し忘れにご注意ください。

これでHTTPの設定は終わりです。

転送先Snowflake設定

設定を入力する前にSnowfakeでテーブルを作っておきます。

列名はmessagesの内容と同じようにしています。
なお、blocksは会話テキストの構造を示す配列データになっているのでSnowfake上ではVARIANT型にしています。


転送先設定はSnowfakeで実行する[ウェアハウス]と
対象の[データベース][スキーマ][PURCHASE]を入力し転送モードを選択します。
今回は[insert]で設定しました。

以上でSnowfakeの設定は終了です。
[保存して自動データ設定・プレビューへ]をクリックして次へ進みます。

データプレビューとデータ設定

ページに移りしばらく待つとJSONのmessagesの部分が表示されます!
このプレビューはデータがちゃんと取れているのか確認できるためとても便利です。

blocksまで表示されているはずです!
※注:上記は最新メッセージがシンプルにメッセージのみになっている場合のカラムです。
絵文字リアクションや返信、URLが入る場合、カラムが増えますのでご注意を!
そういったデータも欲しい場合は最新メッセージに対象のカラムが入るように設定しましょう。

次にカラム定義を確認します。プレビューが読み込めたら自動的に入力されます。

tsカラムのデータ型がdoubleになっており、値がちゃんととれていないためstringに変更します。

スタンダートプランの場合、フィルターやマスキング、文字列置換、カラム暗号化など
行えますが今回はフィルターを設定しリマインダーのSlackbotのレコードを排除しておきます。


以上でジョブ設定は完了です。

ジョブを実行してみる

転送設定から対象ジョブを選び[実行]をクリックします。

そのまま[ジョブを実行]をクリックします。

無事実行できました!(実行ログもちゃんと表示されて助かります)

念の為、Snowflakeで確認しておきます。

問題なさそうですね!

Snowflakeでデータマートを作ってみる

無事troccoでSlackのデータを取得できました。
しかし…現状はテキストデータになるため売上集計などには向いていません。

なので分析しやすいようにデータを処理してデータマートを作成します!
今回使いたいのはtextカラム…ではなく、blocksカラムです。
このカラムはtextカラムの要素をJSONの形で持ちます。
例えば水2本の絵文字(:o_mizu2: :o_mizu2:)の場合、
blocksにはo_mizu2という絵文字2つとその間にある半角スペースのテキストの情報を持ちます。

具体的な中身はこんな感じです。

おそらくtextカラムでも上手く処理して集計が出来そうですが
JSONも処理できるSnowflakeなのでblocksカラムの方が使いやすそうです。

blocksについて注意点

注意したい点がありまして、
実際にデータを取得して確認していたところ
このblocksカラムが出てくるのが2019年11月19日頃からになっておりまして
それ以前はtextカラムで処理するしかなさそうです…

それではblocksカラムを使った処理を作ってみたいと思います!

フラット化

まずは1レコード1商品としたいのでVARIANT型に入っているデータのフラット化を行います。
フラット化についてはFLATTEN関数を使います。
詳細の情報についてはSnowflakeのドキュメントをご確認ください。
https://docs.snowflake.com/ja/sql-reference/functions/flatten.html

FLATTENについて完全に理解している訳ではないですが…サンプルを見ながら作ってみます。
上記のblocksの構造を見ると欲しいデータはelementsの中にあります。
流れとしては
blocks→elements→elements→{name, type} 
といった感じだと思います。

それを踏まえてサンプルを見ながら…

SELECT
 c.value:name
 ,c.value:type
FROM "LEON_DEMO"."SLACK"."PURCHASE" p, lateral flatten(input => p.blocks) a, lateral flatten(input => a.value:elements) b, lateral flatten(input => b.value:elements) c
;

これで実行してみると…

いい感じに取れてそうです!

使えそうなのでクエリ作成してみました。ついでにUnixtimeからTimestampを作ってます。

SELECT
 user
 ,convert_timezone('UTC','Asia/Tokyo', TO_TIMESTAMP(TO_NUMBER(TS))) as send_time
 ,c.value:name
 ,c.value:type
FROM "LEON_DEMO"."SLACK"."PURCHASE" p, lateral flatten(input => p.blocks) a, lateral flatten(input => a.value:elements) b, lateral flatten(input => b.value:elements) c
WHERE c.value:type = 'emoji'
;


結果はこちら

いい感じにレコード側分かれて取れていますね!FLATTEN便利!!

blocksはこのようにemojiやテキストの要素を切り分けることができるので
テキストのみを分析したいなどの用途にも使えるので便利ですね。

売上処理

現状は絵文字名しか入っていないので価格を追加します。
普通なら商品マスタを用意すべきですが価格パターンが少ないので今回はCASE文で作っちゃいます。
色々なネタ絵文字があるためこのクエリは割愛します…笑

できました!

あとはクエリの結果をテーブルに挿入したら完成です!

おまけ:Tableauに繋いでデータを確認してみる

せっかくなのでtroccoで全件データを取ってみてTableauで簡単にですが売上推移を見てみます。
コロナの影響がどんな感じに出てるのか…


やはり緊急事態宣言があった2020年4月以降は売上が少ない傾向になりました。
弊社ではちょうどその頃にテレワーク体制へ移行し出勤するメンバーがかなり減ったのが原因です。
夏場は水が飛ぶように売れるため売上が伸びますが今年の夏は変化無しですね…

オフィス関連のビジネスもこういった影響が出ていることがよくわかりました。

課題

さて、ここまで作ってきたhttp接続を使ったSlackデータ取得ですがいくつか課題があったので挙げておきます。

  • conversations.historyの最大取得数の1000件が限界
    過去分取得の際は[oldest][latest]パラメーターを使って分けて取得する必要があります。
  • unixtimeを使って期間を指定する必要があるがカスタム変数でunixtimeが使えない
    日次で取得する場合は余分の件数を取得→一時テーブルに投入→新規追加分だけテーブルに追加
    といった形での更新になるかと思います。
    ただ1日1000件以上のメッセージがある場合は更新は難しそうですね…
  • blocksカラムが2019年11月19日より前には存在しなかった
    過去分はtextカラムにで取り扱いましょう。

最後に

今回はSlackからデータを取得してみました。
今後接続先にSlackが追加される可能性はありますが
Http接続の使い方やSnowflakeでの扱い方、Slack Web APIの使い方等の参考になれば幸いです!

あとコロナが早く落ち着いて欲しいLeonでした。