carl.distributions module
This module implements tools for composing and fitting statistical distributions.
Note
This module is only meant to be a minimally working prototype for composing and fitting distributions. It is not meant to be a full fledged replacement of RooFit or alikes.
""" This module implements tools for composing and fitting statistical distributions. Note ---- This module is only meant to be a minimally working prototype for composing and fitting distributions. It is not meant to be a full fledged replacement of RooFit or alikes. """ # Carl is free software; you can redistribute it and/or modify it # under the terms of the Revised BSD License; see LICENSE file for # more details. from .base import DistributionMixin from .base import TheanoDistribution from .exponential import Exponential from .normal import Normal from .normal import MultivariateNormal from .uniform import Uniform from .join import Join from .mixture import Mixture from .transforms import LinearTransform from .histogram import Histogram from .kde import KernelDensity from .sampler import Sampler __all__ = ("DistributionMixin", "TheanoDistribution", "Exponential", "Normal", "MultivariateNormal", "Uniform", "Join", "Mixture", "LinearTransform", "Histogram", "KernelDensity", "Sampler")
Classes
class DistributionMixin
Distribution mixin.
This class defines the common API for distribution objects.
class DistributionMixin(BaseEstimator): """Distribution mixin. This class defines the common API for distribution objects. """ def pdf(self, X, **kwargs): """Probability density function. This function returns the value of the probability density function `p(x_i)`, at all `x_i` in `X`. Parameters ---------- * `X` [array-like, shape=(n_samples, n_features)]: The samples. Returns ------- * `pdf` [array, shape=(n_samples,)]: `p(x_i)` for all `x_i` in `X`. """ raise NotImplementedError def nll(self, X, **kwargs): """Negative log-likelihood. This function returns the value of the negative log-likelihood `- log(p(x_i))`, at all `x_i` in `X`. Parameters ---------- * `X` [array-like, shape=(n_samples, n_features)]: The samples. Returns ------- * `nll` [array, shape=(n_samples,)]: `-log(p(x_i))` for all `x_i` in `X`. """ raise NotImplementedError def cdf(self, X, **kwargs): """Cumulative distribution function. This function returns the value of the cumulative distribution function `F(x_i) = p(x <= x_i)`, at all `x_i` in `X`. Parameters ---------- * `X` [array-like, shape=(n_samples, n_features)]: The samples. Returns ------- * `cdf` [array, shape=(n_samples,)]: `cdf(x_i)` for all `x_i` in `X`. """ raise NotImplementedError def ppf(self, X, **kwargs): """Percent point function (inverse of cdf). Parameters ---------- * `X` [array-like, shape=(n_samples, n_features)]: The samples. Returns ------- * `ppf` [array, shape=(n_samples,)]: Percent point function for all `x_i` in `X`. """ raise NotImplementedError def rvs(self, n_samples, random_state=None, **kwargs): """Draw samples. Parameters ---------- * `n_samples` [int]: The number of samples to draw. * `random_state` [integer or RandomState object]: The random seed. Returns ------- * `X` [array, shape=(n_samples, n_features)]: The generated samples. """ raise NotImplementedError def fit(self, X, **kwargs): """Fit the distribution parameters to data. Parameters ---------- * `X` [array-like, shape=(n_samples, n_features)]: The samples. Returns ------- * `self` [object]: `self`. """ return self def score(self, X, **kwargs): """Evaluate the goodness of fit of the distribution w.r.t. `X`. The higher, the better. Parameters ---------- * `X` [array-like, shape=(n_samples, n_features)]: The samples. Returns ------- * `score` [float]: The goodness of fit. """ return NotImplementedError @property def ndim(self): """The number of dimensions (or features) of the data.""" return 1
Ancestors (in MRO)
- DistributionMixin
- sklearn.base.BaseEstimator
- builtins.object
Static methods
def cdf(
self, X, **kwargs)
Cumulative distribution function.
This function returns the value of the cumulative distribution function
F(x_i) = p(x <= x_i)
, at all x_i
in X
.
Parameters
X
[array-like, shape=(n_samples, n_features)]: The samples.
Returns
cdf
[array, shape=(n_samples,)]:cdf(x_i)
for allx_i
inX
.
def cdf(self, X, **kwargs): """Cumulative distribution function. This function returns the value of the cumulative distribution function `F(x_i) = p(x <= x_i)`, at all `x_i` in `X`. Parameters ---------- * `X` [array-like, shape=(n_samples, n_features)]: The samples. Returns ------- * `cdf` [array, shape=(n_samples,)]: `cdf(x_i)` for all `x_i` in `X`. """ raise NotImplementedError
def fit(
self, X, **kwargs)
Fit the distribution parameters to data.
Parameters
X
[array-like, shape=(n_samples, n_features)]: The samples.
Returns
self
[object]:self
.
def fit(self, X, **kwargs): """Fit the distribution parameters to data. Parameters ---------- * `X` [array-like, shape=(n_samples, n_features)]: The samples. Returns ------- * `self` [object]: `self`. """ return self
def nll(
self, X, **kwargs)
Negative log-likelihood.
This function returns the value of the negative log-likelihood
- log(p(x_i))
, at all x_i
in X
.
Parameters
X
[array-like, shape=(n_samples, n_features)]: The samples.
Returns
nll
[array, shape=(n_samples,)]:-log(p(x_i))
for allx_i
inX
.
def nll(self, X, **kwargs): """Negative log-likelihood. This function returns the value of the negative log-likelihood `- log(p(x_i))`, at all `x_i` in `X`. Parameters ---------- * `X` [array-like, shape=(n_samples, n_features)]: The samples. Returns ------- * `nll` [array, shape=(n_samples,)]: `-log(p(x_i))` for all `x_i` in `X`. """ raise NotImplementedError
def pdf(
self, X, **kwargs)
Probability density function.
This function returns the value of the probability density function
p(x_i)
, at all x_i
in X
.
Parameters
X
[array-like, shape=(n_samples, n_features)]: The samples.
Returns
pdf
[array, shape=(n_samples,)]:p(x_i)
for allx_i
inX
.
def pdf(self, X, **kwargs): """Probability density function. This function returns the value of the probability density function `p(x_i)`, at all `x_i` in `X`. Parameters ---------- * `X` [array-like, shape=(n_samples, n_features)]: The samples. Returns ------- * `pdf` [array, shape=(n_samples,)]: `p(x_i)` for all `x_i` in `X`. """ raise NotImplementedError
def ppf(
self, X, **kwargs)
Percent point function (inverse of cdf).
Parameters
X
[array-like, shape=(n_samples, n_features)]: The samples.
Returns
ppf
[array, shape=(n_samples,)]: Percent point function for allx_i
inX
.
def ppf(self, X, **kwargs): """Percent point function (inverse of cdf). Parameters ---------- * `X` [array-like, shape=(n_samples, n_features)]: The samples. Returns ------- * `ppf` [array, shape=(n_samples,)]: Percent point function for all `x_i` in `X`. """ raise NotImplementedError
def rvs(
self, n_samples, random_state=None, **kwargs)
Draw samples.
Parameters
-
n_samples
[int]: The number of samples to draw. -
random_state
[integer or RandomState object]: The random seed.
Returns
X
[array, shape=(n_samples, n_features)]: The generated samples.
def rvs(self, n_samples, random_state=None, **kwargs): """Draw samples. Parameters ---------- * `n_samples` [int]: The number of samples to draw. * `random_state` [integer or RandomState object]: The random seed. Returns ------- * `X` [array, shape=(n_samples, n_features)]: The generated samples. """ raise NotImplementedError
def score(
self, X, **kwargs)
Evaluate the goodness of fit of the distribution w.r.t. X
.
The higher, the better.
Parameters
X
[array-like, shape=(n_samples, n_features)]: The samples.
Returns
score
[float]: The goodness of fit.
def score(self, X, **kwargs): """Evaluate the goodness of fit of the distribution w.r.t. `X`. The higher, the better. Parameters ---------- * `X` [array-like, shape=(n_samples, n_features)]: The samples. Returns ------- * `score` [float]: The goodness of fit. """ return NotImplementedError
Instance variables
var ndim
The number of dimensions (or features) of the data.
class Exponential
Exponential distribution.
This distribution supports 1D data only.
class Exponential(TheanoDistribution): """Exponential distribution. This distribution supports 1D data only. """ def __init__(self, inverse_scale=1.0): """Constructor. Parameters ---------- * `inverse_scale` [float]: The inverse scale. """ super(Exponential, self).__init__(inverse_scale=inverse_scale) # pdf self.pdf_ = T.switch( T.lt(self.X, 0.), 0., self.inverse_scale * T.exp(-self.inverse_scale * self.X)).ravel() self._make(self.pdf_, "pdf") # -log pdf self.nll_ = bound( -T.log(self.inverse_scale) + self.inverse_scale * self.X, np.inf, self.inverse_scale > 0.).ravel() self._make(self.nll_, "nll") # cdf self.cdf_ = (1. - T.exp(-self.inverse_scale * self.X)).ravel() self._make(self.cdf_, "cdf") # ppf self.ppf_ = -T.log(1. - self.p) / self.inverse_scale self._make(self.ppf_, "ppf", args=[self.p])
Ancestors (in MRO)
- Exponential
- TheanoDistribution
- DistributionMixin
- sklearn.base.BaseEstimator
- builtins.object
Class variables
var X
var p
Static methods
def __init__(
self, inverse_scale=1.0)
Constructor.
Parameters
inverse_scale
[float]: The inverse scale.
def __init__(self, inverse_scale=1.0): """Constructor. Parameters ---------- * `inverse_scale` [float]: The inverse scale. """ super(Exponential, self).__init__(inverse_scale=inverse_scale) # pdf self.pdf_ = T.switch( T.lt(self.X, 0.), 0., self.inverse_scale * T.exp(-self.inverse_scale * self.X)).ravel() self._make(self.pdf_, "pdf") # -log pdf self.nll_ = bound( -T.log(self.inverse_scale) + self.inverse_scale * self.X, np.inf, self.inverse_scale > 0.).ravel() self._make(self.nll_, "nll") # cdf self.cdf_ = (1. - T.exp(-self.inverse_scale * self.X)).ravel() self._make(self.cdf_, "cdf") # ppf self.ppf_ = -T.log(1. - self.p) / self.inverse_scale self._make(self.ppf_, "ppf", args=[self.p])
def cdf(
self, X, **kwargs)
Cumulative distribution function.
This function returns the value of the cumulative distribution function
F(x_i) = p(x <= x_i)
, at all x_i
in X
.
Parameters
X
[array-like, shape=(n_samples, n_features)]: The samples.
Returns
cdf
[array, shape=(n_samples,)]:cdf(x_i)
for allx_i
inX
.
def cdf(self, X, **kwargs): """Cumulative distribution function. This function returns the value of the cumulative distribution function `F(x_i) = p(x <= x_i)`, at all `x_i` in `X`. Parameters ---------- * `X` [array-like, shape=(n_samples, n_features)]: The samples. Returns ------- * `cdf` [array, shape=(n_samples,)]: `cdf(x_i)` for all `x_i` in `X`. """ raise NotImplementedError
def fit(
self, X, bounds=None, constraints=None, use_gradient=True, optimizer=None, **kwargs)
Fit the distribution parameters to data by minimizing the negative log-likelihood of the data.
Parameters
-
X
[array-like, shape=(n_samples, n_features)]: The samples. -
bounds
[list of (parameter, (low, high))]: The parameter bounds. -
constraints
: The constraints on the parameters. -
use_gradient
[boolean, default=True]: Whether to use exact gradients (ifTrue
) or numerical gradients (ifFalse
). -
optimizer
[string]: The optimization method.
Returns
self
[object]:self
.
def fit(self, X, bounds=None, constraints=None, use_gradient=True, optimizer=None, **kwargs): """Fit the distribution parameters to data by minimizing the negative log-likelihood of the data. Parameters ---------- * `X` [array-like, shape=(n_samples, n_features)]: The samples. * `bounds` [list of (parameter, (low, high))]: The parameter bounds. * `constraints`: The constraints on the parameters. * `use_gradient` [boolean, default=True]: Whether to use exact gradients (if `True`) or numerical gradients (if `False`). * `optimizer` [string]: The optimization method. Returns ------- * `self` [object]: `self`. """ # Map parameters to placeholders param_to_placeholder = [] param_to_index = {} for i, v in enumerate(self.parameters_): w = T.TensorVariable(v.type) param_to_placeholder.append((v, w)) param_to_index[v] = i # Build bounds mapped_bounds = None if bounds is not None: mapped_bounds = [(None, None) for v in param_to_placeholder] for b in bounds: mapped_bounds[param_to_index[b["param"]]] = b["bounds"] # Build constraints mapped_constraints = None if constraints is not None: mapped_constraints = [] for c in constraints: args = c["param"] if isinstance(args, SharedVariable): args = (args, ) m_c = { "type": c["type"], "fun": lambda x: c["fun"](*[x[param_to_index[a]] for a in args]) } if "jac" in c: m_c["jac"] = lambda x: c["jac"](*[x[param_to_index[a]] for a in args]) mapped_constraints.append(m_c) # Derive objective and gradient objective_ = theano.function( [self.X] + [w for _, w in param_to_placeholder] + [theano.In(v, name=v.name) for v in self.observeds_], T.sum(self.nll_), givens=param_to_placeholder, allow_input_downcast=True) def objective(x): return objective_(X, *x, **kwargs) / len(X) if use_gradient: gradient_ = theano.function( [self.X] + [w for _, w in param_to_placeholder] + [theano.In(v, name=v.name) for v in self.observeds_], theano.grad(T.sum(self.nll_), [v for v, _ in param_to_placeholder]), givens=param_to_placeholder, allow_input_downcast=True) def gradient(x): return np.array(gradient_(X, *x, **kwargs)) / len(X) # Solve! x0 = np.array([v.get_value() for v, _ in param_to_placeholder]) r = minimize(objective, jac=gradient if use_gradient else None, x0=x0, method=optimizer, bounds=mapped_bounds, constraints=mapped_constraints) if r.success: # Assign the solution for i, value in enumerate(r.x): param_to_placeholder[i][0].set_value(value) else: print("Parameter fitting failed!") print(r) return self
def nll(
self, X, **kwargs)
Negative log-likelihood.
This function returns the value of the negative log-likelihood
- log(p(x_i))
, at all x_i
in X
.
Parameters
X
[array-like, shape=(n_samples, n_features)]: The samples.
Returns
nll
[array, shape=(n_samples,)]:-log(p(x_i))
for allx_i
inX
.
def nll(self, X, **kwargs): """Negative log-likelihood. This function returns the value of the negative log-likelihood `- log(p(x_i))`, at all `x_i` in `X`. Parameters ---------- * `X` [array-like, shape=(n_samples, n_features)]: The samples. Returns ------- * `nll` [array, shape=(n_samples,)]: `-log(p(x_i))` for all `x_i` in `X`. """ raise NotImplementedError
def pdf(
self, X, **kwargs)
Probability density function.
This function returns the value of the probability density function
p(x_i)
, at all x_i
in X
.
Parameters
X
[array-like, shape=(n_samples, n_features)]: The samples.
Returns
pdf
[array, shape=(n_samples,)]:p(x_i)
for allx_i
inX
.
def pdf(self, X, **kwargs): """Probability density function. This function returns the value of the probability density function `p(x_i)`, at all `x_i` in `X`. Parameters ---------- * `X` [array-like, shape=(n_samples, n_features)]: The samples. Returns ------- * `pdf` [array, shape=(n_samples,)]: `p(x_i)` for all `x_i` in `X`. """ raise NotImplementedError
def ppf(
self, X, **kwargs)
Percent point function (inverse of cdf).
Parameters
X
[array-like, shape=(n_samples, n_features)]: The samples.
Returns
ppf
[array, shape=(n_samples,)]: Percent point function for allx_i
inX
.
def ppf(self, X, **kwargs): """Percent point function (inverse of cdf). Parameters ---------- * `X` [array-like, shape=(n_samples, n_features)]: The samples. Returns ------- * `ppf` [array, shape=(n_samples,)]: Percent point function for all `x_i` in `X`. """ raise NotImplementedError
def rvs(
self, n_samples, random_state=None, **kwargs)
Draw samples.
Parameters
-
n_samples
[int]: The number of samples to draw. -
random_state
[integer or RandomState object]: The random seed.
Returns
X
[array, shape=(n_samples, n_features)]: The generated samples.
def rvs(self, n_samples, random_state=None, **kwargs): rng = check_random_state(random_state) p = rng.rand(n_samples, 1) return self.ppf(p, **kwargs)
def score(
self, X, **kwargs)
Evaluate the log-likelihood -self.nll(X).sum()
of X
.
The higher, the better.
Parameters
X
[array-like, shape=(n_samples, n_features)]: The samples.
Returns
score
[float]: The log-likelihood ofX
.
def score(self, X, **kwargs): """Evaluate the log-likelihood `-self.nll(X).sum()` of `X`. The higher, the better. Parameters ---------- * `X` [array-like, shape=(n_samples, n_features)]: The samples. Returns ------- * `score` [float]: The log-likelihood of `X`. """ return -self.nll(X, **kwargs).sum()
def set_params(
self, **params)
Set the parameters of this estimator.
The method works on simple estimators as well as on nested objects
(such as pipelines). The latter have parameters of the form
<component>__<parameter>
so that it's possible to update each
component of a nested object.
Returns
self
def set_params(self, **params): for name, value in params.items(): var = getattr(self, name, None) if isinstance(var, SharedVariable): var.set_value(value) elif (isinstance(var, T.TensorVariable) or isinstance(var, T.TensorConstant)): raise ValueError("Only shared variables can be updated.") else: super(DistributionMixin, self).set_params(**{name: value})
Instance variables
var cdf_
var ndim
The number of dimensions (or features) of the data.
var nll_
var pdf_
var ppf_
class Histogram
N-dimensional histogram.
class Histogram(DistributionMixin): """N-dimensional histogram.""" def __init__(self, bins=10, range=None, interpolation=None, variable_width=False): """Constructor. Parameters ---------- * `bins` [int or string]: The number of bins or `"blocks"` for bayesian blocks. * `range` [list of bounds (low, high)]: The boundaries. If `None`, bounds are inferred from data. * `interpolation` [string, optional] Specifies the kind of interpolation between bins as a string (`"linear"`, `"nearest"`, `"zero"`, `"slinear"`, `"quadratic"`, `"cubic"`). * `variable_width` [boolean, optional] If True use equal probability variable length bins. """ self.bins = bins self.range = range self.interpolation = interpolation self.variable_width = variable_width def pdf(self, X, **kwargs): X = check_array(X) if self.ndim_ == 1 and self.interpolation: return self.interpolation_(X[:, 0]) all_indices = [] for j in range(X.shape[1]): indices = np.searchsorted(self.edges_[j], X[:, j], side="right") - 1 # For the last bin, the upper is inclusive indices[X[:, j] == self.edges_[j][-2]] -= 1 all_indices.append(indices) return self.histogram_[all_indices] def nll(self, X, **kwargs): return -np.log(self.pdf(X, **kwargs)) def rvs(self, n_samples, random_state=None, **kwargs): rng = check_random_state(random_state) # Draw random bins with respect to their densities h = (self.histogram_ / self.histogram_.sum()).ravel() flat_indices = np.searchsorted(np.cumsum(h), rng.rand(n_samples), side="right") # Build bin corners indices = np.unravel_index(flat_indices, self.histogram_.shape) indices_end = [a + 1 for a in indices] shape = [len(d) for d in self.edges_] + [len(self.edges_)] corners = np.array(list(product(*self.edges_))).reshape(shape) # Draw uniformly within bins low = corners[indices] high = corners[indices_end] u = rng.rand(*low.shape) return low + u * (high - low) def fit(self, X, sample_weight=None, **kwargs): # Checks X = check_array(X) if sample_weight is not None and len(sample_weight) != len(X): raise ValueError # Compute histogram and edges if self.bins == "blocks": bins = bayesian_blocks(X.ravel(), fitness="events", p0=0.0001) range_ = self.range[0] if self.range else None h, e = np.histogram(X.ravel(), bins=bins, range=range_, weights=sample_weight, normed=False) e = [e] elif self.variable_width: ticks = [np.percentile(X.ravel(), 100. * k / self.bins) for k in range(self.bins + 1)] ticks[-1] += 1e-5 range_ = self.range[0] if self.range else None h, e = np.histogram(X.ravel(), bins=ticks, range=range_, normed=False, weights=sample_weight) h, e = h.astype(float), e.astype(float) widths = e[1:] - e[:-1] h = h / widths / h.sum() e = [e] else: bins = self.bins h, e = np.histogramdd(X, bins=bins, range=self.range, weights=sample_weight, normed=True) # Add empty bins for out of bound samples for j in range(X.shape[1]): h = np.insert(h, 0, 0., axis=j) h = np.insert(h, h.shape[j], 0., axis=j) e[j] = np.insert(e[j], 0, -np.inf) e[j] = np.insert(e[j], len(e[j]), np.inf) if X.shape[1] == 1 and self.interpolation: inputs = e[0][2:-1] - (e[0][2] - e[0][1]) / 2. inputs[0] = e[0][1] inputs[-1] = e[0][-2] outputs = h[1:-1] self.interpolation_ = interp1d(inputs, outputs, kind=self.interpolation, bounds_error=False, fill_value=0.) self.histogram_ = h self.edges_ = e self.ndim_ = X.shape[1] return self def cdf(self, X, **kwargs): """Not supported.""" raise NotImplementedError def ppf(self, X, **kwargs): """Not supported.""" raise NotImplementedError @property def ndim(self): return self.ndim_
Ancestors (in MRO)
- Histogram
- DistributionMixin
- sklearn.base.BaseEstimator
- builtins.object
Static methods
def __init__(
self, bins=10, range=None, interpolation=None, variable_width=False)
Constructor.
Parameters
-
bins
[int or string]: The number of bins or"blocks"
for bayesian blocks. -
range
[list of bounds (low, high)]: The boundaries. IfNone
, bounds are inferred from data. -
interpolation
[string, optional] Specifies the kind of interpolation between bins as a string ("linear"
,"nearest"
,"zero"
,"slinear"
,"quadratic"
,"cubic"
). -
variable_width
[boolean, optional] If True use equal probability variable length bins.
def __init__(self, bins=10, range=None, interpolation=None, variable_width=False): """Constructor. Parameters ---------- * `bins` [int or string]: The number of bins or `"blocks"` for bayesian blocks. * `range` [list of bounds (low, high)]: The boundaries. If `None`, bounds are inferred from data. * `interpolation` [string, optional] Specifies the kind of interpolation between bins as a string (`"linear"`, `"nearest"`, `"zero"`, `"slinear"`, `"quadratic"`, `"cubic"`). * `variable_width` [boolean, optional] If True use equal probability variable length bins. """ self.bins = bins self.range = range self.interpolation = interpolation self.variable_width = variable_width
def cdf(
self, X, **kwargs)
Not supported.
def cdf(self, X, **kwargs): """Not supported.""" raise NotImplementedError
def fit(
self, X, sample_weight=None, **kwargs)
Fit the distribution parameters to data.
Parameters
X
[array-like, shape=(n_samples, n_features)]: The samples.
Returns
self
[object]:self
.
def fit(self, X, sample_weight=None, **kwargs): # Checks X = check_array(X) if sample_weight is not None and len(sample_weight) != len(X): raise ValueError # Compute histogram and edges if self.bins == "blocks": bins = bayesian_blocks(X.ravel(), fitness="events", p0=0.0001) range_ = self.range[0] if self.range else None h, e = np.histogram(X.ravel(), bins=bins, range=range_, weights=sample_weight, normed=False) e = [e] elif self.variable_width: ticks = [np.percentile(X.ravel(), 100. * k / self.bins) for k in range(self.bins + 1)] ticks[-1] += 1e-5 range_ = self.range[0] if self.range else None h, e = np.histogram(X.ravel(), bins=ticks, range=range_, normed=False, weights=sample_weight) h, e = h.astype(float), e.astype(float) widths = e[1:] - e[:-1] h = h / widths / h.sum() e = [e] else: bins = self.bins h, e = np.histogramdd(X, bins=bins, range=self.range, weights=sample_weight, normed=True) # Add empty bins for out of bound samples for j in range(X.shape[1]): h = np.insert(h, 0, 0., axis=j) h = np.insert(h, h.shape[j], 0., axis=j) e[j] = np.insert(e[j], 0, -np.inf) e[j] = np.insert(e[j], len(e[j]), np.inf) if X.shape[1] == 1 and self.interpolation: inputs = e[0][2:-1] - (e[0][2] - e[0][1]) / 2. inputs[0] = e[0][1] inputs[-1] = e[0][-2] outputs = h[1:-1] self.interpolation_ = interp1d(inputs, outputs, kind=self.interpolation, bounds_error=False, fill_value=0.) self.histogram_ = h self.edges_ = e self.ndim_ = X.shape[1] return self
def nll(
self, X, **kwargs)
Negative log-likelihood.
This function returns the value of the negative log-likelihood
- log(p(x_i))
, at all x_i
in X
.
Parameters
X
[array-like, shape=(n_samples, n_features)]: The samples.
Returns
nll
[array, shape=(n_samples,)]:-log(p(x_i))
for allx_i
inX
.
def nll(self, X, **kwargs): return -np.log(self.pdf(X, **kwargs))
def pdf(
self, X, **kwargs)
Probability density function.
This function returns the value of the probability density function
p(x_i)
, at all x_i
in X
.
Parameters
X
[array-like, shape=(n_samples, n_features)]: The samples.
Returns
pdf
[array, shape=(n_samples,)]:p(x_i)
for allx_i
inX
.
def pdf(self, X, **kwargs): X = check_array(X) if self.ndim_ == 1 and self.interpolation: return self.interpolation_(X[:, 0]) all_indices = [] for j in range(X.shape[1]): indices = np.searchsorted(self.edges_[j], X[:, j], side="right") - 1 # For the last bin, the upper is inclusive indices[X[:, j] == self.edges_[j][-2]] -= 1 all_indices.append(indices) return self.histogram_[all_indices]
def ppf(
self, X, **kwargs)
Not supported.
def ppf(self, X, **kwargs): """Not supported.""" raise NotImplementedError
def rvs(
self, n_samples, random_state=None, **kwargs)
Draw samples.
Parameters
-
n_samples
[int]: The number of samples to draw. -
random_state
[integer or RandomState object]: The random seed.
Returns
X
[array, shape=(n_samples, n_features)]: The generated samples.
def rvs(self, n_samples, random_state=None, **kwargs): rng = check_random_state(random_state) # Draw random bins with respect to their densities h = (self.histogram_ / self.histogram_.sum()).ravel() flat_indices = np.searchsorted(np.cumsum(h), rng.rand(n_samples), side="right") # Build bin corners indices = np.unravel_index(flat_indices, self.histogram_.shape) indices_end = [a + 1 for a in indices] shape = [len(d) for d in self.edges_] + [len(self.edges_)] corners = np.array(list(product(*self.edges_))).reshape(shape) # Draw uniformly within bins low = corners[indices] high = corners[indices_end] u = rng.rand(*low.shape) return low + u * (high - low)
def score(
self, X, **kwargs)
Evaluate the goodness of fit of the distribution w.r.t. X
.
The higher, the better.
Parameters
X
[array-like, shape=(n_samples, n_features)]: The samples.
Returns
score
[float]: The goodness of fit.
def score(self, X, **kwargs): """Evaluate the goodness of fit of the distribution w.r.t. `X`. The higher, the better. Parameters ---------- * `X` [array-like, shape=(n_samples, n_features)]: The samples. Returns ------- * `score` [float]: The goodness of fit. """ return NotImplementedError
Instance variables
var bins
var interpolation
var ndim
var range
var variable_width
class Join
Joint distribution.
This class can be used to define a joint distribution
p(x, y, z, ...) = p_0(x) * p_1(y) * p_2(z) * ...
, where p_i
are
themselves distributions.
class Join(TheanoDistribution): """Joint distribution. This class can be used to define a joint distribution `p(x, y, z, ...) = p_0(x) * p_1(y) * p_2(z) * ...`, where `p_i` are themselves distributions. """ def __init__(self, components): """Constructor. Parameters ---------- * `components` [list of `DistributionMixin`]: The components to join together. """ super(Join, self).__init__() self.components = components for i, component in enumerate(components): # Add component parameters, constants and observeds if isinstance(component, TheanoDistribution): for p_i in component.parameters_: self.parameters_.add(p_i) for c_i in component.constants_: self.constants_.add(c_i) for o_i in component.observeds_: self.observeds_.add(o_i) # Derive and overide pdf and nll analytically if possible if all([hasattr(c, "pdf_") for c in self.components]): # pdf c0 = self.components[0] self.pdf_ = theano.clone(c0.pdf_, {c0.X: self.X[:, 0:c0.ndim]}) start = c0.ndim for c in self.components[1:]: self.pdf_ *= theano.clone( c.pdf_, {c.X: self.X[:, start:start+c.ndim]}) start += c.ndim self._make(self.pdf_, "pdf") if all([hasattr(c, "nll_") for c in self.components]): # nll c0 = self.components[0] self.nll_ = theano.clone(c0.nll_, {c0.X: self.X[:, 0:c0.ndim]}) start = c0.ndim for c in self.components[1:]: self.nll_ += theano.clone( c.nll_, {c.X: self.X[:, start:start+c.ndim]}) start += c.ndim self._make(self.nll_, "nll") def pdf(self, X, **kwargs): out = np.ones(len(X)) start = 0 for i, component in enumerate(self.components): out *= component.pdf(X[:, start:start+component.ndim], **kwargs) start += component.ndim return out def nll(self, X, **kwargs): out = np.zeros(len(X)) start = 0 for i, component in enumerate(self.components): out += component.nll(X[:, start:start+component.ndim], **kwargs) start += component.ndim return out def rvs(self, n_samples, random_state=None, **kwargs): rng = check_random_state(random_state) out = np.zeros((n_samples, self.ndim)) start = 0 for i, component in enumerate(self.components): out[:, start:start+component.ndim] = component.rvs( n_samples, random_state=rng, **kwargs) start += component.ndim return out def fit(self, X, **kwargs): if hasattr(self, "nll_"): return super(Join, self).fit(X, **kwargs) else: raise NotImplementedError def cdf(self, X, **kwargs): """Not supported.""" raise NotImplementedError def ppf(self, X, **kwargs): """Not supported.""" raise NotImplementedError @property def ndim(self): return sum([c.ndim for c in self.components])
Ancestors (in MRO)
- Join
- TheanoDistribution
- DistributionMixin
- sklearn.base.BaseEstimator
- builtins.object
Class variables
var X
var p
Static methods
def __init__(
self, components)
Constructor.
Parameters
components
[list ofDistributionMixin
]: The components to join together.
def __init__(self, components): """Constructor. Parameters ---------- * `components` [list of `DistributionMixin`]: The components to join together. """ super(Join, self).__init__() self.components = components for i, component in enumerate(components): # Add component parameters, constants and observeds if isinstance(component, TheanoDistribution): for p_i in component.parameters_: self.parameters_.add(p_i) for c_i in component.constants_: self.constants_.add(c_i) for o_i in component.observeds_: self.observeds_.add(o_i) # Derive and overide pdf and nll analytically if possible if all([hasattr(c, "pdf_") for c in self.components]): # pdf c0 = self.components[0] self.pdf_ = theano.clone(c0.pdf_, {c0.X: self.X[:, 0:c0.ndim]}) start = c0.ndim for c in self.components[1:]: self.pdf_ *= theano.clone( c.pdf_, {c.X: self.X[:, start:start+c.ndim]}) start += c.ndim self._make(self.pdf_, "pdf") if all([hasattr(c, "nll_") for c in self.components]): # nll c0 = self.components[0] self.nll_ = theano.clone(c0.nll_, {c0.X: self.X[:, 0:c0.ndim]}) start = c0.ndim for c in self.components[1:]: self.nll_ += theano.clone( c.nll_, {c.X: self.X[:, start:start+c.ndim]}) start += c.ndim self._make(self.nll_, "nll")
def cdf(
self, X, **kwargs)
Not supported.
def cdf(self, X, **kwargs): """Not supported.""" raise NotImplementedError
def fit(
self, X, **kwargs)
Fit the distribution parameters to data by minimizing the negative log-likelihood of the data.
Parameters
-
X
[array-like, shape=(n_samples, n_features)]: The samples. -
bounds
[list of (parameter, (low, high))]: The parameter bounds. -
constraints
: The constraints on the parameters. -
use_gradient
[boolean, default=True]: Whether to use exact gradients (ifTrue
) or numerical gradients (ifFalse
). -
optimizer
[string]: The optimization method.
Returns
self
[object]:self
.
def fit(self, X, **kwargs): if hasattr(self, "nll_"): return super(Join, self).fit(X, **kwargs) else: raise NotImplementedError
def nll(
self, X, **kwargs)
Negative log-likelihood.
This function returns the value of the negative log-likelihood
- log(p(x_i))
, at all x_i
in X
.
Parameters
X
[array-like, shape=(n_samples, n_features)]: The samples.
Returns
nll
[array, shape=(n_samples,)]:-log(p(x_i))
for allx_i
inX
.
def nll(self, X, **kwargs): out = np.zeros(len(X)) start = 0 for i, component in enumerate(self.components): out += component.nll(X[:, start:start+component.ndim], **kwargs) start += component.ndim return out
def pdf(
self, X, **kwargs)
Probability density function.
This function returns the value of the probability density function
p(x_i)
, at all x_i
in X
.
Parameters
X
[array-like, shape=(n_samples, n_features)]: The samples.
Returns
pdf
[array, shape=(n_samples,)]:p(x_i)
for allx_i
inX
.
def pdf(self, X, **kwargs): out = np.ones(len(X)) start = 0 for i, component in enumerate(self.components): out *= component.pdf(X[:, start:start+component.ndim], **kwargs) start += component.ndim return out
def ppf(
self, X, **kwargs)
Not supported.
def ppf(self, X, **kwargs): """Not supported.""" raise NotImplementedError
def rvs(
self, n_samples, random_state=None, **kwargs)
Draw samples.
Parameters
-
n_samples
[int]: The number of samples to draw. -
random_state
[integer or RandomState object]: The random seed.
Returns
X
[array, shape=(n_samples, n_features)]: The generated samples.
def rvs(self, n_samples, random_state=None, **kwargs): rng = check_random_state(random_state) out = np.zeros((n_samples, self.ndim)) start = 0 for i, component in enumerate(self.components): out[:, start:start+component.ndim] = component.rvs( n_samples, random_state=rng, **kwargs) start += component.ndim return out
def score(
self, X, **kwargs)
Evaluate the log-likelihood -self.nll(X).sum()
of X
.
The higher, the better.
Parameters
X
[array-like, shape=(n_samples, n_features)]: The samples.
Returns
score
[float]: The log-likelihood ofX
.
def score(self, X, **kwargs): """Evaluate the log-likelihood `-self.nll(X).sum()` of `X`. The higher, the better. Parameters ---------- * `X` [array-like, shape=(n_samples, n_features)]: The samples. Returns ------- * `score` [float]: The log-likelihood of `X`. """ return -self.nll(X, **kwargs).sum()
def set_params(
self, **params)
Set the parameters of this estimator.
The method works on simple estimators as well as on nested objects
(such as pipelines). The latter have parameters of the form
<component>__<parameter>
so that it's possible to update each
component of a nested object.
Returns
self
def set_params(self, **params): for name, value in params.items(): var = getattr(self, name, None) if isinstance(var, SharedVariable): var.set_value(value) elif (isinstance(var, T.TensorVariable) or isinstance(var, T.TensorConstant)): raise ValueError("Only shared variables can be updated.") else: super(DistributionMixin, self).set_params(**{name: value})
Instance variables
var components
var ndim
class KernelDensity
Kernel density estimation.
This distribution supports 1D data only.
class KernelDensity(DistributionMixin): """Kernel density estimation. This distribution supports 1D data only. """ def __init__(self, bandwidth=None): """Constructor. Parameters ---------- * `bandwidth` [string or float, optional]: The method used to calculate the estimator bandwidth. """ self.bandwidth = bandwidth def pdf(self, X, **kwargs): X = check_array(X) return self.kde_.pdf(X.T) def nll(self, X, **kwargs): X = check_array(X) return -self.kde_.logpdf(X.T) def rvs(self, n_samples, random_state=None, **kwargs): # XXX gaussian_kde uses Numpy global random state... return self.kde_.resample(n_samples).T def fit(self, X, **kwargs): """Fit the KDE estimator to data. Parameters ---------- * `X` [array-like, shape=(n_samples, n_features)]: The samples. Returns ------- * `self` [object]: `self`. """ X = check_array(X).T self.kde_ = gaussian_kde(X, bw_method=self.bandwidth) return self def cdf(self, X, **kwargs): """Not supported.""" raise NotImplementedError def ppf(self, X, **kwargs): """Not supported.""" raise NotImplementedError
Ancestors (in MRO)
- KernelDensity
- DistributionMixin
- sklearn.base.BaseEstimator
- builtins.object
Static methods
def __init__(
self, bandwidth=None)
Constructor.
Parameters
bandwidth
[string or float, optional]: The method used to calculate the estimator bandwidth.
def __init__(self, bandwidth=None): """Constructor. Parameters ---------- * `bandwidth` [string or float, optional]: The method used to calculate the estimator bandwidth. """ self.bandwidth = bandwidth
def cdf(
self, X, **kwargs)
Not supported.
def cdf(self, X, **kwargs): """Not supported.""" raise NotImplementedError
def fit(
self, X, **kwargs)
Fit the KDE estimator to data.
Parameters
X
[array-like, shape=(n_samples, n_features)]: The samples.
Returns
self
[object]:self
.
def fit(self, X, **kwargs): """Fit the KDE estimator to data. Parameters ---------- * `X` [array-like, shape=(n_samples, n_features)]: The samples. Returns ------- * `self` [object]: `self`. """ X = check_array(X).T self.kde_ = gaussian_kde(X, bw_method=self.bandwidth) return self
def nll(
self, X, **kwargs)
Negative log-likelihood.
This function returns the value of the negative log-likelihood
- log(p(x_i))
, at all x_i
in X
.
Parameters
X
[array-like, shape=(n_samples, n_features)]: The samples.
Returns
nll
[array, shape=(n_samples,)]:-log(p(x_i))
for allx_i
inX
.
def nll(self, X, **kwargs): X = check_array(X) return -self.kde_.logpdf(X.T)
def pdf(
self, X, **kwargs)
Probability density function.
This function returns the value of the probability density function
p(x_i)
, at all x_i
in X
.
Parameters
X
[array-like, shape=(n_samples, n_features)]: The samples.
Returns
pdf
[array, shape=(n_samples,)]:p(x_i)
for allx_i
inX
.
def pdf(self, X, **kwargs): X = check_array(X) return self.kde_.pdf(X.T)
def ppf(
self, X, **kwargs)
Not supported.
def ppf(self, X, **kwargs): """Not supported.""" raise NotImplementedError
def rvs(
self, n_samples, random_state=None, **kwargs)
Draw samples.
Parameters
-
n_samples
[int]: The number of samples to draw. -
random_state
[integer or RandomState object]: The random seed.
Returns
X
[array, shape=(n_samples, n_features)]: The generated samples.
def rvs(self, n_samples, random_state=None, **kwargs): # XXX gaussian_kde uses Numpy global random state... return self.kde_.resample(n_samples).T
def score(
self, X, **kwargs)
Evaluate the goodness of fit of the distribution w.r.t. X
.
The higher, the better.
Parameters
X
[array-like, shape=(n_samples, n_features)]: The samples.
Returns
score
[float]: The goodness of fit.
def score(self, X, **kwargs): """Evaluate the goodness of fit of the distribution w.r.t. `X`. The higher, the better. Parameters ---------- * `X` [array-like, shape=(n_samples, n_features)]: The samples. Returns ------- * `score` [float]: The goodness of fit. """ return NotImplementedError
Instance variables
var bandwidth
var ndim
The number of dimensions (or features) of the data.
class LinearTransform
Apply a linear transformation u = A*x
to x ~ p
.
class LinearTransform(TheanoDistribution): """Apply a linear transformation `u = A*x` to `x ~ p`.""" def __init__(self, p, A): """Constructor. Parameters ---------- * `p` [`DistributionMixin`]: The base distribution. * `A` [array, shape=(p.ndim, p.ndim)]: The linear operator. """ super(LinearTransform, self).__init__() self.p = p self.A = A self.inv_A = np.linalg.inv(A) if isinstance(p, TheanoDistribution): for p_i in p.parameters_: self.parameters_.add(p_i) for c_i in p.constants_: self.constants_.add(c_i) for o_i in p.observeds_: self.observeds_.add(o_i) # Derive and overide pdf, nll and cdf analytically if possible # XXX todo def pdf(self, X, **kwargs): return self.p.pdf(np.dot(self.inv_A, X.T).T, **kwargs) def nll(self, X, **kwargs): return self.p.nll(np.dot(self.inv_A, X.T).T, **kwargs) def ppf(self, X, **kwargs): """Not supported.""" raise NotImplementedError def cdf(self, X, **kwargs): """Not supported.""" raise NotImplementedError def rvs(self, n_samples, random_state=None, **kwargs): rng = check_random_state(random_state) out = self.p.rvs(n_samples, random_state=rng, **kwargs) return np.dot(self.A, out.T).T @property def ndim(self): return self.p.ndim
Ancestors (in MRO)
- LinearTransform
- TheanoDistribution
- DistributionMixin
- sklearn.base.BaseEstimator
- builtins.object
Class variables
var X
Static methods
def __init__(
self, p, A)
Constructor.
Parameters
-
p
[DistributionMixin
]: The base distribution. -
A
[array, shape=(p.ndim, p.ndim)]: The linear operator.
def __init__(self, p, A): """Constructor. Parameters ---------- * `p` [`DistributionMixin`]: The base distribution. * `A` [array, shape=(p.ndim, p.ndim)]: The linear operator. """ super(LinearTransform, self).__init__() self.p = p self.A = A self.inv_A = np.linalg.inv(A) if isinstance(p, TheanoDistribution): for p_i in p.parameters_: self.parameters_.add(p_i) for c_i in p.constants_: self.constants_.add(c_i) for o_i in p.observeds_: self.observeds_.add(o_i)
def cdf(
self, X, **kwargs)
Not supported.
def cdf(self, X, **kwargs): """Not supported.""" raise NotImplementedError
def fit(
self, X, bounds=None, constraints=None, use_gradient=True, optimizer=None, **kwargs)
Fit the distribution parameters to data by minimizing the negative log-likelihood of the data.
Parameters
-
X
[array-like, shape=(n_samples, n_features)]: The samples. -
bounds
[list of (parameter, (low, high))]: The parameter bounds. -
constraints
: The constraints on the parameters. -
use_gradient
[boolean, default=True]: Whether to use exact gradients (ifTrue
) or numerical gradients (ifFalse
). -
optimizer
[string]: The optimization method.
Returns
self
[object]:self
.
def fit(self, X, bounds=None, constraints=None, use_gradient=True, optimizer=None, **kwargs): """Fit the distribution parameters to data by minimizing the negative log-likelihood of the data. Parameters ---------- * `X` [array-like, shape=(n_samples, n_features)]: The samples. * `bounds` [list of (parameter, (low, high))]: The parameter bounds. * `constraints`: The constraints on the parameters. * `use_gradient` [boolean, default=True]: Whether to use exact gradients (if `True`) or numerical gradients (if `False`). * `optimizer` [string]: The optimization method. Returns ------- * `self` [object]: `self`. """ # Map parameters to placeholders param_to_placeholder = [] param_to_index = {} for i, v in enumerate(self.parameters_): w = T.TensorVariable(v.type) param_to_placeholder.append((v, w)) param_to_index[v] = i # Build bounds mapped_bounds = None if bounds is not None: mapped_bounds = [(None, None) for v in param_to_placeholder] for b in bounds: mapped_bounds[param_to_index[b["param"]]] = b["bounds"] # Build constraints mapped_constraints = None if constraints is not None: mapped_constraints = [] for c in constraints: args = c["param"] if isinstance(args, SharedVariable): args = (args, ) m_c = { "type": c["type"], "fun": lambda x: c["fun"](*[x[param_to_index[a]] for a in args]) } if "jac" in c: m_c["jac"] = lambda x: c["jac"](*[x[param_to_index[a]] for a in args]) mapped_constraints.append(m_c) # Derive objective and gradient objective_ = theano.function( [self.X] + [w for _, w in param_to_placeholder] + [theano.In(v, name=v.name) for v in self.observeds_], T.sum(self.nll_), givens=param_to_placeholder, allow_input_downcast=True) def objective(x): return objective_(X, *x, **kwargs) / len(X) if use_gradient: gradient_ = theano.function( [self.X] + [w for _, w in param_to_placeholder] + [theano.In(v, name=v.name) for v in self.observeds_], theano.grad(T.sum(self.nll_), [v for v, _ in param_to_placeholder]), givens=param_to_placeholder, allow_input_downcast=True) def gradient(x): return np.array(gradient_(X, *x, **kwargs)) / len(X) # Solve! x0 = np.array([v.get_value() for v, _ in param_to_placeholder]) r = minimize(objective, jac=gradient if use_gradient else None, x0=x0, method=optimizer, bounds=mapped_bounds, constraints=mapped_constraints) if r.success: # Assign the solution for i, value in enumerate(r.x): param_to_placeholder[i][0].set_value(value) else: print("Parameter fitting failed!") print(r) return self
def nll(
self, X, **kwargs)
Negative log-likelihood.
This function returns the value of the negative log-likelihood
- log(p(x_i))
, at all x_i
in X
.
Parameters
X
[array-like, shape=(n_samples, n_features)]: The samples.
Returns
nll
[array, shape=(n_samples,)]:-log(p(x_i))
for allx_i
inX
.
def nll(self, X, **kwargs): return self.p.nll(np.dot(self.inv_A, X.T).T, **kwargs)
def pdf(
self, X, **kwargs)
Probability density function.
This function returns the value of the probability density function
p(x_i)
, at all x_i
in X
.
Parameters
X
[array-like, shape=(n_samples, n_features)]: The samples.
Returns
pdf
[array, shape=(n_samples,)]:p(x_i)
for allx_i
inX
.
def pdf(self, X, **kwargs): return self.p.pdf(np.dot(self.inv_A, X.T).T, **kwargs)
def ppf(
self, X, **kwargs)
Not supported.
def ppf(self, X, **kwargs): """Not supported.""" raise NotImplementedError
def rvs(
self, n_samples, random_state=None, **kwargs)
Draw samples.
Parameters
-
n_samples
[int]: The number of samples to draw. -
random_state
[integer or RandomState object]: The random seed.
Returns
X
[array, shape=(n_samples, n_features)]: The generated samples.
def rvs(self, n_samples, random_state=None, **kwargs): rng = check_random_state(random_state) out = self.p.rvs(n_samples, random_state=rng, **kwargs) return np.dot(self.A, out.T).T
def score(
self, X, **kwargs)
Evaluate the log-likelihood -self.nll(X).sum()
of X
.
The higher, the better.
Parameters
X
[array-like, shape=(n_samples, n_features)]: The samples.
Returns
score
[float]: The log-likelihood ofX
.
def score(self, X, **kwargs): """Evaluate the log-likelihood `-self.nll(X).sum()` of `X`. The higher, the better. Parameters ---------- * `X` [array-like, shape=(n_samples, n_features)]: The samples. Returns ------- * `score` [float]: The log-likelihood of `X`. """ return -self.nll(X, **kwargs).sum()
def set_params(
self, **params)
Set the parameters of this estimator.
The method works on simple estimators as well as on nested objects
(such as pipelines). The latter have parameters of the form
<component>__<parameter>
so that it's possible to update each
component of a nested object.
Returns
self
def set_params(self, **params): for name, value in params.items(): var = getattr(self, name, None) if isinstance(var, SharedVariable): var.set_value(value) elif (isinstance(var, T.TensorVariable) or isinstance(var, T.TensorConstant)): raise ValueError("Only shared variables can be updated.") else: super(DistributionMixin, self).set_params(**{name: value})
Instance variables
var A
var inv_A
var ndim
var p
class Mixture
Mix components into a mixture distribution.
This class can be used to model a mixture distribution
p(x) = \sum_i w_i p_i(x)
, where p_i
are themselves
distributions and where w_i
are the component weights.
class Mixture(TheanoDistribution): """Mix components into a mixture distribution. This class can be used to model a mixture distribution `p(x) = \sum_i w_i p_i(x)`, where `p_i` are themselves distributions and where `w_i` are the component weights. """ def __init__(self, components, weights=None): """Constructor. Parameters ---------- * `components` [list of `DistributionMixin`]: The components to mix together. * `weights` [list of floats or list of theano expressions]: The component weights. """ super(Mixture, self).__init__() self.components = components self.weights = [] # Check component and weights ndim = self.components[0].ndim for component in self.components[1:]: if ndim != component.ndim: raise ValueError("Mixture components must have the same " "number of dimensions.") if weights is None: weights = [1. / len(components)] * (len(components) - 1) if len(weights) == len(components) - 1: weights.append(None) if len(weights) != len(components): raise ValueError("Mixture components and weights must be in " "equal number.") for i, (component, weight) in enumerate(zip(components, weights)): # Add component parameters, constants and observeds if isinstance(component, TheanoDistribution): for p_i in component.parameters_: self.parameters_.add(p_i) for c_i in component.constants_: self.constants_.add(c_i) for o_i in component.observeds_: self.observeds_.add(o_i) # Validate weights if weight is not None: v, p, c, o = check_parameter("param_w{}".format(i), weight) self.weights.append(v) for p_i in p: self.parameters_.add(p_i) for c_i in c: self.constants_.add(c_i) for o_i in o: self.observeds_.add(o_i) else: assert i == len(self.components) - 1 w_last = 1. for w_i in self.weights: w_last = w_last - w_i self.weights.append(w_last) # Derive and overide pdf, nll and cdf analytically if possible if all([hasattr(c, "pdf_") for c in self.components]): # pdf self.pdf_ = self.weights[0] * self.components[0].pdf_ for i in range(1, len(self.components)): self.pdf_ += self.weights[i] * self.components[i].pdf_ self._make(self.pdf_, "pdf") # -log pdf self.nll_ = self.weights[0] * self.components[0].pdf_ for i in range(1, len(self.components)): self.nll_ += self.weights[i] * self.components[i].pdf_ self.nll_ = -T.log(self.nll_) self._make(self.nll_, "nll") if all([hasattr(c, "cdf_") for c in self.components]): # cdf self.cdf_ = self.weights[0] * self.components[0].cdf_ for i in range(1, len(self.components)): self.cdf_ += self.weights[i] * self.components[i].cdf_ self._make(self.cdf_, "cdf") # Weight evaluation function self._make(T.stack(self.weights, axis=0), "compute_weights", args=[]) def pdf(self, X, **kwargs): weights = self.compute_weights(**kwargs) p = weights[0] * self.components[0].pdf(X, **kwargs) for i in range(1, len(self.components)): p += weights[i] * self.components[i].pdf(X, **kwargs) return p def nll(self, X, **kwargs): return -np.log(self.pdf(X, **kwargs)) def cdf(self, X, **kwargs): weights = self.compute_weights(**kwargs) c = weights[0] * self.components[0].cdf(X, **kwargs) for i in range(1, len(self.components)): c += weights[i] * self.components[i].cdf(X, **kwargs) return c def ppf(self, X, **kwargs): """Not supported.""" raise NotImplementedError def rvs(self, n_samples, random_state=None, **kwargs): rng = check_random_state(random_state) indices = rng.multinomial(1, pvals=self.compute_weights(**kwargs), size=n_samples) out = np.zeros((n_samples, self.ndim)) for j in range(len(self.components)): mask = np.where(indices[:, j])[0] if len(mask) > 0: out[mask, :] = self.components[j].rvs(n_samples=len(mask), random_state=rng, **kwargs) return out def fit(self, X, **kwargs): if hasattr(self, "nll_"): return super(Mixture, self).fit(X, **kwargs) else: raise NotImplementedError @property def ndim(self): return self.components[0].ndim
Ancestors (in MRO)
- Mixture
- TheanoDistribution
- DistributionMixin
- sklearn.base.BaseEstimator
- builtins.object
Class variables
var X
var p
Static methods
def __init__(
self, components, weights=None)
Constructor.
Parameters
-
components
[list ofDistributionMixin
]: The components to mix together. -
weights
[list of floats or list of theano expressions]: The component weights.
def __init__(self, components, weights=None): """Constructor. Parameters ---------- * `components` [list of `DistributionMixin`]: The components to mix together. * `weights` [list of floats or list of theano expressions]: The component weights. """ super(Mixture, self).__init__() self.components = components self.weights = [] # Check component and weights ndim = self.components[0].ndim for component in self.components[1:]: if ndim != component.ndim: raise ValueError("Mixture components must have the same " "number of dimensions.") if weights is None: weights = [1. / len(components)] * (len(components) - 1) if len(weights) == len(components) - 1: weights.append(None) if len(weights) != len(components): raise ValueError("Mixture components and weights must be in " "equal number.") for i, (component, weight) in enumerate(zip(components, weights)): # Add component parameters, constants and observeds if isinstance(component, TheanoDistribution): for p_i in component.parameters_: self.parameters_.add(p_i) for c_i in component.constants_: self.constants_.add(c_i) for o_i in component.observeds_: self.observeds_.add(o_i) # Validate weights if weight is not None: v, p, c, o = check_parameter("param_w{}".format(i), weight) self.weights.append(v) for p_i in p: self.parameters_.add(p_i) for c_i in c: self.constants_.add(c_i) for o_i in o: self.observeds_.add(o_i) else: assert i == len(self.components) - 1 w_last = 1. for w_i in self.weights: w_last = w_last - w_i self.weights.append(w_last) # Derive and overide pdf, nll and cdf analytically if possible if all([hasattr(c, "pdf_") for c in self.components]): # pdf self.pdf_ = self.weights[0] * self.components[0].pdf_ for i in range(1, len(self.components)): self.pdf_ += self.weights[i] * self.components[i].pdf_ self._make(self.pdf_, "pdf") # -log pdf self.nll_ = self.weights[0] * self.components[0].pdf_ for i in range(1, len(self.components)): self.nll_ += self.weights[i] * self.components[i].pdf_ self.nll_ = -T.log(self.nll_) self._make(self.nll_, "nll") if all([hasattr(c, "cdf_") for c in self.components]): # cdf self.cdf_ = self.weights[0] * self.components[0].cdf_ for i in range(1, len(self.components)): self.cdf_ += self.weights[i] * self.components[i].cdf_ self._make(self.cdf_, "cdf") # Weight evaluation function self._make(T.stack(self.weights, axis=0), "compute_weights", args=[])
def cdf(
self, X, **kwargs)
Cumulative distribution function.
This function returns the value of the cumulative distribution function
F(x_i) = p(x <= x_i)
, at all x_i
in X
.
Parameters
X
[array-like, shape=(n_samples, n_features)]: The samples.
Returns
cdf
[array, shape=(n_samples,)]:cdf(x_i)
for allx_i
inX
.
def cdf(self, X, **kwargs): weights = self.compute_weights(**kwargs) c = weights[0] * self.components[0].cdf(X, **kwargs) for i in range(1, len(self.components)): c += weights[i] * self.components[i].cdf(X, **kwargs) return c
def fit(
self, X, **kwargs)
Fit the distribution parameters to data by minimizing the negative log-likelihood of the data.
Parameters
-
X
[array-like, shape=(n_samples, n_features)]: The samples. -
bounds
[list of (parameter, (low, high))]: The parameter bounds. -
constraints
: The constraints on the parameters. -
use_gradient
[boolean, default=True]: Whether to use exact gradients (ifTrue
) or numerical gradients (ifFalse
). -
optimizer
[string]: The optimization method.
Returns
self
[object]:self
.
def fit(self, X, **kwargs): if hasattr(self, "nll_"): return super(Mixture, self).fit(X, **kwargs) else: raise NotImplementedError
def nll(
self, X, **kwargs)
Negative log-likelihood.
This function returns the value of the negative log-likelihood
- log(p(x_i))
, at all x_i
in X
.
Parameters
X
[array-like, shape=(n_samples, n_features)]: The samples.
Returns
nll
[array, shape=(n_samples,)]:-log(p(x_i))
for allx_i
inX
.
def nll(self, X, **kwargs): return -np.log(self.pdf(X, **kwargs))
def pdf(
self, X, **kwargs)
Probability density function.
This function returns the value of the probability density function
p(x_i)
, at all x_i
in X
.
Parameters
X
[array-like, shape=(n_samples, n_features)]: The samples.
Returns
pdf
[array, shape=(n_samples,)]:p(x_i)
for allx_i
inX
.
def pdf(self, X, **kwargs): weights = self.compute_weights(**kwargs) p = weights[0] * self.components[0].pdf(X, **kwargs) for i in range(1, len(self.components)): p += weights[i] * self.components[i].pdf(X, **kwargs) return p
def ppf(
self, X, **kwargs)
Not supported.
def ppf(self, X, **kwargs): """Not supported.""" raise NotImplementedError
def rvs(
self, n_samples, random_state=None, **kwargs)
Draw samples.
Parameters
-
n_samples
[int]: The number of samples to draw. -
random_state
[integer or RandomState object]: The random seed.
Returns
X
[array, shape=(n_samples, n_features)]: The generated samples.
def rvs(self, n_samples, random_state=None, **kwargs): rng = check_random_state(random_state) indices = rng.multinomial(1, pvals=self.compute_weights(**kwargs), size=n_samples) out = np.zeros((n_samples, self.ndim)) for j in range(len(self.components)): mask = np.where(indices[:, j])[0] if len(mask) > 0: out[mask, :] = self.components[j].rvs(n_samples=len(mask), random_state=rng, **kwargs) return out
def score(
self, X, **kwargs)
Evaluate the log-likelihood -self.nll(X).sum()
of X
.
The higher, the better.
Parameters
X
[array-like, shape=(n_samples, n_features)]: The samples.
Returns
score
[float]: The log-likelihood ofX
.
def score(self, X, **kwargs): """Evaluate the log-likelihood `-self.nll(X).sum()` of `X`. The higher, the better. Parameters ---------- * `X` [array-like, shape=(n_samples, n_features)]: The samples. Returns ------- * `score` [float]: The log-likelihood of `X`. """ return -self.nll(X, **kwargs).sum()
def set_params(
self, **params)
Set the parameters of this estimator.
The method works on simple estimators as well as on nested objects
(such as pipelines). The latter have parameters of the form
<component>__<parameter>
so that it's possible to update each
component of a nested object.
Returns
self
def set_params(self, **params): for name, value in params.items(): var = getattr(self, name, None) if isinstance(var, SharedVariable): var.set_value(value) elif (isinstance(var, T.TensorVariable) or isinstance(var, T.TensorConstant)): raise ValueError("Only shared variables can be updated.") else: super(DistributionMixin, self).set_params(**{name: value})
Instance variables
var components
var ndim
var weights
class MultivariateNormal
Multivariate normal distribution.
class MultivariateNormal(TheanoDistribution): """Multivariate normal distribution.""" def __init__(self, mu, sigma): """Constructor. Parameters ---------- * `mu` [1d array]: The means. * `sigma` [2d array]: The covariance matrix. """ super(MultivariateNormal, self).__init__(mu=mu, sigma=sigma) # XXX: The SDP-ness of sigma should be check upon changes # ndim self.ndim_ = self.mu.shape[0] self._make(self.ndim_, "ndim_func_", args=[]) # pdf L = linalg.cholesky(self.sigma) sigma_det = linalg.det(self.sigma) # XXX: compute from L instead sigma_inv = linalg.matrix_inverse(self.sigma) # XXX: idem self.pdf_ = ( (1. / T.sqrt((2. * np.pi) ** self.ndim_ * T.abs_(sigma_det))) * T.exp(-0.5 * T.sum(T.mul(T.dot(self.X - self.mu, sigma_inv), self.X - self.mu), axis=1))).ravel() self._make(self.pdf_, "pdf") # -log pdf self.nll_ = -T.log(self.pdf_) # XXX: for sure this can be better self._make(self.nll_, "nll") # self.rvs_ self._make(T.dot(L, self.X.T).T + self.mu, "rvs_func_") def rvs(self, n_samples, random_state=None, **kwargs): rng = check_random_state(random_state) X = rng.randn(n_samples, self.ndim) return self.rvs_func_(X, **kwargs) def cdf(self, X, **kwargs): """Not supported.""" raise NotImplementedError def ppf(self, X, **kwargs): """Not supported.""" raise NotImplementedError @property def ndim(self): return self.ndim_func_()[None][0]
Ancestors (in MRO)
- MultivariateNormal
- TheanoDistribution
- DistributionMixin
- sklearn.base.BaseEstimator
- builtins.object
Class variables
var X
var p
Static methods
def __init__(
self, mu, sigma)
Constructor.
Parameters
-
mu
[1d array]: The means. -
sigma
[2d array]: The covariance matrix.
def __init__(self, mu, sigma): """Constructor. Parameters ---------- * `mu` [1d array]: The means. * `sigma` [2d array]: The covariance matrix. """ super(MultivariateNormal, self).__init__(mu=mu, sigma=sigma) # XXX: The SDP-ness of sigma should be check upon changes # ndim self.ndim_ = self.mu.shape[0] self._make(self.ndim_, "ndim_func_", args=[]) # pdf L = linalg.cholesky(self.sigma) sigma_det = linalg.det(self.sigma) # XXX: compute from L instead sigma_inv = linalg.matrix_inverse(self.sigma) # XXX: idem self.pdf_ = ( (1. / T.sqrt((2. * np.pi) ** self.ndim_ * T.abs_(sigma_det))) * T.exp(-0.5 * T.sum(T.mul(T.dot(self.X - self.mu, sigma_inv), self.X - self.mu), axis=1))).ravel() self._make(self.pdf_, "pdf") # -log pdf self.nll_ = -T.log(self.pdf_) # XXX: for sure this can be better self._make(self.nll_, "nll") # self.rvs_ self._make(T.dot(L, self.X.T).T + self.mu, "rvs_func_")
def cdf(
self, X, **kwargs)
Not supported.
def cdf(self, X, **kwargs): """Not supported.""" raise NotImplementedError
def fit(
self, X, bounds=None, constraints=None, use_gradient=True, optimizer=None, **kwargs)
Fit the distribution parameters to data by minimizing the negative log-likelihood of the data.
Parameters
-
X
[array-like, shape=(n_samples, n_features)]: The samples. -
bounds
[list of (parameter, (low, high))]: The parameter bounds. -
constraints
: The constraints on the parameters. -
use_gradient
[boolean, default=True]: Whether to use exact gradients (ifTrue
) or numerical gradients (ifFalse
). -
optimizer
[string]: The optimization method.
Returns
self
[object]:self
.
def fit(self, X, bounds=None, constraints=None, use_gradient=True, optimizer=None, **kwargs): """Fit the distribution parameters to data by minimizing the negative log-likelihood of the data. Parameters ---------- * `X` [array-like, shape=(n_samples, n_features)]: The samples. * `bounds` [list of (parameter, (low, high))]: The parameter bounds. * `constraints`: The constraints on the parameters. * `use_gradient` [boolean, default=True]: Whether to use exact gradients (if `True`) or numerical gradients (if `False`). * `optimizer` [string]: The optimization method. Returns ------- * `self` [object]: `self`. """ # Map parameters to placeholders param_to_placeholder = [] param_to_index = {} for i, v in enumerate(self.parameters_): w = T.TensorVariable(v.type) param_to_placeholder.append((v, w)) param_to_index[v] = i # Build bounds mapped_bounds = None if bounds is not None: mapped_bounds = [(None, None) for v in param_to_placeholder] for b in bounds: mapped_bounds[param_to_index[b["param"]]] = b["bounds"] # Build constraints mapped_constraints = None if constraints is not None: mapped_constraints = [] for c in constraints: args = c["param"] if isinstance(args, SharedVariable): args = (args, ) m_c = { "type": c["type"], "fun": lambda x: c["fun"](*[x[param_to_index[a]] for a in args]) } if "jac" in c: m_c["jac"] = lambda x: c["jac"](*[x[param_to_index[a]] for a in args]) mapped_constraints.append(m_c) # Derive objective and gradient objective_ = theano.function( [self.X] + [w for _, w in param_to_placeholder] + [theano.In(v, name=v.name) for v in self.observeds_], T.sum(self.nll_), givens=param_to_placeholder, allow_input_downcast=True) def objective(x): return objective_(X, *x, **kwargs) / len(X) if use_gradient: gradient_ = theano.function( [self.X] + [w for _, w in param_to_placeholder] + [theano.In(v, name=v.name) for v in self.observeds_], theano.grad(T.sum(self.nll_), [v for v, _ in param_to_placeholder]), givens=param_to_placeholder, allow_input_downcast=True) def gradient(x): return np.array(gradient_(X, *x, **kwargs)) / len(X) # Solve! x0 = np.array([v.get_value() for v, _ in param_to_placeholder]) r = minimize(objective, jac=gradient if use_gradient else None, x0=x0, method=optimizer, bounds=mapped_bounds, constraints=mapped_constraints) if r.success: # Assign the solution for i, value in enumerate(r.x): param_to_placeholder[i][0].set_value(value) else: print("Parameter fitting failed!") print(r) return self
def nll(
self, X, **kwargs)
Negative log-likelihood.
This function returns the value of the negative log-likelihood
- log(p(x_i))
, at all x_i
in X
.
Parameters
X
[array-like, shape=(n_samples, n_features)]: The samples.
Returns
nll
[array, shape=(n_samples,)]:-log(p(x_i))
for allx_i
inX
.
def nll(self, X, **kwargs): """Negative log-likelihood. This function returns the value of the negative log-likelihood `- log(p(x_i))`, at all `x_i` in `X`. Parameters ---------- * `X` [array-like, shape=(n_samples, n_features)]: The samples. Returns ------- * `nll` [array, shape=(n_samples,)]: `-log(p(x_i))` for all `x_i` in `X`. """ raise NotImplementedError
def pdf(
self, X, **kwargs)
Probability density function.
This function returns the value of the probability density function
p(x_i)
, at all x_i
in X
.
Parameters
X
[array-like, shape=(n_samples, n_features)]: The samples.
Returns
pdf
[array, shape=(n_samples,)]:p(x_i)
for allx_i
inX
.
def pdf(self, X, **kwargs): """Probability density function. This function returns the value of the probability density function `p(x_i)`, at all `x_i` in `X`. Parameters ---------- * `X` [array-like, shape=(n_samples, n_features)]: The samples. Returns ------- * `pdf` [array, shape=(n_samples,)]: `p(x_i)` for all `x_i` in `X`. """ raise NotImplementedError
def ppf(
self, X, **kwargs)
Not supported.
def ppf(self, X, **kwargs): """Not supported.""" raise NotImplementedError
def rvs(
self, n_samples, random_state=None, **kwargs)
Draw samples.
Parameters
-
n_samples
[int]: The number of samples to draw. -
random_state
[integer or RandomState object]: The random seed.
Returns
X
[array, shape=(n_samples, n_features)]: The generated samples.
def rvs(self, n_samples, random_state=None, **kwargs): rng = check_random_state(random_state) X = rng.randn(n_samples, self.ndim) return self.rvs_func_(X, **kwargs)
def score(
self, X, **kwargs)
Evaluate the log-likelihood -self.nll(X).sum()
of X
.
The higher, the better.
Parameters
X
[array-like, shape=(n_samples, n_features)]: The samples.
Returns
score
[float]: The log-likelihood ofX
.
def score(self, X, **kwargs): """Evaluate the log-likelihood `-self.nll(X).sum()` of `X`. The higher, the better. Parameters ---------- * `X` [array-like, shape=(n_samples, n_features)]: The samples. Returns ------- * `score` [float]: The log-likelihood of `X`. """ return -self.nll(X, **kwargs).sum()
def set_params(
self, **params)
Set the parameters of this estimator.
The method works on simple estimators as well as on nested objects
(such as pipelines). The latter have parameters of the form
<component>__<parameter>
so that it's possible to update each
component of a nested object.
Returns
self
def set_params(self, **params): for name, value in params.items(): var = getattr(self, name, None) if isinstance(var, SharedVariable): var.set_value(value) elif (isinstance(var, T.TensorVariable) or isinstance(var, T.TensorConstant)): raise ValueError("Only shared variables can be updated.") else: super(DistributionMixin, self).set_params(**{name: value})
Instance variables
var ndim
var ndim_
var nll_
var pdf_
class Normal
Normal distribution.
This distribution supports 1D data only.
class Normal(TheanoDistribution): """Normal distribution. This distribution supports 1D data only. """ def __init__(self, mu=0.0, sigma=1.0): """Constructor. Parameters ---------- * `mu` [float]: The distribution mean. * `sigma` [float]: The distribution standard deviation. """ super(Normal, self).__init__(mu=mu, sigma=sigma) # pdf self.pdf_ = ( (1. / np.sqrt(2. * np.pi)) / self.sigma * T.exp(-(self.X - self.mu) ** 2 / (2. * self.sigma ** 2))).ravel() self._make(self.pdf_, "pdf") # -log pdf self.nll_ = bound( T.log(self.sigma) + T.log(np.sqrt(2. * np.pi)) + (self.X - self.mu) ** 2 / (2. * self.sigma ** 2), np.inf, self.sigma > 0.).ravel() self._make(self.nll_, "nll") # cdf self.cdf_ = 0.5 * (1. + T.erf((self.X - self.mu) / (self.sigma * np.sqrt(2.)))).ravel() self._make(self.cdf_, "cdf") # ppf self.ppf_ = (self.mu + np.sqrt(2.) * self.sigma * T.erfinv(2. * self.p - 1.)) self._make(self.ppf_, "ppf", args=[self.p])
Ancestors (in MRO)
- Normal
- TheanoDistribution
- DistributionMixin
- sklearn.base.BaseEstimator
- builtins.object
Class variables
var X
var p
Static methods
def __init__(
self, mu=0.0, sigma=1.0)
Constructor.
Parameters
-
mu
[float]: The distribution mean. -
sigma
[float]: The distribution standard deviation.
def __init__(self, mu=0.0, sigma=1.0): """Constructor. Parameters ---------- * `mu` [float]: The distribution mean. * `sigma` [float]: The distribution standard deviation. """ super(Normal, self).__init__(mu=mu, sigma=sigma) # pdf self.pdf_ = ( (1. / np.sqrt(2. * np.pi)) / self.sigma * T.exp(-(self.X - self.mu) ** 2 / (2. * self.sigma ** 2))).ravel() self._make(self.pdf_, "pdf") # -log pdf self.nll_ = bound( T.log(self.sigma) + T.log(np.sqrt(2. * np.pi)) + (self.X - self.mu) ** 2 / (2. * self.sigma ** 2), np.inf, self.sigma > 0.).ravel() self._make(self.nll_, "nll") # cdf self.cdf_ = 0.5 * (1. + T.erf((self.X - self.mu) / (self.sigma * np.sqrt(2.)))).ravel() self._make(self.cdf_, "cdf") # ppf self.ppf_ = (self.mu + np.sqrt(2.) * self.sigma * T.erfinv(2. * self.p - 1.)) self._make(self.ppf_, "ppf", args=[self.p])
def cdf(
self, X, **kwargs)
Cumulative distribution function.
This function returns the value of the cumulative distribution function
F(x_i) = p(x <= x_i)
, at all x_i
in X
.
Parameters
X
[array-like, shape=(n_samples, n_features)]: The samples.
Returns
cdf
[array, shape=(n_samples,)]:cdf(x_i)
for allx_i
inX
.
def cdf(self, X, **kwargs): """Cumulative distribution function. This function returns the value of the cumulative distribution function `F(x_i) = p(x <= x_i)`, at all `x_i` in `X`. Parameters ---------- * `X` [array-like, shape=(n_samples, n_features)]: The samples. Returns ------- * `cdf` [array, shape=(n_samples,)]: `cdf(x_i)` for all `x_i` in `X`. """ raise NotImplementedError
def fit(
self, X, bounds=None, constraints=None, use_gradient=True, optimizer=None, **kwargs)
Fit the distribution parameters to data by minimizing the negative log-likelihood of the data.
Parameters
-
X
[array-like, shape=(n_samples, n_features)]: The samples. -
bounds
[list of (parameter, (low, high))]: The parameter bounds. -
constraints
: The constraints on the parameters. -
use_gradient
[boolean, default=True]: Whether to use exact gradients (ifTrue
) or numerical gradients (ifFalse
). -
optimizer
[string]: The optimization method.
Returns
self
[object]:self
.
def fit(self, X, bounds=None, constraints=None, use_gradient=True, optimizer=None, **kwargs): """Fit the distribution parameters to data by minimizing the negative log-likelihood of the data. Parameters ---------- * `X` [array-like, shape=(n_samples, n_features)]: The samples. * `bounds` [list of (parameter, (low, high))]: The parameter bounds. * `constraints`: The constraints on the parameters. * `use_gradient` [boolean, default=True]: Whether to use exact gradients (if `True`) or numerical gradients (if `False`). * `optimizer` [string]: The optimization method. Returns ------- * `self` [object]: `self`. """ # Map parameters to placeholders param_to_placeholder = [] param_to_index = {} for i, v in enumerate(self.parameters_): w = T.TensorVariable(v.type) param_to_placeholder.append((v, w)) param_to_index[v] = i # Build bounds mapped_bounds = None if bounds is not None: mapped_bounds = [(None, None) for v in param_to_placeholder] for b in bounds: mapped_bounds[param_to_index[b["param"]]] = b["bounds"] # Build constraints mapped_constraints = None if constraints is not None: mapped_constraints = [] for c in constraints: args = c["param"] if isinstance(args, SharedVariable): args = (args, ) m_c = { "type": c["type"], "fun": lambda x: c["fun"](*[x[param_to_index[a]] for a in args]) } if "jac" in c: m_c["jac"] = lambda x: c["jac"](*[x[param_to_index[a]] for a in args]) mapped_constraints.append(m_c) # Derive objective and gradient objective_ = theano.function( [self.X] + [w for _, w in param_to_placeholder] + [theano.In(v, name=v.name) for v in self.observeds_], T.sum(self.nll_), givens=param_to_placeholder, allow_input_downcast=True) def objective(x): return objective_(X, *x, **kwargs) / len(X) if use_gradient: gradient_ = theano.function( [self.X] + [w for _, w in param_to_placeholder] + [theano.In(v, name=v.name) for v in self.observeds_], theano.grad(T.sum(self.nll_), [v for v, _ in param_to_placeholder]), givens=param_to_placeholder, allow_input_downcast=True) def gradient(x): return np.array(gradient_(X, *x, **kwargs)) / len(X) # Solve! x0 = np.array([v.get_value() for v, _ in param_to_placeholder]) r = minimize(objective, jac=gradient if use_gradient else None, x0=x0, method=optimizer, bounds=mapped_bounds, constraints=mapped_constraints) if r.success: # Assign the solution for i, value in enumerate(r.x): param_to_placeholder[i][0].set_value(value) else: print("Parameter fitting failed!") print(r) return self
def nll(
self, X, **kwargs)
Negative log-likelihood.
This function returns the value of the negative log-likelihood
- log(p(x_i))
, at all x_i
in X
.
Parameters
X
[array-like, shape=(n_samples, n_features)]: The samples.
Returns
nll
[array, shape=(n_samples,)]:-log(p(x_i))
for allx_i
inX
.
def nll(self, X, **kwargs): """Negative log-likelihood. This function returns the value of the negative log-likelihood `- log(p(x_i))`, at all `x_i` in `X`. Parameters ---------- * `X` [array-like, shape=(n_samples, n_features)]: The samples. Returns ------- * `nll` [array, shape=(n_samples,)]: `-log(p(x_i))` for all `x_i` in `X`. """ raise NotImplementedError
def pdf(
self, X, **kwargs)
Probability density function.
This function returns the value of the probability density function
p(x_i)
, at all x_i
in X
.
Parameters
X
[array-like, shape=(n_samples, n_features)]: The samples.
Returns
pdf
[array, shape=(n_samples,)]:p(x_i)
for allx_i
inX
.
def pdf(self, X, **kwargs): """Probability density function. This function returns the value of the probability density function `p(x_i)`, at all `x_i` in `X`. Parameters ---------- * `X` [array-like, shape=(n_samples, n_features)]: The samples. Returns ------- * `pdf` [array, shape=(n_samples,)]: `p(x_i)` for all `x_i` in `X`. """ raise NotImplementedError
def ppf(
self, X, **kwargs)
Percent point function (inverse of cdf).
Parameters
X
[array-like, shape=(n_samples, n_features)]: The samples.
Returns
ppf
[array, shape=(n_samples,)]: Percent point function for allx_i
inX
.
def ppf(self, X, **kwargs): """Percent point function (inverse of cdf). Parameters ---------- * `X` [array-like, shape=(n_samples, n_features)]: The samples. Returns ------- * `ppf` [array, shape=(n_samples,)]: Percent point function for all `x_i` in `X`. """ raise NotImplementedError
def rvs(
self, n_samples, random_state=None, **kwargs)
Draw samples.
Parameters
-
n_samples
[int]: The number of samples to draw. -
random_state
[integer or RandomState object]: The random seed.
Returns
X
[array, shape=(n_samples, n_features)]: The generated samples.
def rvs(self, n_samples, random_state=None, **kwargs): rng = check_random_state(random_state) p = rng.rand(n_samples, 1) return self.ppf(p, **kwargs)
def score(
self, X, **kwargs)
Evaluate the log-likelihood -self.nll(X).sum()
of X
.
The higher, the better.
Parameters
X
[array-like, shape=(n_samples, n_features)]: The samples.
Returns
score
[float]: The log-likelihood ofX
.
def score(self, X, **kwargs): """Evaluate the log-likelihood `-self.nll(X).sum()` of `X`. The higher, the better. Parameters ---------- * `X` [array-like, shape=(n_samples, n_features)]: The samples. Returns ------- * `score` [float]: The log-likelihood of `X`. """ return -self.nll(X, **kwargs).sum()
def set_params(
self, **params)
Set the parameters of this estimator.
The method works on simple estimators as well as on nested objects
(such as pipelines). The latter have parameters of the form
<component>__<parameter>
so that it's possible to update each
component of a nested object.
Returns
self
def set_params(self, **params): for name, value in params.items(): var = getattr(self, name, None) if isinstance(var, SharedVariable): var.set_value(value) elif (isinstance(var, T.TensorVariable) or isinstance(var, T.TensorConstant)): raise ValueError("Only shared variables can be updated.") else: super(DistributionMixin, self).set_params(**{name: value})
Instance variables
var cdf_
var ndim
The number of dimensions (or features) of the data.
var nll_
var pdf_
var ppf_
class Sampler
Sampler.
This class can be used in the likelihood-free setup to provide an
implementation of the rvs
method on top of known data.
class Sampler(DistributionMixin): """Sampler. This class can be used in the likelihood-free setup to provide an implementation of the `rvs` method on top of known data. """ def fit(self, X, sample_weight=None, **kwargs): """Fit. Note that calling `fit` is necessary to store a reference to the data `X`. Parameters ---------- * `X` [array-like, shape=(n_samples, n_features)]: The samples. Returns ------- * `self` [object]: `self`. """ self.X_ = X self.ndim_ = X.shape[1] self.sample_weight_ = sample_weight return self def rvs(self, n_samples, random_state=None, **kwargs): rng = check_random_state(random_state) if self.sample_weight_ is None: w = np.ones(len(self.X_)) else: w = self.sample_weight_ w = w / w.sum() indices = np.searchsorted(np.cumsum(w), rng.rand(n_samples)) return self.X_[indices] def pdf(self, X, **kwargs): """Not supported.""" raise NotImplementedError def nll(self, X, **kwargs): """Not supported.""" raise NotImplementedError def ppf(self, X, **kwargs): """Not supported.""" raise NotImplementedError def cdf(self, X, **kwargs): """Not supported.""" raise NotImplementedError def score(self, X, **kwargs): """Not supported.""" raise NotImplementedError @property def ndim(self): return self.ndim_
Ancestors (in MRO)
- Sampler
- DistributionMixin
- sklearn.base.BaseEstimator
- builtins.object
Static methods
def cdf(
self, X, **kwargs)
Not supported.
def cdf(self, X, **kwargs): """Not supported.""" raise NotImplementedError
def fit(
self, X, sample_weight=None, **kwargs)
Fit.
Note that calling fit
is necessary to store a reference to the
data X
.
Parameters
X
[array-like, shape=(n_samples, n_features)]: The samples.
Returns
self
[object]:self
.
def fit(self, X, sample_weight=None, **kwargs): """Fit. Note that calling `fit` is necessary to store a reference to the data `X`. Parameters ---------- * `X` [array-like, shape=(n_samples, n_features)]: The samples. Returns ------- * `self` [object]: `self`. """ self.X_ = X self.ndim_ = X.shape[1] self.sample_weight_ = sample_weight return self
def nll(
self, X, **kwargs)
Not supported.
def nll(self, X, **kwargs): """Not supported.""" raise NotImplementedError
def pdf(
self, X, **kwargs)
Not supported.
def pdf(self, X, **kwargs): """Not supported.""" raise NotImplementedError
def ppf(
self, X, **kwargs)
Not supported.
def ppf(self, X, **kwargs): """Not supported.""" raise NotImplementedError
def rvs(
self, n_samples, random_state=None, **kwargs)
Draw samples.
Parameters
-
n_samples
[int]: The number of samples to draw. -
random_state
[integer or RandomState object]: The random seed.
Returns
X
[array, shape=(n_samples, n_features)]: The generated samples.
def rvs(self, n_samples, random_state=None, **kwargs): rng = check_random_state(random_state) if self.sample_weight_ is None: w = np.ones(len(self.X_)) else: w = self.sample_weight_ w = w / w.sum() indices = np.searchsorted(np.cumsum(w), rng.rand(n_samples)) return self.X_[indices]
def score(
self, X, **kwargs)
Not supported.
def score(self, X, **kwargs): """Not supported.""" raise NotImplementedError
Instance variables
class TheanoDistribution
Base class for Theano-based distributions.
Exposed attributes include:
-
parameters_
: the set of parameters on which the distribution depends. -
constants_
: the set of constants on which the distribution depends. -
observeds_
: the set of unset variables on which the distribution depends. Values for these variables are required to be set using the**kwargs
argument.
class TheanoDistribution(DistributionMixin): """Base class for Theano-based distributions. Exposed attributes include: - `parameters_`: the set of parameters on which the distribution depends. - `constants_`: the set of constants on which the distribution depends. - `observeds_`: the set of unset variables on which the distribution depends. Values for these variables are required to be set using the `**kwargs` argument. """ # Mixin interface X = T.dmatrix(name="X") # Input expression is shared by all distributions p = T.dmatrix(name="p") def __init__(self, **parameters): """Constructor. Parameters ---------- * `**parameters` [pairs of name/value]: The list of parameter names with their values. """ # Validate parameters of the distribution self.parameters_ = set() # base parameters self.constants_ = set() # base constants self.observeds_ = set() # base observeds # XXX: check that no two variables in observeds_ have the same name for name, value in parameters.items(): v, p, c, o = check_parameter(name, value) setattr(self, name, v) for p_i in p: self.parameters_.add(p_i) for c_i in c: self.constants_.add(c_i) for o_i in o: self.observeds_.add(o_i) def _make(self, expression, name, args=None, kwargs=None): if args is None: args = [self.X] if kwargs is None: kwargs = self.observeds_ func = theano.function( [theano.In(v, name=v.name) for v in args] + [theano.In(v, name=v.name) for v in kwargs], expression, allow_input_downcast=True ) setattr(self, name, func) def rvs(self, n_samples, random_state=None, **kwargs): rng = check_random_state(random_state) p = rng.rand(n_samples, 1) return self.ppf(p, **kwargs) # Scikit-Learn estimator interface def set_params(self, **params): for name, value in params.items(): var = getattr(self, name, None) if isinstance(var, SharedVariable): var.set_value(value) elif (isinstance(var, T.TensorVariable) or isinstance(var, T.TensorConstant)): raise ValueError("Only shared variables can be updated.") else: super(DistributionMixin, self).set_params(**{name: value}) # XXX: shall we also allow replacement of variables and # recompile all expressions instead? def fit(self, X, bounds=None, constraints=None, use_gradient=True, optimizer=None, **kwargs): """Fit the distribution parameters to data by minimizing the negative log-likelihood of the data. Parameters ---------- * `X` [array-like, shape=(n_samples, n_features)]: The samples. * `bounds` [list of (parameter, (low, high))]: The parameter bounds. * `constraints`: The constraints on the parameters. * `use_gradient` [boolean, default=True]: Whether to use exact gradients (if `True`) or numerical gradients (if `False`). * `optimizer` [string]: The optimization method. Returns ------- * `self` [object]: `self`. """ # Map parameters to placeholders param_to_placeholder = [] param_to_index = {} for i, v in enumerate(self.parameters_): w = T.TensorVariable(v.type) param_to_placeholder.append((v, w)) param_to_index[v] = i # Build bounds mapped_bounds = None if bounds is not None: mapped_bounds = [(None, None) for v in param_to_placeholder] for b in bounds: mapped_bounds[param_to_index[b["param"]]] = b["bounds"] # Build constraints mapped_constraints = None if constraints is not None: mapped_constraints = [] for c in constraints: args = c["param"] if isinstance(args, SharedVariable): args = (args, ) m_c = { "type": c["type"], "fun": lambda x: c["fun"](*[x[param_to_index[a]] for a in args]) } if "jac" in c: m_c["jac"] = lambda x: c["jac"](*[x[param_to_index[a]] for a in args]) mapped_constraints.append(m_c) # Derive objective and gradient objective_ = theano.function( [self.X] + [w for _, w in param_to_placeholder] + [theano.In(v, name=v.name) for v in self.observeds_], T.sum(self.nll_), givens=param_to_placeholder, allow_input_downcast=True) def objective(x): return objective_(X, *x, **kwargs) / len(X) if use_gradient: gradient_ = theano.function( [self.X] + [w for _, w in param_to_placeholder] + [theano.In(v, name=v.name) for v in self.observeds_], theano.grad(T.sum(self.nll_), [v for v, _ in param_to_placeholder]), givens=param_to_placeholder, allow_input_downcast=True) def gradient(x): return np.array(gradient_(X, *x, **kwargs)) / len(X) # Solve! x0 = np.array([v.get_value() for v, _ in param_to_placeholder]) r = minimize(objective, jac=gradient if use_gradient else None, x0=x0, method=optimizer, bounds=mapped_bounds, constraints=mapped_constraints) if r.success: # Assign the solution for i, value in enumerate(r.x): param_to_placeholder[i][0].set_value(value) else: print("Parameter fitting failed!") print(r) return self def score(self, X, **kwargs): """Evaluate the log-likelihood `-self.nll(X).sum()` of `X`. The higher, the better. Parameters ---------- * `X` [array-like, shape=(n_samples, n_features)]: The samples. Returns ------- * `score` [float]: The log-likelihood of `X`. """ return -self.nll(X, **kwargs).sum()
Ancestors (in MRO)
- TheanoDistribution
- DistributionMixin
- sklearn.base.BaseEstimator
- builtins.object
Class variables
var X
var p
Static methods
def __init__(
self, **parameters)
Constructor.
Parameters
**parameters
[pairs of name/value]: The list of parameter names with their values.
def __init__(self, **parameters): """Constructor. Parameters ---------- * `**parameters` [pairs of name/value]: The list of parameter names with their values. """ # Validate parameters of the distribution self.parameters_ = set() # base parameters self.constants_ = set() # base constants self.observeds_ = set() # base observeds # XXX: check that no two variables in observeds_ have the same name for name, value in parameters.items(): v, p, c, o = check_parameter(name, value) setattr(self, name, v) for p_i in p: self.parameters_.add(p_i) for c_i in c: self.constants_.add(c_i) for o_i in o: self.observeds_.add(o_i)
def cdf(
self, X, **kwargs)
Cumulative distribution function.
This function returns the value of the cumulative distribution function
F(x_i) = p(x <= x_i)
, at all x_i
in X
.
Parameters
X
[array-like, shape=(n_samples, n_features)]: The samples.
Returns
cdf
[array, shape=(n_samples,)]:cdf(x_i)
for allx_i
inX
.
def cdf(self, X, **kwargs): """Cumulative distribution function. This function returns the value of the cumulative distribution function `F(x_i) = p(x <= x_i)`, at all `x_i` in `X`. Parameters ---------- * `X` [array-like, shape=(n_samples, n_features)]: The samples. Returns ------- * `cdf` [array, shape=(n_samples,)]: `cdf(x_i)` for all `x_i` in `X`. """ raise NotImplementedError
def fit(
self, X, bounds=None, constraints=None, use_gradient=True, optimizer=None, **kwargs)
Fit the distribution parameters to data by minimizing the negative log-likelihood of the data.
Parameters
-
X
[array-like, shape=(n_samples, n_features)]: The samples. -
bounds
[list of (parameter, (low, high))]: The parameter bounds. -
constraints
: The constraints on the parameters. -
use_gradient
[boolean, default=True]: Whether to use exact gradients (ifTrue
) or numerical gradients (ifFalse
). -
optimizer
[string]: The optimization method.
Returns
self
[object]:self
.
def fit(self, X, bounds=None, constraints=None, use_gradient=True, optimizer=None, **kwargs): """Fit the distribution parameters to data by minimizing the negative log-likelihood of the data. Parameters ---------- * `X` [array-like, shape=(n_samples, n_features)]: The samples. * `bounds` [list of (parameter, (low, high))]: The parameter bounds. * `constraints`: The constraints on the parameters. * `use_gradient` [boolean, default=True]: Whether to use exact gradients (if `True`) or numerical gradients (if `False`). * `optimizer` [string]: The optimization method. Returns ------- * `self` [object]: `self`. """ # Map parameters to placeholders param_to_placeholder = [] param_to_index = {} for i, v in enumerate(self.parameters_): w = T.TensorVariable(v.type) param_to_placeholder.append((v, w)) param_to_index[v] = i # Build bounds mapped_bounds = None if bounds is not None: mapped_bounds = [(None, None) for v in param_to_placeholder] for b in bounds: mapped_bounds[param_to_index[b["param"]]] = b["bounds"] # Build constraints mapped_constraints = None if constraints is not None: mapped_constraints = [] for c in constraints: args = c["param"] if isinstance(args, SharedVariable): args = (args, ) m_c = { "type": c["type"], "fun": lambda x: c["fun"](*[x[param_to_index[a]] for a in args]) } if "jac" in c: m_c["jac"] = lambda x: c["jac"](*[x[param_to_index[a]] for a in args]) mapped_constraints.append(m_c) # Derive objective and gradient objective_ = theano.function( [self.X] + [w for _, w in param_to_placeholder] + [theano.In(v, name=v.name) for v in self.observeds_], T.sum(self.nll_), givens=param_to_placeholder, allow_input_downcast=True) def objective(x): return objective_(X, *x, **kwargs) / len(X) if use_gradient: gradient_ = theano.function( [self.X] + [w for _, w in param_to_placeholder] + [theano.In(v, name=v.name) for v in self.observeds_], theano.grad(T.sum(self.nll_), [v for v, _ in param_to_placeholder]), givens=param_to_placeholder, allow_input_downcast=True) def gradient(x): return np.array(gradient_(X, *x, **kwargs)) / len(X) # Solve! x0 = np.array([v.get_value() for v, _ in param_to_placeholder]) r = minimize(objective, jac=gradient if use_gradient else None, x0=x0, method=optimizer, bounds=mapped_bounds, constraints=mapped_constraints) if r.success: # Assign the solution for i, value in enumerate(r.x): param_to_placeholder[i][0].set_value(value) else: print("Parameter fitting failed!") print(r) return self
def nll(
self, X, **kwargs)
Negative log-likelihood.
This function returns the value of the negative log-likelihood
- log(p(x_i))
, at all x_i
in X
.
Parameters
X
[array-like, shape=(n_samples, n_features)]: The samples.
Returns
nll
[array, shape=(n_samples,)]:-log(p(x_i))
for allx_i
inX
.
def nll(self, X, **kwargs): """Negative log-likelihood. This function returns the value of the negative log-likelihood `- log(p(x_i))`, at all `x_i` in `X`. Parameters ---------- * `X` [array-like, shape=(n_samples, n_features)]: The samples. Returns ------- * `nll` [array, shape=(n_samples,)]: `-log(p(x_i))` for all `x_i` in `X`. """ raise NotImplementedError
def pdf(
self, X, **kwargs)
Probability density function.
This function returns the value of the probability density function
p(x_i)
, at all x_i
in X
.
Parameters
X
[array-like, shape=(n_samples, n_features)]: The samples.
Returns
pdf
[array, shape=(n_samples,)]:p(x_i)
for allx_i
inX
.
def pdf(self, X, **kwargs): """Probability density function. This function returns the value of the probability density function `p(x_i)`, at all `x_i` in `X`. Parameters ---------- * `X` [array-like, shape=(n_samples, n_features)]: The samples. Returns ------- * `pdf` [array, shape=(n_samples,)]: `p(x_i)` for all `x_i` in `X`. """ raise NotImplementedError
def ppf(
self, X, **kwargs)
Percent point function (inverse of cdf).
Parameters
X
[array-like, shape=(n_samples, n_features)]: The samples.
Returns
ppf
[array, shape=(n_samples,)]: Percent point function for allx_i
inX
.
def ppf(self, X, **kwargs): """Percent point function (inverse of cdf). Parameters ---------- * `X` [array-like, shape=(n_samples, n_features)]: The samples. Returns ------- * `ppf` [array, shape=(n_samples,)]: Percent point function for all `x_i` in `X`. """ raise NotImplementedError
def rvs(
self, n_samples, random_state=None, **kwargs)
Draw samples.
Parameters
-
n_samples
[int]: The number of samples to draw. -
random_state
[integer or RandomState object]: The random seed.
Returns
X
[array, shape=(n_samples, n_features)]: The generated samples.
def rvs(self, n_samples, random_state=None, **kwargs): rng = check_random_state(random_state) p = rng.rand(n_samples, 1) return self.ppf(p, **kwargs)
def score(
self, X, **kwargs)
Evaluate the log-likelihood -self.nll(X).sum()
of X
.
The higher, the better.
Parameters
X
[array-like, shape=(n_samples, n_features)]: The samples.
Returns
score
[float]: The log-likelihood ofX
.
def score(self, X, **kwargs): """Evaluate the log-likelihood `-self.nll(X).sum()` of `X`. The higher, the better. Parameters ---------- * `X` [array-like, shape=(n_samples, n_features)]: The samples. Returns ------- * `score` [float]: The log-likelihood of `X`. """ return -self.nll(X, **kwargs).sum()
def set_params(
self, **params)
Set the parameters of this estimator.
The method works on simple estimators as well as on nested objects
(such as pipelines). The latter have parameters of the form
<component>__<parameter>
so that it's possible to update each
component of a nested object.
Returns
self
def set_params(self, **params): for name, value in params.items(): var = getattr(self, name, None) if isinstance(var, SharedVariable): var.set_value(value) elif (isinstance(var, T.TensorVariable) or isinstance(var, T.TensorConstant)): raise ValueError("Only shared variables can be updated.") else: super(DistributionMixin, self).set_params(**{name: value})
Instance variables
var constants_
var ndim
The number of dimensions (or features) of the data.
var observeds_
var parameters_
class Uniform
Uniform distribution.
This distribution supports 1D data only.
class Uniform(TheanoDistribution): """Uniform distribution. This distribution supports 1D data only. """ def __init__(self, low=0.0, high=1.0): """Constructor. Parameters ---------- * `low` [float]: The lower bound. * `high` [float]: The upper bound """ super(Uniform, self).__init__(low=low, high=high) # pdf self.pdf_ = T.switch( T.or_(T.lt(self.X, self.low), T.ge(self.X, self.high)), 0., 1. / (self.high - self.low)).ravel() self._make(self.pdf_, "pdf") # -log pdf self.nll_ = T.switch( T.or_(T.lt(self.X, self.low), T.ge(self.X, self.high)), np.inf, T.log(self.high - self.low)).ravel() self._make(self.nll_, "nll") # cdf self.cdf_ = T.switch( T.lt(self.X, self.low), 0., T.switch( T.lt(self.X, self.high), (self.X - self.low) / (self.high - self.low), 1.)).ravel() self._make(self.cdf_, "cdf") # ppf self.ppf_ = self.p * (self.high - self.low) + self.low self._make(self.ppf_, "ppf", args=[self.p])
Ancestors (in MRO)
- Uniform
- TheanoDistribution
- DistributionMixin
- sklearn.base.BaseEstimator
- builtins.object
Class variables
var X
var p
Static methods
def __init__(
self, low=0.0, high=1.0)
Constructor.
Parameters
-
low
[float]: The lower bound. -
high
[float]: The upper bound
def __init__(self, low=0.0, high=1.0): """Constructor. Parameters ---------- * `low` [float]: The lower bound. * `high` [float]: The upper bound """ super(Uniform, self).__init__(low=low, high=high) # pdf self.pdf_ = T.switch( T.or_(T.lt(self.X, self.low), T.ge(self.X, self.high)), 0., 1. / (self.high - self.low)).ravel() self._make(self.pdf_, "pdf") # -log pdf self.nll_ = T.switch( T.or_(T.lt(self.X, self.low), T.ge(self.X, self.high)), np.inf, T.log(self.high - self.low)).ravel() self._make(self.nll_, "nll") # cdf self.cdf_ = T.switch( T.lt(self.X, self.low), 0., T.switch( T.lt(self.X, self.high), (self.X - self.low) / (self.high - self.low), 1.)).ravel() self._make(self.cdf_, "cdf") # ppf self.ppf_ = self.p * (self.high - self.low) + self.low self._make(self.ppf_, "ppf", args=[self.p])
def cdf(
self, X, **kwargs)
Cumulative distribution function.
This function returns the value of the cumulative distribution function
F(x_i) = p(x <= x_i)
, at all x_i
in X
.
Parameters
X
[array-like, shape=(n_samples, n_features)]: The samples.
Returns
cdf
[array, shape=(n_samples,)]:cdf(x_i)
for allx_i
inX
.
def cdf(self, X, **kwargs): """Cumulative distribution function. This function returns the value of the cumulative distribution function `F(x_i) = p(x <= x_i)`, at all `x_i` in `X`. Parameters ---------- * `X` [array-like, shape=(n_samples, n_features)]: The samples. Returns ------- * `cdf` [array, shape=(n_samples,)]: `cdf(x_i)` for all `x_i` in `X`. """ raise NotImplementedError
def fit(
self, X, bounds=None, constraints=None, use_gradient=True, optimizer=None, **kwargs)
Fit the distribution parameters to data by minimizing the negative log-likelihood of the data.
Parameters
-
X
[array-like, shape=(n_samples, n_features)]: The samples. -
bounds
[list of (parameter, (low, high))]: The parameter bounds. -
constraints
: The constraints on the parameters. -
use_gradient
[boolean, default=True]: Whether to use exact gradients (ifTrue
) or numerical gradients (ifFalse
). -
optimizer
[string]: The optimization method.
Returns
self
[object]:self
.
def fit(self, X, bounds=None, constraints=None, use_gradient=True, optimizer=None, **kwargs): """Fit the distribution parameters to data by minimizing the negative log-likelihood of the data. Parameters ---------- * `X` [array-like, shape=(n_samples, n_features)]: The samples. * `bounds` [list of (parameter, (low, high))]: The parameter bounds. * `constraints`: The constraints on the parameters. * `use_gradient` [boolean, default=True]: Whether to use exact gradients (if `True`) or numerical gradients (if `False`). * `optimizer` [string]: The optimization method. Returns ------- * `self` [object]: `self`. """ # Map parameters to placeholders param_to_placeholder = [] param_to_index = {} for i, v in enumerate(self.parameters_): w = T.TensorVariable(v.type) param_to_placeholder.append((v, w)) param_to_index[v] = i # Build bounds mapped_bounds = None if bounds is not None: mapped_bounds = [(None, None) for v in param_to_placeholder] for b in bounds: mapped_bounds[param_to_index[b["param"]]] = b["bounds"] # Build constraints mapped_constraints = None if constraints is not None: mapped_constraints = [] for c in constraints: args = c["param"] if isinstance(args, SharedVariable): args = (args, ) m_c = { "type": c["type"], "fun": lambda x: c["fun"](*[x[param_to_index[a]] for a in args]) } if "jac" in c: m_c["jac"] = lambda x: c["jac"](*[x[param_to_index[a]] for a in args]) mapped_constraints.append(m_c) # Derive objective and gradient objective_ = theano.function( [self.X] + [w for _, w in param_to_placeholder] + [theano.In(v, name=v.name) for v in self.observeds_], T.sum(self.nll_), givens=param_to_placeholder, allow_input_downcast=True) def objective(x): return objective_(X, *x, **kwargs) / len(X) if use_gradient: gradient_ = theano.function( [self.X] + [w for _, w in param_to_placeholder] + [theano.In(v, name=v.name) for v in self.observeds_], theano.grad(T.sum(self.nll_), [v for v, _ in param_to_placeholder]), givens=param_to_placeholder, allow_input_downcast=True) def gradient(x): return np.array(gradient_(X, *x, **kwargs)) / len(X) # Solve! x0 = np.array([v.get_value() for v, _ in param_to_placeholder]) r = minimize(objective, jac=gradient if use_gradient else None, x0=x0, method=optimizer, bounds=mapped_bounds, constraints=mapped_constraints) if r.success: # Assign the solution for i, value in enumerate(r.x): param_to_placeholder[i][0].set_value(value) else: print("Parameter fitting failed!") print(r) return self
def nll(
self, X, **kwargs)
Negative log-likelihood.
This function returns the value of the negative log-likelihood
- log(p(x_i))
, at all x_i
in X
.
Parameters
X
[array-like, shape=(n_samples, n_features)]: The samples.
Returns
nll
[array, shape=(n_samples,)]:-log(p(x_i))
for allx_i
inX
.
def nll(self, X, **kwargs): """Negative log-likelihood. This function returns the value of the negative log-likelihood `- log(p(x_i))`, at all `x_i` in `X`. Parameters ---------- * `X` [array-like, shape=(n_samples, n_features)]: The samples. Returns ------- * `nll` [array, shape=(n_samples,)]: `-log(p(x_i))` for all `x_i` in `X`. """ raise NotImplementedError
def pdf(
self, X, **kwargs)
Probability density function.
This function returns the value of the probability density function
p(x_i)
, at all x_i
in X
.
Parameters
X
[array-like, shape=(n_samples, n_features)]: The samples.
Returns
pdf
[array, shape=(n_samples,)]:p(x_i)
for allx_i
inX
.
def pdf(self, X, **kwargs): """Probability density function. This function returns the value of the probability density function `p(x_i)`, at all `x_i` in `X`. Parameters ---------- * `X` [array-like, shape=(n_samples, n_features)]: The samples. Returns ------- * `pdf` [array, shape=(n_samples,)]: `p(x_i)` for all `x_i` in `X`. """ raise NotImplementedError
def ppf(
self, X, **kwargs)
Percent point function (inverse of cdf).
Parameters
X
[array-like, shape=(n_samples, n_features)]: The samples.
Returns
ppf
[array, shape=(n_samples,)]: Percent point function for allx_i
inX
.
def ppf(self, X, **kwargs): """Percent point function (inverse of cdf). Parameters ---------- * `X` [array-like, shape=(n_samples, n_features)]: The samples. Returns ------- * `ppf` [array, shape=(n_samples,)]: Percent point function for all `x_i` in `X`. """ raise NotImplementedError
def rvs(
self, n_samples, random_state=None, **kwargs)
Draw samples.
Parameters
-
n_samples
[int]: The number of samples to draw. -
random_state
[integer or RandomState object]: The random seed.
Returns
X
[array, shape=(n_samples, n_features)]: The generated samples.
def rvs(self, n_samples, random_state=None, **kwargs): rng = check_random_state(random_state) p = rng.rand(n_samples, 1) return self.ppf(p, **kwargs)
def score(
self, X, **kwargs)
Evaluate the log-likelihood -self.nll(X).sum()
of X
.
The higher, the better.
Parameters
X
[array-like, shape=(n_samples, n_features)]: The samples.
Returns
score
[float]: The log-likelihood ofX
.
def score(self, X, **kwargs): """Evaluate the log-likelihood `-self.nll(X).sum()` of `X`. The higher, the better. Parameters ---------- * `X` [array-like, shape=(n_samples, n_features)]: The samples. Returns ------- * `score` [float]: The log-likelihood of `X`. """ return -self.nll(X, **kwargs).sum()
def set_params(
self, **params)
Set the parameters of this estimator.
The method works on simple estimators as well as on nested objects
(such as pipelines). The latter have parameters of the form
<component>__<parameter>
so that it's possible to update each
component of a nested object.
Returns
self
def set_params(self, **params): for name, value in params.items(): var = getattr(self, name, None) if isinstance(var, SharedVariable): var.set_value(value) elif (isinstance(var, T.TensorVariable) or isinstance(var, T.TensorConstant)): raise ValueError("Only shared variables can be updated.") else: super(DistributionMixin, self).set_params(**{name: value})
Instance variables
var cdf_
var ndim
The number of dimensions (or features) of the data.
var nll_
var pdf_
var ppf_