みなさんこんにちは、INSIGHT LAB株式会社のSHOWです。
本記事はApache Kafkaを使ってSnowflakeへリアルタイムストリーミング処理を検証した内容となります。
最初に前提のお話をします。
Kafkaは大規模なストリームデータを扱うことができるオープンソースの分散メッセージングシステムです。
Kafkaの特徴は次のとおりです。
上記以外にも特徴はありますが、詳細はKafka公式ドキュメントをご確認ください。
Kafkaの話を始めると大変長くなりますので、さっそくKafkaのストリーミング処理を検証してみたいと思います。
Kafkaを動かすために必要なリソースを作成起動します。
Broker は Publish/Subscribe メッセージングシステムが稼働する 1 台のサーバーです。
よくある構成としては、Pub/Subサーバが複数あり、そのサーバを管理するのがZookeeperとなります。
現在はZookeeperでの管理から離れBroker自身で管理するアーキテクチャへ移行しています。
詳しくはKIP-500で調べてみてください。
以下のコマンドを実行し、Zookeeperを起動させます。
$ ./bin/zookeeper-server-start.sh ./config/zookeeper.properties
Kafkaが実行されているサーバ(Broker)をグループ化したものです。
以下コマンドを実行し、Kafkaサーバを起動させます。
$ ./bin/kafka-server-start.sh ./config/server.properties
メッセージを整理するためのカテゴリです。
今回はpersonal-dataと言うカテゴリを作成します。
以下コマンドを実行し、トピックの作成を行います。
$ ./bin/kafka-topics.sh --create --topic personal-data --bootstrap-server localhost:9092
Pythonのプログラム内でTopicのカテゴリ「personal-data」を設定し、kafkaのproducerへフェイクデータを送信するようにしています。
以下のとおり、Snowflake側でデータが取り込まれていることを確認できました。
kafkaの嬉しいポイントとしては、Pub/Subモデルを採用しているため、ストリーム処理が非同期で行われるところです。
サブスクライバー(データの送信側)はパブリッシャー(データの取得側)に直接接続する必要はありません。パブリッシャーはKafkaでメッセージをキューに入れて、サブスクライバーが後で受信できるようにします。このため一時的にサーバの負荷が高まったとしても、負荷が落ち着いた後にキューからデータを読み取ることが可能であり、データの取りこぼしが無いような設計とされています。
さらに23年7月のsnowflakeアップデートでSnowpipe Streaming + Kafka Connectorの連携が可能となりました。
これにより、Snowpipe Streamingを使用することでデフォルトで配信保証(データの重複や損失を防止)が設定されます。
AWSのS3とSQSを組み合わせたストリーム処理も有名ですが、ぜひ一度Kafkaのストリーミング処理もお試しください。新たな扉が開かれることでしょう。