お問い合わせ
15 分で読むことができます。

Databricksを使ってみた

執筆者 seri 更新日時 2025年4月30日

Topics: Databricks

目次

はじめに

Databricksとは、データの収集・加工から可視化や機械学習モデルの開発、それらの運用などのタスクを一気通貫で行うことのできるクラウド型プラットフォームです。

Databricksの無料トライアルを利用して、機械学習モデルの開発を行ってみたので、本記事ではその紹介をさせていただきます。

Databricksの概要

Databricksには様々な機能がありますが、大きく分類すると以下の機能になります。

  • データの可視化
  • データの抽出、変換、読み込み (ETL)
  • 機械学習
  • 生成AI

スクリーンショット (3336)-1

本記事では、このうち機械学習の内容を扱います。

本題に入る前に、いくつか準備のための説明をします。

ワークスペースとUnity Catalog

ワークスペースとは、ノートブック(実行可能なコマンドに加えて可視化、テキストを記述できるWebベースのインターフェース)や、ダッシュボード(可視化機能を提供するインターフェース)、データやモデルなどが配置されている環境のことです。

Unity Catalogは、複数のワークスペースに対して、一元化されたアクセス制御、監査、系列、およびデータ検出機能を利用できる環境のことです。

※ 以降「カタログ」はUnity Catalogのことを指しています。

スクリーンショット (3337)

MLflow

MLflowは、機械学習のライフサイクルを管理するためのオープンソースプラットフォームで、主な機能として以下があります。

  • MLflow Tracking

モデルの学習過程のパラメーター、評価指標、および出力ファイルをログに記録することができます。

  • MLflow Projects

再利用可能で再現可能な方法でコードをパッケージ化するための形式

  • MLflow Models

機械学習モデルをパッケージ化するための標準形式で、モデルの管理と異なる環境へのデプロイを行うことができます。

  • MLflow Registry

モデルのバージョン管理を行うことができます。

スクリーンショット (3341)

このMLflowはDatabricksに統合されています。

ランとエクスペリメント

Databricksのランは、モデル1個に対応する単位で、ランを実行したノートブック、作成されたモデル、および学習過程のパラメーターや評価指標、出力ファイルなどの情報が保存されます。

そして、エクスペリメントとは、複数のランを管理するための機能です。これはMLflow Trackingに相当する機能です。

scikit-learnによる機械学習モデル構築

本章では、Databricksでscikit-learnライブラリを使用して機械学習モデルを構築する方法およびそのモデルを使用して推論を行う方法について説明します。

今回扱うデータセットは、白ワインと赤ワインのデータで、11の特徴 (アルコール含有量、酸度、残留糖など) と、1から10までの整数で表される品質指標から構成されています。

今回の例では、この品質指標の数値ではなく、高品質(品質指標が7以上)であるかどうかの2値分類問題を想定してモデルを構築します。

MLflowを使用したモデル開発プロセスの追跡や、Hyperopt を使用したハイパーパラメーターチューニングの自動化についても説明します。

モデル構築

まず、新しいノートブックを作成します。左上の新規ボタンからノートブックを選択します。

スクリーンショット (3299)

ノートブックが起動します。以降で実装を進めます。

スクリーンショット (3301)

モデル構築に入る前に以下の2つの準備が必要です。

デフォルトの設定では、作成されたモデルはワークスペースに保存されますが、これをカタログに保存されるように切り替えるための処理を行います。

----------------------------------------------------------
import mlflow
mlflow.set_registry_uri("databricks-uc")
----------------------------------------------------------
 

モデルおよびテーブル保存先のカタログ名とスキーマ名を設定します。

 --------------------------------------
CATALOG_NAME = "workspace"
SCHEMA_NAME = "default"
---------------------------------------
 

これでモデル構築のための準備が完了しました。以降でモデル構築を行います。

まず、使用するデータを取得し、それをカタログ上に保存します。

----------------------------------------------------------------------------------------------------------------------------------
# Databricksに保持されている白ワインと赤ワインのデータセットを取得
white_wine = spark.read.csv("/databricks-datasets/wine-quality/winequality-white.csv", sep=';', header=True)
red_wine = spark.read.csv("/databricks-datasets/wine-quality/winequality-red.csv", sep=';', header=True)
 
# 取得したデータセットの項目名の半角空白をスペースに置換
for c in white_wine.columns:
    white_wine = white_wine.withColumnRenamed(c, c.replace(" ", "_"))
