評価する(1:PFIの実装、学習15からの続き)

評価

競馬予測モデルの特徴量重要度を、レース単位シャッフル+NDCG@3 で可視化する PFI 実装

今回は、前回「学習と評価で利用(15:評価ユーティリティセット)」の関数も利用して、レース予測モデルに対して Permutation Feature Importance (PFI) を計算する処理です。PFI とは、特徴量をシャッフルしてモデルの性能低下を測ることで、その特徴量がどれだけ重要かを評価する手法です。
ここでは NDCG@3(上位3頭のランキング精度指標)を用いて、各特徴量の寄与度を算出しています。

# 必要ファイルの読み込み(final_feature_names_seq / keep_for_seq / embedding_cols)
with open("scale_stats.json", "r", encoding="utf-8") as f:
    ss = json.load(f)
keep_for_seq = np.array(ss["keep_for_seq"], dtype=np.int64)
TAIL = int(ss["TAIL"])

with open("embedding_info.json", "r", encoding="utf-8") as f:
    embedding_info = json.load(f)
embedding_cols_all = list(embedding_info["embedding_cols"])
embedding_cols_pfi = [c for c in embedding_cols_all if c != "馬ID" and c in X_cat_val]

try:
    with open("final_feature_names_seq.json", "r", encoding="utf-8") as f:
        feature_names_seq = json.load(f)
    print("✅ final_feature_names_seq 保存済み")
except FileNotFoundError:
    with open("main_feature_order.json", "r", encoding="utf-8") as f:
        feature_columns = json.load(f)
    numeric_cols_final = [feature_columns[i] for i in keep_for_seq]
    numeric_cols_final = [c for c in numeric_cols_final if c not in set(embedding_cols_all)]
    extra_cols = ["similarity_max", "history_len_norm_seq", "pad_flag"]
    feature_names_seq = numeric_cols_final + extra_cols
    with open("final_feature_names_seq.json", "w", encoding="utf-8") as f:
        json.dump(feature_names_seq, f, ensure_ascii=False, indent=2)
    print("✅ final_feature_names_seq 保存済み")

extra_cols = ["similarity_max", "history_len_norm_seq", "pad_flag"]
assert feature_names_seq[-3:] == extra_cols, "final_feature_names_seq の末尾3列が想定と違います"
numeric_cols_final = feature_names_seq[:-3]
num_feat_to_idx = {name: i for i, name in enumerate(numeric_cols_final)}  # 0..D_main-1

race_to_indices = defaultdict(list)
for i, rid in enumerate(race_ids_val):
    race_to_indices[int(rid)].append(i)
race_to_indices = {k: np.asarray(v, dtype=np.int64) for k, v in race_to_indices.items()}

Xv_base = X_val_seq 
Xv_mut  = X_val_seq.copy() 
Xc_mut  = {k: v.copy() for k, v in X_cat_val.items()} 

def make_loader_from_mutables():
    ds = HorseDataset(
        X_num=Xv_mut,
        X_cat=Xc_mut,
        race_ids=race_ids_val,
        horse_ids=horse_ids_val,
        y=y_val,
        history_lengths=history_lengths_val
    )
    return DataLoader(ds, batch_size=256, shuffle=False, collate_fn=grouped_race_collate_fn, num_workers=0)

def eval_ndcg3_with_mutables(model, loader):
    use_cuda = torch.cuda.is_available()
    amp_dtype = torch.bfloat16 if use_cuda else torch.float32
    autocast_ctx = (torch.amp.autocast(device_type="cuda", dtype=amp_dtype)
                    if use_cuda else contextlib.nullcontext())
    return run_validation(model, loader, device, class_weights=None, autocast_ctx=autocast_ctx)

loader = make_loader_from_mutables()
baseline_ndcg = eval_ndcg3_with_mutables(model, loader)
print(f"\n📊 Baseline NDCG@3: {baseline_ndcg:.4f}", flush=True)

n_repeats = 5
rng = np.random.default_rng(42)

loader = make_loader_from_mutables()

T_num = int(Xv_mut.shape[1])
race_perm_by_rep_num = []
for rep in range(n_repeats):
    race_perm = {}
    for rid, idxs in race_to_indices.items():
        L = idxs.size
        if L < 2:
            continue
        order = rng.random((L, T_num)).argsort(axis=0)
        race_perm[rid] = (idxs, order)
    race_perm_by_rep_num.append(race_perm)

num_results = []
total_num = len(num_feat_to_idx)
print(f"\n🚀 Numeric PFI: {total_num} features × {n_repeats} repeats", flush=True)

arange_T_num = np.arange(T_num) 

