馬ごとの時系列シーケンス化(スライディングウィンドウ)+特徴量拡張の前処理パイプライン
「馬ごとの時系列シーケンス化(スライディングウィンドウ)+特徴量拡張の前処理パイプライン」 です。
もう少し噛み砕くと:
- 並び替え → 馬IDごとの時系列整列
- as_strided → スライディングウィンドウでシーケンス生成(ベクトル化)
- 類似度・履歴長・使用率 → 特徴量拡張
- 連結 → 学習用データセット構築
print("✅ 馬ごとの時系列シーケンス化(スライディングウィンドウ)+特徴量拡張の前処理パイプライン")
# 並び替え & インデックス振り直し
data = data.sort_values(by=["馬ID", "日付"]).reset_index(drop=True)
# 類似度計算
def compute_all_max_similarities(values: np.ndarray, sequence_length: int, device=None) -> np.ndarray:
if device is None:
device = "cuda" if torch.cuda.is_available() else "cpu"
values_tensor = torch.tensor(values, dtype=torch.float32, device=device)
normed_values = F.normalize(values_tensor, p=2, dim=1)
sims_matrix = normed_values @ normed_values.T
num_rows = values.shape[0]
num_seqs = num_rows - sequence_length + 1
max_similarities = torch.zeros(num_seqs, device=device)
for i in range(num_seqs):
target_idx = i + sequence_length - 1
max_similarities[i] = 0.0 if i == 0 else sims_matrix[target_idx, i:target_idx].max()
return max_similarities.cpu().numpy()
@lru_cache(maxsize=None)
def cached_similarity(values_bytes: bytes, shape_tuple: tuple, sequence_length: int):
if shape_tuple[0] < sequence_length:
raise ValueError(f"shape too short: {shape_tuple} < sequence_length={sequence_length}")
values = np.frombuffer(values_bytes, dtype=np.float32).reshape(shape_tuple)
return compute_all_max_similarities(values, sequence_length)
exclude_columns = ["着順", "日付", "レースID", "馬ID"]
feature_columns = [c for c in data.columns if c not in exclude_columns]
assert list(data.drop(columns=exclude_columns).columns) == feature_columns, \
"data.drop(...).columns と feature_columns の順序が一致していません"
with open("main_feature_order.json", "w", encoding="utf-8") as f:
json.dump(feature_columns, f, ensure_ascii=False, indent=2)
print("✅ main_feature_order 保存済み")
embedding_indices = {col: feature_columns.index(col) for col in embedding_cols}
numeric_name_mask = np.array([col not in set(embedding_cols) for col in feature_columns], dtype=bool)
keep_numeric_idx = np.where(numeric_name_mask)[0]
missing = sorted(set(embedding_cols) - set(feature_columns))
assert not missing, f"embedding_colsにあるのにfeature_columnsに無い: {missing}"
leaks = sorted(set(feature_columns[i] for i in keep_numeric_idx) & set(embedding_cols))
assert not leaks, f"数値側にEmbedding列が混入: {leaks}"
emb_idx = np.array([embedding_indices[c] for c in embedding_cols], dtype=np.int64)
comp_idx = np.setdiff1d(np.arange(len(feature_columns)), keep_numeric_idx)
assert np.array_equal(np.sort(comp_idx), np.sort(emb_idx)), \
f"補集合≠embedding: extra={sorted(set(feature_columns[i] for i in comp_idx) - set(embedding_cols))}, " \
f"missing={sorted(set(embedding_cols) - set(feature_columns[i] for i in comp_idx))}"
values = data.drop(columns=exclude_columns).to_numpy(dtype=np.float32, copy=True)
y_all = data["着順"].to_numpy()
r_all = data["レースID"].to_numpy()
h_all = data["馬ID"].to_numpy()
hist_all= data["履歴長_norm"].to_numpy(dtype=np.float32)
d_all = data["日付"].to_numpy(dtype="datetime64[ns]")
ids = h_all
starts = np.r_[0, np.flatnonzero(ids[1:] != ids[:-1]) + 1]
ends = np.r_[starts[1:], len(ids)]
S = sequence_length
X_list, y_list, race_list, horse_list, hist_list, used_len_list, sim_list = [], [], [], [], [], [], []
date_list = []
for s, e in zip(starts, ends):
v = values[s:e]
v_num = v[:, keep_numeric_idx]
y = y_all[s:e]
r = r_all[s:e]
h = h_all[s:e]
n, D = v.shape
try:
ms = cached_similarity(v_num.tobytes(), v_num.shape, S)
except Exception as ex:
ms = np.zeros(max(1, n - S + 1), dtype=np.float32)
if n >= S:
shape = (n - S + 1, S, D)
strides = (v.strides[0], v.strides[0], v.strides[1])
win = as_strided(v, shape=shape, strides=strides)
idx_end = np.arange(S - 1, n)
X_list.append(win.copy())
y_seq = np.minimum(y[idx_end], 6) # 1..5 以外は 6
y_list.append(y_seq - 1)
race_list.append(r[idx_end])
horse_list.append(h[idx_end])
hist_list.append(hist_all[s:e][idx_end])
used_len_list.append(np.full(win.shape[0], S, dtype=np.float32))
sim_list.append(ms.astype(np.float32))
date_list.append(d_all[s:e][idx_end])
else:
pad = np.repeat(v[-1:], S - n, axis=0)
seq = np.vstack([v, pad])[None, ...]
X_list.append(seq.astype(np.float32))
y_last = y[-1]
y_list.append(np.array([min(y_last, 6) - 1], dtype=np.int64))
race_list.append(np.array([r[-1]], dtype=r.dtype))
horse_list.append(np.array([h[-1]], dtype=h.dtype))
hist_list.append(np.array([hist_all[e-1]], dtype=np.float32))
used_len_list.append(np.array([float(n)], dtype=np.float32))
sim_list.append(np.array([ms[0] if len(ms) else 0.0], dtype=np.float32))
date_list.append(np.array([d_all[e-1]], dtype="datetime64[ns]"))
X_sequences = np.concatenate(X_list, axis=0)
y_targets = np.concatenate(y_list, axis=0).astype(np.int64)
race_ids = np.concatenate(race_list, axis=0)
horse_ids = np.concatenate(horse_list, axis=0)
history_lengths = np.concatenate(hist_list, axis=0).astype(np.float32)
used_lens = np.concatenate(used_len_list, axis=0).astype(np.float32)
similarity_scores= np.concatenate(sim_list, axis=0).astype(np.float32)
sequence_end_dates = np.concatenate(date_list, axis=0).astype("datetime64[ns]")
used_ratio = used_lens / float(S)
sim_tiled = np.repeat(similarity_scores[:, None], S, axis=1)[..., None]
hist_tiled = np.repeat(history_lengths[:, None], S, axis=1)[..., None]
used_tiled = np.repeat(used_ratio[:, None], S, axis=1)[..., None]
X_sequences = np.concatenate([X_sequences, sim_tiled, hist_tiled, used_tiled], axis=2)
この処理では、各馬のレース履歴を 固定長の時系列データ(シーケンス) に変換し、ニューラルネットワークに入力できる形に整えています。主な流れは以下の通りです。
1. 馬ごとの時系列整備
data = data.sort_values(by=["馬ID", "日付"]).reset_index(drop=True)
馬ID
と日付
でソートし、同じ馬のレース履歴が時系列順に並ぶようにします。reset_index(drop=True)
で新しいインデックスを振り直し。
2. 類似度計算関数
def compute_all_max_similarities(values, sequence_length, device=None): ...
- 入力は数値特徴行列
values
。 F.normalize
で行ベクトルを正規化 → コサイン類似度行列を計算。- 各シーケンスの「直前の履歴」と「過去との最大類似度」を抽出。
@lru_cache
によるcached_similarity
で 同じ入力に対して再計算を省略。
→ 高速化とメモリ効率改善。
3. 特徴量カラム管理
exclude_columns = ["着順", "日付", "レースID", "馬ID"]
feature_columns = [c for c in data.columns if c not in exclude_columns]
- 学習に使わない列(目的変数やID系)を除外し、特徴量カラムの正しい順序を定義。
json.dump
でmain_feature_order.json
として保存し、推論時にも同じ並びで使えるよう保証。
さらに、
embedding_indices
:カテゴリ列(埋め込み用)のインデックスを保持。numeric_name_mask
/keep_numeric_idx
:数値列のインデックスを抽出。assert
:- 埋め込み列が欠けていないか(
missing
)、 - 数値列に誤ってカテゴリが混ざっていないか(
leaks
)、
をチェック。 → データリークや型崩れを防ぐ安全装置。
- 埋め込み列が欠けていないか(
4. numpy 配列化
values = data.drop(columns=exclude_columns).to_numpy(dtype=np.float32, copy=True)
y_all = data["着順"].to_numpy()
r_all = data["レースID"].to_numpy()
h_all = data["馬ID"].to_numpy()
values
: 入力特徴量(float32 行列)y_all
: 目的変数(着順)r_all
: レースIDh_all
: 馬ID- これらを numpy 配列化することで、後続処理を高速化。
5. 馬ごとのシーケンス分割
starts = np.r_[0, np.flatnonzero(ids[1:] != ids[:-1]) + 1]
ends = np.r_[starts[1:], len(ids)]
- 各馬の履歴の開始位置
starts
と終了位置ends
を計算。 - これにより「馬ごとに独立した時系列」を扱えるようになる。
6. スライディングウィンドウ展開
shape = (n - S + 1, S, D)
strides = (v.strides[0], v.strides[0], v.strides[1])
win = as_strided(v, shape=shape, strides=strides)
as_strided
により、長さ S の窓を1つずつずらして時系列シーケンスを生成。- 例:S=3 なら
[レース1,2,3]
→[2,3,4]
→[3,4,5]
… と切り出し。 strides
を指定しているため、メモリコピーせず効率的に窓を展開。
7. 教師データと補助特徴
y_seq = np.minimum(y[idx_end], 6)
→ 着順を 1〜5位はそのまま、6位以下は6として丸め。hist_all
→ 履歴長の正規化値。sim_list
→ 類似度スコア。- 各シーケンスの最終レース時点の 着順・レースID・馬ID・履歴情報 を揃えて格納。
不足データ(履歴が短い馬)は 最後の行を複製してパディング。
8. 出力配列の統合と拡張
X_sequences = np.concatenate(X_list, axis=0)
y_targets = np.concatenate(y_list, axis=0).astype(np.int64)
...
sim_tiled = np.repeat(similarity_scores[:, None], S, axis=1)[..., None]
- 各馬のシーケンスを縦に結合 → 学習可能な3次元テンソル
X_sequences (N, S, D)
を作成。 - 併せてターゲット(着順)、馬ID、レースID、履歴長なども配列化。
- 類似度スコア・履歴長・有効長は
np.repeat
で シーケンス長 S に拡張し、入力特徴として結合。
✨ まとめ
この前処理パイプラインは、
- 馬ごとの履歴を時系列シーケンス化(スライディングウィンドウ)
- 着順を教師ラベル化(1〜5位はそのまま、6位以下はまとめて6位扱い)
- 類似度スコア・履歴長・使用割合を追加特徴量として拡張
- 欠損や列順の検証・保存で再現性を担保
といった機能を備えています。結果として、深層学習モデルにそのまま投入できる形の時系列特徴量データセットが完成します。
@lru_cacheについて
Python の標準ライブラリ functools
が提供する 関数結果のキャッシュ機能 を利用するためのデコレータです。
「LRU」とは Least Recently Used(最近最も使われていないもの) の略で、通常は古いキャッシュから順に破棄していく仕組みを指します。
しかし @lru_cache(maxsize=None)
と書いた場合、キャッシュの上限を設けずに結果を保存し続ける 設定になります。つまり、一度呼び出した引数に対する戻り値はすべて記録され、再度同じ引数で関数を呼び出すと、関数を実行せずにキャッシュ済みの結果が返されます。
メリット
- 高速化: 計算コストが高い関数(再帰処理や複雑な計算など)を繰り返し呼び出す際、結果をキャッシュしておくことで処理時間を大幅に短縮できます。
- コードの簡潔化: 明示的にキャッシュ用の辞書を用意する必要がなく、デコレータをつけるだけでキャッシュ機構を導入できます。
注意点
- メモリを使い続ける
maxsize=None
にするとキャッシュが無制限に増えていくため、入力パターンが膨大な場合はメモリを圧迫する可能性があります。
→ 一般的には@lru_cache(maxsize=128)
など適切な上限を設定するのがおすすめです。 - 引数はハッシュ可能な型のみ
キャッシュのキーに使うため、リストや辞書のような「変更可能なオブジェクト」は引数にできません。
コメント