目次
概要
このナレッジでは、Cloud Run Jobs を活用して、BigQuery 上に蓄積された売上ジャーナルデータに対する店舗情報の不整合を自動的に検出し、Cloud Logging を用いて記録・可視化する仕組みについて解説します。
具体的には、「東京都の売上データに神奈川県に属する店舗の情報が含まれている」といった、地域コードや店舗コードの不一致による整合性エラーをどのように検知するかについて、本記事でご説明します。
使用技術
- Cloud Run Jobs
- Cloud Run Jobs は、コンテナ化されたアプリケーションやスクリプトをサーバーレス環境で一時的に実行できるサービスです。
従来のサーバー管理が不要で、必要な時にだけバッチ処理を起動できるため、運用コストの削減やスケーラビリティの確保に優れています。
本システムでは、Python スクリプトを定期的に実行し、BigQuery 上の売上ジャーナルデータに対して自動的に整合性チェックを行う処理基盤として活用しています。
- Cloud Run Jobs は、コンテナ化されたアプリケーションやスクリプトをサーバーレス環境で一時的に実行できるサービスです。
- BigQuery
- BigQuery は Google Cloud が提供する大規模データ分析プラットフォームで、ペタバイト級の大量データを高速にクエリ実行できる点が特徴です。
本システムでは、売上ジャーナルのような膨大なトランザクションデータを格納・管理し、Cloud Run Jobs からの検証処理に必要なデータ取得や分析を行っています。
SQL ベースのクエリで柔軟にデータの抽出や集計が可能なため、データの異常検知やレポート作成にも適しています。
- BigQuery は Google Cloud が提供する大規模データ分析プラットフォームで、ペタバイト級の大量データを高速にクエリ実行できる点が特徴です。
- Cloud Logging
- Cloud Logging は、Google Cloud 上の各種サービスやアプリケーションのログを収集・蓄積し、検索やモニタリングができるログ管理サービスです。
本システムでは、Cloud Run Jobs の実行結果や異常検知イベントをログとして出力し、リアルタイムでの監視や後続のアラート通知などに活用しています。
ログの可視化により、システムの状態把握やトラブルシューティングが容易になり、安定した運用を支える重要な役割を担っています。
- Cloud Logging は、Google Cloud 上の各種サービスやアプリケーションのログを収集・蓄積し、検索やモニタリングができるログ管理サービスです。
処理概要
- Cloud Run Jobs を利用して、定期的に Python スクリプトを実行します。
-
このスクリプトでは、BigQuery に格納された売上ジャーナルデータを対象に、あらかじめ定義された検証ロジックに基づいてデータの整合性チェックを行います。
具体的には、売上データに紐づく店舗情報を参照し、「東京都」の売上データに「神奈川県」の店舗コードが含まれているような、地域コードと店舗コードの不一致といった異常データの検出を行います。
こうしたチェックにより、データ登録時やシステム連携時に発生する可能性のある整合性エラーを早期に把握することが可能になります。 -
チェックの結果、不整合なデータや異常値が検出された場合には、その情報をイベントとして Cloud Logging に記録・蓄積します。
Cloud Logging 上に記録されたログは、後続の分析や通知処理(たとえば Cloud Monitoring との連携によるアラート通知)にも活用可能で、運用面での監視強化や品質向上に貢献します。
Pythonコード例
以下は、売上ジャーナルデータにおいて、東京都の売上記録に神奈川県の店舗が紛れているケースを検出する Python スクリプトの例です。
Cloud Run Jobs 上で定期実行することで、こうしたデータの混在を早期に検知し、Cloud Logging への出力を通じてモニタリング体制に組み込むことが可能です。
from google.cloud import bigquery, logging as cloud_logging
import logging
def run_store_mismatch_check():
try:
bq_client = bigquery.Client()
log_client = cloud_logging.Client()
logger = log_client.logger("store-location-mismatch-log")
query = """
SELECT
sj.store_code,
sj.prefecture,
sj.transaction_date,
sj.sales_amount
FROM `project_id.dataset.sales_journal` AS sj
WHERE sj.prefecture = '神奈川県'
"""
results = bq_client.query(query).result()
mismatched_rows = []
for row in results:
mismatched_rows.append([
row["store_code"],
row["transaction_date"],
row["sales_amount"]
])
if mismatched_rows:
log_lines = ["[店舗データ不整合検知] 東京都の売上に神奈川県の店舗が含まれています:"]
log_lines.append("店舗コード | 取引日 | 売上金額")
log_lines.append("---------------------------------------")
for r in mismatched_rows:
log_lines.append(f"{r[0]} | {r[1]} | {r[2]}")
logger.log_text("\n".join(log_lines))
else:
logger.log_text("不整合な店舗データは検出されませんでした。")
except Exception as e:
logging.error(f"処理中にエラーが発生しました: {e}")
まとめ
この構成により、Cloud Run Jobs を活用することで、インフラの管理を必要としないサーバーレスな形で BigQuery の定期的なチェック処理を実行することが可能となります。
処理結果や異常検知の情報は Cloud Logging に自動的に出力されるため、システムの状態をログベースで可視化し、トラブルの早期発見につなげることができます。
さらに、Cloud Monitoring と組み合わせることで、特定のエラーメッセージやしきい値をトリガーとして、Slack やメールなどの外部ツールに自動で通知を送ることも可能です。
これにより、運用チームの迅速な対応が可能になり、手動での監視負担を軽減しつつ、安定したシステム運用の実現が期待できます。