競馬予測モデルの特徴量重要度を、レース単位シャッフル+NDCG@3 で可視化する PFI 実装
今回は、前回「学習と評価で利用(15:評価ユーティリティセット)」の関数も利用して、レース予測モデルに対して Permutation Feature Importance (PFI) を計算する処理です。PFI とは、特徴量をシャッフルしてモデルの性能低下を測ることで、その特徴量がどれだけ重要かを評価する手法です。
ここでは NDCG@3(上位3頭のランキング精度指標)を用いて、各特徴量の寄与度を算出しています。
# 必要ファイルの読み込み(final_feature_names_seq / keep_for_seq / embedding_cols)
with open("scale_stats.json", "r", encoding="utf-8") as f:
ss = json.load(f)
keep_for_seq = np.array(ss["keep_for_seq"], dtype=np.int64)
TAIL = int(ss["TAIL"])
with open("embedding_info.json", "r", encoding="utf-8") as f:
embedding_info = json.load(f)
embedding_cols_all = list(embedding_info["embedding_cols"])
embedding_cols_pfi = [c for c in embedding_cols_all if c != "馬ID" and c in X_cat_val]
try:
with open("final_feature_names_seq.json", "r", encoding="utf-8") as f:
feature_names_seq = json.load(f)
print("✅ final_feature_names_seq 保存済み")
except FileNotFoundError:
with open("main_feature_order.json", "r", encoding="utf-8") as f:
feature_columns = json.load(f)
numeric_cols_final = [feature_columns[i] for i in keep_for_seq]
numeric_cols_final = [c for c in numeric_cols_final if c not in set(embedding_cols_all)]
extra_cols = ["similarity_max", "history_len_norm_seq", "pad_flag"]
feature_names_seq = numeric_cols_final + extra_cols
with open("final_feature_names_seq.json", "w", encoding="utf-8") as f:
json.dump(feature_names_seq, f, ensure_ascii=False, indent=2)
print("✅ final_feature_names_seq 保存済み")
extra_cols = ["similarity_max", "history_len_norm_seq", "pad_flag"]
assert feature_names_seq[-3:] == extra_cols, "final_feature_names_seq の末尾3列が想定と違います"
numeric_cols_final = feature_names_seq[:-3]
num_feat_to_idx = {name: i for i, name in enumerate(numeric_cols_final)} # 0..D_main-1
race_to_indices = defaultdict(list)
for i, rid in enumerate(race_ids_val):
race_to_indices[int(rid)].append(i)
race_to_indices = {k: np.asarray(v, dtype=np.int64) for k, v in race_to_indices.items()}
Xv_base = X_val_seq
Xv_mut = X_val_seq.copy()
Xc_mut = {k: v.copy() for k, v in X_cat_val.items()}
def make_loader_from_mutables():
ds = HorseDataset(
X_num=Xv_mut,
X_cat=Xc_mut,
race_ids=race_ids_val,
horse_ids=horse_ids_val,
y=y_val,
history_lengths=history_lengths_val
)
return DataLoader(ds, batch_size=256, shuffle=False, collate_fn=grouped_race_collate_fn, num_workers=0)
def eval_ndcg3_with_mutables(model, loader):
use_cuda = torch.cuda.is_available()
amp_dtype = torch.bfloat16 if use_cuda else torch.float32
autocast_ctx = (torch.amp.autocast(device_type="cuda", dtype=amp_dtype)
if use_cuda else contextlib.nullcontext())
return run_validation(model, loader, device, class_weights=None, autocast_ctx=autocast_ctx)
loader = make_loader_from_mutables()
baseline_ndcg = eval_ndcg3_with_mutables(model, loader)
print(f"\n📊 Baseline NDCG@3: {baseline_ndcg:.4f}", flush=True)
n_repeats = 5
rng = np.random.default_rng(42)
loader = make_loader_from_mutables()
T_num = int(Xv_mut.shape[1])
race_perm_by_rep_num = []
for rep in range(n_repeats):
race_perm = {}
for rid, idxs in race_to_indices.items():
L = idxs.size
if L < 2:
continue
order = rng.random((L, T_num)).argsort(axis=0)
race_perm[rid] = (idxs, order)
race_perm_by_rep_num.append(race_perm)
num_results = []
total_num = len(num_feat_to_idx)
print(f"\n🚀 Numeric PFI: {total_num} features × {n_repeats} repeats", flush=True)
arange_T_num = np.arange(T_num)
for kk, (feat_name, j) in enumerate(num_feat_to_idx.items(), start=1):
t0 = time.time()
scores = []
base_col = Xv_base[:, :, j]
mut_col = Xv_mut[:, :, j]
for rep in range(n_repeats):
mut_col[:] = base_col
for rid, (idxs, order) in race_perm_by_rep_num[rep].items():
mut_col[idxs, :] = base_col[idxs, :][order, arange_T_num]
score = eval_ndcg3_with_mutables(model, loader)
scores.append(score)
scores = np.asarray(scores, dtype=np.float32)
drop = baseline_ndcg - scores
mean_drop, std_drop = float(drop.mean()), float(drop.std())
num_results.append((feat_name, mean_drop, std_drop))
dt = time.time() - t0
print(f"✅ done: {feat_name} ΔNDCG@3 = {mean_drop:+.4f} ± {std_drop:.4f} ({dt:.1f}s)", flush=True)
cat_results = []
total_cat = len(embedding_cols_pfi)
print(f"\n🚀 Categorical PFI: {total_cat} features × {n_repeats} repeats", flush=True)
def ensure_perm_cache_for_T(T_needed, cache_dict):
key = f"T={T_needed}"
if key in cache_dict:
return cache_dict[key]
race_perm_by_rep = []
for rep in range(n_repeats):
race_perm = {}
for rid, idxs in race_to_indices.items():
L = idxs.size
if L < 2:
continue
order = rng.random((L, T_needed)).argsort(axis=0)
race_perm[rid] = (idxs, order)
race_perm_by_rep.append(race_perm)
cache_dict[key] = race_perm_by_rep
return race_perm_by_rep
_perm_cache = {f"T={T_num}": race_perm_by_rep_num}
for kk, col in enumerate(embedding_cols_pfi, start=1):
if col not in Xc_mut:
continue
t0 = time.time()
scores = []
T_cat = int(Xc_mut[col].shape[1])
race_perm_by_rep_cat = ensure_perm_cache_for_T(T_cat, _perm_cache)
arange_T_cat = np.arange(T_cat)
base_mat = X_cat_val[col]
mut_mat = Xc_mut[col]
for rep in range(n_repeats):
mut_mat[:, :] = base_mat
for rid, (idxs, order) in race_perm_by_rep_cat[rep].items():
mut_mat[idxs, :] = base_mat[idxs, :][order, arange_T_cat]
score = eval_ndcg3_with_mutables(model, loader)
scores.append(score)
scores = np.asarray(scores, dtype=np.float32)
drop = baseline_ndcg - scores
mean_drop, std_drop = float(drop.mean()), float(drop.std())
cat_results.append((col, mean_drop, std_drop))
dt = time.time() - t0
print(f"✅ done: {col} ΔNDCG@3 = {mean_drop:+.4f} ± {std_drop:.4f} ({dt:.1f}s)", flush=True)
num_results.sort(key=lambda x: (-x[1], x[2]))
cat_results.sort(key=lambda x: (-x[1], x[2]))
print("\n📋 数値特徴量の影響(NDCG@3 低下量: 平均±SD):", flush=True)
for name, m, s in num_results:
arrow = "⬆️" if m > 0 else "⬇️"
print(f" {name:20s} drop = {m:+.4f} ± {s:.4f} {arrow}", flush=True)
print("\n📋 カテゴリ特徴量の影響(NDCG@3 低下量: 平均±SD):", flush=True)
for name, m, s in cat_results:
arrow = "⬆️" if m > 0 else "⬇️"
print(f" {name:20s} drop = {m:+.4f} ± {s:.4f} {arrow}", flush=True)
print("\n✅ PFI (NDCG@3 × 5回平均, レース内S軸シャッフル, 馬ID除外) 完了", flush=True)
1. 必要なファイルの読み込み
with open("scale_stats.json", "r", encoding="utf-8") as f:
ss = json.load(f)
keep_for_seq = np.array(ss["keep_for_seq"], dtype=np.int64)
TAIL = int(ss["TAIL"])
まずは特徴量やスケーリング情報を格納した JSON ファイルを読み込みます。
keep_for_seq
: モデルに残す数値特徴量のインデックスTAIL
: 時系列的に利用する末尾の履歴長さ
次に、埋め込み特徴量(カテゴリ変数用)を定義した JSON も読み込みます。
2. 特徴量名の準備
try:
with open("final_feature_names_seq.json", "r", encoding="utf-8") as f:
feature_names_seq = json.load(f)
print("✅ final_feature_names_seq 保存済み")
except FileNotFoundError:
...
すでに保存済みの最終的な特徴量名リスト (final_feature_names_seq.json
) があればそれを利用し、なければ
- 数値特徴量
- 追加の補助列(
similarity_max
,history_len_norm_seq
,pad_flag
)
を組み合わせて新しく作成します。
ここで numeric_cols_final
が「数値特徴量一覧」として定義されます。
3. レースごとのインデックス作成
race_to_indices = defaultdict(list)
for i, rid in enumerate(race_ids_val):
race_to_indices[int(rid)].append(i)
race_to_indices = {k: np.asarray(v, dtype=np.int64) for k, v in race_to_indices.items()}
レースIDごとにサンプルのインデックスをまとめた辞書を作成します。
後に「同じレース内で特徴量をシャッフルする」処理で利用されます。
4. モデル評価用の DataLoader 準備
def make_loader_from_mutables():
ds = HorseDataset(
X_num=Xv_mut,
X_cat=Xc_mut,
race_ids=race_ids_val,
horse_ids=horse_ids_val,
y=y_val,
history_lengths=history_lengths_val
)
return DataLoader(ds, batch_size=256, shuffle=False, collate_fn=grouped_race_collate_fn, num_workers=0)
数値・カテゴリ特徴量を格納した配列から、PyTorch の DataLoader
を生成する関数です。grouped_race_collate_fn
を使うことで、レース単位でバッチが構成される点が重要です。
5. 評価関数(NDCG@3)
def eval_ndcg3_with_mutables(model, loader):
use_cuda = torch.cuda.is_available()
amp_dtype = torch.bfloat16 if use_cuda else torch.float32
autocast_ctx = (torch.amp.autocast(device_type="cuda", dtype=amp_dtype)
if use_cuda else contextlib.nullcontext())
return run_validation(model, loader, device, class_weights=None, autocast_ctx=autocast_ctx)
モデルの評価を行う関数です。
GPU 環境では torch.amp.autocast
による半精度計算を利用し、効率的に推論を行います。返り値は NDCG@3 のスコアです。
6. ベースライン性能の測定
loader = make_loader_from_mutables()
baseline_ndcg = eval_ndcg3_with_mutables(model, loader)
print(f"\n📊 Baseline NDCG@3: {baseline_ndcg:.4f}", flush=True)
特徴量をシャッフルしない状態での性能(ベースライン NDCG@3)を算出します。
この値からの低下量が、その特徴量の重要度を意味します。
7. 数値特徴量の PFI
for kk, (feat_name, j) in enumerate(num_feat_to_idx.items(), start=1):
...
for rep in range(n_repeats):
mut_col[:] = base_col
for rid, (idxs, order) in race_perm_by_rep_num[rep].items():
mut_col[idxs, :] = base_col[idxs, :][order, arange_T_num]
score = eval_ndcg3_with_mutables(model, loader)
scores.append(score)
各数値特徴量ごとに、レース単位でシャッフルしたデータを作り、その影響を測定します。
mean_drop
: 平均的な性能低下std_drop
: ばらつき
が算出され、重要度として記録されます。
8. カテゴリ特徴量の PFI
for kk, col in enumerate(embedding_cols_pfi, start=1):
...
for rep in range(n_repeats):
mut_mat[:, :] = base_mat
for rid, (idxs, order) in race_perm_by_rep_cat[rep].items():
mut_mat[idxs, :] = base_mat[idxs, :][order, arange_T_cat]
score = eval_ndcg3_with_mutables(model, loader)
scores.append(score)
カテゴリ変数(embedding特徴量)についても同様に、レース単位のシャッフルを行い、寄与度を計測します。
数値特徴量と同様に、性能低下を基準に重要度を評価します。
9. 結果の出力
num_results.sort(key=lambda x: (-x[1], x[2]))
cat_results.sort(key=lambda x: (-x[1], x[2]))
数値・カテゴリ特徴量ごとに、性能低下が大きい順にソートして出力します。
これにより「どの特徴量がモデルにとって最も重要だったか」が直感的に分かります。
まとめ
このコード全体は、
- 特徴量の準備
- レース単位でのシャッフル
- PFI による数値・カテゴリ特徴量の寄与度測定
- 結果のランキング出力
を行う処理です。特に「レース単位でシャッフルする」工夫により、競馬のデータ構造に即した正しい PFI が計算されています。
コメント