for c in red_wine.columns:
    red_wine = red_wine.withColumnRenamed(c, c.replace(" ", "_"))
 
# テーブル名を定義
white_wine_table = f"{CATALOG_NAME}.{SCHEMA_NAME}.white_wine"
red_wine_table = f"{CATALOG_NAME}.{SCHEMA_NAME}.red_wine"
 
# 指定したカタログ上にテーブルを保存
spark.sql(f"DROP TABLE IF EXISTS {white_wine_table}")
spark.sql(f"DROP TABLE IF EXISTS {red_wine_table}")
white_wine.write.saveAsTable(f"{CATALOG_NAME}.{SCHEMA_NAME}.white_wine")
red_wine.write.saveAsTable(f"{CATALOG_NAME}.{SCHEMA_NAME}.red_wine")
----------------------------------------------------------------------------------------------------------------------------------
 

カタログの指定した場所に「white_wine」「red_wine」の2つのテーブルが保存されます。

スクリーンショット (3309)

 また、実行したセルの下部から「パフォーマンスを表示」を押下すると、取得したテーブルの型情報やデータ容量を確認することができます。

スクリーンショット (3307)

次に、必要なライブラリをインポートします。

-------------------------------------------------------------------------------------------------
import numpy as np
import pandas as pd
import sklearn.datasets
import sklearn.metrics
import sklearn.model_selection
import sklearn.ensemble
import matplotlib.pyplot as plt
 
# hyperoptはハイパーパラメーターチューニングに使用されるライブラリ
%pip install hyperopt
from hyperopt import fmin, tpe, hp, STATUS_OK
from hyperopt.pyll import scope
-------------------------------------------------------------------------------------------------
 

カタログからデータを読み込み、前処理を行います。

---------------------------------------------------------------------------------------------------------------------------
# 保存したテーブルをPandas DataFrameとしてロード
 white_wine = spark.read.table(f"{CATALOG_NAME}.{SCHEMA_NAME}.white_wine").toPandas()
red_wine = spark.read.table(f"{CATALOG_NAME}.{SCHEMA_NAME}.red_wine").toPandas()
 
# 白ワインか赤ワインかのフラグを追加して2つのテーブルを結合
white_wine['is_red'] = 0.0
red_wine['is_red'] = 1.0
data_df = pd.concat([white_wine, red_wine], axis=0)
 
# 目的変数となる列「data_labels」をワインが高品質であるかどうかの論理値として定義
data_labels = data_df['quality'].astype('int') >= 7
data_df = data_df.drop(['quality'], axis=1)
 
# 学習用データとテスト用データに分割
X_train, X_test, y_train, y_test = sklearn.model_selection.train_test_split(
    data_df,
    data_labels,
    test_size=0.2,
    random_state=1
)
---------------------------------------------------------------------------------------------------------------------------
 

MLflowの自動ログ記録を有効にします。これを有効にすることにより、得られた評価指標や学習時に使用したパラメーターなどのログがエクスペリメントに保存されます。

------------------------
mlflow.autolog()
------------------------
 

今回は勾配ブースティング分類モデルを使って実装を進めます。学習とテスト用データでの推論を行い、ROC曲線とAUCを取得し、それらをワークスペースやエクスペリメントに保存します。

 ---------------------------------------------------------------------------------------------------------------------------
# 名前を指定して新しいランを実行
with mlflow.start_run(run_name='gradient_boost') as run:
 
    model = sklearn.ensemble.GradientBoostingClassifier(random_state=0)
 
    # 学習とテスト用データでの推論およびそのROC曲線とAUCの取得
    model.fit(X_train, y_train)
    predicted_probs = model.predict_proba(X_test)
    roc_auc = sklearn.metrics.roc_auc_score(y_test, predicted_probs[:,1])
    roc_curve = sklearn.metrics.RocCurveDisplay.from_estimator(model, X_test, y_test)
 
    # ROC曲線の画像ファイルをワークスペースに保存
    roc_curve.figure_.savefig("roc_curve.png")
 
    # テストデータのAUCは自動的にエクスペリメントに保存されないため、手動で保存
    mlflow.log_metric("test_auc", roc_auc)
 
    # ROC曲線の画像ファイルをエクスペリメントに保存
    mlflow.log_artifact("roc_curve.png")
    print("Test AUC of: {}".format(roc_auc))
 ---------------------------------------------------------------------------------------------------------------------------
 

実行結果はこのように表示されます。

スクリーンショット (3311)

また、ワークスペースにROC曲線の画像ファイルが保存されていることが確認できます。