for kk, (feat_name, j) in enumerate(num_feat_to_idx.items(), start=1):
    t0 = time.time()
    scores = []

    base_col = Xv_base[:, :, j]
    mut_col  = Xv_mut[:, :, j]

    for rep in range(n_repeats):
        mut_col[:] = base_col

        for rid, (idxs, order) in race_perm_by_rep_num[rep].items():
            mut_col[idxs, :] = base_col[idxs, :][order, arange_T_num]

        score = eval_ndcg3_with_mutables(model, loader)
        scores.append(score)

    scores = np.asarray(scores, dtype=np.float32)
    drop = baseline_ndcg - scores
    mean_drop, std_drop = float(drop.mean()), float(drop.std())
    num_results.append((feat_name, mean_drop, std_drop))

    dt = time.time() - t0
    print(f"✅ done: {feat_name}  ΔNDCG@3 = {mean_drop:+.4f} ± {std_drop:.4f}  ({dt:.1f}s)", flush=True)

cat_results = []
total_cat = len(embedding_cols_pfi)
print(f"\n🚀 Categorical PFI: {total_cat} features × {n_repeats} repeats", flush=True)

def ensure_perm_cache_for_T(T_needed, cache_dict):
    key = f"T={T_needed}"
    if key in cache_dict:
        return cache_dict[key]
    race_perm_by_rep = []
    for rep in range(n_repeats):
        race_perm = {}
        for rid, idxs in race_to_indices.items():
            L = idxs.size
            if L < 2:
                continue
            order = rng.random((L, T_needed)).argsort(axis=0)
            race_perm[rid] = (idxs, order)
        race_perm_by_rep.append(race_perm)
    cache_dict[key] = race_perm_by_rep
    return race_perm_by_rep

_perm_cache = {f"T={T_num}": race_perm_by_rep_num}

for kk, col in enumerate(embedding_cols_pfi, start=1):
    if col not in Xc_mut:
        continue
    t0 = time.time()
    scores = []

    T_cat = int(Xc_mut[col].shape[1])
    race_perm_by_rep_cat = ensure_perm_cache_for_T(T_cat, _perm_cache)
    arange_T_cat = np.arange(T_cat)

    base_mat = X_cat_val[col]
    mut_mat  = Xc_mut[col]

    for rep in range(n_repeats):
        mut_mat[:, :] = base_mat
        for rid, (idxs, order) in race_perm_by_rep_cat[rep].items():
            mut_mat[idxs, :] = base_mat[idxs, :][order, arange_T_cat]
        score = eval_ndcg3_with_mutables(model, loader)
        scores.append(score)
    scores = np.asarray(scores, dtype=np.float32)
    drop = baseline_ndcg - scores
    mean_drop, std_drop = float(drop.mean()), float(drop.std())
    cat_results.append((col, mean_drop, std_drop))

    dt = time.time() - t0
    print(f"✅ done: {col}  ΔNDCG@3 = {mean_drop:+.4f} ± {std_drop:.4f}  ({dt:.1f}s)", flush=True)

num_results.sort(key=lambda x: (-x[1], x[2]))
cat_results.sort(key=lambda x: (-x[1], x[2]))

print("\n📋 数値特徴量の影響(NDCG@3 低下量: 平均±SD):", flush=True)
for name, m, s in num_results:
    arrow = "⬆️" if m > 0 else "⬇️"
    print(f"  {name:20s} drop = {m:+.4f} ± {s:.4f} {arrow}", flush=True)

print("\n📋 カテゴリ特徴量の影響(NDCG@3 低下量: 平均±SD):", flush=True)
for name, m, s in cat_results:
    arrow = "⬆️" if m > 0 else "⬇️"
    print(f"  {name:20s} drop = {m:+.4f} ± {s:.4f} {arrow}", flush=True)

print("\n✅ PFI (NDCG@3 × 5回平均, レース内S軸シャッフル, 馬ID除外) 完了", flush=True)

1. 必要なファイルの読み込み

with open("scale_stats.json", "r", encoding="utf-8") as f:
    ss = json.load(f)
keep_for_seq = np.array(ss["keep_for_seq"], dtype=np.int64)
TAIL = int(ss["TAIL"])

まずは特徴量やスケーリング情報を格納した JSON ファイルを読み込みます。

  • keep_for_seq: モデルに残す数値特徴量のインデックス
  • TAIL: 時系列的に利用する末尾の履歴長さ

次に、埋め込み特徴量(カテゴリ変数用)を定義した JSON も読み込みます。


2. 特徴量名の準備

