Source code for lds.learning


import math
import time
import numpy as np
import scipy.optimize
import warnings
import copy

from . import inference
from .tracking import utils

iteration = 0


[docs] def scipy_optimize_SS_tracking_fullV0(y, B, sigma_a0, Qe, Z, diag_R_0, m0_0, V0_0, max_iter, disp=True): iL_V0 = np.tril_indices(V0_0.shape[0]) def get_coefs_from_params(sigma_a, diag_R, m0, V0, iL_V0=iL_V0): V0_chol = np.linalg.cholesky(V0) L_coefs = V0_chol[iL_V0] x = np.insert(np.concatenate([diag_R, m0, L_coefs]), 0, sigma_a) return x def get_params_from_coefs(x, iL_V0=iL_V0, sigma_a0=sigma_a0, diag_R_0=diag_R_0, m0_0=m0_0, V0_0=V0_0): cur = 0 sigma_a = x[slice(cur, cur+1)] cur += len(sigma_a) diag_R = x[slice(cur, cur+len(diag_R_0))] cur += len(diag_R) m0 = x[slice(cur, cur+len(m0_0))] cur += len(m0) M = V0_0.shape[0] L_coefs = x[slice(cur, cur+int(M*(M+1)/2))] L_V0 = np.zeros(shape=V0_0.shape) L_V0[iL_V0] = L_coefs V0 = L_V0 @ L_V0.T return sigma_a, diag_R, m0, V0 def optim_criterion(x): sigma_a, diag_R, m0, V0 = get_params_from_coefs(x) R = np.diag(diag_R) kf = inference.filterLDS_SS_withMissingValues(y=y, B=B, Q=sigma_a*Qe, m0=m0, V0=V0, Z=Z, R=R) answer = 0 N = kf["Sn"].shape[2] for n in range(N): innov = kf["innov"][:, :, n] Sn = kf["Sn"][:,:,n] Sn_inv = np.linalg.inv(Sn) answer += np.linalg.slogdet(Sn)[1] answer += innov.T @ Sn_inv @ innov return answer def callback(x): global iteration iteration += 1 sigma_a, diag_R, m0, V0 = get_params_from_coefs(x) optim_value = optim_criterion(x=x) print("Iteration: {:d}".format(iteration)) print("optim criterion: {:f}".format(optim_value.item())) print("sigma_a={:f}".format(sigma_a.item())) print("diag_R:") print(diag_R) print("m0:") print(m0) print("V0:") print(V0) x0 = get_coefs_from_params(sigma_a=sigma_a0, diag_R=diag_R_0, m0=m0_0, V0=V0_0) options={"disp": disp, "maxiter": max_iter} opt_res = scipy.optimize.minimize(optim_criterion, x0, method="Nelder-Mead", callback=callback, options=options)
[docs] def scipy_optimize_SS_tracking_diagV0(y, B, sigma_ax0, sigma_ay0, Qe, Z, sqrt_diag_R_0, m0_0, sqrt_diag_V0_0, max_iter=50, disp=True): def get_coefs_from_params(sigma_ax, sigma_ay, sqrt_diag_R, m0, sqrt_diag_V0): x = np.concatenate([[sigma_ax, sigma_ay], sqrt_diag_R, m0, sqrt_diag_V0]) return x def get_params_from_coefs(x, sigma_ax0=sigma_ax0, sigma_ay0=sigma_ay0, sqrt_diag_R_0=sqrt_diag_R_0, m0_0=m0_0, sqrt_diag_V0_0=sqrt_diag_V0_0): cur = 0 sigma_ax = x[slice(cur, cur+1)] cur += len(sigma_ax) sigma_ay = x[slice(cur, cur+1)] cur += len(sigma_ay) sqrt_diag_R = x[slice(cur, cur+len(sqrt_diag_R_0))] cur += len(sqrt_diag_R) m0 = x[slice(cur, cur+len(m0_0))] cur += len(m0) sqrt_diag_V0 = x[slice(cur, cur+len(sqrt_diag_V0_0))] return sigma_ax, sigma_ay, sqrt_diag_R, m0, sqrt_diag_V0 def optim_criterion(x): sigma_ax, sigma_ay, sqrt_diag_R, m0, sqrt_diag_V0 = \ get_params_from_coefs(x) V0 = np.diag(sqrt_diag_V0**2) R = np.diag(sqrt_diag_R**2) # build Q from Qe, sigma_ax, sigma_ay Q = utils.buildQfromQe_np(Qe=Qe, sigma_ax=sigma_ax, sigma_ay=sigma_ay) kf = inference.filterLDS_SS_withMissingValues_np(y=y, B=B, Q=Q, m0=m0, V0=V0, Z=Z, R=R) answer = 0 N = kf["Sn"].shape[2] for n in range(N): innov = kf["innov"][:, :, n] Sn = kf["Sn"][:, :, n] Sn_inv = np.linalg.inv(Sn) answer += np.linalg.slogdet(Sn)[1] answer += innov.T @ Sn_inv @ innov return answer def callback(x): global iteration iteration += 1 sigma_ax, sigma_ay, sqrt_diag_R, m0, sqrt_diag_V0 = \ get_params_from_coefs(x) optim_value = optim_criterion(x=x) print("Iteration: {:d}".format(iteration)) print("optim criterion: {:f}".format(optim_value.item())) print("sigma_ax={:f}".format(sigma_ax.item())) print("sigma_ay={:f}".format(sigma_ay.item())) print("sqrt_diag_R:") print(sqrt_diag_R) print("m0:") print(m0) print("sqrt_diag_V0:") print(sqrt_diag_V0) x0 = get_coefs_from_params(sigma_ax=sigma_ax0, sigma_ay=sigma_ay0, sqrt_diag_R=sqrt_diag_R_0, m0=m0_0, sqrt_diag_V0=sqrt_diag_V0_0) options = {"disp": disp, "maxiter": max_iter} opt_res = scipy.optimize.minimize(optim_criterion, x0, method="Nelder-Mead", callback=callback, options=options) sigma_ax, sigma_ay, sqrt_diag_R, m0, sqrt_diag_V0 = \ get_params_from_coefs(opt_res["x"]) x = {"sigma_ax": sigma_ax, "sigma_ay": sigma_ay, "sqrt_diag_R": sqrt_diag_R, "m0": m0, "sqrt_diag_V0": sqrt_diag_V0} answer = {"fun": opt_res["fun"], "message": opt_res["message"], "nfev": opt_res["nfev"], "nit": opt_res["nit"], "status": opt_res["status"], "success": opt_res["success"], "x": x} return answer
[docs] def torch_lbfgs_optimize_SS_tracking_diagV0(y, B, sigma_a0, Qe, Z, sqrt_diag_R_0, m0_0, sqrt_diag_V0_0, max_iter=20, lr=1.0, tolerance_grad=1e-7, tolerance_change=1e-9, n_epochs = 100, tol=1e-6, line_search_fn="strong_wolfe", vars_to_estimate={ "sigma_a": True, "R": True, "m0": True, "V0": True}, disp=True): import torch def log_likelihood_fn(): V0 = torch.diag(sqrt_diag_V0**2) R = torch.diag(sqrt_diag_R**2) Q = Qe * sigma_a**2 log_like = inference.logLikeLDS_SS_withMissingValues_torch( y=y, B=B, Q=Q, m0=m0, V0=V0, Z=Z, R=R) return log_like optim_params = {"max_iter": max_iter, "lr": lr, "tolerance_grad": tolerance_grad, "tolerance_change": tolerance_change, "line_search_fn": line_search_fn} sigma_a = torch.tensor([sigma_a0], dtype=torch.double) sqrt_diag_R = sqrt_diag_R_0 m0 = m0_0 sqrt_diag_V0 = sqrt_diag_V0_0 x = [] if vars_to_estimate["sigma_a"]: x.append(sigma_a) if vars_to_estimate["sqrt_diag_R"]: x.append(sqrt_diag_R) if vars_to_estimate["m0"]: x.append(m0) if vars_to_estimate["sqrt_diag_V0"]: x.append(sqrt_diag_V0) if len(x) == 0: raise RuntimeError("No variable to estimate. Please set one element " "of vars_to_estimate to True") optimizer = torch.optim.LBFGS(x, **optim_params) for i in range(len(x)): x[i].requires_grad = True def closure(): optimizer.zero_grad() curEval = -log_likelihood_fn() print(f"in closure, ll={-curEval}") curEval.backward(retain_graph=True) return curEval termination_info = "success: reached maximum number of iterations" log_like = [] elapsed_time = [] start_time = time.time() for epoch in range(n_epochs): prev_x = copy.deepcopy(x) try: curEval = optimizer.step(closure) except RuntimeError: # begin backtracking if vars_to_estimate["sigma_a"]: sigma_a = prev_x.pop(0) sigma_a.requires_grad = False if vars_to_estimate["sqrt_diag_R"]: sqrt_diag_R = prev_x.pop(0) sqrt_diag_R.requires_grad = False if vars_to_estimate["m0"]: m0 = prev_x.pop(0) m0.requires_grad = False if vars_to_estimate["sqrt_diag_V0"]: sqrt_diag_V0 = prev_x.pop(0) sqrt_diag_V0.requires_grad = False # end backtracking termination_info = "nan generated" break log_like.append(-curEval.item()) elapsed_time.append(time.time() - start_time) print("--------------------------------------------------------------------------------") print(f"epoch: {epoch}") print(f"likelihood: {log_like[-1]}") if vars_to_estimate["sigma_a"]: print("sigma_a: ") print(sigma_a) if vars_to_estimate["sqrt_diag_R"]: print("sqrt_diag_R: ") print(sqrt_diag_R) if vars_to_estimate["m0"]: print("m0: ") print(m0) if vars_to_estimate["sqrt_diag_V0"]: print("sqrt_diag_V0: ") print(sqrt_diag_V0) if epoch > 0 and log_like[-1] - log_like[-2] < tol: termination_info = "success: converged" break for i in range(len(x)): x[i].requires_grad = False estimates = {} if vars_to_estimate["sigma_a"]: estimates["sigma_a"] = sigma_a if vars_to_estimate["sqrt_diag_R"]: estimates["sqrt_diag_R"] = sqrt_diag_R if vars_to_estimate["m0"]: estimates["m0"] = m0 if vars_to_estimate["sqrt_diag_V0"]: estimates["sqrt_diag_V0"] = sqrt_diag_V0 answer = {"estimates": estimates, "log_like": log_like, "elapsed_time": elapsed_time, "termination_info": termination_info} return answer
[docs] def torch_adam_optimize_SS_tracking_diagV0(y, B, sigma_a0, Qe, Z, sqrt_diag_R_0, m0_0, sqrt_diag_V0_0, max_iter=50, lr=1e-3, eps=1e-8, vars_to_estimate={ "sigma_a": True, "R": True, "m0": True, "V0": True}, ): import torch def log_likelihood_fn(): V0 = torch.diag(sqrt_diag_V0**2) R = torch.diag(sqrt_diag_R**2) Q = Qe * sigma_a**2 kf = inference.filterLDS_SS_withMissingValues_torch(y=y, B=B, Q=Q, m0=m0, V0=V0, Z=Z, R=R) log_like = kf["logLike"] return log_like optim_params = {"lr": lr, "eps": eps} sigma_a = torch.Tensor([sigma_a0]) sqrt_diag_R = sqrt_diag_R_0 m0 = m0_0 sqrt_diag_V0 = sqrt_diag_V0_0 x = [] if vars_to_estimate["sigma_a"]: x.append(sigma_a) if vars_to_estimate["sqrt_diag_R"]: x.append(sqrt_diag_R) if vars_to_estimate["m0"]: x.append(m0) if vars_to_estimate["sqrt_diag_V0"]: x.append(sqrt_diag_V0) if len(x) == 0: raise RuntimeError("No variable to estimate. Please set one element " "of vars_to_estimate to True") optimizer = torch.optim.Adam(x, **optim_params) for i in range(len(x)): x[i].requires_grad = True log_like = [] elapsed_time = [] start_time = time.time() for i in range(max_iter): optimizer.zero_grad() curEval = -log_likelihood_fn() curEval.backward() optimizer.step() log_like.append(-curEval.item()) elapsed_time.append(time.time() - start_time) print("--------------------------------------------------------------------------------") print(f"iteration: {i} ({max_iter})") print(f"logLike: {-curEval}") print("sigma_a: ") print(sigma_a) print("sqrt_diag_R: ") print(sqrt_diag_R) print("m0: ") print(m0) print("sqrt_diag_V0: ") print(sqrt_diag_V0) for i in range(len(x)): x[i].requires_grad = False estimates = {} if vars_to_estimate["sigma_a"]: e_sigma_a = x.pop(0)[0].item() estimates["sigma_a"] = e_sigma_a if vars_to_estimate["sqrt_diag_R"]: e_sqrt_diag_R = x.pop(0) estimates["sqrt_diag_R"] = e_sqrt_diag_R if vars_to_estimate["m0"]: e_m0 = x.pop(0) estimates["m0"] = e_m0 if vars_to_estimate["sqrt_diag_V0"]: e_sqrt_diag_V0 = x.pop(0).numpy() estimates["sqrt_diag_V0"] = e_sqrt_diag_V0 answer = {"estimates": estimates, "log_like": log_like, "elapsed_time": elapsed_time, } return answer
[docs] def em_SS_tracking(y, B, sigma_a0, Qe, Z, R_0, m0_0, V0_0, vars_to_estimate={"sigma_a": True, "R": True, "m0": True, "V0": True}, max_iter=50, tolerance_change=1e-9): sigma_a = sigma_a0 R = R_0 m0 = m0_0 V0 = V0_0 Qe_inv = np.linalg.inv(Qe) N = y.shape[1] M = Qe.shape[0] termination_info = "success: reached maximum number of iterations" log_like = [] elapsed_time = [] start_time = time.time() for iter in range(max_iter): # E step Q = Qe * sigma_a**2 kf = inference.filterLDS_SS_withMissingValues_np(y=y, B=B, Q=Q, m0=m0, V0=V0, Z=Z, R=R) print("LogLike[{:04d}]={:f}".format(iter, kf["logLike"].item())) print("sigma_a={:f}".format(sigma_a)) print("R:") print(R) print("m0:") print(m0) print("V0:") print(V0) log_like.append(kf["logLike"].item()) if math.isnan(log_like[-1]) or iter > 0 and log_like[-1] < log_like[-2]: if math.isnan(log_like[-1]): termination_info = "nan detected" else: termination_info = "log likelihood decreased" # begin backtrack if vars_to_estimate["sigma_a"]: sigma_a = sigma_a_prev if vars_to_estimate["R"]: R = R_prev if vars_to_estimate["m0"]: m0 = m0_prev if vars_to_estimate["V0"]: V0 = V0_prev del log_like[-1] # end backtrack break elif iter > 0 and log_like[-1] - log_like[-2] < tolerance_change: termination_info = "converged" break elapsed_time.append(time.time() - start_time) ks = inference.smoothLDS_SS(B=B, xnn=kf["xnn"], Vnn=kf["Vnn"], xnn1=kf["xnn1"], Vnn1=kf["Vnn1"], m0=m0, V0=V0) # M step if vars_to_estimate["sigma_a"]: sigma_a_prev = sigma_a S11, S10, S00 = posteriorCorrelationMatrices(Z=Z, B=B, KN=kf["KN"], Vnn=kf["Vnn"], xnN=ks["xnN"], VnN=ks["VnN"], x0N=ks["x0N"], V0N=ks["V0N"], Jn=ks["Jn"], J0=ks["J0"]) # sigma_a W = S11 - S10 @ B.T - B @ S10.T + B @ S00 @ B.T U = W @ Qe_inv K = np.trace(U) sigma_a = np.sqrt(K/(N*M)) # R if vars_to_estimate["R"]: R_prev = R u = y[:, 0] - (Z @ ks["xnN"][:, :, 0]).squeeze() R = np.outer(u, u) + Z @ ks["VnN"][:, :, 0] @ Z.T for i in range(1, N): u = y[:, i] - (Z @ ks["xnN"][:, :, i]).squeeze() R = R + np.outer(u, u) + Z @ ks["VnN"][:, :, i] @ Z.T R = R/N # m0, V0 if vars_to_estimate["m0"]: m0_prev = m0 m0 = ks["x0N"].squeeze() if vars_to_estimate["V0"]: V0_prev = V0 V0 = ks["V0N"] estimates = {} if vars_to_estimate["sigma_a"]: estimates["sigma_a"] = sigma_a if vars_to_estimate["R"]: estimates["R"] = R if vars_to_estimate["m0"]: estimates["m0"] = m0 if vars_to_estimate["V0"]: estimates["V0"] = V0 optim_res = {"estimates": estimates, "log_like": log_like, "elapsed_time": elapsed_time, "termination_info": termination_info, } return optim_res
[docs] def em_SS_LDS(y, B0, Q0, Z0, R0, m0, V0, max_iter=50, tol=1e-4, varsToEstimate=dict(m0=True, V0=True, B=True, Q=True, Z=True, R=True)): B = B0 Q = Q0 Z = Z0 R = R0 V0 = V0 M = B0.shape[0] N = y.shape[1] log_like = np.empty(max_iter) for iter in range(max_iter): kf = inference.filterLDS_SS_withMissingValues_np(y=y, B=B, Q=Q, m0=m0, V0=V0, Z=Z, R=R) print("LogLike[{:04d}]={:f}".format(iter, kf["logLike"].item())) log_like[iter] = kf["logLike"] ks = inference.smoothLDS_SS(B=B, xnn=kf["xnn"], Vnn=kf["Vnn"], xnn1=kf["xnn1"], Vnn1=kf["Vnn1"], m0=m0, V0=V0) S11, S10, S00 = posteriorCorrelationMatrices(Z=Z, B=B, KN=kf["KN"], Vnn=kf["Vnn"], xnN=ks["xnN"], VnN=ks["VnN"], x0N=ks["x0N"], V0N=ks["V0N"], Jn=ks["Jn"], J0=ks["J0"]) if varsToEstimate["Z"]: Z = np.outer(y[:,0], ks["xnN"][:, :, 0]) for i in range(1, N): Z = Z + np.outer(y[:, i], ks["xnN"][:, :, i]) Z = Z @ np.linalg.inv(S11) if varsToEstimate["B"]: B = S10 @ np.linalg.inv(S00) if varsToEstimate["Q"]: Q = (S11 - S10 @ np.linalg.inv(S00) @ S10.T)/N Q = (Q.T + Q)/2 # Now that Z is estimated, lets estimate R, if requested if varsToEstimate["R"]: u = y[:, 0] - (Z @ ks["xnN"][:, :, 0]).squeeze() R = np.outer(u, u) + Z @ ks["VnN"][:, :, 0] @ Z.T for i in range(1, N): u = y[:, i] - (Z @ ks["xnN"][:, :, i]).squeeze() R = R + np.outer(u, u) + Z @ ks["VnN"][:, :, i] @ Z.T R = R/N if varsToEstimate["m0"]: m0 = ks["x0N"] if varsToEstimate["V0"]: V0 = ks["V0N"] answer = dict(B=B, Q=Q, Z=Z, R=R, m0=m0, V0=V0, log_like=log_like[:iter], niter=iter) return answer
[docs] def posteriorCorrelationMatrices(Z, B, KN, Vnn, xnN, VnN, x0N, V0N, Jn, J0): # We want to first estimate Z and then R, because R depends on Z Vnn1N = lag1CovSmootherLDS_SS(Z=Z, KN=KN, B=B, Vnn=Vnn, Jn=Jn, J0=J0) S11 = np.outer(xnN[:,:,0], xnN[:,:,0]) + VnN[:,:,0] S10 = np.outer(xnN[:,:,0], x0N) + Vnn1N[:,:,0] S00 = np.outer(x0N, x0N) + V0N N = xnN.shape[2] for i in range(1, N): S11 = S11 + np.outer(xnN[:, :, i], xnN[:, :, i]) + VnN[:, :, i] S10 = S10 + np.outer(xnN[:, :, i], xnN[:, :, i-1]) + Vnn1N[:, :, i] S00 = S00 + np.outer(xnN[:, :, i-1], xnN[:, :, i-1]) + VnN[:, :, i-1] return S11, S10, S00
[docs] def lag1CovSmootherLDS_SS(Z, KN, B, Vnn, Jn, J0): #SS16, Property 6.3 M = KN.shape[0] N = Vnn.shape[2] Vnn1N = np.empty(shape=(M, M, N)) eye = np.eye(M) Vnn1N[:, :, N-1] = (eye - KN @ Z) @ B @ Vnn[:, :, N-2] for k in range(N-1, 1, -1): Vnn1N[:, :, k-1] = Vnn[:, :, k-1] @ Jn[:, :, k-2].T + \ Jn[:, :, k-1] @ \ (Vnn1N[:, :, k] - B @ Vnn[:, :, k-1]) @ Jn[:, :, k-2].T Vnn1N[:, :, 0] = Vnn[:, :, 0] @ J0.T + Jn[:, :, 0] @ (Vnn1N[:, :, 1] - B @ Vnn[:, :, 0]) @ J0.T return Vnn1N