diff --git a/docs/tutorials/11_LevyTransitionModels_&_MarginalisedParticleFilter.py b/docs/tutorials/11_LevyTransitionModels_&_MarginalisedParticleFilter.py new file mode 100644 index 000000000..f61af94e1 --- /dev/null +++ b/docs/tutorials/11_LevyTransitionModels_&_MarginalisedParticleFilter.py @@ -0,0 +1,300 @@ +#!/usr/bin/env python +# coding: utf-8 +""" +============================================================= +11 - Tracking linear Levy transition models with the MPF +============================================================= +In line with the tutorial examples of the Kalman and particle +filters in Stone Soup, a simplified single-target tracking +example without clutter is provided here to demonstrate the +use of linear transition models driven by non-Gaussian Levy +noise, as well as how to perform inference tasks on this class +of models using the Marginalized Particle Filter (MPF). +""" + +# Consider the scenario where the target evolves according to the Langevin model, driven by a normal sigma-mean mixture with the mixing distribution being the $\alpha$-stable distribution. + +# In[1]: + + +import numpy as np +from datetime import datetime, timedelta + + +# The state of the target can be represented as 2D Cartesian coordinates, $\left[x, \dot x, y, \dot y\right]^{\top}$, modelling both its position and velocity. A simple truth path is created with a sampling rate of 1 Hz. + +# In[2]: + + +from stonesoup.types.groundtruth import GroundTruthPath, GroundTruthState +from stonesoup.models.base_driver import NoiseCase +from stonesoup.models.driver import AlphaStableNSMDriver +from stonesoup.models.transition.levy_linear import LevyLangevin, CombinedLinearLevyTransitionModel + +# And the clock starts +start_time = datetime.now().replace(microsecond=0) + + +# The `LevyLangevin` class creates a one-dimensional Langevin model, driven by the $\alpha$-stable NSM mixture process defined in the `AlphaStableNSMDriver` class. +# +# \begin{equation} +# d \dot{x}(t)=-\theta \dot{x}(t) d t+d W(t), \quad \theta>0 +# \end{equation} +# +# where $\theta$ is the damping factor and $W(t)$ is the non-Gaussian driving process. +# +# The noise samples $\mathbf{w}_n$ are drawn from the $\alpha$-stable distribution parameterized by the $\alpha$-stable law, $S_{\alpha}(\sigma, \beta, \mu)$. +# +# The input parameters to `AlphaStableNSMDriver` class are the stability index $\alpha$, expected jumps per unit time $c$, conditional Gaussian mean $\mu_W$ & variance $\sigma_W^2$, and the type of residuals used for the truncated shot-noise representation, specified by `noise_case`. +# +# Without diving into technical details, the scaling factor $\sigma$, skewness parameter $\beta$ and location $\mu$, in the $\alpha$-stable law is a function of the conditional Gaussian parameters $\mu_W, \sigma_W^2$. In general, set $\mu_W=0$ for a symmetric target distribution $\beta=0$, or $\mu_W \neq 0$ to model biased trajectories otherwise. In addition, the size of the resulting trajectories (and jumps) can be adjusted by varying $\sigma_W^2$. +# +# The available noise cases are: +# +# 1. No residuals, `NoiseCase.TRUNCATED`, least expensive but drawn noise samples deviate further from target distribution. +# 2. `NoiseCase.GAUSSIAN_APPROX`, the most expensive but drawn noise samples closest target distribution. +# 3. `PartialNoiseCase.GAUSSIAN_APPROX`, a compromise between both cases (1) and (2). +# +# +# For interested readers, refer to [1, 2] for more details. +# +# Here, we initialise an $\alpha$-stable driver with the default parameters `mu_W=0, sigma_W2=1, alpha=1.4, noise_case=NoiseCase.GAUSSIAN_APPROX(), c=10`. +# +# Then, the driver instance is injected into the Langevin model for every coordinate axes (i.e., x and y) during initialisation with parameter `damping_coeff=0.15`. +# +# Note that we overwrite the default `mu_W` parameter in the $\alpha$-stable driver for the x-coordinate axes to bias our trajectories towards the left. This can be done by passing an additional argument `mu_W = -0.02` when injecting the driver into the Langevin model. +# +# Finallt, the `CombinedLinearLevyTransitionModel` class takes a set of 1-D models and combines them into a linear transition model of arbitrary dimension, $D$, (in this case, $D=2$). +# +# +# +# +# +# + +# In[3]: + + +seed = 1 # Random seem for reproducibility + +# Driving process parameters +mu_W = 0 +sigma_W2 = 4 +alpha = 1.4 +c=10 +noise_case=NoiseCase.GAUSSIAN_APPROX + + +# Model parameters +theta=0.15 + +driver_x = AlphaStableNSMDriver(mu_W=mu_W, sigma_W2=sigma_W2, seed=seed, c=c, alpha=alpha, noise_case=noise_case) +driver_y = driver_x # Same driving process in both dimensions and sharing the same latents (jumps) +langevin_x = LevyLangevin(driver=driver_x, damping_coeff=theta, mu_W=-0.02) +langevin_y = LevyLangevin(driver=driver_y, damping_coeff=theta) +transition_model = CombinedLinearLevyTransitionModel([langevin_x, langevin_y]) + + +# In[4]: + + +print(transition_model.mu_W) +print(transition_model.sigma_W2) + + +# The ground truth is initialised from (0,0). + +# In[5]: + + +timesteps = [start_time] +truth = GroundTruthPath([GroundTruthState([0, 1, 0, 1], timestamp=timesteps[0])]) + +num_steps = 40 +for k in range(1, num_steps + 1): + timesteps.append(start_time+timedelta(seconds=k)) # add next timestep to list of timesteps + truth.append(GroundTruthState( + transition_model.function(truth[k-1], noise=True, time_interval=timedelta(seconds=1)), + timestamp=timesteps[k])) + + +# The simulated ground truth path can be plotted using the in-built plotting classes in Stone Soup. +# +# In addition to the ground truth, Stone Soup plotting tools allow measurements and predicted tracks (see later) to be plotted and synced together consistently. +# +# An animated plotter that uses Plotly graph objects can be accessed via the `AnimatedPlotterly` class from Stone Soup. +# +# Note that the animated plotter requires a list of timesteps as an input, and that `tail_length` +# is set to 0.3. This means that each data point will be on display for 30% of the total +# simulation time. The mapping argument is [0, 2] because those are the x and +# y position indices from our state vector. +# +# If a static plotter is preferred, the `Plotterly` class can be used instead +# +# + +# In[6]: + + +# from stonesoup.plotter import AnimatedPlotterly +# plotter = AnimatedPlotterly(timesteps, tail_length=1.0, width=600, height=600) + +from stonesoup.plotter import Plotterly +plotter = Plotterly(autosize=False, width=600, height=600) +plotter.plot_ground_truths(truth, [0, 2]) +plotter.fig + + +# ## Simulate measurements + +# Assume a 'linear' sensor which detects the +# position, but not velocity, of a target, such that +# $\mathbf{z}_k = H_k \mathbf{x}_k + \boldsymbol{\nu}_k$, +# $\boldsymbol{\nu}_k \sim \mathcal{N}(0,R)$, with +# +# \begin{align}H_k &= \begin{bmatrix} +# 1 & 0 & 0 & 0\\ +# 0 & 0 & 1 & 0\\ +# \end{bmatrix}\\ +# R &= \begin{bmatrix} +# 25 & 0\\ +# 0 & 25\\ +# \end{bmatrix} \omega\end{align} +# +# where $\omega$ is set to 25 initially. + +# In[7]: + + +from stonesoup.types.detection import Detection +from stonesoup.models.measurement.linear import LinearGaussian + + +# The linear Gaussian measurement model is set up by indicating the number of dimensions in the +# state vector and the dimensions that are measured (so specifying $H_k$) and the noise +# covariance matrix $R$. + +# In[8]: + + +measurement_model = LinearGaussian( + ndim_state=4, # Number of state dimensions (position and velocity in 2D) + mapping=(0, 2), # Mapping measurement vector index to state index + noise_covar=np.array([[16, 0], # Covariance matrix for Gaussian PDF + [0, 16]]) + ) + + +# The measurements can now be generated and plotted accordingly. + +# In[9]: + + +measurements = [] +for state in truth: + measurement = measurement_model.function(state, noise=True) + measurements.append(Detection(measurement, + timestamp=state.timestamp, + measurement_model=measurement_model)) + + +# In[10]: + + +plotter.plot_measurements(measurements, [0, 2]) +plotter.fig + + +# ## Marginalised Particle Filtering + +# The `MarginalisedParticlePredictor` and `MarginalisedParticleUpdater` classes correspond to the predict and update steps +# respectively. +# Both require a `TransitionModel` and a `MeasurementModel` instance respectively. +# To avoid degenerate samples, the `SystematicResampler` is used which is passed to the updater. +# More resamplers that are included in Stone Soup are covered in the +# [Resampler Tutorial](https://stonesoup.readthedocs.io/en/latest/auto_tutorials/sampling/ResamplingTutorial.html#sphx-glr-auto-tutorials-sampling-resamplingtutorial-py). + +# In[11]: + + +from stonesoup.predictor.particle import MarginalisedParticlePredictor +from stonesoup.resampler.particle import SystematicResampler +from stonesoup.updater.particle import MarginalisedParticleUpdater + +predictor = MarginalisedParticlePredictor(transition_model=transition_model) +resampler = SystematicResampler() +updater = MarginalisedParticleUpdater(measurement_model, resampler) + + +# To start we create a prior estimate. This is a `MarginalisedParticleState` which describes the state as a distribution of particles. +# +# The mean priors are randomly sampled from the standard normal distribution. +# +# The covariance priors is initialised with a scalar multiple of the identity matrix . + +# In[12]: + + +from scipy.stats import multivariate_normal +from stonesoup.types.numeric import Probability # Similar to a float type +from stonesoup.types.state import MarginalisedParticleState +from stonesoup.types.array import StateVectors + +number_particles = 100 + +# Sample from the prior Gaussian distribution +states = multivariate_normal.rvs(np.array([0, 1, 0, 1]), + np.diag([1., 1., 1., 1.]), + size=number_particles) +covars = np.stack([np.eye(4) * 100 for i in range(number_particles)], axis=2) # (M, M, N) + +# Create prior particle state. +prior = MarginalisedParticleState( + state_vector=StateVectors(states.T), + covariance=covars, + weight=np.array([Probability(1/number_particles)]*number_particles), + timestamp=start_time-timedelta(seconds=1)) + + +# We now run the predict and update steps, propagating the collection of particles and resampling at each step + +# In[13]: + + +from stonesoup.types.hypothesis import SingleHypothesis +from stonesoup.types.track import Track + +track = Track() +for measurement in measurements: + prediction = predictor.predict(prior, timestamp=measurement.timestamp) + hypothesis = SingleHypothesis(prediction, measurement) + post = updater.update(hypothesis) + track.append(post) + prior = track[-1] + + +# Plot the resulting track with the sample points at each iteration. Can also change 'plot_history' +# to True if wanted. + +# In[14]: + + +plotter.plot_tracks(track, [0, 2], particle=True, uncertainty=True) +plotter.fig + + +# ## References +# [1] Lemke, Tatjana, and Simon J. Godsill, 'Inference for models with asymmetric α -stable noise processes', in Siem Jan Koopman, and Neil Shephard (eds), Unobserved Components and Time Series Econometrics (Oxford, 2015; online edn, Oxford Academic, 21 Jan. 2016) +# +# [2] S. Godsill, M. Riabiz, and I. Kontoyiannis, “The L ́evy state space model,” in 2019 53rd Asilomar Conference on Signals, Systems, and Computers, 2019, pp. 487–494. +# + +# In[ ]: + + + + diff --git a/stonesoup/models/__init__.py b/stonesoup/models/__init__.py index f32e36448..d8c80af1e 100644 --- a/stonesoup/models/__init__.py +++ b/stonesoup/models/__init__.py @@ -1,3 +1,4 @@ from .base import Model +from .base_driver import Driver -__all__ = ['Model'] +__all__ = ['Model', 'Driver'] diff --git a/stonesoup/models/base.py b/stonesoup/models/base.py index 42cead3f6..e3907d73b 100644 --- a/stonesoup/models/base.py +++ b/stonesoup/models/base.py @@ -1,14 +1,18 @@ from abc import abstractmethod from typing import TYPE_CHECKING, Union, Optional - +from collections import namedtuple +from datetime import timedelta import numpy as np from scipy.stats import multivariate_normal -from ..base import Base, Property -from ..functions import jacobian as compute_jac -from ..types.array import StateVector, StateVectors, CovarianceMatrix -from ..types.numeric import Probability -from ..types.state import State +from stonesoup.models.base_driver import ConditionallyGaussianDriver +from stonesoup.models.driver import GaussianDriver +from stonesoup.base import Base, Property +from stonesoup.functions import jacobian as compute_jac +from stonesoup.types.array import StateVector, StateVectors, CovarianceMatrix, CovarianceMatrices +from stonesoup.types.numeric import Probability +from stonesoup.types.state import State +from scipy.integrate import quad_vec if TYPE_CHECKING: from ..types.detection import Detection @@ -342,3 +346,252 @@ def logpdf(self, state1: State, state2: State, **kwargs) -> Union[float, np.ndar @abstractmethod def covar(self, **kwargs) -> CovarianceMatrix: """Model covariance""" + + +class Latents: + """Data class for handling sampled non-linear (and non-Gaussian) + latent variables. + """ + + def __init__(self, num_samples: int) -> None: + """Constructor + + Args: + num_samples (int): Number of jumps sampled + """ + self.store: dict[ConditionallyGaussianDriver, namedtuple] = dict() + self.Data = namedtuple("Data", ["sizes", "times"]) + self._num_samples = num_samples + + def exists(self, driver: ConditionallyGaussianDriver) -> bool: + """Checks if the driver instance exists in the store. + + Args: + driver (ConditionalGaussianDriver): Driver instance. + + Returns: + bool: True if driver has already been added, else False. + """ + return driver in self.store + + def add( + self, driver: ConditionallyGaussianDriver, jsizes: np.ndarray, jtimes: np.ndarray + ) -> None: + """Adds the driver instance to the store alongside its sampled + latent variables. + + Latent variables are non-linear and possible non-Gaussian, in the form of Poisson + jumps. + + The number of sampled jumps must match `self._num_samples`. + + Args: + driver (ConditionalGaussianDriver): Driver instance to add. + jsizes (np.ndarray): Sampled jump sizes. + jtimes (np.ndarray): Sampled jump times. + """ + assert jsizes.shape == jtimes.shape + assert jsizes.shape[1] == self._num_samples + data = self.Data(jsizes, jtimes) + self.store[driver] = data + + def sizes(self, driver: ConditionallyGaussianDriver) -> np.ndarray: + """Returns the sampled jump sizes generated by the specified driver instance. + + Args: + driver (ConditionalGaussianDriver): Driver instance. + + Returns: + np.ndarray: Jump sizes sampled by the given driver instance. + """ + assert driver in self.store + # dimensions of sizes are (n_jumps, n_samples) + return self.store[driver].sizes + + def times(self, driver: ConditionallyGaussianDriver) -> np.ndarray: + """Returns the sampled jump times generated by the specified driver instance. + + Args: + driver (ConditionalGaussianDriver): Driver instance. + + Returns: + np.ndarray: Jump times sampled by the given driver instance. + """ + assert driver in self.store + # dimensions of times are (n_times, n_samples) + return self.store[driver].times + + @property + def num_samples(self) -> int: + """Returns the number of expected jumps. + + All latent samples generated by drivers that will be added to this store + must have a length equal to `self._num_samples` + + Returns: + int: No. expected jumps + """ + return self._num_samples + + +class LevyModel(Model): + """ + Class to be derived from for Levy models. + For now, we consider only conditionally Gaussian ones + """ + + driver: Union[ConditionallyGaussianDriver, GaussianDriver] = Property( + doc="Conditional Gaussian process noise driver" + ) + mu_W: Optional[float] = Property(default=None, doc="Condtional Gaussian mean") + sigma_W2: Optional[float] = Property(default=None, doc="Conditional Gaussian variance") + + def __init__(self, *args, **kwargs): + super().__init__(*args, **kwargs) + + @abstractmethod + def _integrand(self, dt: float, jtimes: np.ndarray) -> np.ndarray: + pass + + def _integrate(self, func: np.ndarray, a: np.ndarray, b: np.ndarray) -> np.ndarray: + res, err = quad_vec(func, a=a, b=b) + return res + + def _integral(self, dt: float) -> np.ndarray: + def func(dt: int): + return self._integrand(dt, jtimes=np.zeros((1, 1)))[0, 0, :] # currying + return self._integrate(func, a=0, b=dt) + + def mean( + self, latents: Latents, time_interval: timedelta, **kwargs + ) -> Union[StateVector, StateVectors]: + """Model mean""" + assert latents is not None + dt = time_interval.total_seconds() + if latents.exists(self.driver): + jsizes = latents.sizes(self.driver) + jtimes = latents.times(self.driver) + else: + jsizes, jtimes = None, None + return self.driver.mean( + jsizes=jsizes, + jtimes=jtimes, + dt=dt, + e_ft_func=self._integral, + ft_func=self._integrand, + mu_W=self.mu_W, + num_samples=latents.num_samples, + ) + + def covar( + self, latents: Latents, time_interval: timedelta, **kwargs + ) -> Union[CovarianceMatrix, CovarianceMatrices]: + """Model covariance""" + assert latents is not None + dt = time_interval.total_seconds() + if latents.exists(self.driver): + jsizes = latents.sizes(self.driver) + jtimes = latents.times(self.driver) + else: + jsizes, jtimes = None, None + return self.driver.covar( + jsizes=jsizes, + jtimes=jtimes, + dt=dt, + e_ft_func=self._integral, + ft_func=self._integrand, + mu_W=self.mu_W, + sigma_W2=self.sigma_W2, + num_samples=latents.num_samples, + ) + + def sample_latents( + self, + time_interval: timedelta, + num_samples: int, + random_state: Optional[np.random.RandomState] = None, + ) -> Latents: + dt = time_interval.total_seconds() + latents = Latents(num_samples=num_samples) + if isinstance(self.driver, ConditionallyGaussianDriver): + jsizes, jtimes = self.driver.sample_latents( + dt=dt, num_samples=num_samples, random_state=random_state + ) + latents.add(driver=self.driver, jsizes=jsizes, jtimes=jtimes) + return latents + + def rvs( + self, + latents: Optional[Latents] = None, + n_rvs_samples_for_each_mean_covar_pair: int = 1, + random_state: Optional[np.random.RandomState] = None, + **kwargs + ) -> Union[StateVector, StateVectors]: + noise = 0 + n_mean_covar_pair = 1 + if not latents: + latents = self.sample_latents( + num_samples=n_mean_covar_pair, random_state=random_state, **kwargs + ) + mean = self.mean(latents=latents, **kwargs) + if mean is None or None in mean: + raise ValueError("Cannot generate rvs from None-type mean") + assert isinstance(mean, StateVector) + + covar = self.covar(latents=latents, **kwargs) + if covar is None or None in covar: + raise ValueError("Cannot generate rvs from None-type covariance") + assert isinstance(covar, CovarianceMatrix) + + noise += self.driver.rvs( + mean=mean, + covar=covar, + random_state=random_state, + num_samples=n_rvs_samples_for_each_mean_covar_pair, + **kwargs + ) + return noise + + def condpdf( + self, state1: State, state2: State, latents: Optional[Latents] = None, **kwargs + ) -> Union[Probability, np.ndarray]: + r"""Model conditional pdf/likelihood evaluation function""" + return Probability.from_log_ufunc( + self.logcondpdf(state1, state2, latents=latents, **kwargs) + ) + + def logcondpdf( + self, state1: State, state2: State, latents: Optional[Latents] = None, **kwargs + ) -> Union[float, np.ndarray]: + r"""Model log conditional pdf/likelihood evaluation function""" + if latents is None: + raise ValueError("Latents cannot be none.") + + mean = self.mean(latents=latents, **kwargs) + if mean is None or None in mean: + raise ValueError("Cannot generate pdf from None-type mean") + assert isinstance(mean, StateVector) + + covar = self.covar(latents=latents, **kwargs) + if covar is None or None in covar: + raise ValueError("Cannot generate pdf from None-type covariance") + assert isinstance(covar, CovarianceMatrix) + + likelihood = np.atleast_1d( + multivariate_normal.logpdf( + (state1.state_vector - self.function(state2, **kwargs)).T, mean=mean, cov=covar + ) + ) + + if len(likelihood) == 1: + likelihood = likelihood[0] + + return likelihood + + def logpdf(self, state1: State, state2: State, **kwargs) -> Union[Probability, np.ndarray]: + r"""Model log pdf/likelihood evaluation function""" + return NotImplementedError + + def pdf(self, state1: State, state2: State, **kwargs) -> Union[Probability, np.ndarray]: + r"""Model pdf/likelihood evaluation function""" + return Probability.from_log_ufunc(self.logpdf(state1, state2, **kwargs)) diff --git a/stonesoup/models/base_driver.py b/stonesoup/models/base_driver.py new file mode 100644 index 000000000..68ffd2f01 --- /dev/null +++ b/stonesoup/models/base_driver.py @@ -0,0 +1,426 @@ +from abc import abstractmethod +from enum import Enum +from typing import Callable, Generator, Optional, Tuple, Union + +import numpy as np + +from stonesoup.base import Base, Property +from stonesoup.types.array import ( + CovarianceMatrices, + CovarianceMatrix, + StateVector, + StateVectors, +) + + +class NoiseCase(Enum): + """Different methods of approximating the residuals for + the truncated series representation of the associated + Levy integrals + """ + + TRUNCATED = 0 + GAUSSIAN_APPROX = 1 + PARTIAL_GAUSSIAN_APPROX = 2 + + +class Driver(Base): + """Base class for all driver classes use to drive transition models.""" + + pass + + +class LevyDriver(Driver): + """Driver type + + Base/Abstract class for all stochastic Levy noise driving processes + used to drive :class:`~.LevyModel` instances. + """ + + seed: Optional[int] = Property(default=None, doc="Seed for random number generation") + + def __init__(self, *args, **kwargs) -> None: + super().__init__(*args, **kwargs) + self.random_state = np.random.default_rng(self.seed) + + @abstractmethod + def characteristic_func(): + """Characteristic function for associated Levy distribution""" + + +class ConditionallyGaussianDriver(LevyDriver): + """Conditional Gaussian Levy noise driver. + + Noise samples are generated according to the Levy State-Space Model by Godsill et al. + """ + + c: np.double = Property(doc="Truncation parameter, expected no. jumps per unit time.") + mu_W: float = Property(default=0.0, doc="Default conditional Gaussian mean") + sigma_W2: float = Property(default=1.0, doc="Default conditional Gaussian variance") + noise_case: NoiseCase = Property( + default=NoiseCase.GAUSSIAN_APPROX, + doc="Cases for compensating residuals from series truncation", + ) + + def _thinning_probabilities(self, jsizes: np.ndarray) -> np.ndarray: + """Thinning probabilities for accept-reject sampling of latent variables.""" + # Accept all + return np.ones_like(jsizes) # (n_jumps, n_samples) + + def _accept_reject( + self, jsizes: np.ndarray, random_state: Optional[Generator] + ) -> np.ndarray: + """Accept reject sampling to thin out sampled latents (jumps). + + Args: + jsizes (np.ndarray): Jump sizes to apply accept-reject sampling + random_state (Generator, optional): Random state to use. Defaults to None. + + Returns: + np.ndarray: _description_ + """ + probabilities = self._thinning_probabilities(jsizes) + u = random_state.uniform(low=0.0, high=1.0, size=probabilities.shape) + jsizes = np.where(u <= probabilities, jsizes, 0) + return jsizes + + def sample_latents( + self, dt: float, num_samples: int, random_state: Optional[Generator] = None + ) -> Tuple[np.ndarray, np.ndarray]: + """Samples the non-linear and possible non-Gaussian latent variables. + + Args: + dt (float): _description_ + num_samples (int): Number of different jump sequences to sample. Each jump sequence + consist of a multiple jumps where the number of jumps depends + on the truncation parameter `self.c` + random_state (Optional[Generator], optional): Random state to use. Defaults to None. + + Returns: + Tuple[np.ndarray, np.ndarray]: A Tuple consisting of the jump sizes and jump times. + """ + if random_state is None: + random_state = self.random_state + # Sample latents pairs + # num_samples = 1 # TODO: constraned num_samples to 1 always. + epochs = random_state.exponential( + scale=1 / dt, size=(int(self.c * dt), num_samples) + ) + epochs = epochs.cumsum(axis=0) + # Accept reject sampling + jsizes = self._hfunc(epochs=epochs) + jsizes = self._accept_reject(jsizes=jsizes, random_state=random_state) + # Generate jump times + jtimes = random_state.uniform(low=0.0, high=dt, size=jsizes.shape) + return jsizes, jtimes + + @abstractmethod + def _hfunc(self, epochs: np.ndarray) -> np.ndarray: + """H function to be used an direct or indirect evaluation of the inverse upper tail + probability of the Levy density. For indirect approaches, accept reject sampling is + as an additional step is needed. + """ + + @abstractmethod + def _centering(self, e_ft: np.ndarray, truncation: float) -> StateVector: + """Compensation term for skewed Levy density. + + Args: + e_ft (np.ndarray): Expectation of Levy stochastic integral over a unit time axis. + truncation (float): Truncation parameter or no. expected + Possion jumps per unit time. + + Returns: + StateVector: Vectorised form of compensation term. + """ + + @abstractmethod + def _jump_power(self, jszies: np.ndarray) -> np.ndarray: + """Raises the latent jump sizes to the desired power . + + Args: + jszies (np.ndarray): Latent jump sizes to raise. + + Returns: + np.ndarray: Latent jump sizes raised to the desired power. + """ + + @abstractmethod + def _first_moment(self, truncation: float) -> float: + """Computes first moment of the underlying subordinator process up to + an upper limit defined by h(c), whereby h is the H function and c represents + the truncation parameter. + + Args: + truncation (float): Truncation parameter which defines the upper limit + of the associated integral. + + + Returns: + float: First moment of subordinator process up to limit h(c). + """ + + @abstractmethod + def _second_moment(self, truncation: float) -> float: + """Computes second moment of the underlying subordinator process up to + an upper limit defined by h(c), whereby h is the H function and c represents + the truncation parameter. + + Args: + truncation (float): Truncation parameter which defines the upper limit + of the associated integral. + + + Returns: + float: Second moment of subordinator process up to limit h(c). + """ + + @abstractmethod + def _residual_covar( + self, e_ft: np.ndarray, truncation: float, mu_W: float, sigma_W2: float + ) -> CovarianceMatrix: + """Calculates the covariance of Gaussian approximate residuals. Residuals + arises from the truncated series representation of the Levy stotchastic + integral. + + Args: + e_ft (np.ndarray): Levy stochastic integral over a unit time axia. + truncation (float): Truncation parameter. + mu_W (float): Gaussian mean of Levy density when conditioned over the + latent variables (jumps). + sigma_W2 (float): Gaussian variance of Levy density when conditioned + over the latent variables (jumps). + + Returns: + CovarianceMatrix: Covariance matrix of the Gaussian approximated + residuals. + """ + + def _residual_mean( + self, e_ft: np.ndarray, truncation: float, mu_W: float + ) -> StateVector: + """Calculates the mean of Gaussian approximate residuals. Residuals + arises from the truncated series representation of the Levy stotchastic + integral. + + Args: + e_ft (np.ndarray): Levy stochastic integral over a unit time axia. + truncation (float): Truncation parameter. + mu_W (float): Gaussian mean of Levy density when conditioned over the + latent variables (jumps). + + Returns: + StateVector: Mean vector of the Gaussian approximated residuals. + """ + if self.noise_case == NoiseCase.TRUNCATED: + m = e_ft.shape[0] + r_mean = np.zeros((m, 1)) + elif ( + self.noise_case == NoiseCase.GAUSSIAN_APPROX + or self.noise_case == NoiseCase.PARTIAL_GAUSSIAN_APPROX + ): + r_mean = e_ft * mu_W # (m, 1) + else: + raise AttributeError("invalid noise case") + return self._first_moment(truncation=truncation) * r_mean # (m, 1) + + def mean( + self, + jsizes: np.array, + jtimes: np.array, + ft_func: Callable[..., np.ndarray], + e_ft_func: Callable[..., np.ndarray], + dt: float, + mu_W: Optional[float] = None, + **kwargs + ) -> Union[StateVector, StateVectors]: + """Computes mean vectors. The number of mean vectors is dependent on the + number of samples in the jump sizes/times. Each jump sequence results in + an unique mean vector. + + Args: + jsizes (np.array): Latents corresponding to jump sizes. + jtimes (np.array): Latents corresponding to jump times. + ft_func (Callable[..., np.ndarray]): The function f consisting of the + state transtion matrix multiplied by the control matrix h as denoted + by Godstill et. al. (2020). + e_ft_func (Callable[..., np.ndarray]): The expectation of ft_func. + dt (float): The time interval. + mu_W (Optional[float], optional): The conditionally Gaussian mean vector. + Defaults to None and the default mu_W specified during initialisation + is used. + + Returns: + Union[StateVector, StateVectors]: The resulting mean vectors. + """ + mu_W = np.atleast_2d(self.mu_W) if mu_W is None else np.atleast_2d(mu_W) + assert jsizes.shape == jtimes.shape + num_samples = jsizes.shape[1] + truncation = self.c * dt + ft = ft_func(dt=dt, jtimes=jtimes) # (n_jumps, n_samples, m, 1) + series = np.sum(jsizes[..., None, None] * ft, axis=0) # (n_samples, m, 1) + m = series * mu_W + + e_ft = e_ft_func(dt=dt) # (m, 1) + residual_mean = self._residual_mean(e_ft=e_ft, mu_W=mu_W, truncation=truncation)[ + None, ... + ] + centering = ( + dt * self._centering(e_ft=e_ft, mu_W=mu_W, truncation=truncation)[None, ...] + ) + mean = m - centering + residual_mean + if num_samples == 1: + return mean[0].view(StateVector) + else: + return mean.view(StateVectors) + + def covar( + self, + jsizes: np.array, + jtimes: np.array, + ft_func: Callable[..., np.ndarray], + e_ft_func: Callable[..., np.ndarray], + dt: float, + mu_W: Optional[float] = None, + sigma_W2: Optional[float] = None, + **kwargs + ) -> Union[CovarianceMatrix, CovarianceMatrices]: + """Computes covariance matrices. The number of covariance matrices is dependent + on the number of samples in the jump sizes/times. Each jump sequence results + in an unique covariance matrix. + + Args: + jsizes (np.array): Latents corresponding to jump sizes. + jtimes (np.array): Latents corresponding to jump times. + ft_func (Callable[..., np.ndarray]): The function f consisting of the + state transtion matrix multiplied by the control matrix h as denoted + by Godstill et. al. (2020). + e_ft_func (Callable[..., np.ndarray]): The expectation of ft_func. + dt (float): The time interval. + mu_W (Optional[float], optional): The conditionally Gaussian mean. + Defaults to None and the default mu_W specified during initialisation + is used. + sigma_W2 (Optional[float], optional): The conditionally Gaussian variance. + Defaults to None and the default sigma_W2 specified during initialisation + is used. + + Returns: + Union[CovarianceMatrix, CovarianceMatrices]: The resulting covariance matrices. + """ + mu_W = np.atleast_2d(self.mu_W) if mu_W is None else np.atleast_2d(mu_W) + sigma_W2 = ( + np.atleast_2d(self.sigma_W2) if sigma_W2 is None else np.atleast_2d(sigma_W2) + ) + + assert jsizes.shape == jtimes.shape + num_samples = jsizes.shape[1] + jsizes = self._jump_power(jsizes) # (n_jumps, n_samples) + truncation = self._hfunc(self.c * dt) + + ft = ft_func(dt=dt, jtimes=jtimes) # (n_jumps, n_samples, m, 1) + ft2 = np.einsum("ijkl, ijml -> ijkm", ft, ft) # (n_jumps, n_samples, m, m) + series = np.sum(jsizes[..., None, None] * ft2, axis=0) # (n_samples, m, m) + s = sigma_W2 * series + + e_ft = e_ft_func(dt=dt) # (m, 1) + residual_cov = self._residual_covar( + e_ft=e_ft, mu_W=mu_W, sigma_W2=sigma_W2, truncation=truncation + ) + covar = s + residual_cov + if num_samples == 1: + return covar[0].view(CovarianceMatrix) # (m, m) + else: + return covar.view(CovarianceMatrices) # (n_samples, m, m) + + def rvs( + self, + mean: StateVector, + covar: CovarianceMatrices, + random_state: Optional[np.random.RandomState] = None, + num_samples: int = 1, + **kwargs + ) -> Union[StateVector, StateVectors]: + """Computes the driving noise term given the mean and covariance matrix specified. + + + Args: + mean (StateVector): The mean vector. + covar (CovarianceMatrices): The covariance matrix. + random_state (Optional[np.random.RandomState], optional): RNG to use. Defaults to None. + num_samples (int, optional): Number of driving noise samples. Defaults to 1. + + Returns: + Union[StateVector, StateVectors]: Driving noise samples. + """ + assert isinstance(mean, StateVector) + assert isinstance(covar, CovarianceMatrix) + if random_state is None: + random_state = self.random_state + noise = random_state.multivariate_normal(mean.flatten(), covar, size=num_samples) + noise = noise.T + if num_samples == 1: + return noise.view(StateVector) + else: + return noise.view(StateVectors) + + +class NormalSigmaMeanDriver(ConditionallyGaussianDriver): + """Implements the class of Normal Sigma Mean (NSM) Levy models.""" + + def _jump_power(self, jsizes: np.ndarray) -> np.ndarray: + return jsizes**2 + + def _residual_covar( + self, e_ft: np.ndarray, truncation: float, mu_W: float, sigma_W2: float, **kwargs + ) -> CovarianceMatrix: + mu_W = mu_W + sigma_W2 = sigma_W2 + if self.noise_case == NoiseCase.TRUNCATED: + m = e_ft.shape[0] + r_cov = np.zeros((m, m)) + elif self.noise_case == NoiseCase.GAUSSIAN_APPROX: + r_cov = ( + e_ft + @ e_ft.T + * self._second_moment(truncation=truncation) + * (mu_W**2 + sigma_W2) + ) + elif self.noise_case == NoiseCase.PARTIAL_GAUSSIAN_APPROX: + r_cov = e_ft @ e_ft.T * self._second_moment(truncation=truncation) * sigma_W2 + else: + raise AttributeError("Invalid noise case.") + return r_cov # (m, m) + + +class NormalVarianceMeanDriver(ConditionallyGaussianDriver): + """Implements the Normal Variance Mean (NVM) Levy models.""" + + def _jump_power(self, jsizes: np.ndarray) -> np.ndarray: + return jsizes + + def _centering(self, e_ft: np.ndarray, truncation: float, mu_W: float) -> StateVector: + m = e_ft.shape[0] + return np.zeros((m, 1)) + + def _residual_covar( + self, e_ft: np.ndarray, truncation: float, mu_W: float, sigma_W2: float, **kwargs + ) -> CovarianceMatrix: + mu_W = mu_W + sigma_W2 = sigma_W2 + if self.noise_case == NoiseCase.TRUNCATED: + m = e_ft.shape[0] + r_cov = np.zeros((m, m)) + elif self.noise_case == NoiseCase.GAUSSIAN_APPROX: + r_cov = ( + e_ft + @ e_ft.T + * ( + self._second_moment(truncation=truncation) * mu_W**2 + + self._first_moment(truncation=truncation) * sigma_W2 + ) + ) + elif self.noise_case == NoiseCase.PARTIAL_GAUSSIAN_APPROX: + r_cov = e_ft @ e_ft.T * self._first_moment(truncation=truncation) * sigma_W2 + else: + raise AttributeError("Invalid noise case.") + return r_cov # (m, m) diff --git a/stonesoup/models/driver.py b/stonesoup/models/driver.py new file mode 100644 index 000000000..43311af15 --- /dev/null +++ b/stonesoup/models/driver.py @@ -0,0 +1,251 @@ +from typing import Callable, Generator, Optional, Union + +import numpy as np +from scipy.special import gamma as gammafnc +from scipy.special import gammainc + +from stonesoup.base import Property +from stonesoup.models.base_driver import ( + LevyDriver, + NormalSigmaMeanDriver, + NormalVarianceMeanDriver, +) +from stonesoup.types.array import ( + CovarianceMatrices, + CovarianceMatrix, + StateVector, + StateVectors, +) + + +def incgammal(s: float, x: float) -> float: # Helper function + return gammainc(s, x) * gammafnc(s) + + +class GaussianDriver(LevyDriver): + """Implements Gaussian noise driver to be used with :class:`~.LevyModel`.""" + + mu_W: float = Property(default=0.0, doc="Default Gaussian mean") + sigma_W2: float = Property(default=1.0, doc="Default Gaussian variance") + + def characteristic_func( + self, mu_W: Optional[float] = None, sigma_W2: Optional[float] = None, **kwargs + ) -> Callable[[float], complex]: + if mu_W is None: + mu_W = self.mu_W + if sigma_W2 is None: + sigma_W2 = self.sigma_W2 + + def inner(w: float): + return np.exp(-1j * w * mu_W - 0.5 * sigma_W2 * w**2) + + return inner + + def mean( + self, + e_ft_func: Callable[..., np.ndarray], + dt: float, + mu_W: Optional[float] = None, + num_samples: int = 1, + **kwargs + ) -> Union[StateVector, StateVectors]: + """Computes mean vectors. The number of mean vectors is dependent on the + number of samples in the jump sizes/times. Each jump sequence results in + an unique mean vector. + + Args: + e_ft_func (Callable[..., np.ndarray]): The expectation of ft_func. + dt (float): The time interval. + mu_W (Optional[float], optional): The conditionally Gaussian mean vector. + Defaults to None and the default mu_W specified during initialisation + is used. + num_samples (int): Number of mean vectors to generate. + + Returns: + Union[StateVector, StateVectors]: The resulting mean vectors. + """ + mu_W = np.atleast_2d(self.mu_W) if mu_W is None else np.atleast_2d(mu_W) + e_ft = e_ft_func(dt=dt) + mean = mu_W * e_ft + if num_samples == 1: + return mean.view(StateVector) + else: + mean = np.tile(mean, (num_samples, 1, 1)) + return mean.view(StateVectors) + + def covar( + self, + e_ft_func: Callable[..., np.ndarray], + dt: float, + sigma_W2: Optional[float] = None, + num_samples: int = 1, + **kwargs + ) -> Union[CovarianceMatrix, CovarianceMatrices]: + """Computes covariance matrices. The number of covariance matrices is dependent + on the number of samples in the jump sizes/times. Each jump sequence results + in an unique covariance matrix. + + Args: + e_ft_func (Callable[..., np.ndarray]): The expectation of ft_func. + dt (float): The time interval. + sigma_W2 (Optional[float], optional): The conditionally Gaussian variance. + Defaults to None and the default sigma_W2 specified during initialisation + is used. + num_samples (int): Number of covariance matrices to generate. + + Returns: + Union[CovarianceMatrix, CovarianceMatrices]: The resulting covariance matrices. + """ + e_ft = e_ft_func(dt=dt) + sigma_W2 = ( + np.atleast_2d(self.sigma_W2) if sigma_W2 is None else np.atleast_2d(sigma_W2) + ) + covar = sigma_W2 * e_ft @ e_ft.T + if num_samples == 1: + return covar.view(CovarianceMatrix) + else: + covar = np.tile(covar, (num_samples, 1, 1)) + return covar.view(CovarianceMatrices) + + def rvs( + self, + mean: StateVector, + covar: CovarianceMatrix, + random_state: Optional[Generator] = None, + num_samples: int = 1, + **kwargs + ) -> Union[StateVector, StateVectors]: + """Computes the driving noise term given the mean and covariance matrix specified. + + + Args: + mean (StateVector): The Gaussian mean vector. + covar (CovarianceMatrices): The Gaussian covariance matrix. + random_state (Optional[np.random.RandomState], optional): RNG to use. Defaults to None. + num_samples (int, optional): Number of driving noise samples. Defaults to 1. + + Returns: + Union[StateVector, StateVectors]: Driving noise samples. + """ + if random_state is None: + random_state = self.random_state + noise = random_state.multivariate_normal(mean.flatten(), covar, size=num_samples) + noise = noise.T + if num_samples == 1: + return noise.view(StateVector) + else: + return noise.view(StateVectors) + + +class AlphaStableNSMDriver(NormalSigmaMeanDriver): + """Implements the Alpha Stable NSM noise driver to be used with :class:`~.LevyModel`.""" + + alpha: float = Property(doc="Alpha parameter.") + + def __init__(self, *args, **kwargs) -> None: + super().__init__(*args, **kwargs) + if ( + np.isclose(self.alpha, 0.0) + or np.isclose(self.alpha, 1.0) + or np.isclose(self.alpha, 2.0) + ): + raise AttributeError("alpha must be 0 < alpha < 1 or 1 < alpha < 2.") + + def _hfunc(self, epochs: np.ndarray) -> np.ndarray: + return np.power(epochs, -1.0 / self.alpha) + + def _first_moment(self, **kwargs) -> float: + return self.alpha / (1.0 - self.alpha) * np.power(self.c, 1.0 - 1.0 / self.alpha) + + def _second_moment(self, **kwargs) -> float: + return self.alpha / (2.0 - self.alpha) * np.power(self.c, 1.0 - 2.0 / self.alpha) + + def _residual_mean( + self, e_ft: np.ndarray, truncation: float, mu_W: float + ) -> StateVector: + if 1 < self.alpha < 2: + m = e_ft.shape[0] + r_mean = np.zeros((m, 1)) + return r_mean + return super()._residual_mean(e_ft=e_ft, mu_W=mu_W, truncation=truncation) + + def _centering(self, e_ft: np.ndarray, truncation: float, mu_W: float) -> StateVector: + if 1 < self.alpha < 2: + term = e_ft * mu_W # (m, 1) + return -self._first_moment(truncation=truncation) * term # (m, 1) + elif 0 < self.alpha < 1: + m = e_ft.shape[0] + return np.zeros((m, 1)) + else: + raise AttributeError("alpha must be 0 < alpha < 2") + + def characteristic_func(self): + # TODO + raise NotImplementedError + + +class GammaNVMDriver(NormalVarianceMeanDriver): + """Implements the Gamma NVM noise driver to be used with :class:`~.LevyModel`.""" + + nu: float = Property(doc="Scale parameter") + beta: float = Property(doc="Shape parameter") + + def _hfunc(self, epochs: np.ndarray) -> np.ndarray: + return 1.0 / (self.beta * (np.exp(epochs / self.nu) - 1.0)) + + def _first_moment(self, truncation: float) -> float: + return (self.nu / self.beta) * incgammal(1.0, self.beta * truncation) + + def _second_moment(self, truncation: float) -> float: + return (self.nu / self.beta**2) * incgammal(2.0, self.beta * truncation) + + def _thinning_probabilities(self, jsizes: np.ndarray) -> np.ndarray: + return (1.0 + self.beta * jsizes) * np.exp( + -self.beta * jsizes + ) # (n_jumps, n_samples) + + def _residual_covar( + self, e_ft: np.ndarray, truncation: float, mu_W: float, sigma_W2: float + ) -> CovarianceMatrix: + m = e_ft.shape[0] + return np.zeros((m, m)) + + def _residual_mean( + self, + e_ft: np.ndarray, + truncation: float, + mu_W: float, + ) -> CovarianceMatrix: + m = e_ft.shape[0] + return np.zeros((m, 1)) + + def characteristic_func(self): + # TODO + raise NotImplementedError + + +class TemperedStableNVMDriver(NormalVarianceMeanDriver): + """Implements the Tempered Stable NVM noise driver to be used with :class:`~.LevyModel`.""" + + alpha: float = Property(doc="Alpha parameter") + beta: float = Property(doc="Shape parameter") + + def _hfunc(self, epochs: np.ndarray) -> np.ndarray: + return np.power(epochs, -1.0 / self.alpha) + + def _first_moment(self, truncation: float) -> float: + return (self.alpha * self.beta ** (self.alpha - 1.0)) * incgammal( + 1.0 - self.alpha, self.beta * truncation + ) + + def _second_moment(self, truncation: float) -> float: + return (self.alpha * self.beta ** (self.alpha - 2.0)) * incgammal( + 2.0 - self.alpha, self.beta * truncation + ) + + def _thinning_probabilities(self, jsizes: np.ndarray) -> np.ndarray: + return np.exp(-self.beta * jsizes) # (n_jumps, n_samples) + + def characteristic_func(self): + # TODO + raise NotImplementedError diff --git a/stonesoup/models/tests/__init__.py b/stonesoup/models/tests/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/stonesoup/models/tests/conftest.py b/stonesoup/models/tests/conftest.py new file mode 100644 index 000000000..2cd1b7351 --- /dev/null +++ b/stonesoup/models/tests/conftest.py @@ -0,0 +1,27 @@ +import numpy as np +import pytest + + +@pytest.fixture +def seed() -> int: + return 0 + + +@pytest.fixture +def mu_W() -> float: + return 1.0 + + +@pytest.fixture +def sigma_W2() -> float: + return 1 + + +@pytest.fixture +def expected_jumps_per_sec() -> int: + return 2 + + +@pytest.fixture +def mock_e_ft_func() -> np.ndarray: + return lambda dt: np.array([[1.0], [2.0]]) diff --git a/stonesoup/models/tests/test_as_nsm.py b/stonesoup/models/tests/test_as_nsm.py new file mode 100644 index 000000000..4acff9519 --- /dev/null +++ b/stonesoup/models/tests/test_as_nsm.py @@ -0,0 +1,186 @@ +import numpy as np +import pytest +from scipy.integrate import quad +from scipy.special import gamma +from scipy.stats import kstest, levy_stable, norm + +from stonesoup.models.base_driver import NoiseCase +from stonesoup.models.driver import AlphaStableNSMDriver +from stonesoup.types.array import ( + CovarianceMatrices, + CovarianceMatrix, + StateVector, + StateVectors, +) + + +@pytest.fixture +def alpha() -> float: + return 1.5 + + +@pytest.fixture( + scope="function", + params=[ + NoiseCase.GAUSSIAN_APPROX, + NoiseCase.PARTIAL_GAUSSIAN_APPROX, + NoiseCase.TRUNCATED, + ], +) +def alpha_stable_nsm_driver( + request: pytest.FixtureRequest, + seed: int, + mu_W: float, + sigma_W2: float, + expected_jumps_per_sec: int, + alpha: float, +): + return AlphaStableNSMDriver( + seed=seed, + mu_W=mu_W, + sigma_W2=sigma_W2, + c=expected_jumps_per_sec, + alpha=alpha, + noise_case=request.param, + ) + + +def test_mean_covar(alpha_stable_nsm_driver: AlphaStableNSMDriver): + def ft(dt: int, jtimes: np.array): + return np.ones_like(jtimes)[..., None, None] + + def e_ft(dt: int): + return dt * np.ones((1, 1)) + + dt = 1 + n_samples = 2 + jsizes, jtimes = alpha_stable_nsm_driver.sample_latents(dt=dt, num_samples=n_samples) + mean = alpha_stable_nsm_driver.mean( + jsizes=jsizes, jtimes=jtimes, e_ft_func=e_ft, ft_func=ft, dt=dt + ) + covar = alpha_stable_nsm_driver.covar( + jsizes=jsizes, jtimes=jtimes, e_ft_func=e_ft, ft_func=ft, dt=dt + ) + expected_mean = StateVectors([[[-1.2177414]], [[-1.80693522]]]) + + assert isinstance(mean, StateVectors) + assert np.allclose(mean, expected_mean) + + assert isinstance(covar, CovarianceMatrices) + if alpha_stable_nsm_driver.noise_case == NoiseCase.GAUSSIAN_APPROX: + expected_covar = CovarianceMatrices([[[8.04448152]], [[6.70822924]]]) + elif alpha_stable_nsm_driver.noise_case == NoiseCase.PARTIAL_GAUSSIAN_APPROX: + expected_covar = CovarianceMatrices([[[5.66337994]], [[4.32712766]]]) + elif alpha_stable_nsm_driver.noise_case == NoiseCase.TRUNCATED: + expected_covar = CovarianceMatrices([[[3.28227836]], [[1.94602608]]]) + else: + raise RuntimeError(f"Noise case not tested {alpha_stable_nsm_driver.noise_case}") + assert np.allclose(covar, expected_covar) + + n_samples = 1 + jsizes, jtimes = alpha_stable_nsm_driver.sample_latents(dt=dt, num_samples=n_samples) + mean = alpha_stable_nsm_driver.mean( + jsizes=jsizes, jtimes=jtimes, e_ft_func=e_ft, ft_func=ft, dt=dt + ) + covar = alpha_stable_nsm_driver.covar( + jsizes=jsizes, jtimes=jtimes, e_ft_func=e_ft, ft_func=ft, dt=dt + ) + assert isinstance(mean, StateVector) + assert isinstance(covar, CovarianceMatrix) + + +def test_sample_latents( + expected_jumps_per_sec, alpha_stable_nsm_driver: AlphaStableNSMDriver +): + # Use a seperate instance other than fixture with c = + dt = 1 + num_samples = 2 + jsizes, jtimes = alpha_stable_nsm_driver.sample_latents( + dt=dt, num_samples=num_samples + ) + expected_jsizes = np.array([[1.29327154, 0.98714497], [1.26875021, 0.98568295]]) + expected_jtimes = np.array([[0.54362499, 0.93507242], [0.81585355, 0.0027385]]) + assert jsizes.shape == jtimes.shape + assert jsizes.shape[0] == expected_jumps_per_sec + assert jsizes.shape[1] == num_samples + assert np.allclose(jsizes, expected_jsizes) + assert np.allclose(jtimes, expected_jtimes) + + +def raw_abs_moment_gaussian(alpha, mu, sigma): + def func(x): + return (np.abs(x) ** alpha) * norm.pdf(x, loc=mu, scale=sigma) + + return quad(func, -6, 6)[0] + + +def signed_raw_abs_moment_gaussian(alpha, mu, sigma): + def func(x): + return (np.sign(x) * np.abs(x) ** alpha) * norm.pdf(x, loc=mu, scale=sigma) + + return quad(func, -6, 6)[0] + + +def alpha_stable_cdf(alpha, mu, sigma2): + # Compute scale + sigma = np.sqrt(sigma2) + raw_moments = raw_abs_moment_gaussian(alpha, mu, sigma) + C_alpha = (1 - alpha) / (gamma(2 - alpha) * np.cos(np.pi * alpha / 2)) + scale = (raw_moments / C_alpha) ** (1 / alpha) + + # Compute skew (beta) + signed_raw_moments = signed_raw_abs_moment_gaussian(alpha, mu, sigma) + raw_moments = raw_abs_moment_gaussian(alpha, mu, sigma) + skew = signed_raw_moments / raw_moments + + dist = levy_stable(alpha=alpha, beta=skew, scale=scale) + return dist.cdf + + +def test_rvs(alpha_stable_nsm_driver: AlphaStableNSMDriver): + """Tests if resulting noise distribution is correct""" + + def ft(dt: int, jtimes: np.array): + return np.ones_like(jtimes)[..., None, None] + + def e_ft(dt: int): + return dt * np.ones((1, 1)) + + alpha_stable_nsm_driver.c = 100 + num_samples = 1000 + dt = 1 + jsizes, jtimes = alpha_stable_nsm_driver.sample_latents( + dt=dt, num_samples=num_samples + ) + assert jtimes.shape == jsizes.shape + assert jtimes.shape == (alpha_stable_nsm_driver.c, num_samples) + rvs_samples = [] + for i in range(num_samples): + a = jsizes[:, i: i + 1] + b = jtimes[:, i: i + 1] + mean = alpha_stable_nsm_driver.mean( + jsizes=a, jtimes=b, e_ft_func=e_ft, ft_func=ft, dt=dt + ) + covar = alpha_stable_nsm_driver.covar( + jsizes=a, jtimes=b, e_ft_func=e_ft, ft_func=ft, dt=dt + ) + noise = alpha_stable_nsm_driver.rvs(mean=mean, covar=covar, num_samples=1) + rvs_samples.append(noise) + rvs_samples = np.array(rvs_samples) + + threshold = 1e-2 + cdf = alpha_stable_cdf( + alpha=alpha_stable_nsm_driver.alpha, + mu=alpha_stable_nsm_driver.mu_W, + sigma2=alpha_stable_nsm_driver.mu_W, + ) + results = kstest(rvs_samples, cdf, N=num_samples) + + if alpha_stable_nsm_driver.noise_case == NoiseCase.GAUSSIAN_APPROX: + assert results.pvalue >= threshold # 99% CI + elif alpha_stable_nsm_driver.noise_case == NoiseCase.PARTIAL_GAUSSIAN_APPROX: + assert results.pvalue >= threshold + elif alpha_stable_nsm_driver.noise_case == NoiseCase.TRUNCATED: + assert results.pvalue < threshold + else: + raise RuntimeError(f"Noise case not tested {alpha_stable_nsm_driver.noise_case}") diff --git a/stonesoup/models/tests/test_gaussian_driver.py b/stonesoup/models/tests/test_gaussian_driver.py new file mode 100644 index 000000000..b25a0a943 --- /dev/null +++ b/stonesoup/models/tests/test_gaussian_driver.py @@ -0,0 +1,81 @@ +from typing import Callable + +import numpy as np +import pytest + +from stonesoup.models.driver import GaussianDriver + + +@pytest.fixture +def std_normal_driver(seed) -> GaussianDriver: + return GaussianDriver(mu_W=0, sigma_W2=1, seed=seed) + + +@pytest.fixture +def gaussian_driver(seed) -> GaussianDriver: + return GaussianDriver(mu_W=1.0, sigma_W2=4.0, seed=seed) + + +def test_seed(std_normal_driver: GaussianDriver, gaussian_driver: GaussianDriver) -> None: + assert std_normal_driver.seed == 0 + assert gaussian_driver.seed == 0 + + +def test_characteristic_function( + std_normal_driver: GaussianDriver, gaussian_driver: GaussianDriver +) -> None: + characteristic_func = std_normal_driver.characteristic_func() + w = 0.0 + result = characteristic_func(w) + expected = np.exp(-0.5 * w**2) + assert pytest.approx(expected) == result + + characteristic_func = gaussian_driver.characteristic_func() + mu = 1.0 + sigma = 2.0 + w = 0.5 + result = characteristic_func(w) + expected = np.exp(-1j * w * mu - 0.5 * sigma**2 * w**2) + assert pytest.approx(expected) == result + + +def test_mean( + std_normal_driver: GaussianDriver, + gaussian_driver: GaussianDriver, + mock_e_ft_func: Callable[..., np.ndarray], +) -> None: + assert std_normal_driver.mu_W == 0.0 + assert gaussian_driver.mu_W == 1.0 + num_samples = 2 + mean = std_normal_driver.mean( + mock_e_ft_func, dt=1, mu_W=std_normal_driver.mu_W, num_samples=num_samples + ) + expected_mean = np.array([[[0.0], [0.0]], [[0.0], [0.0]]]) + assert np.allclose(mean, expected_mean) + + mean = std_normal_driver.mean( + mock_e_ft_func, dt=1, mu_W=gaussian_driver.mu_W, num_samples=num_samples + ) + expected_mean = np.array([[[1.0], [2.0]], [[1.0], [2.0]]]) + assert np.allclose(mean, expected_mean) + + +def test_covar( + std_normal_driver: GaussianDriver, + gaussian_driver: GaussianDriver, + mock_e_ft_func: Callable[..., np.ndarray], +) -> None: + assert std_normal_driver.mu_W == 0.0 + assert gaussian_driver.mu_W == 1.0 + num_samples = 1 + covar = std_normal_driver.covar( + mock_e_ft_func, dt=1, sigma_W2=std_normal_driver.sigma_W2, num_samples=num_samples + ) + expected_covar = np.array([[1.0, 2.0], [2.0, 4.0]]) + assert np.allclose(covar, expected_covar) + + covar = std_normal_driver.covar( + mock_e_ft_func, dt=1, sigma_W2=gaussian_driver.sigma_W2, num_samples=num_samples + ) + expected_covar = np.array([[1.0, 2.0], [2.0, 4.0]]) * 4 + assert np.allclose(covar, expected_covar) diff --git a/stonesoup/models/transition/base.py b/stonesoup/models/transition/base.py index a9a3e1fab..48c0491fe 100644 --- a/stonesoup/models/transition/base.py +++ b/stonesoup/models/transition/base.py @@ -1,13 +1,20 @@ -from abc import abstractmethod import copy -from typing import Sequence +from abc import abstractmethod +from datetime import timedelta +from typing import Iterable, List, Optional, Sequence, Union -from scipy.linalg import block_diag import numpy as np +from scipy.linalg import block_diag -from ..base import Model, GaussianModel -from ...base import Property -from ...types.array import StateVector, StateVectors +from stonesoup.base import Property +from stonesoup.models.base import GaussianModel, Latents, LevyModel, Model +from stonesoup.models.base_driver import ConditionallyGaussianDriver +from stonesoup.types.array import ( + CovarianceMatrices, + CovarianceMatrix, + StateVector, + StateVectors, +) class TransitionModel(Model): @@ -32,6 +39,7 @@ class CombinedGaussianTransitionModel(TransitionModel, GaussianModel): If any of the models are time variant the keyword argument "time_interval" must be supplied to all methods """ + model_list: Sequence[GaussianModel] = Property(doc="List of Transition Models.") def __init__(self, *args, **kwargs): @@ -71,10 +79,12 @@ def function(self, state, noise=False, **kwargs) -> StateVector: else: noise_loop = False for model in self.model_list: - temp_state.state_vector =\ - state.state_vector[ndim_count:model.ndim_state + ndim_count, :] - state_vector[ndim_count:model.ndim_state + ndim_count, :] += \ - model.function(temp_state, noise=noise_loop, **kwargs) + temp_state.state_vector = state.state_vector[ + ndim_count: model.ndim_state + ndim_count, : + ] + state_vector[ndim_count: model.ndim_state + ndim_count, :] += model.function( + temp_state, noise=noise_loop, **kwargs + ) ndim_count += model.ndim_state if isinstance(noise, bool): noise = 0 @@ -98,8 +108,9 @@ def jacobian(self, state, **kwargs): ndim_count = 0 J_list = [] for model in self.model_list: - temp_state.state_vector =\ - state.state_vector[ndim_count:model.ndim_state + ndim_count, :] + temp_state.state_vector = state.state_vector[ + ndim_count: model.ndim_state + ndim_count, : + ] J_list.append(model.jacobian(temp_state, **kwargs)) ndim_count += model.ndim_state @@ -129,3 +140,164 @@ def covar(self, **kwargs): covar_list = [model.covar(**kwargs) for model in self.model_list] return block_diag(*covar_list) + + +class CombinedLevyTransitionModel(TransitionModel, LevyModel): + r"""Combine multiple models into a single model by stacking them. + + The assumption is that all models are Gaussian. + Time Variant, and Time Invariant models can be combined together. + If any of the models are time variant the keyword argument "time_interval" + must be supplied to all methods + """ + + model_list: Sequence[GaussianModel] = Property(doc="List of Transition Models.") + + def __init__(self, *args, **kwargs): + super().__init__(*args, **kwargs) + assert len(self.model_list) != 0 + + def _integrand(self, dt: float, jtimes: np.ndarray): + return NotImplementedError + + @property + def driver(self) -> List[Iterable]: + return [model.driver for model in self.model_list] + + @property + def mu_W(self): + mu = [m.mu_W if m.mu_W is not None else m.driver.mu_W for m in self.model_list] + return np.atleast_2d(mu).T + + @property + def sigma_W2(self): + sigma2 = [ + m.sigma_W2 if m.sigma_W2 is not None else m.driver.sigma_W2 + for m in self.model_list + ] + return np.diag(sigma2) + + @property + def ndim_state(self): + """ndim_state getter method + + Returns + ------- + : :class:`int` + The number of combined model state dimensions. + """ + return sum(model.ndim_state for model in self.model_list) + + def mean(self, **kwargs) -> Union[StateVector, StateVectors]: + """Returns the transition model noise mean vector. + + Returns + ------- + : :class:`stonesoup.types.state.StateVector` of shape\ + (:py:attr:`~ndim_state`, 1) + The process noise mean. + """ + mean_list = [model.mean(**kwargs) for _, model in enumerate(self.model_list)] + if len(mean_list[0].shape) == 2: + return np.vstack(mean_list).view(StateVector) + else: + return np.concatenate(mean_list, axis=1).view(StateVectors) + + def covar(self, **kwargs) -> Union[CovarianceMatrix, CovarianceMatrices]: + """Returns the transition model noise covariance matrix. + + Returns + ------- + : :class:`stonesoup.types.state.CovarianceMatrix` of shape\ + (:py:attr:`~ndim_state`, :py:attr:`~ndim_state`) + The process noise covariance. + """ + + covar_list = [model.covar(**kwargs) for _, model in enumerate(self.model_list)] + if len(covar_list[0].shape) == 2: + return block_diag(*covar_list).view(CovarianceMatrix) + else: + N = covar_list[0].shape[0] + ret = [] + for n in range(N): + tmp = [] + for tensor in covar_list: # D + tmp.append(tensor[n]) + ret.append(block_diag(*tmp)) + return np.array(ret).view(CovarianceMatrices) + + def function( + self, state, time_interval: timedelta, noise=False, **kwargs + ) -> StateVector: + """Applies each transition model in :py:attr:`~model_list` in turn to the state's + corresponding state vector components. + For example, in a 3D state space, with :py:attr:`~model_list` = [modelA(ndim_state=2), + modelB(ndim_state=1)], this would apply modelA to the state vector's 1st and 2nd elements, + then modelB to the remaining 3rd element. + + Parameters + ---------- + state : :class:`stonesoup.state.State` + The state to be transitioned according to the models in :py:attr:`~model_list`. + time_interval : :class:`timestamp.timedelta` + The time interval between two observations. + noise : :class:`bool` + + + Returns + ------- + state_vector: :class:`stonesoup.types.array.StateVector` + of shape (:py:attr:`~ndim_state, 1`). The resultant state vector of the transition. + """ + + temp_state = copy.copy(state) + ndim_count = 0 + if state.state_vector.shape[1] == 1: + state_vector = np.zeros(state.state_vector.shape).view(StateVector) + else: + state_vector = np.zeros(state.state_vector.shape).view(StateVectors) + # To handle explicit noise vector(s) passed in we set the noise for the individual models + # to False and add the noise later. When noise is Boolean, we just pass in that value. + if noise is None: + noise = False + if isinstance(noise, bool): + noise_loop = noise + else: + noise_loop = False + latents = self.sample_latents(time_interval=time_interval, num_samples=1) + for model in self.model_list: + temp_state.state_vector = state.state_vector[ + ndim_count: model.ndim_state + ndim_count, : + ] + state_vector[ndim_count: model.ndim_state + ndim_count, :] += model.function( + state=temp_state, + latents=latents, + time_interval=time_interval, + noise=noise_loop, + **kwargs + ) + ndim_count += model.ndim_state + + if isinstance(noise, bool): + noise = 0 + return state_vector + noise + + def sample_latents( + self, + time_interval: timedelta, + num_samples: int, + random_state: Optional[np.random.RandomState] = None, + ) -> Latents: + dt = time_interval.total_seconds() + latents = Latents(num_samples=num_samples) + for m in self.model_list: + if ( + m.driver + and isinstance(m.driver, ConditionallyGaussianDriver) + and not latents.exists(m.driver) + ): + jsizes, jtimes = m.driver.sample_latents( + dt=dt, num_samples=num_samples, random_state=random_state + ) + latents.add(driver=m.driver, jsizes=jsizes, jtimes=jtimes) + return latents diff --git a/stonesoup/models/transition/levy_linear.py b/stonesoup/models/transition/levy_linear.py new file mode 100644 index 000000000..932dfff25 --- /dev/null +++ b/stonesoup/models/transition/levy_linear.py @@ -0,0 +1,250 @@ +from datetime import timedelta +from typing import Optional, Union + +import numpy as np +from numpy.random import RandomState +from scipy.linalg import block_diag, expm + +from stonesoup.base import Property +from stonesoup.models.base import ( + LinearModel, + TimeVariantModel, + Latents, + LevyModel, +) +from stonesoup.models.transition.base import CombinedLevyTransitionModel, TransitionModel +from stonesoup.types.array import StateVector, StateVectors + + +class LinearLevyTransitionModel(TransitionModel, LinearModel, LevyModel): + + @property + def ndim_state(self): + """ndim_state getter method + + Returns + ------- + : :class:`int` + The number of model state dimensions. + """ + + return self.matrix().shape[0] + + +class CombinedLinearLevyTransitionModel(CombinedLevyTransitionModel, LinearModel): + r"""Combine multiple models into a single model by stacking them. + + The assumption is that all models are Linear and Levy. + Time Variant, and Time Invariant models can be combined together. + If any of the models are time variant the keyword argument "time_interval" + must be supplied to all methods + """ + + def matrix(self, **kwargs): + """Model matrix :math:`F` + + Returns + ------- + : :class:`numpy.ndarray` of shape\ + (:py:attr:`~ndim_state`, :py:attr:`~ndim_state`) + """ + + transition_matrices = [model.matrix(**kwargs) for model in self.model_list] + return block_diag(*transition_matrices) + + +class LevyConstantNthDerivative(LinearLevyTransitionModel, TimeVariantModel): + r"""Identical model to :class:`~.ConstantNthDerivative`, but driving white noise + is now replaced with a Levy driving process. + """ + + constant_derivative: int = Property( + doc=( + "The order of the derivative with respect to time to be kept constant, eg if 2 " + "identical to constant acceleration" + ) + ) + + noise_diff_coeff: Optional[float] = Property( + default=1.0, doc="The acceleration noise diffusion coefficient :math:`q`" + ) + + def __init__(self, *args, **kwargs): + super().__init__(*args, **kwargs) + self.A = self._init_A() + self.h = self._init_h() + + def _init_A(self): + N = self.constant_derivative + mat = np.zeros((N + 1, N + 1)) + for i in range(N): + mat[i, i + 1] = 1 + return mat + + def _init_h(self): + N = self.constant_derivative + mat = np.zeros((N + 1, 1)) + mat[N, 0] = 1 + return mat + + @property + def ndim_state(self): + return self.constant_derivative + 1 + + def _integrand(self, dt: float, jtimes: np.ndarray) -> np.ndarray: + delta = dt - jtimes[..., np.newaxis, np.newaxis] + return expm(self.A * delta) @ self.h + + def matrix(self, time_interval, **kwargs): + dt = time_interval.total_seconds() + return expm(self.A * dt) + + def rvs( + self, + latents: Optional[Latents] = None, + num_samples: int = 1, + random_state: RandomState = None, + **kwargs + ) -> Union[StateVector, StateVectors]: + coeff = self.noise_diff_coeff + return super().rvs(latents, num_samples, random_state, **kwargs) * coeff + + +class LevyRandomWalk(LevyConstantNthDerivative): + r"""This is a class implementation of a discrete, time-variant 1D + Linear-Levy Random Walk Transition Model. + + The target is assumed to be (almost) stationary, where + target velocity is modelled as white noise. + """ + + @property + def constant_derivative(self): + """For random walk, this is 0.""" + return 0 + + +class LevyConstantVelocity(LevyConstantNthDerivative): + r"""This is a class implementation of a discrete, time-variant 1D + Linear-Levy Constant Velocity Transition Model. + + The target is assumed to move with (nearly) constant velocity, where + target acceleration is modelled as Levy noise. + """ + + @property + def constant_derivative(self): + """For constant velocity, this is 1.""" + return 1 + + +class LevyConstantAcceleration(LevyConstantNthDerivative): + r"""This is a class implementation of a discrete, time-variant 1D Constant + Acceleration Transition Model. + + The target acceleration is modeled as a Levy noise random process. + """ + + @property + def constant_derivative(self): + """For constant acceleration, this is 2.""" + return 2 + + +class LevyNthDerivativeDecay(LinearLevyTransitionModel, TimeVariantModel): + r"""Identical model to :class:`~.NthDerivativeDecay`, but driving white noise is + now replaced with a Levy driving process. + """ + + decay_derivative: int = Property( + doc="The derivative with respect to time to decay exponentially, eg if 2 identical to " + "singer" + ) + noise_diff_coeff: Optional[float] = Property( + default=1.0, doc="The acceleration noise diffusion coefficient :math:`q`" + ) + damping_coeff: float = Property( + doc="The Nth derivative damping coefficient :math:`K`" + ) + + def __init__(self, *args, **kwargs): + super().__init__(*args, **kwargs) + self.A = self._init_A() + self.h = self._init_h() + + def _integrand(self, dt: float, jtimes: np.ndarray) -> np.ndarray: + delta = dt - jtimes[..., np.newaxis, np.newaxis] + return expm(self.A * delta) @ self.h + + def _init_A(self): + N = self.decay_derivative + mat = np.zeros((N + 1, N + 1)) + for i in range(N): + mat[i, i + 1] = 1 + mat[N, N] = -self.damping_coeff + return mat + + def _init_h(self): + N = self.decay_derivative + mat = np.zeros((N + 1, 1)) + mat[N, 0] = 1 + return mat + + @property + def ndim_state(self): + return self.decay_derivative + 1 + + def matrix(self, time_interval: timedelta, **kwargs): + dt = time_interval.total_seconds() + return expm(self.A * dt) + + def rvs( + self, + latents: Optional[Latents] = None, + num_samples: int = 1, + random_state: RandomState = None, + **kwargs + ) -> Union[StateVector, StateVectors]: + coeff = self.noise_diff_coeff + return super().rvs(latents, num_samples, random_state, **kwargs) * coeff + + +class LevyLangevin(LevyNthDerivativeDecay): + r"""This is a class implementation of a discrete, time-variant 1D + Linear-Levy Ornstein Uhlenbeck Transition Model. + + The target is assumed to move with (nearly) constant velocity, which + exponentially decays to zero over time, and target acceleration is + modeled as a Levy process. + """ + + damping_coeff: float = Property(doc="The velocity damping coefficient :math:`K`") + + @property + def decay_derivative(self): + return 1 + + def matrix(self, time_interval: timedelta, **kwargs) -> np.ndarray: + """Closed form known, faster than matrix exponentiation""" + dt = time_interval.total_seconds() + eA0 = np.array([[0, 1.0 / -self.damping_coeff], [0.0, 1.0]]) + eA1 = np.array([[1, 1.0 / self.damping_coeff], [0.0, 0.0]]) + eAdt = np.exp(-self.damping_coeff * dt) * eA0 + eA1 + # exp_A_delta_t + return eAdt # (m, m) + + def _integrand(self, dt: float, jtimes: np.ndarray) -> np.ndarray: + v1 = np.array([[1.0 / -self.damping_coeff], [1.0]]) # (m, 1) + v2 = np.array([[1.0 / self.damping_coeff], [0.0]]) + term1 = np.exp(-self.damping_coeff * (dt - jtimes))[ + ..., None, None + ] # (n_jumps, n_samples, 1, 1) + term2 = np.ones_like(jtimes)[..., None, None] + return term1 * v1 + term2 * v2 # (n_jumps, n_samples, m, 1) + + def _integral(self, dt: float) -> np.ndarray: + v1 = np.array([[1.0 / -self.damping_coeff], [1.0]]) + v2 = np.array([[1.0 / self.damping_coeff], [0.0]]) + term1 = (np.exp(-self.damping_coeff * dt) - 1.0) / (-self.damping_coeff) * v1 + term2 = dt * v2 + return term1 + term2 # (m, 1) diff --git a/stonesoup/models/transition/tests/test_levy_linear.py b/stonesoup/models/transition/tests/test_levy_linear.py new file mode 100644 index 000000000..80b316971 --- /dev/null +++ b/stonesoup/models/transition/tests/test_levy_linear.py @@ -0,0 +1,193 @@ +import pytest +from stonesoup.models.transition.levy_linear import ( + LevyLangevin, + LevyRandomWalk, + LevyConstantAcceleration, + LevyConstantVelocity, +) +from stonesoup.models.driver import GaussianDriver, AlphaStableNSMDriver +from stonesoup.models.base_driver import NoiseCase, ConditionallyGaussianDriver +import numpy as np +from datetime import datetime, timedelta, timezone +from stonesoup.types.groundtruth import GroundTruthPath, GroundTruthState +from stonesoup.types.array import StateVector +from typing import Union, List +from stonesoup.models.base import LevyModel + + +@pytest.fixture +def seed() -> int: + return 0 + + +@pytest.fixture(scope="function") +def gaussian_driver(seed): + mu_W = 1 + sigma_W2 = 1 + return GaussianDriver(mu_W=mu_W, sigma_W2=sigma_W2, seed=seed) + + +@pytest.fixture +def conditionally_gaussian_driver(seed): + mu_W = 1 + sigma_W2 = 1 + alpha = 1.5 + c = 10 + noise_case = NoiseCase.GAUSSIAN_APPROX + return AlphaStableNSMDriver( + mu_W=mu_W, sigma_W2=sigma_W2, seed=seed, alpha=alpha, c=c, noise_case=noise_case + ) + + +def compare_transition(model: LevyModel, expected_state_vectors: List[StateVector]): + start_time = datetime.now(tz=timezone.utc).replace(microsecond=0) + timesteps = [start_time] + truth = GroundTruthPath([GroundTruthState(expected_state_vectors[0], timestamp=timesteps[0])]) + + num_steps = 3 + for k in range(1, num_steps + 1): + timesteps.append( + start_time + timedelta(seconds=k) + ) # add next timestep to list of timesteps + truth.append( + GroundTruthState( + model.function(truth[k - 1], noise=True, time_interval=timedelta(seconds=1)), + timestamp=timesteps[k], + ) + ) + + for result, expected in zip(truth, expected_state_vectors): + assert np.allclose(result.state_vector, expected) + + +@pytest.mark.parametrize( + "driver_type, expected_state_vectors", + [ + ( + "gaussian_driver", + [ + StateVector([[0]]), + StateVector([[1.12573022]]), + StateVector([[1.99362536]]), + StateVector([[3.63404801]]), + ], + ), + ( + "conditionally_gaussian_driver", + [ + StateVector([[0]]), + StateVector([[-3.60737453]]), + StateVector([[-7.18177547]]), + StateVector([[-5.37123095]]), + ], + ), + ], +) +def test_levy_random_walk( + request: pytest.FixtureRequest, + driver_type: str, + expected_state_vectors: np.ndarray, +): + driver = request.getfixturevalue(driver_type) + transition_model = LevyRandomWalk(driver=driver) + compare_transition(transition_model, expected_state_vectors) + + +@pytest.mark.parametrize( + "driver_type, expected_state_vectors", + [ + ( + "gaussian_driver", + [ + StateVector([[0], [1]]), + StateVector([[1.31573952], [1.61112188]]), + StateVector([[2.94435263], [1.6449766]]), + StateVector([[5.15437707], [2.73864109]]), + ], + ), + ( + "conditionally_gaussian_driver", + [ + StateVector([[0], [1]]), + StateVector([[2.23700493], [2.13560881]]), + StateVector([[2.52813741], [-4.1708489]]), + StateVector([[-0.4501127], [-1.11469671]]), + ], + ), + ], +) +def test_levy_langevin( + request: pytest.FixtureRequest, + driver_type: Union[GaussianDriver, ConditionallyGaussianDriver], + expected_state_vectors: np.ndarray, +): + driver = request.getfixturevalue(driver_type) + theta = 0.2 + transition_model = LevyLangevin(damping_coeff=theta, driver=driver) + compare_transition(transition_model, expected_state_vectors) + + +@pytest.mark.parametrize( + "driver_type, expected_state_vectors", + [ + ( + "gaussian_driver", + [ + StateVector([[0], [1]]), + StateVector([[1.43713489], [1.87426978]]), + StateVector([[3.49119334], [2.23384713]]), + StateVector([[6.49287515], [3.7695165]]), + ], + ), + ( + "conditionally_gaussian_driver", + [ + StateVector([[0], [1]]), + StateVector([[2.47577405], [2.5539618]]), + StateVector([[3.21401715], [-3.7669634]]), + StateVector([[0.25101422], [-1.38010576]]), + ], + ), + ], +) +def test_levy_constant_velocity( + request: pytest.FixtureRequest, + driver_type: Union[GaussianDriver, ConditionallyGaussianDriver], + expected_state_vectors: np.ndarray, +): + driver = request.getfixturevalue(driver_type) + transition_model = LevyConstantVelocity(driver=driver) + compare_transition(transition_model, expected_state_vectors) + + +@pytest.mark.parametrize( + "driver_type, expected_state_vectors", + [ + ( + "gaussian_driver", + [ + StateVector([[0], [0], [1]]), + StateVector([[0.64571163], [1.43713489], [1.87426978]]), + StateVector([[3.16916472], [3.75895461], [2.76936966]]), + StateVector([[8.26213749], [6.37632425], [2.46536962]]), + ], + ), + ( + "conditionally_gaussian_driver", + [ + StateVector([[0], [0], [1]]), + StateVector([[1.26124374], [2.47364757], [2.53440017]]), + StateVector([[5.30179643], [5.30065052], [1.20670201]]), + StateVector([[10.28136979], [4.36069052], [0.81064657]]), + ], + ), + ], +) +def test_levy_constant_acceleration( + request: pytest.FixtureRequest, + driver_type: Union[GaussianDriver, ConditionallyGaussianDriver], + expected_state_vectors: np.ndarray, +): + driver = request.getfixturevalue(driver_type) + transition_model = LevyConstantAcceleration(driver=driver) + compare_transition(transition_model, expected_state_vectors) diff --git a/stonesoup/predictor/particle.py b/stonesoup/predictor/particle.py index 2c31208b1..ce9209a04 100644 --- a/stonesoup/predictor/particle.py +++ b/stonesoup/predictor/particle.py @@ -525,3 +525,50 @@ def predict(self, prior, timestamp=None, **kwargs): transition_model=self.transition_model) return prediction + + +class MarginalisedParticlePredictor(ParticlePredictor): + """Implementation of the Marginalised Particle Filter (MPF) predictor. + + The MPF which partitions the state into: + 1. linear Gaussian + 2. non-linear and possibly non-Gaussian + + It uses a Kalman filter to handle the linear Gaussian states, and + a traditional particle filter to infer the non-linear states. + + See Cappe, Godsill, Moulines (2007) for more details. + """ + + def __init__(self, *args, **kwargs): + super().__init__(*args, **kwargs) + + def predict(self, prior, timestamp=None, **kwargs): + try: + time_interval = timestamp - prior.timestamp + except TypeError: + time_interval = None + + num_samples = len(prior) + model = self.transition_model + + latents = model.sample_latents(time_interval=time_interval, num_samples=num_samples) + process_mean = model.mean(latents=latents, time_interval=time_interval)[:, :, 0] # (N, M) + process_covar = model.covar(latents=latents, time_interval=time_interval) + process_mean = process_mean.T # (M, N) + process_covar = process_covar.T # (M, M, N) + + F = model.matrix(time_interval=time_interval, **kwargs) # (M, M) + new_state_vector = F @ prior.state_vector + process_mean + tmp = np.einsum("jik, li-> jlk", prior.covariance, F) # () + new_covariance = np.einsum("ij, jlk->ilk", F, tmp) + process_covar + + ret = Prediction.from_state( + prior, + parent=prior, + state_vector=new_state_vector, + covariance=new_covariance, + timestamp=timestamp, + transition_model=self.transition_model, + ) + return ret diff --git a/stonesoup/predictor/tests/test_mpf_predictor.py b/stonesoup/predictor/tests/test_mpf_predictor.py new file mode 100644 index 000000000..f0301f51c --- /dev/null +++ b/stonesoup/predictor/tests/test_mpf_predictor.py @@ -0,0 +1,293 @@ +from datetime import timezone, datetime, timedelta + +import numpy as np +import pytest + +from stonesoup.models.base_driver import NoiseCase +from stonesoup.models.driver import AlphaStableNSMDriver, GaussianDriver +from stonesoup.models.transition.levy_linear import ( + CombinedLinearLevyTransitionModel, + LevyLangevin, + LevyRandomWalk, +) +from stonesoup.predictor.particle import MarginalisedParticlePredictor +from stonesoup.types.array import CovarianceMatrices, StateVectors +from stonesoup.types.numeric import Probability +from stonesoup.types.prediction import ( + MarginalisedParticleState, + MarginalisedParticleStatePrediction, +) + + +@pytest.fixture +def seed() -> int: + return 0 + + +@pytest.fixture(scope="function") +def alpha_stable_langevin_model_2d(seed): + mu_W = 1 + sigma_W2 = 25 + alpha = 1.4 + c = 10 + noise_case = NoiseCase.GAUSSIAN_APPROX + driver = AlphaStableNSMDriver( + mu_W=mu_W, sigma_W2=sigma_W2, seed=seed, alpha=alpha, c=c, noise_case=noise_case + ) + theta = 0.2 + transition_model_x = LevyLangevin(damping_coeff=theta, driver=driver) + transition_model_y = LevyLangevin(damping_coeff=theta, driver=driver) + return CombinedLinearLevyTransitionModel( + model_list=[transition_model_x, transition_model_y] + ) + + +@pytest.fixture(scope="function") +def gaussian_random_walk_model_1d(seed): + mu_W = 1 + sigma_W2 = 25 + driver = GaussianDriver(mu_W=mu_W, sigma_W2=sigma_W2, seed=seed) + transition_model = LevyRandomWalk(driver=driver) + return CombinedLinearLevyTransitionModel(model_list=[transition_model]) + + +@pytest.mark.parametrize( + "model_type, prior, expected_state, expected_covariance", + [ + ( + "alpha_stable_langevin_model_2d", + MarginalisedParticleState( + state_vector=StateVectors( + [ + [-0.62665397, -0.88888401, 0.26237158, 0.77061426, -0.22988814], + [-0.04712397, 1.50397703, 0.67290205, 0.83930833, 2.23985039], + [1.89306702, 1.4276931, 0.76814597, 0.53354089, 0.60444506], + [1.36980183, 0.10191372, -0.03253117, 1.40181662, -0.82502972], + ] + ), + covariance=np.array( + [ + [ + [100.0, 100.0, 100.0, 100.0, 100.0], + [0.0, 0.0, 0.0, 0.0, 0.0], + [0.0, 0.0, 0.0, 0.0, 0.0], + [0.0, 0.0, 0.0, 0.0, 0.0], + ], + [ + [0.0, 0.0, 0.0, 0.0, 0.0], + [100.0, 100.0, 100.0, 100.0, 100.0], + [0.0, 0.0, 0.0, 0.0, 0.0], + [0.0, 0.0, 0.0, 0.0, 0.0], + ], + [ + [0.0, 0.0, 0.0, 0.0, 0.0], + [0.0, 0.0, 0.0, 0.0, 0.0], + [100.0, 100.0, 100.0, 100.0, 100.0], + [0.0, 0.0, 0.0, 0.0, 0.0], + ], + [ + [0.0, 0.0, 0.0, 0.0, 0.0], + [0.0, 0.0, 0.0, 0.0, 0.0], + [0.0, 0.0, 0.0, 0.0, 0.0], + [100.0, 100.0, 100.0, 100.0, 100.0], + ], + ] + ), + timestamp=None, + weight=np.array( + [ + Probability(0.2), + Probability(0.2), + Probability(0.2), + Probability(0.2), + Probability(0.2), + ], + dtype=object, + ), + parent=None, + particle_list=None, + fixed_covar=None, + ), + StateVectors( + [ + [-2.86082137, -0.60835831, 10.62216111, 33.9105639, 0.21075109], + [-2.58249062, -1.17297794, 11.94039787, 67.75687784, -1.38847372], + [0.94312498, 0.43746399, 10.48856876, 34.1833178, -1.73275825], + [-1.42240989, -2.32089029, 11.362838, 68.21742068, -3.89778532], + ] + ), + CovarianceMatrices( + [ + [ + [ + 1.91274417e02, + 2.08692049e02, + 3.35442281e03, + 2.92355908e04, + 2.05968615e02, + ], + [ + 9.61730879e01, + 1.07339161e02, + 4.09684328e03, + 6.02232891e04, + 1.14768907e02, + ], + [ + 0.00000000e00, + 0.00000000e00, + 0.00000000e00, + 0.00000000e00, + 0.00000000e00, + ], + [ + 0.00000000e00, + 0.00000000e00, + 0.00000000e00, + 0.00000000e00, + 0.00000000e00, + ], + ], + [ + [ + 9.61730879e01, + 1.07339161e02, + 4.09684328e03, + 6.02232891e04, + 1.14768907e02, + ], + [ + 1.43071683e02, + 1.35040268e02, + 5.18703675e03, + 1.24597076e05, + 1.39323271e02, + ], + [ + 0.00000000e00, + 0.00000000e00, + 0.00000000e00, + 0.00000000e00, + 0.00000000e00, + ], + [ + 0.00000000e00, + 0.00000000e00, + 0.00000000e00, + 0.00000000e00, + 0.00000000e00, + ], + ], + [ + [ + 0.00000000e00, + 0.00000000e00, + 0.00000000e00, + 0.00000000e00, + 0.00000000e00, + ], + [ + 0.00000000e00, + 0.00000000e00, + 0.00000000e00, + 0.00000000e00, + 0.00000000e00, + ], + [ + 1.91274417e02, + 2.08692049e02, + 3.35442281e03, + 2.92355908e04, + 2.05968615e02, + ], + [ + 9.61730879e01, + 1.07339161e02, + 4.09684328e03, + 6.02232891e04, + 1.14768907e02, + ], + ], + [ + [ + 0.00000000e00, + 0.00000000e00, + 0.00000000e00, + 0.00000000e00, + 0.00000000e00, + ], + [ + 0.00000000e00, + 0.00000000e00, + 0.00000000e00, + 0.00000000e00, + 0.00000000e00, + ], + [ + 9.61730879e01, + 1.07339161e02, + 4.09684328e03, + 6.02232891e04, + 1.14768907e02, + ], + [ + 1.43071683e02, + 1.35040268e02, + 5.18703675e03, + 1.24597076e05, + 1.39323271e02, + ], + ], + ] + ), + ), + ( + "gaussian_random_walk_model_1d", + MarginalisedParticleState( + state_vector=StateVectors( + StateVectors( + [[0.85778795, 0.81499289, -1.55124413, -1.34084576, 0.3580332]] + ) + ), + covariance=np.array([[[100.0, 100.0, 100.0, 100.0, 100.0]]]), + timestamp=None, + weight=np.array( + [ + Probability(0.2), + Probability(0.2), + Probability(0.2), + Probability(0.2), + Probability(0.2), + ], + dtype=object, + ), + parent=None, + particle_list=None, + fixed_covar=None, + ), + StateVectors([[1.85778795, 1.81499289, -0.55124413, -0.34084576, 1.3580332]]), + CovarianceMatrices([[[125.0, 125.0, 125.0, 125.0, 125.0]]]), + ), + ], +) +def test_marginalised_particle_filter_predict( + request: pytest.FixtureRequest, + model_type: str, + prior: MarginalisedParticleState, + expected_state: StateVectors, + expected_covariance: CovarianceMatrices, +): + start_time = datetime.now(tz=timezone.utc).replace(microsecond=0) + prior.timestamp = start_time + model = request.getfixturevalue(model_type) + predictor = MarginalisedParticlePredictor(transition_model=model) + prediction = predictor.predict(prior, timestamp=start_time + timedelta(seconds=1)) + assert isinstance(prediction, MarginalisedParticleStatePrediction) + assert prediction.parent == prior + assert prediction.particle_list is None + assert prediction.particle_list == prior.particle_list + assert prediction.fixed_covar is None + assert prediction.fixed_covar == prior.fixed_covar + assert np.all(prediction.weight == prior.weight) + assert np.allclose(prediction.state_vector, expected_state) + assert np.allclose(prediction.covariance, expected_covariance) diff --git a/stonesoup/types/array.py b/stonesoup/types/array.py index b341e19c4..8960fe9d3 100644 --- a/stonesoup/types/array.py +++ b/stonesoup/types/array.py @@ -49,6 +49,25 @@ def __array_ufunc__(self, ufunc, method, *inputs, **kwargs): return self._cast(result) +class Tensor(Matrix): + """Tensor wrapper for :class:`numpy.ndarray` + + This class returns a view to a :class:`numpy.ndarray` It's called same as + to :func:`numpy.asarray`. + """ + + @classmethod + def _cast(cls, val): + # This tries to cast the result as either a CovarianceMatrix or Tensor type if applicable. + if isinstance(val, np.ndarray): + if val.ndim == 3 and val.shape[2] == 1: + return val.view(CovarianceMatrix) + else: + return val.view(Tensor) + else: + return val + + class StateVector(Matrix): r"""State vector wrapper for :class:`numpy.ndarray` @@ -244,6 +263,90 @@ def __new__(cls, *args, **kwargs): return array.view(cls) +class CovarianceMatrices(Tensor): + """Wrapper for :class:`numpy.ndarray for multiple Covariance Matrices` + + This class returns a view to a :class:`numpy.ndarray` that is in shape + (num_dimensions, num_dimensions, num_components), customising some numpy functions to ensure + custom types are handled correctly. This can be initialised by a sequence + type (list, tuple; not array) that contains :class:`CovarianceMatrix`, otherwise + it's called same as :func:`numpy.asarray`. + """ + + def __new__(cls, covariances, *args, **kwargs): + if isinstance(covariances, Sequence) and not isinstance(covariances, np.ndarray): + if isinstance(covariances[0], CovarianceMatrix): + return np.asarray(covariances).T.view(cls) + array = np.asarray(covariances, *args, **kwargs) + if array.shape[0] == 1: + return array.view(CovarianceMatrix) + return array.view(cls) + + def __iter__(self): + covariancem_gen = super(CovarianceMatrices, self.T).__iter__() + for covariancematrix in covariancem_gen: + yield CovarianceMatrix(covariancematrix) + + @classmethod + def _cast(cls, val): + out = super()._cast(val) + if isinstance(out, Tensor) and out.ndim == 3: + return out.view(CovarianceMatrices) + else: + return out + + @staticmethod + def _mean(covariance_matrices, axis=None, dtype=None, out=None, keepdims=np._NoValue): + if covariance_matrices.dtype != np.object_: + # Can just use standard numpy mean if not using custom objects + return np.mean(np.asarray(covariance_matrices), axis, dtype, out, keepdims) + elif axis == 2 and out is None: + covariance_matrices = np.average(covariance_matrices, axis) + if dtype: + return covariance_matrices.astype(dtype) + else: + return covariance_matrices + else: + return NotImplemented + + @staticmethod + def _average(covariance_matrices, axis=None, weights=None, returned=False): + if covariance_matrices.dtype != np.object_: + # Can just use standard numpy averaging if not using custom objects + covariance_matrix = np.average( + np.asarray(covariance_matrices), axis=axis, weights=weights + ) + # Convert type as may have type of weights + covariance_matrix = CovarianceMatrix(covariance_matrix.astype(np.float64, copy=False)) + elif axis == 1: # Need to handle special cases of averaging potentially + dims = covariance_matrices.shape[0] + covariance_matrix = CovarianceMatrix( + np.empty((dims, dims, 1), dtype=covariance_matrices.dtype) + ) + for dim, row in enumerate(np.asarray(covariance_matrices)): + for dim_inner, col in enumerate(row): + type_ = type(col[0]) # Assume all the same type + if hasattr(type_, "average"): + # Check if type has custom average method + covariance_matrix[dim, dim_inner, 0] = type_.average(col, weights=weights) + else: + # Else use numpy built in, converting to float array + covariance_matrix[dim, dim_inner, 0] = type_( + np.average(np.asarray(col, dtype=np.float64), weights=weights) + ) + else: + return NotImplemented + + if returned: + return covariance_matrix, np.sum(weights) + else: + return covariance_matrix + + @staticmethod + def _cov(*args, **kwargs): + return NotImplemented + + class PrecisionMatrix(Matrix): """Precision matrix. This is the matrix inverse of a covariance matrix. diff --git a/stonesoup/types/prediction.py b/stonesoup/types/prediction.py index 1fa2f9790..5e92ec775 100644 --- a/stonesoup/types/prediction.py +++ b/stonesoup/types/prediction.py @@ -2,13 +2,13 @@ import datetime from typing import Sequence -from .array import CovarianceMatrix +from .array import CovarianceMatrix, CovarianceMatrices from .base import Type from .state import (State, GaussianState, EnsembleState, ParticleState, MultiModelParticleState, RaoBlackwellisedParticleState, SqrtGaussianState, InformationState, TaggedWeightedGaussianState, WeightedGaussianState, CategoricalState, ASDGaussianState, - BernoulliParticleState, KernelParticleState) + BernoulliParticleState, KernelParticleState, MarginalisedParticleState) from ..base import Property from ..models.transition.base import TransitionModel from ..types.state import CreatableFromState, CompositeState @@ -232,3 +232,36 @@ class CompositeMeasurementPrediction(MeasurementPrediction, CompositeState): MeasurementPrediction.register(CompositeState) # noqa: E305 + + +class MarginalisedParticleStatePrediction(Prediction, MarginalisedParticleState): + """Marginalised particle state prediction type + + This is a simple MarginalisedParticle state update object. + """ + + pass + + +class MarginalisedParticleMeasurementPrediction( + MeasurementPrediction, MarginalisedParticleState +): + """Marginalised particle state measurement prediction type""" + + cross_covar: CovarianceMatrices = Property( + default=None, doc="The state-measurement cross covariance matrix" + ) + + def __init__(self, *args, **kwargs): + super().__init__(*args, **kwargs) + if ( + self.cross_covar is not None + and self.cross_covar.shape[1] != self.state_vector.shape[0] + ): + raise ValueError( + "cross_covar should have the same number of " + "columns as the number of rows in state_vector" + ) + + +MeasurementPrediction.register(MarginalisedParticleState) diff --git a/stonesoup/types/state.py b/stonesoup/types/state.py index 2e5e0ee5a..9521da491 100644 --- a/stonesoup/types/state.py +++ b/stonesoup/types/state.py @@ -1187,3 +1187,55 @@ def __len__(self): State.register(CompositeState) # noqa: E305 + + +class MarginalisedParticleState(ParticleState): + """Marginalised particle state type + + This is a particle state object which describes the state as a + distribution of particles, used by the marginalised particle filter. + """ + + # need to do dimensionality check on this covariance, need it to be 3-d + covariance: np.ndarray = Property( + doc="Three dimensional array (m, m, n) of covariance matrices" + ) + + def __init__(self, *args, **kwargs): + super().__init__(*args, **kwargs) + if self.fixed_covar is not None: + raise AttributeError("Fixed covariance should be none") + + @clearable_cached_property("state_vector", "weight", "covariance") + def covar(self): + """Sample covariance matrix for particles""" + covariance = self.covariance + mu = self.state_vector + weighted_covar = np.sum( + self.weight[np.newaxis, np.newaxis, :] * covariance, + axis=len(covariance.shape) - 1, + ) + + mu_bar = np.sum(self.weight[np.newaxis, :] * mu, axis=1) + tmp = mu - mu_bar[:, np.newaxis] + weighted_mean = np.sum( + self.weight[np.newaxis, np.newaxis, :] * (np.einsum("ik,jk->ijk", tmp, tmp)), + axis=len(covariance.shape) - 1, + ) + return np.array(weighted_mean + weighted_covar, dtype=float) + + def particles(self): + """Sequence of individual :class:`~.Particle` objects.""" + if self.particle_list is not None: + return self.particle_list + return tuple(particle for particle in self) + + @clearable_cached_property("state_vector", "log_weight", "covariance") + def mean(self): + """Sample mean for particles""" + if len(self) == 1: # No need to calculate mean + return self.state_vector + return np.average(self.state_vector, axis=1, weights=np.exp(self.log_weight)) + + +State.register(MarginalisedParticleState) diff --git a/stonesoup/types/tests/test_array.py b/stonesoup/types/tests/test_array.py index af1e90616..d8353d205 100644 --- a/stonesoup/types/tests/test_array.py +++ b/stonesoup/types/tests/test_array.py @@ -1,7 +1,14 @@ import numpy as np import pytest -from ..array import Matrix, StateVector, StateVectors, CovarianceMatrix, PrecisionMatrix +from ..array import ( + Matrix, + StateVector, + StateVectors, + CovarianceMatrix, + PrecisionMatrix, + CovarianceMatrices, +) def test_statevector(): @@ -21,16 +28,16 @@ def test_statevector(): def test_statevectors(): - vec1 = np.array([[1.], [2.], [3.]]) - vec2 = np.array([[2.], [3.], [4.]]) + vec1 = np.array([[1.0], [2.0], [3.0]]) # 3x1 + vec2 = np.array([[2.0], [3.0], [4.0]]) # 3x1 sv1 = StateVector(vec1) sv2 = StateVector(vec2) - vecs1 = np.concatenate((vec1, vec2), axis=1) + vecs1 = np.concatenate((vec1, vec2), axis=1) # 3x2 svs1 = StateVectors([sv1, sv2]) svs2 = StateVectors(vecs1) - svs3 = StateVectors([vec1, vec2]) # Creates 3dim array + svs3 = StateVectors([vec1, vec2]) # Creates 3dim array, # 2x3x1 assert np.array_equal(svs1, vecs1) assert np.array_equal(svs2, vecs1) assert svs3.shape != vecs1.shape @@ -40,8 +47,8 @@ def test_statevectors(): def test_statevectors_mean(): - svs = StateVectors([[1., 2., 3.], [4., 5., 6.]]) - mean = StateVector([[2., 5.]]) + svs = StateVectors([[1.0, 2.0, 3.0], [4.0, 5.0, 6.0]]) + mean = StateVector([[2.0, 5.0]]) assert np.allclose(np.average(svs, axis=1), mean) assert np.allclose(np.mean(svs, axis=1, keepdims=True), mean) @@ -101,42 +108,55 @@ def test_setting(): def test_covariancematrix(): - """ CovarianceMatrix Type test """ + """CovarianceMatrix Type test""" with pytest.raises(ValueError): CovarianceMatrix(np.array([0])) - covar_nparray = np.array([[2.2128, 0, 0, 0], - [0.0002, 2.2130, 0, 0], - [0.3897, -0.00004, 0.0128, 0], - [0, 0.3897, 0.0013, 0.0135]]) * 1e3 + covar_nparray = ( + np.array( + [ + [2.2128, 0, 0, 0], + [0.0002, 2.2130, 0, 0], + [0.3897, -0.00004, 0.0128, 0], + [0, 0.3897, 0.0013, 0.0135], + ] + ) + * 1e3 + ) covar_matrix = CovarianceMatrix(covar_nparray) assert np.array_equal(covar_matrix, covar_nparray) def test_precisionmatrix(): - """ CovarianceMatrix Type test """ + """CovarianceMatrix Type test""" with pytest.raises(ValueError): PrecisionMatrix(np.array([0])) - prec_nparray = np.array([[7, 1, 0.5, 0], - [1, 4, 2, 0.4], - [0.5, 2, 6, 0.3], - [0, 0.4, 0.3, 5]]) + prec_nparray = np.array( + [[7, 1, 0.5, 0], [1, 4, 2, 0.4], [0.5, 2, 6, 0.3], [0, 0.4, 0.3, 5]] + ) prec_matrix = PrecisionMatrix(prec_nparray) assert np.array_equal(prec_matrix, prec_nparray) def test_matrix(): - """ Matrix Type test """ - - covar_nparray = np.array([[2.2128, 0, 0, 0], - [0.0002, 2.2130, 0, 0], - [0.3897, -0.00004, 0.0128, 0], - [0, 0.3897, 0.0013, 0.0135]]) * 1e3 + """Matrix Type test""" + + covar_nparray = ( + np.array( + [ + [2.2128, 0, 0, 0], + [0.0002, 2.2130, 0, 0], + [0.3897, -0.00004, 0.0128, 0], + [0, 0.3897, 0.0013, 0.0135], + ] + ) + * 1e3 + ) matrix = Matrix(covar_nparray) assert np.array_equal(matrix, covar_nparray) @@ -145,38 +165,37 @@ def test_matrix(): def test_multiplication(): vector = np.array([[1, 1, 1, 1]]).T state_vector = StateVector(vector) - array = np.array([[1., 2., 3., 4.], - [5., 6., 7., 8.]]) + array = np.array([[1.0, 2.0, 3.0, 4.0], [5.0, 6.0, 7.0, 8.0]]) covar = CovarianceMatrix(array) Mtype = Matrix Vtype = StateVector - assert np.array_equal(covar@state_vector, array@vector) - assert np.array_equal(covar@vector, array@vector) - assert np.array_equal(array@state_vector, array@vector) - assert np.array_equal(state_vector.T@covar.T, vector.T@array.T) - assert np.array_equal(vector.T@covar.T, vector.T@array.T) - assert np.array_equal(state_vector.T@array.T, vector.T@array.T) + assert np.array_equal(covar @ state_vector, array @ vector) + assert np.array_equal(covar @ vector, array @ vector) + assert np.array_equal(array @ state_vector, array @ vector) + assert np.array_equal(state_vector.T @ covar.T, vector.T @ array.T) + assert np.array_equal(vector.T @ covar.T, vector.T @ array.T) + assert np.array_equal(state_vector.T @ array.T, vector.T @ array.T) - assert type(array@state_vector) == Vtype # noqa: E721 - assert type(state_vector.T@array.T) == Mtype # noqa: E721 - assert type(covar@vector) == Vtype # noqa: E721 - assert type(vector.T@covar.T) == Mtype # noqa: E721 + assert type(array @ state_vector) == Vtype # noqa: E721 + assert type(state_vector.T @ array.T) == Mtype # noqa: E721 + assert type(covar @ vector) == Vtype # noqa: E721 + assert type(vector.T @ covar.T) == Mtype # noqa: E721 def test_array_ops(): vector = np.array([[1, 1, 1, 1]]).T - vector2 = vector + 2. + vector2 = vector + 2.0 sv = StateVector(vector) - array = np.array([[1., 2., 3., 4.], [2., 3., 4., 5.]]).T + array = np.array([[1.0, 2.0, 3.0, 4.0], [2.0, 3.0, 4.0, 5.0]]).T covar = CovarianceMatrix(array) Mtype = Matrix Vtype = type(sv) assert np.array_equal(covar - vector, array - vector) - assert type(covar-vector) == Mtype # noqa: E721 + assert type(covar - vector) == Mtype # noqa: E721 assert np.array_equal(covar + vector, array + vector) - assert type(covar+vector) == Mtype # noqa: E721 + assert type(covar + vector) == Mtype # noqa: E721 assert np.array_equal(vector - covar, vector - array) assert type(vector - covar) == Mtype # noqa: E721 assert np.array_equal(vector + covar, vector + array) @@ -190,8 +209,8 @@ def test_array_ops(): assert type(vector2 + sv) == Vtype # noqa: E721 assert np.array_equal(sv + vector2, vector + vector2) assert type(sv + vector2) == Vtype # noqa: E721 - assert type(sv+2.) == Vtype # noqa: E721 - assert type(sv*2.) == Vtype # noqa: E721 + assert type(sv + 2.0) == Vtype # noqa: E721 + assert type(sv * 2.0) == Vtype # noqa: E721 assert np.array_equal(array - sv, array - vector) assert type(array - sv) == Mtype # noqa: E721 @@ -201,5 +220,41 @@ def test_array_ops(): assert type(array + sv) == Mtype # noqa: E721 assert np.array_equal(sv + array, vector + array) assert type(sv + array) == Mtype # noqa: E721 - assert type(covar+2.) == Mtype # noqa: E721 - assert type(covar*2.) == Mtype # noqa: E721 + assert type(covar + 2.0) == Mtype # noqa: E721 + assert type(covar * 2.0) == Mtype # noqa: E721 + + +def test_covariancematrices(): + cov1 = np.array([[1.0, 0.0, 0.0], [0.0, 2.0, 0.0], [0.0, 0.0, 3.0]]) + cov2 = np.array([[4.0, 0.0, 0.0], [0.0, 5.0, 0.0], [0.0, 0.0, 6.0]]) + + cm1 = CovarianceMatrix(cov1) + cm2 = CovarianceMatrix(cov2) + cms0 = CovarianceMatrices([cov1]) + assert isinstance(cms0, CovarianceMatrix) + + tensor1 = np.stack((cov1, cov2), axis=-1) + cms1 = CovarianceMatrices([cm1, cm2]) # 3x3x2 + assert np.array_equal(cms1, tensor1) + cms2 = CovarianceMatrices(tensor1) # 3x3x2 + assert np.array_equal(cms2, tensor1) + cms3 = CovarianceMatrices([cov1, cov2]) # 2x3x3 + assert cms3.shape != tensor1.shape + for cm in cms2: + assert isinstance(cm, CovarianceMatrix) + + +def test_covariancematrices_mean(): + cov1 = np.array([[1.0, 0.0, 0.0], [0.0, 2.0, 0.0], [0.0, 0.0, 3.0]]) + cov2 = np.array([[4.0, 0.0, 0.0], [0.0, 5.0, 0.0], [0.0, 0.0, 6.0]]) + cm1 = CovarianceMatrix(cov1) + cm2 = CovarianceMatrix(cov2) + cms1 = CovarianceMatrices([cm1, cm2]) + cms2 = CovarianceMatrices([cov1, cov2]) + mean1 = CovarianceMatrix([[2.5, 0.0, 0.0], [0.0, 3.5, 0.0], [0.0, 0.0, 4.5]]) + mean2 = CovarianceMatrices( + [[[2.5], [0.0], [0.0]], [[0.0], [3.5], [0.0]], [[0.0], [0.0], [4.5]]] + ) + assert np.allclose(np.average(cms1, axis=2), mean1) + assert np.allclose(np.average(cms2, axis=0), mean1) + assert np.allclose(np.mean(cms1, axis=2, keepdims=True), mean2) diff --git a/stonesoup/types/update.py b/stonesoup/types/update.py index b8973a136..2dafe5813 100644 --- a/stonesoup/types/update.py +++ b/stonesoup/types/update.py @@ -5,15 +5,27 @@ from .hypothesis import Hypothesis, CompositeHypothesis from .mixture import GaussianMixture from .state import CreatableFromState, CompositeState, KernelParticleState -from .state import State, GaussianState, ParticleState, EnsembleState, \ - SqrtGaussianState, InformationState, CategoricalState, ASDGaussianState, \ - WeightedGaussianState, TaggedWeightedGaussianState, \ - MultiModelParticleState, RaoBlackwellisedParticleState, BernoulliParticleState +from .state import ( + State, + GaussianState, + ParticleState, + EnsembleState, + SqrtGaussianState, + InformationState, + CategoricalState, + ASDGaussianState, + WeightedGaussianState, + TaggedWeightedGaussianState, + MultiModelParticleState, + RaoBlackwellisedParticleState, + BernoulliParticleState, + MarginalisedParticleState, +) from ..base import Property class Update(Type, CreatableFromState): - """ Update type + """Update type The base update class. Updates are returned by :class:'~.Updater' objects and contain the information that was used to perform the updating""" @@ -22,7 +34,7 @@ class Update(Type, CreatableFromState): class StateUpdate(Update, State): - """ StateUpdate type + """StateUpdate type Most simple state update type, where everything only has time and a state vector. Requires a prior state that was updated, @@ -31,7 +43,7 @@ class StateUpdate(Update, State): class GaussianStateUpdate(Update, GaussianState): - """ GaussianStateUpdate type + """GaussianStateUpdate type This is a simple Gaussian state update object, which, as the name suggests, is described by a Gaussian distribution. @@ -39,7 +51,7 @@ class GaussianStateUpdate(Update, GaussianState): class SqrtGaussianStateUpdate(Update, SqrtGaussianState): - """ SqrtGaussianStateUpdate type + """SqrtGaussianStateUpdate type This is equivalent to a Gaussian state update object, but with the covariance of the Gaussian distribution stored in matrix square root @@ -48,7 +60,7 @@ class SqrtGaussianStateUpdate(Update, SqrtGaussianState): class WeightedGaussianStateUpdate(Update, WeightedGaussianState): - """ WeightedGaussianStateUpdate type + """WeightedGaussianStateUpdate type This is a simple Gaussian state update object, which, as the name suggests, is described by a Gaussian distribution with an associated weight. @@ -56,7 +68,7 @@ class WeightedGaussianStateUpdate(Update, WeightedGaussianState): class TaggedWeightedGaussianStateUpdate(Update, TaggedWeightedGaussianState): - """ TaggedWeightedGaussianStateUpdate type + """TaggedWeightedGaussianStateUpdate type This is a simple Gaussian state update object, which, as the name suggests, is described by a Gaussian distribution, with an associated weight and unique tag. @@ -64,7 +76,7 @@ class TaggedWeightedGaussianStateUpdate(Update, TaggedWeightedGaussianState): class GaussianMixtureUpdate(Update, GaussianMixture): - """ GaussianMixtureUpdate type + """GaussianMixtureUpdate type This is a Gaussian mixture update object, which, as the name suggests, is described by a Gaussian mixture. @@ -72,7 +84,7 @@ class GaussianMixtureUpdate(Update, GaussianMixture): class ASDGaussianStateUpdate(Update, ASDGaussianState): - """ ASDGaussianStateUpdate type + """ASDGaussianStateUpdate type This is a simple ASD Gaussian state update object, which, as the name suggests, is described by a Gaussian distribution. @@ -112,8 +124,10 @@ class KernelParticleStateUpdate(Update, KernelParticleState): This is a Kernel Particle state update object. """ - proposal: StateVectors = Property(default=None, - doc='Kernel covariance value. Default `None`.') + + proposal: StateVectors = Property( + default=None, doc="Kernel covariance value. Default `None`." + ) class EnsembleStateUpdate(Update, EnsembleState): @@ -124,7 +138,7 @@ class EnsembleStateUpdate(Update, EnsembleState): class InformationStateUpdate(Update, InformationState): - """ InformationUpdate type + """InformationUpdate type This is a simple Information state update object, which, as the name suggests, is described by a precision matrix and its corresponding state vector. @@ -143,5 +157,28 @@ class CompositeUpdate(Update, CompositeState): sub_states: Sequence[Update] = Property( doc="Sequence of sub-updates comprising the composite update. All sub-updates must have " - "matching timestamp. Must not be empty.") + "matching timestamp. Must not be empty." + ) hypothesis: CompositeHypothesis = Property(doc="Hypothesis used for updating") + + +class MarginalisedParticleStateUpdate(Update, MarginalisedParticleState): + """RBStateUpdate type + + This is a simple MarginalisedParticle state update object. + """ + + """Particle Filter update step + + Parameters + ---------- + hypothesis : :class:`~.Hypothesis` + Hypothesis with predicted state and associated detection used for + updating. + + Returns + ------- + : :class:`~.ParticleState` + The state posterior + """ + pass diff --git a/stonesoup/updater/particle.py b/stonesoup/updater/particle.py index ea2055cec..961350bf5 100644 --- a/stonesoup/updater/particle.py +++ b/stonesoup/updater/particle.py @@ -15,6 +15,7 @@ from ..resampler import Resampler from ..regulariser import Regulariser from ..types.numeric import Probability +from stonesoup.types.array import StateVectors, CovarianceMatrices from ..types.prediction import ( Prediction, ParticleMeasurementPrediction, GaussianStatePrediction, MeasurementPrediction) from ..types.update import ParticleStateUpdate, Update @@ -651,3 +652,121 @@ def _get_measurement_loglikelihoods(self, prediction, detections): measurement_model = self._check_measurement_model(detection.measurement_model) g[:, i] = measurement_model.logpdf(detection, prediction, noise=True) return g + + +class MarginalisedParticleUpdater(ParticleUpdater): + """Implementation of the Marginalised Particle Filter (MPF) updater + + The MPF which partitions the state into: + 1. linear Gaussian + 2. non-linear and possibly non-Gaussian + + It uses a Kalman filter to handle the linear Gaussian states, and + a traditional particle filter to infer the non-linear states. + + See Cappe, Godsill, Moulines (2007) for more details. + """ + + def _measurement_matrix(self, predicted_state=None, measurement_model=None, **kwargs): + # H + return self._check_measurement_model(measurement_model).matrix(**kwargs) + + def _measurement_cross_covariance(self, predicted_state, measurement_matrix): + # PH.T + return np.einsum("ijk,jm->imk", predicted_state.covariance, measurement_matrix.T) + + def _innovation_covariance(self, m_cross_cov, meas_mat, meas_mod): + meas_covar = meas_mod.covar() # R + # HPH.T + R + return np.einsum("ij,jkl->ikl", meas_mat, m_cross_cov) + meas_covar[..., np.newaxis] + + def _posterior_mean(self, predicted_state, kalman_gain, measurement, measurement_prediction): + tmp = np.einsum( + "jkl,kl->jl", + kalman_gain, + (measurement.state_vector - measurement_prediction.state_vector), + ) + post_mean = predicted_state.state_vector + tmp + return post_mean.view(StateVectors) + + def _posterior_covariance(self, hypothesis): + predicted_covar = hypothesis.prediction.covariance + post_cov = np.zeros_like(predicted_covar) + mp_covar = hypothesis.measurement_prediction.covariance # M x M X N + mp_cross_covar = hypothesis.measurement_prediction.cross_covar + inv = np.linalg.inv(mp_covar.T).T # M x M X N + kalman_gain = np.einsum("jki, lki -> jli", mp_cross_covar, inv) # M x M X N + tmp = np.einsum("jki, lki -> jli", mp_covar, kalman_gain) + post_cov = predicted_covar - np.einsum("jli, lki -> jki", kalman_gain, tmp) + + return post_cov.view(CovarianceMatrices), kalman_gain + + def update(self, hypothesis, **kwargs): + predicted_state = hypothesis.prediction + + if hypothesis.measurement.measurement_model is None: + measurement_model = self.measurement_model + else: + measurement_model = hypothesis.measurement.measurement_model + + if hypothesis.measurement_prediction is None: + # Attach the measurement prediction to the hypothesis + hypothesis.measurement_prediction = self.predict_measurement( + predicted_state, measurement_model=measurement_model, **kwargs + ) + + posterior_covariance, kalman_gain = self._posterior_covariance(hypothesis) + + # Posterior mean + posterior_mean = self._posterior_mean( + predicted_state, kalman_gain, hypothesis.measurement, hypothesis.measurement_prediction + ) + + posterior = Update.from_state( + state_vector=posterior_mean, + covariance=posterior_covariance, + state=hypothesis.prediction, + hypothesis=hypothesis, + timestamp=hypothesis.prediction.timestamp, + ) + + new_weight = posterior.log_weight + measurement_model.logpdf( + hypothesis.measurement, posterior, **kwargs + ) + + # Normalise the weights + new_weight -= logsumexp(new_weight) + + posterior.log_weight = new_weight + + # Resample + resample_flag = True + if self.resampler is not None: + resampled_state = self.resampler.resample(posterior) + if resampled_state == posterior: + resample_flag = False + posterior = resampled_state + + if self.regulariser is not None and resample_flag: + prior = hypothesis.prediction.parent + posterior = self.regulariser.regularise(prior, posterior) + return posterior + + @lru_cache() + def predict_measurement(self, predicted_state, measurement_model=None, **kwargs): + # If a measurement model is not specified then use the one that's + # native to the updater + measurement_model = self._check_measurement_model(measurement_model) + + pred_meas = measurement_model.function(predicted_state, **kwargs) + hh = self._measurement_matrix( + predicted_state=predicted_state, measurement_model=measurement_model, **kwargs + ) + + # The measurement cross covariance and innovation covariance + meas_cross_cov = self._measurement_cross_covariance(predicted_state, hh) + innov_cov = self._innovation_covariance(meas_cross_cov, hh, measurement_model) + + return MeasurementPrediction.from_state( + predicted_state, pred_meas, innov_cov, cross_covar=meas_cross_cov + ) diff --git a/stonesoup/updater/tests/test_mpf_updater.py b/stonesoup/updater/tests/test_mpf_updater.py new file mode 100644 index 000000000..96ef1de6f --- /dev/null +++ b/stonesoup/updater/tests/test_mpf_updater.py @@ -0,0 +1,234 @@ +from datetime import datetime, timezone + +import numpy as np +import pytest + +from stonesoup.models.base_driver import NoiseCase +from stonesoup.models.driver import AlphaStableNSMDriver, GaussianDriver +from stonesoup.models.measurement.linear import LinearGaussian +from stonesoup.models.transition.levy_linear import ( + CombinedLinearLevyTransitionModel, + LevyLangevin, + LevyRandomWalk, +) +from stonesoup.resampler.particle import SystematicResampler +from stonesoup.types.array import ( + CovarianceMatrices, + CovarianceMatrix, + StateVector, + StateVectors, +) +from stonesoup.types.detection import Detection +from stonesoup.types.hypothesis import SingleHypothesis +from stonesoup.types.numeric import Probability +from stonesoup.types.prediction import MarginalisedParticleStatePrediction +from stonesoup.types.update import MarginalisedParticleStateUpdate +from stonesoup.updater.particle import MarginalisedParticleUpdater + + +@pytest.fixture +def seed() -> int: + return 0 + + +@pytest.fixture(scope="function") +def alpha_stable_langevin_model_2d(seed): + mu_W = 1 + sigma_W2 = 25 + alpha = 1.4 + c = 10 + noise_case = NoiseCase.GAUSSIAN_APPROX + driver = AlphaStableNSMDriver( + mu_W=mu_W, sigma_W2=sigma_W2, seed=seed, alpha=alpha, c=c, noise_case=noise_case + ) + theta = 0.2 + transition_model_x = LevyLangevin(damping_coeff=theta, driver=driver) + transition_model_y = LevyLangevin(damping_coeff=theta, driver=driver) + return CombinedLinearLevyTransitionModel(model_list=[transition_model_x, transition_model_y]) + + +@pytest.fixture(scope="function") +def gaussian_random_walk_model_1d(seed): + mu_W = 1 + sigma_W2 = 25 + driver = GaussianDriver(mu_W=mu_W, sigma_W2=sigma_W2, seed=seed) + transition_model = LevyRandomWalk(driver=driver) + return CombinedLinearLevyTransitionModel(model_list=[transition_model]) + + +@pytest.mark.parametrize( + "model_type, hypothesis, expected_state, expected_covariance", + [ + ( + "alpha_stable_langevin_model_2d", + SingleHypothesis( + prediction=MarginalisedParticleStatePrediction( + state_vector=StateVectors( + [ + [0.85798578, 0.12636091, -0.0676282, 0.03570828, -0.42422919], + [-3.02711197, -2.48877242, -0.72049429, -2.42812059, -2.70124928], + [-0.40956703, -1.00294167, 0.70939368, -1.58729029, -1.17903268], + [-3.01473378, -4.1861108, 0.13154929, -3.38822607, -2.68388992], + ] + ), + covariance=CovarianceMatrices( + [ + [ + [ + 192.29655805, + 191.51680052, + 199.54518545, + 190.1203207, + 192.02971017, + ], + [90.9730478, 89.65483754, 110.50326036, 90.29556339, 92.33711585], + [0.0, 0.0, 0.0, 0.0, 0.0], + [0.0, 0.0, 0.0, 0.0, 0.0], + ], + [ + [90.9730478, 89.65483754, 110.50326036, 90.29556339, 92.33711585], + [ + 98.55203117, + 98.78987551, + 152.07003477, + 106.72740959, + 106.23263394, + ], + [0.0, 0.0, 0.0, 0.0, 0.0], + [0.0, 0.0, 0.0, 0.0, 0.0], + ], + [ + [0.0, 0.0, 0.0, 0.0, 0.0], + [0.0, 0.0, 0.0, 0.0, 0.0], + [ + 192.29655805, + 191.51680052, + 199.54518545, + 190.1203207, + 192.02971017, + ], + [90.9730478, 89.65483754, 110.50326036, 90.29556339, 92.33711585], + ], + [ + [0.0, 0.0, 0.0, 0.0, 0.0], + [0.0, 0.0, 0.0, 0.0, 0.0], + [90.9730478, 89.65483754, 110.50326036, 90.29556339, 92.33711585], + [ + 98.55203117, + 98.78987551, + 152.07003477, + 106.72740959, + 106.23263394, + ], + ], + ] + ), + weight=np.array( + [ + Probability(0.2), + Probability(0.2), + Probability(0.2), + Probability(0.2), + Probability(0.2), + ], + ), + ), + measurement=Detection( + state_vector=StateVector([[-5.43254207], [-2.04992048]]), + measurement_model=LinearGaussian( + ndim_state=4, + mapping=(0, 2), + noise_covar=CovarianceMatrix([[25.0, 0.0], [0.0, 25.0]]), + ), + ), + ), + StateVectors( + [ + [-4.70881594, -4.83523313, -4.79068622, -4.79705464, -4.85562649], + [-5.66069483, -3.3606777, -4.79059169, -4.72338839, -4.83207812], + [-1.86119758, -1.74270902, -1.92903162, -1.99615636, -1.94960152], + [-3.70148167, -1.22636542, -4.61964166, -3.58241253, -3.05441646], + ] + ), + CovarianceMatrices( + [ + [ + [22.12374643, 22.11338797, 22.21659585, 22.09464918, 22.12020995], + [10.46646213, 10.35194929, 12.30300932, 10.49361156, 10.6364603], + [0.0, 0.0, 0.0, 0.0, 0.0], + [0.0, 0.0, 0.0, 0.0, 0.0], + ], + [ + [10.46646213, 10.35194929, 12.30300932, 10.49361156, 10.6364603], + [60.4653928, 61.66578225, 97.68912909, 68.82634687, 66.94703126], + [0.0, 0.0, 0.0, 0.0, 0.0], + [0.0, 0.0, 0.0, 0.0, 0.0], + ], + [ + [0.0, 0.0, 0.0, 0.0, 0.0], + [0.0, 0.0, 0.0, 0.0, 0.0], + [22.12374643, 22.11338797, 22.21659585, 22.09464918, 22.12020995], + [10.46646213, 10.35194929, 12.30300932, 10.49361156, 10.6364603], + ], + [ + [0.0, 0.0, 0.0, 0.0, 0.0], + [0.0, 0.0, 0.0, 0.0, 0.0], + [10.46646213, 10.35194929, 12.30300932, 10.49361156, 10.6364603], + [60.4653928, 61.66578225, 97.68912909, 68.82634687, 66.94703126], + ], + ] + ), + ), + ( + "gaussian_random_walk_model_1d", + SingleHypothesis( + prediction=MarginalisedParticleStatePrediction( + state_vector=StateVectors( + [[1.48184682, 1.96629969, 1.58480781, 1.07186231, 1.81230203]] + ), + covariance=CovarianceMatrices([[[125.0, 125.0, 125.0, 125.0, 125.0]]]), + weight=np.array( + [ + Probability(0.2), + Probability(0.2), + Probability(0.2), + Probability(0.2), + Probability(0.2), + ], + ), + ), + measurement=Detection( + state_vector=StateVector([[0.35420189]]), + measurement_model=LinearGaussian( + ndim_state=1, + mapping=(0,), + noise_covar=CovarianceMatrix([[25.0]]), + ), + ), + ), + StateVectors([[0.62288486, 0.59721858, 0.55930288, 0.54214271, 0.47381196]]), + CovarianceMatrices( + [[[20.83333333, 20.83333333, 20.83333333, 20.83333333, 20.83333333]]] + ), + ), + ], +) +def test_marginalised_particle_filter_update( + request: pytest.FixtureRequest, + model_type: str, + hypothesis: SingleHypothesis, + expected_state: StateVectors, + expected_covariance: CovarianceMatrices, +): + start_time = datetime.now(tz=timezone.utc).replace(microsecond=0) + hypothesis.prediction.timestamp = start_time + hypothesis.measurement.timestamp = start_time + + model = request.getfixturevalue(model_type) + resampler = SystematicResampler() + updater = MarginalisedParticleUpdater(model, resampler) + posterior = updater.update(hypothesis) + assert isinstance(posterior, MarginalisedParticleStateUpdate) + assert posterior.hypothesis == hypothesis + assert np.allclose(posterior.state_vector, expected_state) + assert np.allclose(posterior.covariance, expected_covariance)