スクリーンショット (3310)

実行したセルの下部から「1件のランのジョブ」を押下すると、得られた評価指標や学習時に使用したパラメーターなどのログを確認することができます。手動で保存した「test_auc」や「roc_curve.png」が保存されていることも確認できます。

スクリーンショット (3312)

スクリーンショット (3314)

スクリーンショット (3316)

スクリーンショット (3340)

さらに、複数のランを実行して複数のモデルでの精度を比較することも可能です。ここでは、アダブースト分類モデルを使って同様の処理を実装します。

-------------------------------------------------------------------------------------------
with mlflow.start_run(run_name='ada_boost') as run: 
    model = sklearn.ensemble.AdaBoostClassifier(random_state=0) 
    # 以下同様
-------------------------------------------------------------------------------------------
 

実行したセルの下部から「エクスペリメント」を押下すると、「ada_boost」「gradient_boost」の2つのランが保存されていることが確認できます。

スクリーンショット (3320)

 test_aucで比較すると、gradient_boostの方が精度がよいことがわかります。

スクリーンショット (3321)

次に、モデルの精度を高めるためにハイパーパラメーターチューニングを行います。ここで は、分散ハイパーパラメーターチューニングとモデル選択に使用されるPythonライブラリである、Hyperoptを使用してハイパーパラメーターチューニングを行います。

-------------------------------------------------------------------------------------------------------------
# チューニングするパラメーターを指定
search_space = {
    'n_estimators': scope.int(hp.quniform('n_estimators', 20, 1000, 1)),
    'learning_rate': hp.loguniform('learning_rate', -3, 0),
    'max_depth': scope.int(hp.quniform('max_depth', 2, 5, 1)),
}
 
def train_model(params):
 
    mlflow.autolog()
    with mlflow.start_run(nested=True):
        model_hp = sklearn.ensemble.GradientBoostingClassifier(
            random_state=0,
            **params
        )
        model_hp.fit(X_train, y_train)
        predicted_probs = model_hp.predict_proba(X_test)
 
        # AUCの取得とエクスペリメントへの保存
        roc_auc = sklearn.metrics.roc_auc_score(y_test, predicted_probs[:,1])
        mlflow.log_metric('test_auc', roc_auc)
 
        # fminがAUCを最大化するように、損失を-1*AUCに設定
        return {'status': STATUS_OK, 'loss': -1*roc_auc}
 
# ハイパーパラメータチューニングのため新しいランを実行
with mlflow.start_run(run_name='gb_hyperopt') as run:
    # 最大のAUC(最小の損失)をもつパラメーターを取得
    best_params = fmin(
        fn=train_model,
        space=search_space,
        algo=tpe.suggest,
        max_evals=16
    )
-------------------------------------------------------------------------------------------------------------
-------------------------------------------------------------------------------------------------
# 上で得られたパラメーターが複数ある場合、最新のもののみを再取得
best_run = mlflow.search_runs(
    order_by=['metrics.test_auc DESC', 'start_time DESC'],
    max_results=10,
).iloc[0]
 
# ノードブック上に取得した最良パラメータを出力
print('Best Run')
print('AUC: {}'.format(best_run["metrics.test_auc"]))
print('Num Estimators: {}'.format(best_run["params.n_estimators"]))
print('Max Depth: {}'.format(best_run["params.max_depth"]))
print('Learning Rate: {}'.format(best_run["params.learning_rate"]))
-------------------------------------------------------------------------------------------------
 

最良パラメータが出力されます。

スクリーンショット (3324)

最後に、先ほど取得した最良パラメータによるモデルをカタログに保存します。

---------------------------------------------------------------------------------------------------------------------------------
model_uri = 'runs:/{run_id}/model'.format(
    run_id=best_run.run_id
)
 
# モデルをロード
model = mlflow.sklearn.load_model(model_uri)
 
# モデルをカタログに保存
mlflow.register_model(model_uri, f"{CATALOG_NAME}.{SCHEMA_NAME}.wine_quality_model_best")
---------------------------------------------------------------------------------------------------------------------------------
 

カタログにモデルが保存されていることが確認できます。

スクリーンショット (3325)

スクリーンショット (3328)

ここで「run_id」とはランに割り当てられたIDで、以下の手順で確認できます。

カタログから「wine_quality_model_best」を選択→バージョンから最新のバージョンを押下→ソースのラン→ランIDを確認

スクリーンショット (3331)-1

参考ページ

 

 

 

 

seri

執筆者 seri