diff --git a/filterpy/kalman/multi-EKF.py b/filterpy/kalman/multi-EKF.py new file mode 100644 index 00000000..719e4128 --- /dev/null +++ b/filterpy/kalman/multi-EKF.py @@ -0,0 +1,381 @@ +import numpy as np +import sys + +from copy import deepcopy +from filterpy.common import pretty_str, reshape_z +from filterpy.stats import logpdf +from math import log, exp, sqrt +from numpy import dot, eye, linalg, zeros + + +class MultiExtendedKalmanFilter: + + """Implements a multi-input extended Kalman filter (EKF). You are + responsible for setting the various state variables to reasonable values; + the defaults will not give you a functional filter. + This is an adapted implementation by niclas.wesemann@motius.de that uses + several inputs sources such that sensor can be used to update one Kalman + filter. Assumed you want to fuse the sensors by doing one prediction and + sequential updates in this manner: + + prediction + update sensor 1 + update sensor 2 + update sensor 3 + ... + prediction + update sensor 1 + ... + + Please note that all sensors have to measure the same parameters. This not + meant to fuse sensors that measure different parameters, so all sensors + have to return speed, or rotation, or position. + + You will have to set the following attributes after constructing this + object for the filter to perform properly. Please note that there are + various checks in place to ensure that you have made everything the + 'correct' size. However, it is possible to provide incorrectly sized + arrays such that the linear algebra can not perform an operation. + It can also fail silently - you can end up with matrices of a size that + allows the linear algebra to work, but are the wrong shape for the problem + you are trying to solve. + + Parameters + ---------- + + dim_x : int + Number of state variables for the Kalman filter. For example, if + you are tracking the position and velocity of an object in two + dimensions, dim_x would be 4. + + This is used to set the default size of P, Q, and u + + dim_z : int + Number of of measurement inputs. For example, if the sensor + provides you with position in (x,y), dim_z would be 2. + + nb_sensors: int + Number of sensors you are using to update this filter + + Attributes + ---------- + x : numpy.array(dim_x, 1) + State estimate vector + + P : list of numpy.array(dim_x, dim_x) + Covariance matrix for each sensor + + x_prior : numpy.array(dim_x, 1) + Prior (predicted) state estimate. The *_prior and *_post attributes + are for convienence; they store the prior and posterior of the + current epoch. Read Only. + + P_prior : list of numpy.array(dim_x, dim_x) + Prior (predicted) state covariance matrix for each sensor. Read Only. + + x_post : numpy.array(dim_x, 1) + Posterior (updated) state estimate. Read Only. + + P_post : list of numpy.array(dim_x, dim_x) + Posterior (updated) state covariance matrix for each sensor. Read Only. + + R : list of numpy.array(dim_z, dim_z) + Measurement noise matrix for each sensor + + Q : list of numpy.array(dim_x, dim_x) + Process noise matrix for each sensor + + F : numpy.array() + State Transition matrix + + H : numpy.array(dim_x, dim_x) + Measurement function + + y : list of numpy.array + Residual of the update step for each sensor. Read only. + + K : list of numpy.array(dim_x, dim_z) + Kalman gain of the update step for each. Read only. + + S : list of numpy.array + System uncertainty projected to measurement space for each sensor. + Read only. + + z : numpy.array + Last measurement used in update(). Read only. + + log_likelihood : list of floats + log-likelihood of the last measurement. Read only. + + likelihood : list of floats + likelihood of last measurment. Read only. + + Computed from the log-likelihood. The log-likelihood can be very + small, meaning a large negative value such as -28000. Taking the + exp() of that results in 0.0, which can break typical algorithms + which multiply by this value, so by default we always return a + number >= sys.float_info.min. + + mahalanobis : list of floats + mahalanobis distance of the innovation. E.g. 3 means measurement + was 3 standard deviations away from the predicted value. + + Read only. + + Examples + -------- + + See my book Kalman and Bayesian Filters in Python + https://github.com/rlabbe/Kalman-and-Bayesian-Filters-in-Python + """ + + def __init__(self, dim_x, dim_z, dim_u=0, sources=None): + self.dim_x = dim_x + self.dim_z = dim_z + self.dim_u = dim_u + self.sources = sources # number of sensors to fuse + + self.x = zeros((dim_x, 1)) # state estimate vector + self.P = eye(dim_x) # uncertainty covariance for all + self.B = 0 # control transition matrix + self.F = eye(dim_x) # state transition matrix + self.Q = eye(dim_x) # process uncertainty + + # assuming updates are performed sequentially not simultaneously + # (undefined order) + z = np.array([None] * self.dim_z) + self.z = reshape_z(z, self.dim_z, self.x.ndim) + + # identity matrix. Do not alter this. + self._I = np.eye(dim_x) + + # gain and residual are computed during the innovation step. We + # save them so that in case you want to inspect them for various + # purposes + self.y = zeros((dim_z, 1)) # residual + self.K = zeros(self.x.shape) + self.S = zeros((dim_z, dim_z)) + self.SI = zeros((dim_z, dim_z)) + + self._log_likelihood = log(sys.float_info.min) + self._likelihood = sys.float_info.min + self._mahalanobis = None + + # initialize R based on sources given + self.R = {} + for source in self.sources: + self.R[source] = eye(dim_z) # state uncertainties for all + + # these will always be a copy of x,P after predict() is called + self.x_prior = self.x.copy() + self.P_prior = self.P.copy() + + # these will always be a copy of x,P after update() is called + self.x_post = self.x.copy() + self.P_post = self.P.copy() + + def predict_update(self): + # Note this method is provided in the original implementation mentioned + # above, but it does not make sense to implement this for the multi- + # input case since you would always need all sensor data at the + # same time, this is not always given. + raise NotImplementedError( + "Only implemented in ExtendedKalmanFilter class" + ) + + def update( + self, + z, + HJacobian, + Hx, + source, + R=None, + args=(), + hx_args=(), + residual=np.subtract, + ): + """Performs the update innovation of the extended Kalman filter for + one specific sensor. Make sure that z and sensor id correspond to each + other! + + Parameters + ---------- + + z : np.array + measurement for this step. + If `None`, posterior is not computed + + HJacobian : function + function which computes the Jacobian of the H matrix (measurement + function). Takes state variable (self.x) as input, returns H. + + Hx : function + function which takes as input the state variable (self.x) along + with the optional arguments in hx_args, and returns the measurement + that would correspond to that state. + + sensor : int + the id of the sensor you are updating, so z has to come from the + same sensor such that you are updating the correct Kalman gain, + innovation matrix, covariances, etc. + + R : np.array, scalar, or None + Optionally provide R to override the measurement noise for this + one call, otherwise self.R will be used. + + args : tuple, optional, default (,) + arguments to be passed into HJacobian after the required state + variable. for robot localization you might need to pass in + information about the map and time of day, so you might have + `args=(map_data, time)`, where the signature of HCacobian will + be `def HJacobian(x, map, t)` + + hx_args : tuple, optional, default (,) + arguments to be passed into Hx function after the required state + variable. + + residual : function (z, z2), optional + Optional function that computes the residual (difference) between + the two measurement vectors. If you do not provide this, then the + built in minus operator will be used. You will normally want to use + the built in unless your residual computation is nonlinear (for + example, if they are angles) + """ + + if z is None: + self.z = np.array([[None] * self.dim_z]).T + self.x_post = self.x.copy() + self.P_post = self.P.copy() + return + + if not isinstance(args, tuple): + args = (args,) + + if not isinstance(hx_args, tuple): + hx_args = (hx_args,) + + if R is None: + R = self.R[source] + elif np.isscalar(R): + R = eye(self.dim_z) * R + + if np.isscalar(z) and self.dim_z == 1: + z = np.asarray([z], float) + + H = HJacobian(self.x, *args) + + PHT = dot(self.P, H.T) + self.S = dot(H, PHT) + R + self.SI = linalg.inv(self.S) + self.K = PHT.dot(self.SI) + + hx = Hx(self.x, *hx_args) + self.y = residual(z, hx) + self.x = self.x + dot(self.K, self.y) + + # P = (I-KH)P(I-KH)' + KRK' is more numerically stable + # and works for non-optimal K vs the equation + # P = (I-KH)P usually seen in the literature. + I_KH = self._I - dot(self.K, H) + self.P = dot(I_KH, self.P).dot(I_KH.T) + dot(self.K, R).dot(self.K.T) + + # set to None to force recompute + self._log_likelihood = None + self._likelihood = None + self._mahalanobis = None + + # save measurement and posterior state + self.z = deepcopy(z) + self.x_post = self.x.copy() + self.P_post = self.P.copy() + + def predict_x(self, u=0): + """ + Predicts the next state of X. If you need to + compute the next state yourself, override this function. You would + need to do this, for example, if the usual Taylor expansion to + generate F is not providing accurate results for you. + """ + self.x = dot(self.F, self.x) + dot(self.B, u) + + def predict(self, u=0): + """ + Predict next state (prior) using the Kalman filter state propagation + equations. It will predict one next state but update all covariance + matrices in self.P + + Parameters + ---------- + + u : np.array + Optional control vector. If non-zero, it is multiplied by B + to create the control input into the system. + """ + + self.predict_x(u) + self.P = dot(self.F, self.P).dot(self.F.T) + self.Q + + # save priors + self.x_prior = np.copy(self.x) + self.P_prior = self.P.copy() + + @property + def log_likelihood(self): + """ + log-likelihood of the last measurement. + """ + + if self._log_likelihood is None: + self._log_likelihood = logpdf(x=self.y, cov=self.S) + return self._log_likelihood + + @property + def likelihood(self): + """ + Computed from the log-likelihood. The log-likelihood can be very + small, meaning a large negative value such as -28000. Taking the + exp() of that results in 0.0, which can break typical algorithms + which multiply by this value, so by default we always return a + number >= sys.float_info.min. + """ + if self._likelihood is None: + self._likelihood = exp(self.log_likelihood) + if self._likelihood == 0: + self._likelihood = sys.float_info.min + return self._likelihood + + @property + def mahalanobis(self): + """ + Mahalanobis distance of innovation. E.g. 3 means measurement + was 3 standard deviations away from the predicted value. + + Returns + ------- + mahalanobis : float + """ + if self._mahalanobis is None: + self._mahalanobis = sqrt( + float(dot(dot(self.y.T, self.SI), self.y)) + ) + return self._mahalanobis + + def __repr__(self): + return "\n".join( + [ + "MultiDKalmanFilter object", + pretty_str("x", self.x), + pretty_str("P", self.P), + pretty_str("x_prior", self.x_prior), + pretty_str("P_prior", self.P_prior), + pretty_str("F", self.F), + pretty_str("Q", self.Q), + pretty_str("R", self.R), + pretty_str("K", self.K), + pretty_str("y", self.y), + pretty_str("S", self.S), + pretty_str("likelihood", self.likelihood), + pretty_str("log-likelihood", self.log_likelihood), + pretty_str("mahalanobis", self.mahalanobis), + ] + )