学習データを 時系列順にレース単位で分割
このコードは、レース単位+時系列順でデータを「訓練/検証/テスト」に分割する処理です。データリークを防ぐ設計がポイントなので、その観点も含めて解説します。
print("✅ データ分割(リーク撲滅: レース単位 + 時系列)")
seq_df = pd.DataFrame({
"idx": np.arange(len(X_sequences)),
"race_id": race_ids,
"end_date": sequence_end_dates
})
race_date = seq_df.groupby("race_id", sort=False)["end_date"].max()
race_order = race_date.sort_values()
# 2) 後ろの一塊をテスト、次の塊をバリデーション、残りをトレインに
unique_races_sorted = race_order.index.to_numpy()
n_r = unique_races_sorted.size
test_frac = 0.10 # 元の test_size=0.1 に相当
val_frac = 0.20 # train_val をさらに 20% を val に相当(お好みで調整)
n_test = max(1, int(round(n_r * test_frac)))
n_val = max(1, int(round((n_r - n_test) * val_frac)))
test_races = unique_races_sorted[-n_test:]
val_races = unique_races_sorted[-(n_test + n_val):-n_test] if n_r > (n_test + 0) else np.array([], dtype=unique_races_sorted.dtype)
train_races = unique_races_sorted[:-(n_test + n_val)] if n_r > (n_test + n_val) else np.array([], dtype=unique_races_sorted.dtype)
def mask_from_races(rids, pick):
pick = set(pick.tolist() if isinstance(pick, np.ndarray) else pick)
return np.array([rid in pick for rid in rids], dtype=bool)
train_mask = mask_from_races(race_ids, train_races)
val_mask = mask_from_races(race_ids, val_races)
test_mask = mask_from_races(race_ids, test_races)
assert not (train_mask & val_mask).any()
assert not (train_mask & test_mask).any()
assert not (val_mask & test_mask).any()
def split(arr):
return arr[train_mask], arr[val_mask], arr[test_mask]
X_train, X_val, X_test = split(X_sequences)
y_train, y_val, y_test = split(y_targets)
race_ids_train, race_ids_val, race_ids_test = split(race_ids)
horse_ids_train, horse_ids_val, horse_ids_test = split(horse_ids)
similarity_train, similarity_val, similarity_test = split(similarity_scores)
history_lengths_train, history_lengths_val, history_lengths_test = split(history_lengths)
sequence_end_dates_train, sequence_end_dates_val, sequence_end_dates_test = split(sequence_end_dates)
この処理は、学習データを 時系列順にレース単位で分割し、データリーク(未来情報が訓練に混ざること)を防ぎます。流れは以下のとおりです。
1. レース終端日の整理
seq_df = pd.DataFrame({
"idx": np.arange(len(X_sequences)),
"race_id": race_ids,
"end_date": sequence_end_dates
})
race_date = seq_df.groupby("race_id", sort=False)["end_date"].max()
race_order = race_date.sort_values()
- 各シーケンスごとに レースIDと終了日 を DataFrame 化。
groupby("race_id").max()
により 各レースの最終日(開催日) を抽出。sort_values()
でレース開催日順に並べ、時系列の順序付けを確立。
2. データ分割サイズの決定
test_frac = 0.10
val_frac = 0.20
n_test = max(1, int(round(n_r * test_frac)))
n_val = max(1, int(round((n_r - n_test) * val_frac)))
- 全レース数
n_r
をもとに、- 後ろ10%をテストセット
- その直前20%をバリデーションセット
- 残りをトレーニングセット
max(1, …)
により、最低でも1レースは割り当てる安全策。
3. レース単位で区切る
test_races = unique_races_sorted[-n_test:]
val_races = unique_races_sorted[-(n_test + n_val):-n_test]
train_races= unique_races_sorted[:-(n_test + n_val)]
- ユニークなレースID を時系列順に並べた配列を切り分け、
test_races
= 一番後ろ(未来)val_races
= その直前train_races
= それより前(過去)
という形で分割。
4. レースIDに基づくマスク生成
def mask_from_races(rids, pick):
return np.array([rid in set(pick) for rid in rids], dtype=bool)
train_mask = mask_from_races(race_ids, train_races)
val_mask = mask_from_races(race_ids, val_races)
test_mask = mask_from_races(race_ids, test_races)
- 各サンプル(シーケンス)が属するレースIDを見て、
- 訓練/検証/テストのどこに属するか をブール値のマスクで表現。
assert
で セット間の重複が無いことを確認(データリーク防止)。
5. 配列を安全に分割
def split(arr):
return arr[train_mask], arr[val_mask], arr[test_mask]
X_train, X_val, X_test = split(X_sequences)
y_train, y_val, y_test = split(y_targets)
...
split
関数で、任意の配列を同じマスクで分割。- 特徴量・目的変数・レースID・馬ID・履歴長・類似度スコア・日付 など、すべて揃えて分割。
- これにより「学習/検証/テスト」でデータの一貫性が保たれる。
✨ まとめ
この処理の特徴は、
- レース単位で分割(同じレース内の馬が異なるセットに分かれない)
- 時系列順に分割(未来情報が訓練に混ざらない → データリーク撲滅)
- 柔軟な割合指定(10%テスト、20%バリデーションなど)
- 全配列を同期分割(入力特徴・ターゲット・メタ情報がズレない)
という点です。結果として、現実的な予測タスク(未来のレースを未知として扱う)に忠実な分割が実現できます。
コメント