このナレッジでは、Cloud Run Jobs を活用して、BigQuery 上に蓄積された売上ジャーナルデータに対する店舗情報の不整合を自動的に検出し、Cloud Logging を用いて記録・可視化する仕組みについて解説します。
具体的には、「東京都の売上データに神奈川県に属する店舗の情報が含まれている」といった、地域コードや店舗コードの不一致による整合性エラーをどのように検知するかについて、本記事でご説明します。
このスクリプトでは、BigQuery に格納された売上ジャーナルデータを対象に、あらかじめ定義された検証ロジックに基づいてデータの整合性チェックを行います。
具体的には、売上データに紐づく店舗情報を参照し、「東京都」の売上データに「神奈川県」の店舗コードが含まれているような、地域コードと店舗コードの不一致といった異常データの検出を行います。
こうしたチェックにより、データ登録時やシステム連携時に発生する可能性のある整合性エラーを早期に把握することが可能になります。
チェックの結果、不整合なデータや異常値が検出された場合には、その情報をイベントとして Cloud Logging に記録・蓄積します。
Cloud Logging 上に記録されたログは、後続の分析や通知処理(たとえば Cloud Monitoring との連携によるアラート通知)にも活用可能で、運用面での監視強化や品質向上に貢献します。
以下は、売上ジャーナルデータにおいて、東京都の売上記録に神奈川県の店舗が紛れているケースを検出する Python スクリプトの例です。
Cloud Run Jobs 上で定期実行することで、こうしたデータの混在を早期に検知し、Cloud Logging への出力を通じてモニタリング体制に組み込むことが可能です。
from google.cloud import bigquery, logging as cloud_logging
import logging
def run_store_mismatch_check():
    try:
        bq_client = bigquery.Client()
        log_client = cloud_logging.Client()
        logger = log_client.logger("store-location-mismatch-log")
        query = """
        SELECT
          sj.store_code,
          sj.prefecture,
                        sj.transaction_date,
          sj.sales_amount
        FROM `project_id.dataset.sales_journal` AS sj
        WHERE sj.prefecture = '神奈川県'
        """
        results = bq_client.query(query).result()
        mismatched_rows = []
        for row in results:
            mismatched_rows.append([
                row["store_code"],
                row["transaction_date"],
                row["sales_amount"]
            ])
        if mismatched_rows:
            log_lines = ["[店舗データ不整合検知] 東京都の売上に神奈川県の店舗が含まれています:"]
            log_lines.append("店舗コード | 取引日 | 売上金額")
            log_lines.append("---------------------------------------")
            for r in mismatched_rows:
                log_lines.append(f"{r[0]} | {r[1]} | {r[2]}")
            logger.log_text("\n".join(log_lines))
        else:
            logger.log_text("不整合な店舗データは検出されませんでした。")
    except Exception as e:
        logging.error(f"処理中にエラーが発生しました: {e}")
この構成により、Cloud Run Jobs を活用することで、インフラの管理を必要としないサーバーレスな形で BigQuery の定期的なチェック処理を実行することが可能となります。
処理結果や異常検知の情報は Cloud Logging に自動的に出力されるため、システムの状態をログベースで可視化し、トラブルの早期発見につなげることができます。
さらに、Cloud Monitoring と組み合わせることで、特定のエラーメッセージやしきい値をトリガーとして、Slack やメールなどの外部ツールに自動で通知を送ることも可能です。
これにより、運用チームの迅速な対応が可能になり、手動での監視負担を軽減しつつ、安定したシステム運用の実現が期待できます。