Source code for lds.inference


import math
import numpy as np


[docs] class OnlineKalmanFilter: """ Class implementing the online Kalman filter algorithm for the following linear dynamical system: .. math:: x_n &= B\ x_{n-1} + w_n,\ \\textrm{where}\ w_n\sim\mathcal{N}(w_n|0, Q)\, \\textrm{and}\ x_n\in\Re^M y_n &= Z\ x_{n-1} + v_n,\ \\textrm{where}\ v_n\sim\mathcal{N}(v_n|0, R)\, \\textrm{and}\ y_n\in\Re^N x_0&\in\mathcal{N}(x_0|m_0, V_0) Example use: .. code-block:: python online_kf = OnlineKalmanFilter(B, Q, m0, V0, Z, R) x_pred, P_pred = online_kf.predict() for y in ys: x_filt, P_filt = online_kf.update(y) x_pred, P_pred = online_kf.predict() A script using `OnlineKalmanFilter` for tracking the position of a mouse can be found `here <https://github.com/joacorapela/lds_python/blob/master/code/scripts/doOnlineFilterFWGMouseTrajectory.py>`_ Note 1: invocation so `predict()` and `update(y)` should alternate. That is, each invocation to `update(y)` should be preceded by an invocation to `predict()`, and each invocation to `predict()` (except the first one) should be preceded by an invoation to `update(y)`. Note 2: observations :math:`y_n` should be sampled uniformly. """ def __init__(self, B, Q, m0, V0, Z, R): self._B = B self._Q = Q self._m0 = m0 self._V0 = V0 self._Z = Z self._R = R self._x = m0 self._P = V0 M = len(m0) self.I = np.eye(M)
[docs] def predict(self): """Predicts the next state. :return: (state, covariance): tuple containing the predicted state vector and covariance matrix. """ self.x = self.B @ self.x self.P = self.B @ self.P @ self.B.T + self.Q return self.x, self.P
[docs] def update(self, y): """Updates the current state and covariance. :param y: observation :math:`\in\Re^M` :return: (state, covariance): tuple containing the updated state vector and covariance matrix. """ if y.ndim == 1: y = np.expand_dims(y, axis=1) if not np.isnan(y).any(): pred_obs = self.Z @ self.x residual = y - pred_obs Stmp = self.Z @ self.P @ self.Z.T + self.R S = (Stmp + Stmp.T) / 2 Sinv = np.linalg.inv(S) K = self.P @ self.Z.T @ Sinv self.x = self.x + K @ residual self.P = (self.I - K @ self.Z) @ self.P return self.x, self.P
@property def B(self): return self._B @B.setter def B(self, B): self._B = B @property def Q(self): return self._Q @Q.setter def Q(self, Q): self._Q = Q @property def m0(self): return self._m0 @m0.setter def m0(self, m0): self._m0 = m0 @property def V0(self): return self._V0 @V0.setter def V0(self, V0): self._V0 = V0 @property def Z(self): return self._Z @Z.setter def Z(self, Z): self._Z = Z @property def R(self): return self._R @R.setter def R(self, R): self._R = R @property def x(self): return self._x @x.setter def x(self, x): self._x = x @property def P(self): return self._P @P.setter def P(self, P): self._P = P
[docs] class TimeVaryingOnlineKalmanFilter: """ Class implementing the time-varying and online Kalman filter algorithm for the following linear dynamical system: .. math:: x_n &= B_n\ x_{n-1} + w_n,\ \\textrm{where}\ w_n\sim\mathcal{N}(w_n|0, Q_n)\, \\textrm{and}\ x_n\in\Re^M y_n &= Z_n\ x_{n-1} + v_n,\ \\textrm{where}\ v_n\sim\mathcal{N}(v_n|0, R_n)\, \\textrm{and}\ y_n\in\Re^N x_0&\in\mathcal{N}(x_0|m_0, V_0) Example use: .. code-block:: python online_kf = OnlineKalmanFilter(m0, V0) x_pred, P_pred = online_kf.predict() for y in ys: x_filt, P_filt = online_kf.update(y) x_pred, P_pred = online_kf.predict() A script using `OnlineKalmanFilter` for tracking the position of a mouse can be found `here <https://github.com/joacorapela/lds_python/blob/master/code/scripts/doOnlineFilterFWGMouseTrajectory.py>`_ Note 1: invocation so `predict()` and `update(y)` should alternate. That is, each invocation to `update(y)` should be preceded by an invocation to `predict()`, and each invocation to `predict()` (except the first one) should be preceded by an invoation to `update(y)`. Note 2: observations :math:`y_n` should be sampled uniformly. """
[docs] def predict(self, x, P, B, Q): """Predicts the next state. :return: (state, covariance): tuple containing the predicted state vector and covariance matrix. """ x = B @ x P = B @ P @ B.T + Q return x, P
[docs] def update(self, y, x, P, Z, R): """Updates the current state and covariance. :param y: observation :math:`\in\Re^M` :return: (state, covariance): tuple containing the updated state vector and covariance matrix. """ if y.ndim == 1: y = np.expand_dims(y, axis=1) if not np.isnan(y).any(): M = len(x) I = np.eye(M) pred_obs = Z @ x residual = y - pred_obs Stmp = Z @ P @ Z.T + R S = (Stmp + Stmp.T) / 2 Sinv = np.linalg.inv(S) K = P @ Z.T @ Sinv x = x + K @ residual P = (I - K @ Z) @ P return x, P
[docs] def filterLDS_SS_withMissingValues_torch(y, B, Q, m0, V0, Z, R): """ Kalman filter implementation of the algorithm described in Shumway and Stoffer 2006. :param: y: time series to be smoothed :type: y: numpy array (NxT) :param: B: state transition matrix :type: B: numpy matrix (MxM) :param: Q: state noise covariance matrix :type: Q: numpy matrix (MxM) :param: m0: initial state mean :type: m0: one-dimensional numpy array (M) :param: V0: initial state covariance :type: V0: numpy matrix (MxM) :param: Z: state to observation matrix :type: Z: numpy matrix (NxM) :param: R: observations covariance matrix :type: R: numpy matrix (NxN) :return: {xnn1, Vnn1, xnn, Vnn, innov, K, Sn, logLike}: xnn1 and Vnn1 (predicted means, MxT, and covariances, MxMxT), xnn and Vnn (filtered means, MxT, and covariances, MxMxT), innov (innovations, NxT), K (Kalman gain matrices, MxNxT), Sn (innovations covariance matrices, NxNxT), logLike (data loglikelihood, float). :rtype: dictionary """ import torch if torch.any(torch.isnan(y[:, 0])) or torch.any(torch.isnan(y[:, -1])): raise ValueError("The first or last observation cannot contain nan") if m0.ndim != 1: raise ValueError("mean must be 1 dimensional") # N: number of observations # M: dim state space # P: dim observations M = B.shape[0] N = y.shape[1] P = y.shape[0] xnn1 = torch.empty(size=[M, 1], dtype=torch.double) xnn1_h = torch.empty(size=[M, 1, N], dtype=torch.double) Vnn1 = torch.empty(size=[M, M], dtype=torch.double) Vnn1_h = torch.empty(size=[M, M, N], dtype=torch.double) xnn = torch.empty(size=[M, 1], dtype=torch.double) xnn_h = torch.empty(size=[M, 1, N], dtype=torch.double) Vnn = torch.empty(size=[M, M], dtype=torch.double) Vnn_h = torch.empty(size=[M, M, N], dtype=torch.double) innov = torch.empty(size=[P, 1], dtype=torch.double) innov_h = torch.empty(size=[P, 1, N], dtype=torch.double) Sn = torch.empty(size=[P, P], dtype=torch.double) Sn_h = torch.empty(size=[P, P, N], dtype=torch.double) # k==0 xnn1 = B @ m0 Vnn1 = B @ V0 @ B.T + Q Stmp = Z @ Vnn1 @ Z.T + R Sn = (Stmp + torch.transpose(Stmp, 0, 1)) / 2 Sinv = torch.linalg.inv(Sn) K = Vnn1 @ Z.T @ Sinv innov = y[:, 0] - (Z @ xnn1).squeeze() xnn = xnn1 + K @ innov Vnn = Vnn1 - K @ Z @ Vnn1 logLike = -N*P*math.log(2*math.pi) - torch.logdet(Sn) - \ innov.T @ Sinv @ innov if torch.isnan(logLike): raise RuntimeError("obtained nan log likelihood") xnn1_h[:, :, 0] = torch.unsqueeze(xnn1, 1) Vnn1_h[:, :, 0] = Vnn1 xnn_h[:, :, 0] = torch.unsqueeze(xnn, 1) Vnn_h[:, :, 0] = Vnn innov_h[:, :, 0] = torch.unsqueeze(innov, 1) Sn_h[:, :, 0] = Sn # k>1 for k in range(1, N): xnn1 = B @ xnn Vnn1 = B @ Vnn @ B.T + Q if(torch.any(torch.isnan(y[:, k]))): xnn = xnn1 Vnn = Vnn1 else: Stmp = Z @ Vnn1 @ Z.T + R Sn = (Stmp + Stmp.T)/2 Sinv = torch.linalg.inv(Sn) K = Vnn1 @ Z.T @ Sinv innov = y[:, k] - (Z @ xnn1).squeeze() xnn = xnn1 + K @ innov Vnn = Vnn1 - K @ Z @ Vnn1 logLike = logLike-torch.logdet(Sn) -\ innov.T @ Sinv @ innov if torch.isnan(logLike): raise ValueError("obtained nan log likelihood") xnn1_h[:, :, k] = torch.unsqueeze(xnn1, 1) Vnn1_h[:, :, k] = Vnn1 xnn_h[:, :, k] = torch.unsqueeze(xnn, 1) Vnn_h[:, :, k] = Vnn innov_h[:, :, k] = torch.unsqueeze(innov, 1) Sn_h[:, :, k] = Sn logLike = 0.5 * logLike answer = {"xnn1": xnn1_h, "Vnn1": Vnn1_h, "xnn": xnn_h, "Vnn": Vnn_h, "innov": innov_h, "KN": K, "Sn": Sn_h, "logLike": logLike} return answer
[docs] def logLikeLDS_SS_withMissingValues_torch(y, B, Q, m0, V0, Z, R): """ Kalman filter implementation of the algorithm described in Shumway and Stoffer 2006. :param: y: time series to be smoothed :type: y: numpy array (NxT) :param: B: state transition matrix :type: B: numpy matrix (MxM) :param: Q: state noise covariance matrix :type: Q: numpy matrix (MxM) :param: m0: initial state mean :type: m0: one-dimensional numpy array (M) :param: V0: initial state covariance :type: V0: numpy matrix (MxM) :param: Z: state to observation matrix :type: Z: numpy matrix (NxM) :param: R: observations covariance matrix :type: R: numpy matrix (NxN) :return: logLike (data loglikelihood, float). :rtype: dictionary """ import torch if torch.any(torch.isnan(y[:, 0])) or torch.any(torch.isnan(y[:, -1])): raise ValueError("The first or last observation cannot contain nan") if m0.ndim != 1: raise ValueError("mean must be 1 dimensional") # N: number of observations # M: dim state space # P: dim observations M = B.shape[0] N = y.shape[1] P = y.shape[0] xnn1 = torch.empty(size=[M, 1], dtype=torch.double) Vnn1 = torch.empty(size=[M, M], dtype=torch.double) xnn = torch.empty(size=[M, 1], dtype=torch.double) Vnn = torch.empty(size=[M, M], dtype=torch.double) innov = torch.empty(size=[P, 1], dtype=torch.double) Sn = torch.empty(size=[P, P], dtype=torch.double) # k==0 xnn1 = B @ m0 Vnn1 = B @ V0 @ B.T + Q Stmp = Z @ Vnn1 @ Z.T + R Sn = (Stmp + torch.transpose(Stmp, 0, 1)) / 2 Sinv = torch.linalg.inv(Sn) K = Vnn1 @ Z.T @ Sinv innov = y[:, 0] - (Z @ xnn1).squeeze() xnn = xnn1 + K @ innov Vnn = Vnn1 - K @ Z @ Vnn1 logLike = -N*P*math.log(2*math.pi) - torch.logdet(Sn) - \ innov.T @ Sinv @ innov if torch.isnan(logLike): raise RuntimeError("obtained nan log likelihood") # k>1 for k in range(1, N): xnn1 = B @ xnn Vnn1 = B @ Vnn @ B.T + Q if(torch.any(torch.isnan(y[:, k]))): xnn = xnn1 Vnn = Vnn1 else: Stmp = Z @ Vnn1 @ Z.T + R Sn = (Stmp + Stmp.T)/2 Sinv = torch.linalg.inv(Sn) K = Vnn1 @ Z.T @ Sinv innov = y[:, k] - (Z @ xnn1).squeeze() xnn = xnn1 + K @ innov Vnn = Vnn1 - K @ Z @ Vnn1 logLike = logLike-torch.logdet(Sn) -\ innov.T @ Sinv @ innov if torch.isnan(logLike): raise ValueError("obtained nan log likelihood") logLike = 0.5 * logLike return logLike
[docs] def filterLDS_SS_withMissingValues_np(y, B, Q, m0, V0, Z, R): """ Kalman filter implementation of the algorithm described in Shumway and Stoffer 2006. :param: y: time series to be smoothed :type: y: numpy array (NxT) :param: B: state transition matrix :type: B: numpy matrix (MxM) :param: Q: state noise covariance matrix :type: Q: numpy matrix (MxM) :param: m0: initial state mean :type: m0: one-dimensional numpy array (M) :param: V0: initial state covariance :type: V0: numpy matrix (MxM) :param: Z: state to observation matrix :type: Z: numpy matrix (NxM) :param: R: observations covariance matrix :type: R: numpy matrix (NxN) :return: {xnn1, Vnn1, xnn, Vnn, innov, K, Sn, logLike}: xnn1 and Vnn1 (predicted means, MxT, and covariances, MxMxT), xnn and Vnn (filtered means, MxT, and covariances, MxMxT), innov (innovations, NxT), K (Kalman gain matrices, MxNxT), Sn (innovations covariance matrices, NxNxT), logLike (data loglikelihood, float). :rtype: dictionary """ if m0.ndim != 1: raise ValueError("mean must be 1 dimensional") # N: number of observations # M: dim state space # P: dim observations M = B.shape[0] N = y.shape[1] P = y.shape[0] xnn1 = np.empty(shape=[M, 1, N]) Vnn1 = np.empty(shape=[M, M, N]) xnn = np.empty(shape=[M, 1, N]) Vnn = np.empty(shape=[M, M, N]) innov = np.empty(shape=[P, 1, N]) Sn = np.empty(shape=[P, P, N]) # k==0 xnn1[:, 0, 0] = B @ m0 Vnn1[:, :, 0] = B @ V0 @ B.T + Q Stmp = Z @ Vnn1[:, :, 0] @ Z.T + R Sn[:, :, 0] = (Stmp + Stmp.T) / 2 Sinv = np.linalg.inv(Sn[:, :, 0]) K = Vnn1[:, :, 0] @ Z.T @ Sinv innov[:, 0, 0] = y[:, 0] - (Z @ xnn1[:, :, 0]).squeeze() xnn[:, :, 0] = xnn1[:, :, 0] + K @ innov[:, :, 0] Vnn[:, :, 0] = Vnn1[:, :, 0] - K @ Z @ Vnn1[:, :, 0] logLike = -N*P*np.log(2*np.pi) - np.linalg.slogdet(Sn[:, :, 0])[1] - \ innov[:, :, 0].T @ Sinv @ innov[:, :, 0] # k>1 for k in range(1, N): xnn1[:, :, k] = B @ xnn[:, :, k-1] Vnn1[:, :, k] = B @ Vnn[:, :, k-1] @ B.T + Q if(np.any(np.isnan(y[:, k]))): xnn[:, :, k] = xnn1[:, :, k] Vnn[:, :, k] = Vnn1[:, :, k] else: Stmp = Z @ Vnn1[:, :, k] @ Z.T + R Sn[:, :, k] = (Stmp + Stmp.T)/2 Sinv = np.linalg.inv(Sn[:, :, k]) K = Vnn1[:, :, k] @ Z.T @ Sinv innov[:, 0, k] = y[:, k] - (Z @ xnn1[:, :, k]).squeeze() xnn[:, :, k] = xnn1[:, :, k] + K @ innov[:, :, k] Vnn[:, :, k] = Vnn1[:, :, k] - K @ Z @ Vnn1[:, :, k] logLike = logLike-np.linalg.slogdet(Sn[:, :, k])[1] -\ innov[:, :, k].T @ Sinv @ innov[:, :, k] logLike = 0.5 * logLike answer = {"xnn1": xnn1, "Vnn1": Vnn1, "xnn": xnn, "Vnn": Vnn, "innov": innov, "KN": K, "Sn": Sn, "logLike": logLike} return answer
[docs] def smoothLDS_SS(B, xnn, Vnn, xnn1, Vnn1, m0, V0): """ Kalman smoother implementation :param: B: state transition matrix :type: B: numpy matrix (MxM) :param: xnn: filtered means (from Kalman filter) :type: xnn: numpy array (MxT) :param: Vnn: filtered covariances (from Kalman filter) :type: Vnn: numpy array (MxMXT) :param: xnn1: predicted means (from Kalman filter) :type: xnn1: numpy array (MxT) :param: Vnn1: predicted covariances (from Kalman filter) :type: Vnn1: numpy array (MxMXT) :param: m0: initial state mean :type: m0: one-dimensional numpy array (M) :param: V0: initial state covariance :type: V0: numpy matrix (MxM) :return: {xnN, VnN, Jn, x0N, V0N, J0}: xnn1 and Vnn1 (smoothed means, MxT, and covariances, MxMxT), Jn (smoothing gain matrix, MxMxT), x0N and V0N (smoothed initial state mean, M, and covariance, MxM), J0 (initial smoothing gain matrix, MxN). """ if m0.ndim != 1: raise ValueError("mean must be 1 dimensional") N = xnn.shape[2] M = B.shape[0] xnN = np.empty(shape=[M, 1, N]) VnN = np.empty(shape=[M, M, N]) Jn = np.empty(shape=[M, M, N]) xnN[:, :, -1] = xnn[:, :, -1] VnN[:, :, -1] = Vnn[:, :, -1] for n in reversed(range(1, N)): Jn[:, :, n-1] = Vnn[:, :, n-1] @ B.T @ np.linalg.inv(Vnn1[:, :, n]) xnN[:, :, n-1] = xnn[:, :, n-1] + \ Jn[:, :, n-1] @ (xnN[:, :, n]-xnn1[:, :, n]) VnN[:, :, n-1] = Vnn[:, :, n-1] + \ Jn[:, :, n-1] @ (VnN[:, :, n]-Vnn1[:, :, n]) @ Jn[:, :, n-1].T # initial state x00 and V00 # return the smooth estimates of the state at time 0: x0N and V0N J0 = V0 @ B.T @ np.linalg.inv(Vnn1[:, :, 0]) x0N = np.expand_dims(m0, 1) + J0 @ (xnN[:, :, 0] - xnn1[:, :, 0]) V0N = V0 + J0 @ (VnN[:, :, 0] - Vnn1[:, :, 0]) @ J0.T answer = {"xnN": xnN, "VnN": VnN, "Jn": Jn, "x0N": x0N, "V0N": V0N, "J0": J0} return answer