carl.data module
This module implements generators for likelihood-free inference benchmarks.
""" This module implements generators for likelihood-free inference benchmarks. """ # Carl is free software; you can redistribute it and/or modify it # under the terms of the Revised BSD License; see LICENSE file for # more details. from .gk import GK from .ricker import Ricker __all__ = ("GK", "Ricker")
Classes
class GK
g-and-k distribution generator.
The g-and-k distribution is defined as x = A + B *
(1 + c * (1 - exp(-g * z)) / (1 + exp(-g * z))) * ((1 + z ** 2) ** k) * z
,
where z ~ N(0, 1)
.
Reference
- Haynes, Michele A., H. L. MacGillivray, and K. L. Mengersen. "Robustness of ranking and selection rules using generalised g-and-k distributions." Journal of Statistical Planning and Inference 65.1 (1997): 45-66.
class GK(TheanoDistribution): """g-and-k distribution generator. The g-and-k distribution is defined as `x = A + B * (1 + c * (1 - exp(-g * z)) / (1 + exp(-g * z))) * ((1 + z ** 2) ** k) * z`, where `z ~ N(0, 1)`. Reference --------- * Haynes, Michele A., H. L. MacGillivray, and K. L. Mengersen. "Robustness of ranking and selection rules using generalised g-and-k distributions." Journal of Statistical Planning and Inference 65.1 (1997): 45-66. """ def __init__(self, A, B, g, k, c=0.8): """Constructor. Parameters ---------- * `A` [float or theano shared variable] * `B` [float or theano shared variable] * `g` [float or theano shared variable] * `k` [float or theano shared variable] * `c` [float or theano shared variable, default=0.8] """ super(GK, self).__init__(A=A, B=B, g=g, k=k, c=c) def rvs(self, n_samples, random_state=None, **kwargs): """Generate random samples. Parameters ---------- * `n_samples` [int]: The number of samples to generate. * `random_state` [integer or RandomState object]: The random seed. """ rng = check_random_state(random_state) A = self.A.eval() B = self.B.eval() g = self.g.eval() k = self.k.eval() c = self.c.eval() z = rng.randn(n_samples).reshape(-1, 1) Q = A + B * (1 + c * (1 - np.exp(-g * z)) / (1 + np.exp(-g * z))) * ((1 + z ** 2) ** k) * z return Q
Ancestors (in MRO)
- GK
- carl.distributions.base.TheanoDistribution
- carl.distributions.base.DistributionMixin
- sklearn.base.BaseEstimator
- builtins.object
Class variables
var X
var p
Static methods
def __init__(
self, A, B, g, k, c=0.8)
Constructor.
Parameters
A
[float or theano shared variable]B
[float or theano shared variable]g
[float or theano shared variable]k
[float or theano shared variable]c
[float or theano shared variable, default=0.8]
def __init__(self, A, B, g, k, c=0.8): """Constructor. Parameters ---------- * `A` [float or theano shared variable] * `B` [float or theano shared variable] * `g` [float or theano shared variable] * `k` [float or theano shared variable] * `c` [float or theano shared variable, default=0.8] """ super(GK, self).__init__(A=A, B=B, g=g, k=k, c=c)
def cdf(
self, X, **kwargs)
Cumulative distribution function.
This function returns the value of the cumulative distribution function
F(x_i) = p(x <= x_i)
, at all x_i
in X
.
Parameters
X
[array-like, shape=(n_samples, n_features)]: The samples.
Returns
cdf
[array, shape=(n_samples,)]:cdf(x_i)
for allx_i
inX
.
def cdf(self, X, **kwargs): """Cumulative distribution function. This function returns the value of the cumulative distribution function `F(x_i) = p(x <= x_i)`, at all `x_i` in `X`. Parameters ---------- * `X` [array-like, shape=(n_samples, n_features)]: The samples. Returns ------- * `cdf` [array, shape=(n_samples,)]: `cdf(x_i)` for all `x_i` in `X`. """ raise NotImplementedError
def fit(
self, X, bounds=None, constraints=None, use_gradient=True, optimizer=None, **kwargs)
Fit the distribution parameters to data by minimizing the negative log-likelihood of the data.
Parameters
-
X
[array-like, shape=(n_samples, n_features)]: The samples. -
bounds
[list of (parameter, (low, high))]: The parameter bounds. -
constraints
: The constraints on the parameters. -
use_gradient
[boolean, default=True]: Whether to use exact gradients (ifTrue
) or numerical gradients (ifFalse
). -
optimizer
[string]: The optimization method.
Returns
self
[object]:self
.
def fit(self, X, bounds=None, constraints=None, use_gradient=True, optimizer=None, **kwargs): """Fit the distribution parameters to data by minimizing the negative log-likelihood of the data. Parameters ---------- * `X` [array-like, shape=(n_samples, n_features)]: The samples. * `bounds` [list of (parameter, (low, high))]: The parameter bounds. * `constraints`: The constraints on the parameters. * `use_gradient` [boolean, default=True]: Whether to use exact gradients (if `True`) or numerical gradients (if `False`). * `optimizer` [string]: The optimization method. Returns ------- * `self` [object]: `self`. """ # Map parameters to placeholders param_to_placeholder = [] param_to_index = {} for i, v in enumerate(self.parameters_): w = T.TensorVariable(v.type) param_to_placeholder.append((v, w)) param_to_index[v] = i # Build bounds mapped_bounds = None if bounds is not None: mapped_bounds = [(None, None) for v in param_to_placeholder] for b in bounds: mapped_bounds[param_to_index[b["param"]]] = b["bounds"] # Build constraints mapped_constraints = None if constraints is not None: mapped_constraints = [] for c in constraints: args = c["param"] if isinstance(args, SharedVariable): args = (args, ) m_c = { "type": c["type"], "fun": lambda x: c["fun"](*[x[param_to_index[a]] for a in args]) } if "jac" in c: m_c["jac"] = lambda x: c["jac"](*[x[param_to_index[a]] for a in args]) mapped_constraints.append(m_c) # Derive objective and gradient objective_ = theano.function( [self.X] + [w for _, w in param_to_placeholder] + [theano.In(v, name=v.name) for v in self.observeds_], T.sum(self.nll_), givens=param_to_placeholder, allow_input_downcast=True) def objective(x): return objective_(X, *x, **kwargs) / len(X) if use_gradient: gradient_ = theano.function( [self.X] + [w for _, w in param_to_placeholder] + [theano.In(v, name=v.name) for v in self.observeds_], theano.grad(T.sum(self.nll_), [v for v, _ in param_to_placeholder]), givens=param_to_placeholder, allow_input_downcast=True) def gradient(x): return np.array(gradient_(X, *x, **kwargs)) / len(X) # Solve! x0 = np.array([v.get_value() for v, _ in param_to_placeholder]) r = minimize(objective, jac=gradient if use_gradient else None, x0=x0, method=optimizer, bounds=mapped_bounds, constraints=mapped_constraints) if r.success: # Assign the solution for i, value in enumerate(r.x): param_to_placeholder[i][0].set_value(value) else: print("Parameter fitting failed!") print(r) return self
def nll(
self, X, **kwargs)
Negative log-likelihood.
This function returns the value of the negative log-likelihood
- log(p(x_i))
, at all x_i
in X
.
Parameters
X
[array-like, shape=(n_samples, n_features)]: The samples.
Returns
nll
[array, shape=(n_samples,)]:-log(p(x_i))
for allx_i
inX
.
def nll(self, X, **kwargs): """Negative log-likelihood. This function returns the value of the negative log-likelihood `- log(p(x_i))`, at all `x_i` in `X`. Parameters ---------- * `X` [array-like, shape=(n_samples, n_features)]: The samples. Returns ------- * `nll` [array, shape=(n_samples,)]: `-log(p(x_i))` for all `x_i` in `X`. """ raise NotImplementedError
def pdf(
self, X, **kwargs)
Probability density function.
This function returns the value of the probability density function
p(x_i)
, at all x_i
in X
.
Parameters
X
[array-like, shape=(n_samples, n_features)]: The samples.
Returns
pdf
[array, shape=(n_samples,)]:p(x_i)
for allx_i
inX
.
def pdf(self, X, **kwargs): """Probability density function. This function returns the value of the probability density function `p(x_i)`, at all `x_i` in `X`. Parameters ---------- * `X` [array-like, shape=(n_samples, n_features)]: The samples. Returns ------- * `pdf` [array, shape=(n_samples,)]: `p(x_i)` for all `x_i` in `X`. """ raise NotImplementedError
def ppf(
self, X, **kwargs)
Percent point function (inverse of cdf).
Parameters
X
[array-like, shape=(n_samples, n_features)]: The samples.
Returns
ppf
[array, shape=(n_samples,)]: Percent point function for allx_i
inX
.
def ppf(self, X, **kwargs): """Percent point function (inverse of cdf). Parameters ---------- * `X` [array-like, shape=(n_samples, n_features)]: The samples. Returns ------- * `ppf` [array, shape=(n_samples,)]: Percent point function for all `x_i` in `X`. """ raise NotImplementedError
def rvs(
self, n_samples, random_state=None, **kwargs)
Generate random samples.
Parameters
-
n_samples
[int]: The number of samples to generate. -
random_state
[integer or RandomState object]: The random seed.
def rvs(self, n_samples, random_state=None, **kwargs): """Generate random samples. Parameters ---------- * `n_samples` [int]: The number of samples to generate. * `random_state` [integer or RandomState object]: The random seed. """ rng = check_random_state(random_state) A = self.A.eval() B = self.B.eval() g = self.g.eval() k = self.k.eval() c = self.c.eval() z = rng.randn(n_samples).reshape(-1, 1) Q = A + B * (1 + c * (1 - np.exp(-g * z)) / (1 + np.exp(-g * z))) * ((1 + z ** 2) ** k) * z return Q
def score(
self, X, **kwargs)
Evaluate the log-likelihood -self.nll(X).sum()
of X
.
The higher, the better.
Parameters
X
[array-like, shape=(n_samples, n_features)]: The samples.
Returns
score
[float]: The log-likelihood ofX
.
def score(self, X, **kwargs): """Evaluate the log-likelihood `-self.nll(X).sum()` of `X`. The higher, the better. Parameters ---------- * `X` [array-like, shape=(n_samples, n_features)]: The samples. Returns ------- * `score` [float]: The log-likelihood of `X`. """ return -self.nll(X, **kwargs).sum()
def set_params(
self, **params)
Set the parameters of this estimator.
The method works on simple estimators as well as on nested objects
(such as pipelines). The latter have parameters of the form
<component>__<parameter>
so that it's possible to update each
component of a nested object.
Returns
self
def set_params(self, **params): for name, value in params.items(): var = getattr(self, name, None) if isinstance(var, SharedVariable): var.set_value(value) elif (isinstance(var, T.TensorVariable) or isinstance(var, T.TensorConstant)): raise ValueError("Only shared variables can be updated.") else: super(DistributionMixin, self).set_params(**{name: value})
Instance variables
var ndim
The number of dimensions (or features) of the data.
class Ricker
Ricker model.
N_t+1 = r * N_t * exp(-N_t + e_t); Y_t+1 ~ Poisson(phi * N_t+1)
,
where e_t
are iid normal deviates with zero mean and variance sigma^2
.
Reference
- Ricker, William E. "Stock and recruitment." Journal of the Fisheries Board of Canada 11.5 (1954): 559-623..
class Ricker(TheanoDistribution): """Ricker model. `N_t+1 = r * N_t * exp(-N_t + e_t); Y_t+1 ~ Poisson(phi * N_t+1)`, where `e_t` are iid normal deviates with zero mean and variance `sigma^2`. Reference --------- * Ricker, William E. "Stock and recruitment." Journal of the Fisheries Board of Canada 11.5 (1954): 559-623.. """ def __init__(self, log_r=3.8, sigma=0.3, phi=10.0): """Constructor. Parameters ---------- * `log_r` [float or theano shared variable, default=3.8] * `sigma` [float or theano shared variable, default=0.3] * `phi` [float or theano shared variable, phi=10.0] """ super(Ricker, self).__init__(log_r=log_r, sigma=sigma, phi=phi) def rvs(self, n_samples, random_state=None, **kwargs): """Generate random samples `Y_t`, for `t = 1:n_samples`. Parameters ---------- * `n_samples` [int]: The number of samples to generate. * `random_state` [integer or RandomState object]: The random seed. """ rng = check_random_state(random_state) log_r = self.log_r.eval() sigma = self.sigma.eval() phi = self.phi.eval() N_t = 1 Y = [] for t in range(1, n_samples + 1): e_t = sigma * rng.randn() N_t = np.exp(log_r) * N_t * np.exp(-N_t + e_t) Y_t = rng.poisson(phi * N_t) Y.append(Y_t) Y = np.array(Y).reshape(-1, 1) return Y
Ancestors (in MRO)
- Ricker
- carl.distributions.base.TheanoDistribution
- carl.distributions.base.DistributionMixin
- sklearn.base.BaseEstimator
- builtins.object
Class variables
var X
var p
Static methods
def __init__(
self, log_r=3.8, sigma=0.3, phi=10.0)
Constructor.
Parameters
log_r
[float or theano shared variable, default=3.8]sigma
[float or theano shared variable, default=0.3]phi
[float or theano shared variable, phi=10.0]
def __init__(self, log_r=3.8, sigma=0.3, phi=10.0): """Constructor. Parameters ---------- * `log_r` [float or theano shared variable, default=3.8] * `sigma` [float or theano shared variable, default=0.3] * `phi` [float or theano shared variable, phi=10.0] """ super(Ricker, self).__init__(log_r=log_r, sigma=sigma, phi=phi)
def cdf(
self, X, **kwargs)
Cumulative distribution function.
This function returns the value of the cumulative distribution function
F(x_i) = p(x <= x_i)
, at all x_i
in X
.
Parameters
X
[array-like, shape=(n_samples, n_features)]: The samples.
Returns
cdf
[array, shape=(n_samples,)]:cdf(x_i)
for allx_i
inX
.
def cdf(self, X, **kwargs): """Cumulative distribution function. This function returns the value of the cumulative distribution function `F(x_i) = p(x <= x_i)`, at all `x_i` in `X`. Parameters ---------- * `X` [array-like, shape=(n_samples, n_features)]: The samples. Returns ------- * `cdf` [array, shape=(n_samples,)]: `cdf(x_i)` for all `x_i` in `X`. """ raise NotImplementedError
def fit(
self, X, bounds=None, constraints=None, use_gradient=True, optimizer=None, **kwargs)
Fit the distribution parameters to data by minimizing the negative log-likelihood of the data.
Parameters
-
X
[array-like, shape=(n_samples, n_features)]: The samples. -
bounds
[list of (parameter, (low, high))]: The parameter bounds. -
constraints
: The constraints on the parameters. -
use_gradient
[boolean, default=True]: Whether to use exact gradients (ifTrue
) or numerical gradients (ifFalse
). -
optimizer
[string]: The optimization method.
Returns
self
[object]:self
.
def fit(self, X, bounds=None, constraints=None, use_gradient=True, optimizer=None, **kwargs): """Fit the distribution parameters to data by minimizing the negative log-likelihood of the data. Parameters ---------- * `X` [array-like, shape=(n_samples, n_features)]: The samples. * `bounds` [list of (parameter, (low, high))]: The parameter bounds. * `constraints`: The constraints on the parameters. * `use_gradient` [boolean, default=True]: Whether to use exact gradients (if `True`) or numerical gradients (if `False`). * `optimizer` [string]: The optimization method. Returns ------- * `self` [object]: `self`. """ # Map parameters to placeholders param_to_placeholder = [] param_to_index = {} for i, v in enumerate(self.parameters_): w = T.TensorVariable(v.type) param_to_placeholder.append((v, w)) param_to_index[v] = i # Build bounds mapped_bounds = None if bounds is not None: mapped_bounds = [(None, None) for v in param_to_placeholder] for b in bounds: mapped_bounds[param_to_index[b["param"]]] = b["bounds"] # Build constraints mapped_constraints = None if constraints is not None: mapped_constraints = [] for c in constraints: args = c["param"] if isinstance(args, SharedVariable): args = (args, ) m_c = { "type": c["type"], "fun": lambda x: c["fun"](*[x[param_to_index[a]] for a in args]) } if "jac" in c: m_c["jac"] = lambda x: c["jac"](*[x[param_to_index[a]] for a in args]) mapped_constraints.append(m_c) # Derive objective and gradient objective_ = theano.function( [self.X] + [w for _, w in param_to_placeholder] + [theano.In(v, name=v.name) for v in self.observeds_], T.sum(self.nll_), givens=param_to_placeholder, allow_input_downcast=True) def objective(x): return objective_(X, *x, **kwargs) / len(X) if use_gradient: gradient_ = theano.function( [self.X] + [w for _, w in param_to_placeholder] + [theano.In(v, name=v.name) for v in self.observeds_], theano.grad(T.sum(self.nll_), [v for v, _ in param_to_placeholder]), givens=param_to_placeholder, allow_input_downcast=True) def gradient(x): return np.array(gradient_(X, *x, **kwargs)) / len(X) # Solve! x0 = np.array([v.get_value() for v, _ in param_to_placeholder]) r = minimize(objective, jac=gradient if use_gradient else None, x0=x0, method=optimizer, bounds=mapped_bounds, constraints=mapped_constraints) if r.success: # Assign the solution for i, value in enumerate(r.x): param_to_placeholder[i][0].set_value(value) else: print("Parameter fitting failed!") print(r) return self
def nll(
self, X, **kwargs)
Negative log-likelihood.
This function returns the value of the negative log-likelihood
- log(p(x_i))
, at all x_i
in X
.
Parameters
X
[array-like, shape=(n_samples, n_features)]: The samples.
Returns
nll
[array, shape=(n_samples,)]:-log(p(x_i))
for allx_i
inX
.
def nll(self, X, **kwargs): """Negative log-likelihood. This function returns the value of the negative log-likelihood `- log(p(x_i))`, at all `x_i` in `X`. Parameters ---------- * `X` [array-like, shape=(n_samples, n_features)]: The samples. Returns ------- * `nll` [array, shape=(n_samples,)]: `-log(p(x_i))` for all `x_i` in `X`. """ raise NotImplementedError
def pdf(
self, X, **kwargs)
Probability density function.
This function returns the value of the probability density function
p(x_i)
, at all x_i
in X
.
Parameters
X
[array-like, shape=(n_samples, n_features)]: The samples.
Returns
pdf
[array, shape=(n_samples,)]:p(x_i)
for allx_i
inX
.
def pdf(self, X, **kwargs): """Probability density function. This function returns the value of the probability density function `p(x_i)`, at all `x_i` in `X`. Parameters ---------- * `X` [array-like, shape=(n_samples, n_features)]: The samples. Returns ------- * `pdf` [array, shape=(n_samples,)]: `p(x_i)` for all `x_i` in `X`. """ raise NotImplementedError
def ppf(
self, X, **kwargs)
Percent point function (inverse of cdf).
Parameters
X
[array-like, shape=(n_samples, n_features)]: The samples.
Returns
ppf
[array, shape=(n_samples,)]: Percent point function for allx_i
inX
.
def ppf(self, X, **kwargs): """Percent point function (inverse of cdf). Parameters ---------- * `X` [array-like, shape=(n_samples, n_features)]: The samples. Returns ------- * `ppf` [array, shape=(n_samples,)]: Percent point function for all `x_i` in `X`. """ raise NotImplementedError
def rvs(
self, n_samples, random_state=None, **kwargs)
Generate random samples Y_t
, for t = 1:n_samples
.
Parameters
-
n_samples
[int]: The number of samples to generate. -
random_state
[integer or RandomState object]: The random seed.
def rvs(self, n_samples, random_state=None, **kwargs): """Generate random samples `Y_t`, for `t = 1:n_samples`. Parameters ---------- * `n_samples` [int]: The number of samples to generate. * `random_state` [integer or RandomState object]: The random seed. """ rng = check_random_state(random_state) log_r = self.log_r.eval() sigma = self.sigma.eval() phi = self.phi.eval() N_t = 1 Y = [] for t in range(1, n_samples + 1): e_t = sigma * rng.randn() N_t = np.exp(log_r) * N_t * np.exp(-N_t + e_t) Y_t = rng.poisson(phi * N_t) Y.append(Y_t) Y = np.array(Y).reshape(-1, 1) return Y
def score(
self, X, **kwargs)
Evaluate the log-likelihood -self.nll(X).sum()
of X
.
The higher, the better.
Parameters
X
[array-like, shape=(n_samples, n_features)]: The samples.
Returns
score
[float]: The log-likelihood ofX
.
def score(self, X, **kwargs): """Evaluate the log-likelihood `-self.nll(X).sum()` of `X`. The higher, the better. Parameters ---------- * `X` [array-like, shape=(n_samples, n_features)]: The samples. Returns ------- * `score` [float]: The log-likelihood of `X`. """ return -self.nll(X, **kwargs).sum()
def set_params(
self, **params)
Set the parameters of this estimator.
The method works on simple estimators as well as on nested objects
(such as pipelines). The latter have parameters of the form
<component>__<parameter>
so that it's possible to update each
component of a nested object.
Returns
self
def set_params(self, **params): for name, value in params.items(): var = getattr(self, name, None) if isinstance(var, SharedVariable): var.set_value(value) elif (isinstance(var, T.TensorVariable) or isinstance(var, T.TensorConstant)): raise ValueError("Only shared variables can be updated.") else: super(DistributionMixin, self).set_params(**{name: value})
Instance variables
var ndim
The number of dimensions (or features) of the data.