try:
    with open("final_feature_names_seq.json", "r", encoding="utf-8") as f:
        feature_names_seq = json.load(f)
    print("✅ final_feature_names_seq 保存済み")
except FileNotFoundError:
    ...

すでに保存済みの最終的な特徴量名リスト (final_feature_names_seq.json) があればそれを利用し、なければ

  • 数値特徴量
  • 追加の補助列(similarity_max, history_len_norm_seq, pad_flag
    を組み合わせて新しく作成します。

ここで numeric_cols_final が「数値特徴量一覧」として定義されます。


3. レースごとのインデックス作成

race_to_indices = defaultdict(list)
for i, rid in enumerate(race_ids_val):
    race_to_indices[int(rid)].append(i)
race_to_indices = {k: np.asarray(v, dtype=np.int64) for k, v in race_to_indices.items()}

レースIDごとにサンプルのインデックスをまとめた辞書を作成します。
後に「同じレース内で特徴量をシャッフルする」処理で利用されます。


4. モデル評価用の DataLoader 準備

def make_loader_from_mutables():
    ds = HorseDataset(
        X_num=Xv_mut,
        X_cat=Xc_mut,
        race_ids=race_ids_val,
        horse_ids=horse_ids_val,
        y=y_val,
        history_lengths=history_lengths_val
    )
    return DataLoader(ds, batch_size=256, shuffle=False, collate_fn=grouped_race_collate_fn, num_workers=0)

数値・カテゴリ特徴量を格納した配列から、PyTorch の DataLoader を生成する関数です。
grouped_race_collate_fn を使うことで、レース単位でバッチが構成される点が重要です。


5. 評価関数(NDCG@3)

def eval_ndcg3_with_mutables(model, loader):
    use_cuda = torch.cuda.is_available()
    amp_dtype = torch.bfloat16 if use_cuda else torch.float32
    autocast_ctx = (torch.amp.autocast(device_type="cuda", dtype=amp_dtype)
                    if use_cuda else contextlib.nullcontext())
    return run_validation(model, loader, device, class_weights=None, autocast_ctx=autocast_ctx)

モデルの評価を行う関数です。
GPU 環境では torch.amp.autocast による半精度計算を利用し、効率的に推論を行います。返り値は NDCG@3 のスコアです。


6. ベースライン性能の測定

loader = make_loader_from_mutables()
baseline_ndcg = eval_ndcg3_with_mutables(model, loader)
print(f"\n📊 Baseline NDCG@3: {baseline_ndcg:.4f}", flush=True)

特徴量をシャッフルしない状態での性能(ベースライン NDCG@3)を算出します。
この値からの低下量が、その特徴量の重要度を意味します。


7. 数値特徴量の PFI

for kk, (feat_name, j) in enumerate(num_feat_to_idx.items(), start=1):
    ...
    for rep in range(n_repeats):
        mut_col[:] = base_col
        for rid, (idxs, order) in race_perm_by_rep_num[rep].items():
            mut_col[idxs, :] = base_col[idxs, :][order, arange_T_num]
        score = eval_ndcg3_with_mutables(model, loader)
        scores.append(score)

各数値特徴量ごとに、レース単位でシャッフルしたデータを作り、その影響を測定します。

  • mean_drop: 平均的な性能低下
  • std_drop: ばらつき

が算出され、重要度として記録されます。


8. カテゴリ特徴量の PFI

for kk, col in enumerate(embedding_cols_pfi, start=1):
    ...
    for rep in range(n_repeats):
        mut_mat[:, :] = base_mat
        for rid, (idxs, order) in race_perm_by_rep_cat[rep].items():
            mut_mat[idxs, :] = base_mat[idxs, :][order, arange_T_cat]
        score = eval_ndcg3_with_mutables(model, loader)
        scores.append(score)

カテゴリ変数(embedding特徴量)についても同様に、レース単位のシャッフルを行い、寄与度を計測します。
数値特徴量と同様に、性能低下を基準に重要度を評価します。


9. 結果の出力

num_results.sort(key=lambda x: (-x[1], x[2]))
cat_results.sort(key=lambda x: (-x[1], x[2]))

数値・カテゴリ特徴量ごとに、性能低下が大きい順にソートして出力します。
これにより「どの特徴量がモデルにとって最も重要だったか」が直感的に分かります。


まとめ

このコード全体は、

  1. 特徴量の準備
  2. レース単位でのシャッフル
  3. PFI による数値・カテゴリ特徴量の寄与度測定
  4. 結果のランキング出力

を行う処理です。特に「レース単位でシャッフルする」工夫により、競馬のデータ構造に即した正しい PFI が計算されています。

コメント

タイトルとURLをコピーしました