状態空間モデルは、観測できない状態を推定し、その推定した状態から観測値を予測するモデルです。観測できない状態の形を指定できるため、季節周期やトレンドを表す状態のモデルを構築でき、それぞれの成分に分解できます。これにより、ブラックボックスになりがちな機械学習の予測モデルではわからない、時系列データの分析が可能となります。
今回の記事では、トレンド、季節周期、自己回帰を含んだ盛々の状態の状態空間モデルを作り、時系列データがどのように分解できるかまでを見ます。時系列データの予測はもちろん行いますが、普通に構築しただけでは精度が低いので、最適化にも挑戦という野心的なところまでやるのです!
最後まで見ていただくと、次の画像のような結果を出せます。時系列データの予測ではそこそこ良い精度に見えますし、トレンド、周期成分、自己回帰の影響度合いもわかります。内部がブラックボックスにならないのが状態空間モデルの良いところですね。今回のコードはこちらの「状態空間モデル.ipynb」です。
状態空間モデルとは
状態空間モデルとはなんぞや的なお話は、前回触れました。前回の記事は、状態空間モデルの式とそこからわかること、トレンド推定モデルの実装という2本立てです。細かいところは前回を参考にしてもらい、ここでは、状態空間モデルの式のおさらいをします。今回は実装ゴリゴリで頭より指動かすのが多めです。
状態空間モデルの式をおさらいすると、以下の形になります。
\begin{eqnarray}
\mathbf {y}_t & = & \mathbf {H}(\mathbf{x}_t) + \mathbf {w}_t \\
\mathbf {x}_t & = & \mathbf {F}(\mathbf{x}_{t-1}) + \mathbf {G}(\mathbf{v}_t)
\end{eqnarray}
- \( \mathbf { y_t } \):時刻 \( t \) の観測値。ベクトルの場合があるが、大抵はスカラー
- \( \mathbf { x_t } \):時刻 \( t \) の状態ベクトル
- \( \mathbf { H } \):状態を観測値に変換する関数。線形、非線形どちらの場合もあり、線形の場合は行列で表し、観測行列と呼ばれる
- \( \mathbf { F } \):状態を次の状態に変換する関数。線形、非線形どちらの場合もあり、線形の場合は行列で表し、状態推移行列と呼ばれる
- \( \mathbf { w_t } \):時刻 \( t \) の観測ノイズ
- \( \mathbf { v_t } \):時刻 \( t \) の状態ノイズ
- \( \mathbf { G } \):時刻 \( t \) の状態ノイズを変換する線形 or 非線形関数
この式の状態 \( \mathbf{x}_t \) の値を推定することがミッションです。しかしながら、状態遷移行列の設定次第で結果が大きく異なります。前回は、勢いのままにぬるっとトレンド推定モデルを構築したため、予測結果は散々でした。今回は、複雑な状態を表せるように、苦手意識が強い人が多い天下の行列様をどんどん使っていきます。
トレンド、季節周期、自己回帰の状態式を確認
前回はトレンドを状態とした状態空間モデルを作りましたが、予測は全然だめだったので、今回は挽回します。そのためにやることは、状態をどーんと追加です。状態遷移行列で、トレンド、周期成分、自己回帰を表します。それでは、最初にそれぞれの数式を見ていきましょう。
\begin{eqnarray}
\mathbf { y_t } & = & \mathbf { tre }_t + \mathbf {w}_t , \quad \mathbf{w}_t \sim \mathbf{ N }(0, \sigma_{w}^2 ) \\
\mathbf {tre}_t & = & \sum_{ i=1 }^k { c_i^{ (k) } \mathbf {tre}_{ t-i } } + \mathbf{v}_{tret}, \quad \mathbf{v}_{tret} \sim \mathbf{N}(0, \sigma_{tre}^2 ) \\
\mathbf {se}_t & = & - \sum_{ i=1 }^{p-1} { \mathbf {se}_{ t-i } } + \mathbf{v}_{set}, \quad \mathbf{v}_{set} \sim \mathbf{N}(0, \sigma_{se}^2 ) \\
\mathbf {ar}_t & = & - \sum_{ i=1 }^{q} { \phi_i \mathbf {ar}_{ t-i } } + \mathbf{v}_{art}, \quad \mathbf{v}_{art} \sim \mathbf{N}(0, \sigma_{ar}^2 )
\end{eqnarray}
上記の式を状態とする状態空間モデルを作ります。ただ事前に、それぞれの状態の次元を設定しておかなければいけません。それぞれの式は、トレンドの \(k\) を2、季節周期の \( p \) を12、自己回帰ARの \(q\) を2という設定です。ただし、ARの \( \phi_i \) の値をどうするかは悩みます。この値をどうするかで結果が大きく変わってしまいからです。できる限り予測精度が高くなる \( \phi \) を設定したいですよね。これが気になる方は引き続きご覧ください。
トレンド、季節周期、自己回帰を状態とする状態空間モデルの構築
今回構築する状態空間モデルの式がわかったので、実践していきましょう。まずは時系列データを取得し、状態空間モデルで扱える形に変えます。
データの取得
url = "https://www.analyticsvidhya.com/wp-content/uploads/2016/02/AirPassengers.csv" stream = requests.get(url).content passengers = pd.read_csv(io.StringIO(stream.decode('utf-8'))) passengers.plot( x='Month', y='#Passengers', figsize=(8, 5), xlabel='' )
取得したのは月ごとの乗客数のデータです。値が段々増加しており、非定常な時系列データだと思われます。なので、このデータを変換することなく予測ができれば、自己回帰より優れたモデルになるでしょう。
このままのデータの形式では状態空間モデルで扱えないので、加工します。
y = pd.Series( passengers['#Passengers'].values, index=pd.to_datetime(passengers['Month'], infer_datetime_format=True) ) y = y.astype(float) n_train = 120 train_data, test_data = y.values[:n_train], y.values[n_train:]
上記のコードの最後の方で、パラメータを推定した状態空間モデルの予測がうまくいくかを確認するために、学習データとテストデータを分けています。
状態の遷移行列の設定
まずは、各状態の次元数を定めています。それぞれの状態の次元数は、観測値:1、トレンド:2、周期性:12、AR:2です。それを元に状態の遷移行列を設定します。この処理がまっとまっている関数が「set_state_space_model_matrixes」です。「set_state_space_model_matrixers」関数内では、トレンド、周期性、ARの遷移行列の設定をそれぞれ別関数にして行っています。
n_dim_obs = 1 n_dim_trend = 2 n_dim_series = 12 n_dim_ar = 2 Q_sigma = 10 # 状態の遷移行列の設定 F, H, Q, n_dim_state = set_state_space_model_matrixes( n_dim_trend=n_dim_trend, n_dim_obs=n_dim_obs, n_dim_series=n_dim_series, n_dim_ar=n_dim_ar, Q_sigma=Q_sigma ) # 状態空間モデルの構築 def set_state_space_model_matrixes( n_dim_trend, n_dim_obs=1, n_dim_series=0, n_dim_ar=0, Q_sigma=10 ): # 状態全体の次元数を求める # 季節周期あるいはARの遷移行列の次元が0ではない場合、それぞれの次元を足したあとに−1 if n_dim_series > 0 or n_dim_ar > 0: n_dim_state = n_dim_trend + n_dim_series + n_dim_ar - 1 else: n_dim_state = n_dim_trend n_dim_Q = (n_dim_trend != 0) + (n_dim_series != 0) + (n_dim_ar != 0) G = np.zeros((n_dim_state, n_dim_Q)) F = np.zeros((n_dim_state, n_dim_state)) H = np.zeros((n_dim_obs, n_dim_state)) Q = np.eye(n_dim_Q) * Q_sigma # トレンドの遷移行列の設定 F, H, G, index_state, index_obj = set_trend_matrixes( F=F, H=H, G=G, n_dim_trend=n_dim_trend ) # 季節周期の遷移行列の設定 F, H, G, index_state, index_obj = set_series_matrixes( F=F, H=H, G=G, n_dim_series=n_dim_series, index_state=index_state, index_obj=index_obj ) # ARの遷移行列の設定 F, H, G, index_state, index_obj = set_ar_matrixes( F=F, H=H, G=G, n_dim_ar=n_dim_ar, index_state=index_state, index_obj=index_obj ) Q = G.dot(Q).dot(G.T) return F, H, Q, n_dim_state
状態空間モデルにおける状態の遷移行列の設定の全体感は、上のコードのとおりです。まだそれぞれの遷移行列の中身の話はしていないですが、理解するのにそこまで難しくはなさそうですね。
それでは、遷移行列を設定する関数の中身を見ていきます。
トレンドの遷移行列の設定
設定するトレンドの次元数に応じて、遷移行列の作り方を分けます。最初に状態と観測の遷移行列のサイズを決めているので、その関数で設定した行列の次の次元を管理するために、「index_state」「index_obj」を用意しています。注目すべきは遷移行列の0行目です。ここで期間 \( t \) に対する係数を決めています。
# トレンドの遷移行列の設定 def set_trend_matrixes(F, H, G, n_dim_trend): G[0, 0] = 1 H[0, 0] = 1 if n_dim_trend == 1: F[0, 0] = 1 elif n_dim_trend == 2: F[0, 0] = 2 F[0, 1] = -1 F[1, 0] = 1 elif n_dim_trend == 3: F[0, 0] = 3 F[0, 1] = -3 F[0, 2] = 1 F[1, 0] = 1 F[2, 1] = 1 index_state, index_obj = n_dim_trend, n_dim_trend return F, H, G, index_state, index_obj
季節周期の遷移行列の設定
季節周期の開始インデックスに−1を代入しています。
# 季節周期の遷移行列の設定 def set_series_matrixes(F, H, G, n_dim_series, index_state, index_obj): if n_dim_series > 0: G[index_state, 1] = 1 H[0, index_obj] = 1 for i in range(n_dim_series - 1): F[index_state, index_state + i] = -1 for i in range(n_dim_series - 2): F[index_state + i + 1, index_state + i] = 1 index_state = index_state + n_dim_series - 1 index_obj = index_obj + n_dim_series - 1 return F, H, G, index_state, index_obj
ARの遷移行列の設定
ARの遷移行列で時刻 \( t \) (index_state行目) は0を代入しています。なので、ARの係数は0です。これだともちろん、ARの状態がないと同然ですね。ただし、ARの係数は何にすればよいかわからりません。これは後ほど、最適化により解決していきます。
# ARの遷移行列の設定 def set_ar_matrixes(F, H, G, n_dim_ar, index_state, index_obj): if n_dim_ar > 0: G[index_state, 2] = 1 H[0, index_obj] = 1 for i in range(n_dim_ar): F[index_state, index_state + i] = 0 for i in range(n_dim_ar - 1): F[index_state + i + 1, index_state + i] = 1 return F, H, G, index_state, index_obj
今回作る状態空間モデルの遷移行列の設定は完成です。この遷移行列をもとに実際に構築していきます。
トレンド、周期性、ARを状態とする状態空間モデルの構築
先程設定した遷移行列とpykalmanを使えば、簡単に状態空間モデルを構築できます。
kf = KalmanFilter( n_dim_obs=n_dim_obs, n_dim_state=n_dim_state, initial_state_mean=np.zeros(n_dim_state), initial_state_covariance=np.ones((n_dim_state, n_dim_state)), transition_matrices=F, observation_matrices=H, observation_covariance=1.0, transition_covariance=Q )
状態空間モデルが完成しました。実に簡単です。モデルが実装できたので、予測結果を見てみましょう。前回はトレンドのみの状態で状態空間モデルを作りましたが、予測結果は散々でした。今回は状態を複雑にしたので、前回より良くなるはずです!
さぁ、結果を見てみましょう。結果を見るコードは前回の記事で説明しているので、さくっといきます。
plot_state_space_model_pred( kf=kf, y=y.values, n_train=n_train, img_file_path=result_dir_path.joinpath('passenger_ar_predict.png') ) # 状態空間モデルの予測をグラフ化 def plot_state_space_model_pred( kf, y, n_train, credible_interval=True, img_file_path=None ): train_data, test_data = y[:n_train], y[n_train:] state_means, state_covs = kf.smooth(train_data) ovsevation_means_predicted = np.dot(state_means, kf.observation_matrices.T) ovsevation_covs_predicted = kf.observation_matrices.dot( np.abs(state_covs) ).transpose(1, 0, 2).dot(kf.observation_matrices.T) lowers, uppers = scipy.stats.norm.interval( 0.95, ovsevation_means_predicted.flatten(), scale=np.sqrt(ovsevation_covs_predicted.flatten()) ) current_state = state_means[-1] current_cov = state_covs[-1] pred_means = np.array([]) tmp_lowers = [] tmp_uppers = [] for i in range(len(test_data)): current_state, current_cov = kf.filter_update( current_state, current_cov, observation=None ) pred_mean = current_state.dot(kf.observation_matrices.T) pred_cov = kf.observation_matrices.dot( np.abs(current_cov) ).dot(kf.observation_matrices.T) pred_means = np.r_[pred_means, pred_mean] lower, upper = scipy.stats.norm.interval( 0.95, pred_mean, scale=np.sqrt(pred_cov) ) tmp_lowers.append(lower) tmp_uppers.append(upper) lowers = np.hstack([lowers, np.array(tmp_lowers).flatten()]) uppers = np.hstack([uppers, np.array(tmp_uppers).flatten()]) plt.figure(figsize=(8, 6)) plt.plot(y, label="observation") plt.plot( np.hstack([ ovsevation_means_predicted.flatten(), pred_means.flatten() ]), '--', label="forecast" ) if credible_interval: plt.fill_between(range(len(y)), uppers, lowers, alpha=0.5, label="credible interval") plt.legend() plt.tight_layout() if img_file_path: plt.savefig(img_file_path)
予測結果はなんともダメダメです。あらぬ方向に下り続けてしまっています。なぜ、このような事になっているのかの原因を特定したいですね。状態空間モデルでは、それぞれ仮定した状態に時系列データを分解して見られるため、どの状態がいけていないのかがわかります。
構築した状態空間モデルの状態を分解してグラフ化
今回構築した状態空間モデルの予測結果も全然だめでした。これを改善するために、状態を分解して見てみます。コードは少し長いですが、一つ一つ見てもらえば同じ処理の繰り返しなので、わかるはずです。状態の推定と1期先予測の繰り返しを、それぞれの状態で分けて行っています。
# 状態空間モデルの分解した状態をグラフ化 def plot_state_space_model_process( kf, y, n_train, n_dim_trend, n_dim_series=0, n_dim_ar=0, img_file_path=None ): train_data, test_data = y[:n_train], y[n_train:] state_means, state_covs = kf.smooth(train_data) # トレンド状態を観測値に変換 index_start = 0 index_end = n_dim_trend observation_means_predicted_trend = np.dot( state_means[:, index_start:index_end], kf.observation_matrices[:, index_start:index_end].T ) index_start = index_end if n_dim_series > 0: # 季節周期の状態を観測値に変換 index_end = index_start + n_dim_series - 1 observation_means_predicted_series = np.dot( state_means[:, index_start:index_end], kf.observation_matrices[:, index_start:index_end].T ) index_start = index_end if n_dim_ar > 0: # ARの状態を観測値に変換 index_end = index_start + n_dim_ar observation_means_predicted_ar = np.dot( state_means[:, index_start:index_end], kf.observation_matrices[:, index_start:index_end].T ) pred_means_trend = [] if n_dim_series > 0: pred_means_series = [] if n_dim_ar > 0: pred_means_ar = [] current_state = state_means[-1] current_cov = state_covs[-1] for i in range(len(test_data)): current_state, current_cov = kf.filter_update( current_state, current_cov, observation=None ) index_start = 0 index_end = n_dim_trend pred_means_trend.append( kf.observation_matrices[:, index_start:index_end].dot(current_state[index_start:index_end]) ) index_start = index_end if n_dim_series > 0: index_end = index_start + n_dim_series - 1 pred_means_series.append( kf.observation_matrices[:, index_start:index_end].dot(current_state[index_start:index_end]) ) index_start = index_end if n_dim_ar > 0: index_end = index_start + n_dim_ar pred_means_ar.append( kf.observation_matrices[:, index_start:index_end].dot(current_state[index_start:index_end]) ) index_start = index_end plt.figure(figsize=(8, 6)) plt.plot(y, label='observation') plt.plot(np.hstack([observation_means_predicted_trend.flatten(), np.array(pred_means_trend).flatten()]), '--', label='trend') if n_dim_series > 0: plt.plot(np.hstack([observation_means_predicted_series.flatten(), np.array(pred_means_series).flatten()]), ':', label='series') if n_dim_ar > 0: plt.plot(np.hstack([observation_means_predicted_ar.flatten(), np.array(pred_means_ar).flatten()]), '+-', label='ar') plt.legend() plt.tight_layout() if img_file_path: plt.savefig(img_file_path)
上のグラフから、トレンドがだだ下がりしていることがわかります。そもそもトレンドは直前の値に影響されるため、下がっているところからずっと下がるのはしょうがないですね。ここで問題なのは、ARモデルが機能していないことでしょう。これは、ARの遷移行列の係数を0としているので当たり前です。なので、ARの遷移行列を改善しましょう。
遷移行列と共分散の最適化
状態を盛々にしたのに予測結果は散々でした。状態遷移行列を見ると、うまくいっていないのはARとトレンドです。ARはそもそも係数を設定していないので、トレンドで無理にカバーしようと失敗したような感じでしょう。なので、ARの係数が良くなるように最適化をします。ついでに、共分散の値も最適化しちゃいます。
とはいっても、肩肘張って取り組むほど難しいわけではないです。最適化する値の範囲を指定して(bounds_ar、bounds_Q)、ARの次元ごとに指定したループ回数分、対数尤度を最大化する値を見つけます。このコードを実行してエラーが出る方はこちらを御覧ください。pykalmanのモジュール内のファイルを修正する必要があります。
n_iter = 5 n_Q = 1 + int(n_dim_series > 0) + int(n_dim_ar > 0) index_ar_start = n_dim_trend + n_dim_series - 1 bounds_ar = ((-0.95, 0.95), ) bounds_Q = ((1e-4, 1e3), ) * (n_Q - 1) + ((1e-4, 5e1), ) kf_copy = copy.deepcopy(kf) for index_ar in range(n_dim_ar): for loop in range(n_iter): minimize( minimize_likelihood_state_space_model, (0.,), args=(kf_copy, train_data, index_ar_start, index_ar_start + index_ar, 'mat'), method='SLSQP', bounds=bounds_ar ) cov_indexes = [index_ar_start, n_dim_trend, 0] for i, index in enumerate(cov_indexes): minimize( minimize_likelihood_state_space_model, (0., ), args=(kf_copy, train_data, index, index, 'cov'), method='SLSQP', bounds=(bounds_Q[len(cov_indexes) - i - 1], ) ) print('AR {} loop [{}/{}], Log Likelifood : {:.2f}'.format( index_ar + 1, loop + 1, n_iter, kf_copy.loglikelihood(train_data) )) opt_kf = copy.deepcopy(kf_copy) # 引数の遷移行列を代入し、LogLikelihoodを返す def minimize_likelihood_state_space_model( value, kf, train_data, index_row, index_col, kind='mat' ): if kind == 'mat': kf.transition_matrices[index_row, index_col] = value elif kind == 'cov': kf.transition_covariance[index_row, index_col] = value kf.smooth(train_data) return -kf.loglikelihood(train_data)
対数尤度の値が大きくなっています(実際は最小化をしているので目的関数値はマイナスにして小さくなっています)。これにより、ARの遷移行列と共分散の値が最適化されました。実際の値を見てみましょう。
opt_kf.transition_matrices[index_ar_start: , index_ar_start:]
np.diag(opt_kf.transition_covariance)[[0, n_dim_trend, index_ar_start]]
当初に指定した値が変わりましたね。ARの遷移行列をみると、1期前の値に強く影響していることがわかります。季節周期の共分散の値が大きいですね。それでは、予測結果を見てみます。
plot_state_space_model_pred(
kf=opt_kf, y=y.values, n_train=n_train,
img_file_path=result_dir_path.joinpath('passenger_state_space_opt_predict.png')
)
トレンドだけを状態としたときの結果と比べると、予測結果がかなり改善しています。予測期間の信頼区間は大きいですが、予測なので妥当でしょう。
それぞれの状態を分けて見てみます。
plot_state_space_model_process(
kf=opt_kf, y=y.values, n_train=n_train,
n_dim_trend=n_dim_trend,
n_dim_series=n_dim_series,
n_dim_ar=n_dim_ar,
img_file_path=result_dir_path.joinpath('passenger_state_space_opt_process.png')
)
ARがベースになっており、一定値のトレンドと定期的に季節周期が加わっているのがわかりますね。更に、季節周期は段々振れ幅大きくなっていることがわかります。これは乗客数のデータであり、乗客数が段々増加している中で季節的な振れ幅が大きくなっており、効率的な経営をするためにはスタッフの増減を管理するシステムの構築が大事そうですね。このように、分析する上で役立つの示唆を得られるのが状態空間モデルの良さでしょう。
参考文献
この本でPythonを使って時系列データの分析をするコードを参考にさせていただきました。今回は自分なりにコードを工夫しながら、テーマを絞って取り上げたので、詳細をより知りたい方はぜひご覧ください。
状態空間モデルを構築するために内部の式展開がどうなっているかを知る上では、こちらの本が参考になりました。