"""Collection of metrics for evaluating the performance of probabilistic models."""
from collections import namedtuple
import numpy as np
from arviz_base import convert_to_datatree, dataset_to_dataarray, extract, rcParams
from scipy.spatial import cKDTree
from scipy.stats import wasserstein_distance, wasserstein_distance_nd
from arviz_stats.base import array_stats
from arviz_stats.base.stats_utils import round_num
[docs]
def bayesian_r2(
data,
pred_mean=None,
scale=None,
scale_kind="sd",
summary=True,
group="posterior",
point_estimate=None,
ci_kind=None,
ci_prob=None,
circular=False,
round_to=None,
):
r"""Bayesian :math:`R^2` for regression models.
The :math:`R^2`, or coefficient of determination, is defined as the proportion of variance
in the data that is explained by the model.
The Bayesian :math:`R^2` (or modeled :math:`R^2`) differs from other definitions of
:math:`R^2` in that it is computed only using posterior quantities from the fitted model.
For details of the Bayesian :math:`R^2` see [1]_.
Briefly, it is defined as:
.. math::
R^2 = \frac{\mathrm{Var}_{\mu}}{\mathrm{Var}_{\mu} + \mathrm{Var}_{\mathrm{res}}}
where :math:`\mathrm{Var}_{\mu}` is the variance of the predicted means,
and :math:`\mathrm{Var}_{\mathrm{res}}` is the modelled residual variance.
For a Gaussian family, this is :math:`\\sigma^2`.
For a Bernoulli family, this is :math:`p(1-p)`, where :math:`p` is the predicted
probability of success (see [2]_ for details). This is computed internally if
`scale` is not provided.
For other models, you may need to compute the appropriate scale variable
representing the modeled variance (or pseudo-variance) and pass it using
the ``scale`` argument.
Parameters
----------
data : DataTree or InferenceData
Input data. It should contain the posterior and posterior_predictive groups.
pred_mean : str
Name of the variable representing the predicted mean.
scale : str, optional
Name of the variable representing the modeled variance (or pseudo-variance).
It can be omitted for binary classification problems, in which case the pseudo-variance
is computed internally.
scale_kind : str
Whether the variable referenced by `scale` is a standard deviation ("sd")
or variance ("var"). Defaults to "sd".
If "sd", it is squared internally to obtain the variance. Omitted if `scale` is None.
summary: bool
Whether to return a summary (default) or an array of :math:`R^2` samples.
The summary is a named tuple with a point estimate and a credible interval
group : str, optional
Group from which to obtain the predicted means (`pred_mean`) and scale (`scale`).
point_estimate: str
The point estimate to compute. If None, the default value is used.
Defaults values are defined in rcParams["stats.point_estimate"]. Ignored if
summary is False.
ci_kind: str
The kind of credible interval to compute. If None, the default value is used.
Defaults values are defined in rcParams["stats.ci_kind"]. Ignored if
summary is False.
ci_prob: float
The probability for the credible interval. If None, the default value is used.
Defaults values are defined in rcParams["stats.ci_prob"]. Ignored if
summary is False.
circular: bool
Whether to compute the residual :math:`R^2` for circular data. Defaults to False.
It's assumed that the circular data is in radians and ranges from -π to π.
:math:`R^2 = 1 - \mathrm{Var}_{\mathrm{res}}`. Thus the `scale` must represent the
modeled circular variance and `scale_kind` must be "var".
We avoid using the term math::`\mathrm{Var}_{\mu}`, because as the dispersion of the
circular data increases the dispersion of the mean also increase so even for a model
that does not explain any of the data :math:`R^2` can be much higher than 0.
round_to: int or str or None, optional
If integer, number of decimal places to round the result. Integers can be negative.
If string of the form '2g' number of significant digits to round the result.
Defaults to rcParams["stats.round_to"] if None. Use the string "None" or "none" to
return raw numbers.
Returns
-------
Namedtuple or array
See Also
--------
arviz_stats.residual_r2 : Residual :math:`R^2`.
arviz_stats.loo_r2 : LOO-adjusted :math:`R^2`.
Examples
--------
Calculate Bayesian :math:`R^2` for logistic regression:
.. ipython::
In [1]: from arviz_stats import bayesian_r2
...: from arviz_base import load_arviz_data
...: data = load_arviz_data('anes')
...: bayesian_r2(data, pred_mean="p")
Calculate Bayesian :math:`R^2` for circular regression. The posterior has
the concentration parameter ``kappa`` (from the VonMises distribution).
Thus we compute the circular variance as :math:`1 - I_1(\kappa) / I_0(\kappa)`,
.. ipython::
In [1]: from scipy.special import i0, i1
...: data = load_arviz_data('periwinkles')
...: kappa = data.posterior['kappa']
...: data.posterior["variance"] = 1 - i1(kappa) / i0(kappa)
...: bayesian_r2(data, pred_mean='mu', scale='variance',
...: scale_kind="var", circular=True)
References
----------
.. [1] Gelman et al. *R-squared for Bayesian regression models*.
The American Statistician. 73(3) (2019). https://doi.org/10.1080/00031305.2018.1549100
preprint http://www.stat.columbia.edu/~gelman/research/published/bayes_R2_v3.pdf.
.. [2] Tjur, T. *Coefficient of determination in logistic regression models-A new proposal:
The coefficient of discrimination* The American Statistician, 63(4) (2009).
https://doi.org/10.1198/tast.2009.08210
"""
if point_estimate is None:
point_estimate = rcParams["stats.point_estimate"]
if ci_kind is None:
ci_kind = rcParams["stats.ci_kind"]
if ci_prob is None:
ci_prob = rcParams["stats.ci_prob"]
if round_to is None:
round_to = rcParams["stats.round_to"]
mu_pred = extract(data, group=group, var_names=pred_mean).values.T
if scale is not None:
scale = extract(data, group=group, var_names=scale).values.T
r_squared = array_stats.bayesian_r2(mu_pred, scale, scale_kind, circular)
if summary:
return _summary_r2("bayesian", r_squared, point_estimate, ci_kind, ci_prob, round_to)
return r_squared
[docs]
def residual_r2(
data,
pred_mean=None,
obs_name=None,
summary=True,
group="posterior",
point_estimate=None,
ci_kind=None,
ci_prob=None,
circular=False,
round_to=None,
):
r"""Residual :math:`R^2` for Bayesian regression models.
The :math:`R^2`, or coefficient of determination, is defined as the proportion of variance
in the data that is explained by the model. For details of the residual :math:`R^2` see [1]_.
Briefly, it is defined as:
.. math::
R^2 = \frac{\mathrm{Var}_{\mu}}{\mathrm{Var}_{\mu} + \mathrm{Var}_{\mathrm{res}}}
where :math:`\mathrm{Var}_{\mu}` is the variance of the predicted means,
and :math:`\mathrm{Var}_{\mathrm{res}}` is the residual variance.
.. math::
\mathrm{Var}_{\mathrm{res}}^s = V_{n=1}^N \hat{e}_n^s,
where :math:`\hat{e}_n^s=y_n-\hat{y}_n^s` are the residuals for observation :math:`n` in
posterior sample :math:`s`.
The residual :math:`R^2` differs from the Bayesian :math:`R^2` in that it computes
residual variance from the observed data, while for the Bayesian :math:`R^2` all
variance terms come from the model, and not directly from the data.
Parameters
----------
data : DataTree or InferenceData
Input data. It should contain the posterior_predictive and observed_data groups.
pred_mean : str
Name of the variable representing the predicted mean.
obs_name : str, optional
Name of the variable representing the observed data.
summary: bool
Whether to return a summary (default) or an array of :math:`R^2` samples.
The summary is a named tuple with a point estimate and a credible interval
group : str, optional
Group from which to obtain the predicted means (`pred_name`). Defaults to "posterior".
point_estimate: str
The point estimate to compute. If None, the default value is used.
Defaults values are defined in rcParams["stats.point_estimate"]. Ignored if
summary is False.
ci_kind: str
The kind of credible interval to compute. If None, the default value is used.
Defaults values are defined in rcParams["stats.ci_kind"]. Ignored if
summary is False.
ci_prob: float
The probability for the credible interval. If None, the default value is used.
Defaults values are defined in rcParams["stats.ci_prob"]. Ignored if
summary is False.
circular: bool
Whether to compute the residual :math:`R^2` for circular data. Defaults to False.
It's assumed that the circular data is in radians and ranges from -π to π.
:math:`R^2 = 1 - Var_{\mathrm{res}}`. where :math:`Var_{\mathrm{res}}` is computed
using the circular variance which goes from 0 to 1.
We avoid using the term math::`\mathrm{Var}_{\mu}`, because as the dispersion of the
circular data increases the dispersion of the mean also increase so even for a model
that does not explain any of the data :math:`R^2` can be much higher than 0.
round_to: int or str or None, optional
If integer, number of decimal places to round the result. Integers can be negative.
If string of the form '2g' number of significant digits to round the result.
Defaults to rcParams["stats.round_to"] if None. Use the string "None" or "none" to
return raw numbers.
Returns
-------
Namedtuple or array
See Also
--------
arviz_stats.bayesian_r2 : Bayesian :math:`R^2`.
arviz_stats.loo_r2 : LOO-adjusted :math:`R^2`.
Examples
--------
Calculate residual :math:`R^2` for Bayesian logistic regression:
.. ipython::
In [1]: from arviz_stats import residual_r2
...: from arviz_base import load_arviz_data
...: data = load_arviz_data('anes')
...: residual_r2(data, pred_mean='p', obs_name='vote')
Calculate residual :math:`R^2` for Bayesian circular regression:
.. ipython::
In [1]: data = load_arviz_data('periwinkles')
...: residual_r2(data, pred_mean='mu', obs_name='direction', circular=True)
References
----------
.. [1] Gelman et al. *R-squared for Bayesian regression models*.
The American Statistician. 73(3) (2019). https://doi.org/10.1080/00031305.2018.1549100
preprint http://www.stat.columbia.edu/~gelman/research/published/bayes_R2_v3.pdf.
"""
if point_estimate is None:
point_estimate = rcParams["stats.point_estimate"]
if ci_kind is None:
ci_kind = rcParams["stats.ci_kind"]
if ci_prob is None:
ci_prob = rcParams["stats.ci_prob"]
if round_to is None:
round_to = rcParams["stats.round_to"]
y_true = extract(data, group="observed_data", var_names=obs_name, combined=False).values
mu_pred = extract(data, group=group, var_names=pred_mean).values.T
r_squared = array_stats.residual_r2(y_true, mu_pred, circular)
if summary:
return _summary_r2("residual", r_squared, point_estimate, ci_kind, ci_prob, round_to)
return r_squared
[docs]
def metrics(data, kind="rmse", var_name=None, sample_dims=None, round_to=None):
"""
Compute performace metrics.
Currently supported metrics are mean absolute error, mean squared error and
root mean squared error.
For classification problems, accuracy and balanced accuracy are also supported.
Parameters
----------
data: DataTree or InferenceData
It should contain groups `observed_data` and `posterior_predictive`.
kind: str
The kind of metric to compute. Available options are:
- 'mae': mean absolute error.
- 'mse': mean squared error.
- 'rmse': root mean squared error. Default.
- 'acc': classification accuracy.
- 'acc_balanced': balanced classification accuracy.
var_name: str, optional
The name of the observed and predicted variable.
sample_dims: iterable of hashable, optional
Dimensions to be considered sample dimensions and are to be reduced.
Default ``rcParams["data.sample_dims"]``.
round_to: int or str or None, optional
If integer, number of decimal places to round the result. Integers can be negative.
If string of the form '2g' number of significant digits to round the result.
Defaults to rcParams["stats.round_to"] if None. Use the string "None" or "none" to
return raw numbers.
Returns
-------
estimate: namedtuple
A namedtuple with the mean of the computed metric and its standard error.
Examples
--------
Calculate root mean squared error
.. ipython::
In [1]: from arviz_stats import metrics
...: from arviz_base import load_arviz_data
...: dt = load_arviz_data("radon")
...: metrics(dt, kind="rmse")
Calculate accuracy of a logistic regression model
.. ipython::
In [1]: dt = load_arviz_data("anes")
...: metrics(dt, kind="acc")
Notes
-----
The computation of the metrics is done by first reducing the posterior predictive
samples, this is done to mirror the computation of the metrics by the
:func:`arviz_stats.loo_metrics` function, and hence make comparison easier to perform.
"""
if sample_dims is None:
sample_dims = rcParams["data.sample_dims"]
if round_to is None:
round_to = rcParams["stats.round_to"]
if var_name is None:
var_name = list(data.observed_data.data_vars.keys())[0]
observed = data.observed_data[var_name]
predicted = data.posterior_predictive[var_name].mean(dim=sample_dims)
return _metrics(observed, predicted, kind, round_to)
[docs]
def kl_divergence(
data1,
data2,
group="posterior",
var_names=None,
sample_dims=None,
num_samples=500,
round_to=None,
random_seed=212480,
):
"""Compute the Kullback-Leibler (KL) divergence.
The KL-divergence is a measure of how different two probability distributions are.
It represents how much extra uncertainty are we introducing when we use one
distribution to approximate another. The KL-divergence is not symmetric, thus
changing the order of the `data1` and `data2` arguments will change the result.
For details of the approximation used to the compute the KL-divergence see [1]_.
Parameters
----------
data1, data2 : DataArray, Dataset, DataTree, or InferenceData
group : hashable, default "posterior"
Group on which to compute the kl-divergence.
var_names : str or list of str, optional
Names of the variables for which the KL-divergence should be computed.
sample_dims : iterable of hashable, optional
Dimensions to be considered sample dimensions and are to be reduced.
Default ``rcParams["data.sample_dims"]``.
num_samples : int
Number of samples to use for the distance calculation. Default is 500.
round_to: int or str or None, optional
If integer, number of decimal places to round the result. Integers can be negative.
If string of the form '2g' number of significant digits to round the result.
Defaults to rcParams["stats.round_to"] if None. Use the string "None" or "none" to
return raw numbers.
random_seed : int
Random seed for reproducibility. Use None for no seed.
Returns
-------
KL-divergence : float
Examples
--------
Calculate the KL-divergence between the posterior distributions
for the variable mu in the centered and non-centered eight schools models
.. ipython::
In [1]: from arviz_stats import kl_divergence
...: from arviz_base import load_arviz_data
...: data1 = load_arviz_data('centered_eight')
...: data2 = load_arviz_data('non_centered_eight')
...: kl_divergence(data1, data2, var_names="mu")
References
----------
.. [1] F. Perez-Cruz, *Kullback-Leibler divergence estimation of continuous distributions*
IEEE International Symposium on Information Theory. (2008)
https://doi.org/10.1109/ISIT.2008.4595271.
preprint https://www.tsc.uc3m.es/~fernando/bare_conf3.pdf
"""
if round_to is None:
round_to = rcParams["stats.round_to"]
dist1, dist2 = _prepare_distribution_pair(
data1,
data2,
group=group,
var_names=var_names,
sample_dims=sample_dims,
num_samples=num_samples,
random_seed=random_seed,
)
kl_d = _kld(dist1, dist2)
return round_num(kl_d, round_to)
[docs]
def wasserstein(
data1,
data2,
group="posterior",
var_names=None,
sample_dims=None,
joint=True,
num_samples=500,
round_to=None,
random_seed=212480,
):
"""Compute the Wasserstein-1 distance.
The Wasserstein distance, also called the Earth mover’s distance or the optimal transport
distance, is a similarity metric between two probability distributions [1]_.
Parameters
----------
data1, data2 : DataArray, Dataset, DataTree, or InferenceData
group : hashable, default "posterior"
Group on which to compute the Wasserstein distance.
var_names : str or list of str, optional
Names of the variables for which the Wasserstein distance should be computed.
sample_dims : iterable of hashable, optional
Dimensions to be considered sample dimensions and are to be reduced.
Default ``rcParams["data.sample_dims"]``.
joint : bool, default True
Whether to compute Wasserstein distance for the joint distribution (True)
or over the marginals (False)
num_samples : int
Number of samples to use for the distance calculation. Default is 500.
round_to: int or str or None, optional
If integer, number of decimal places to round the result. Integers can be negative.
If string of the form '2g' number of significant digits to round the result.
Defaults to rcParams["stats.round_to"] if None. Use the string "None" or "none" to
return raw numbers.
random_seed : int
Random seed for reproducibility. Use None for no seed.
Returns
-------
wasserstein_distance : float
Notes
-----
The computation is faster for the marginals (`joint=False`). This is equivalent to
assume the marginals are independent, which usually is not the case.
This function uses the :func:`scipy.stats.wasserstein_distance` for the computation of the
marginals and :func:`scipy.stats.wasserstein_distance_nd` for the joint distribution.
Examples
--------
Calculate the Wasserstein distance between the posterior distributions
for the variable mu in the centered and non-centered eight schools models
.. ipython::
In [1]: from arviz_stats import wasserstein
...: from arviz_base import load_arviz_data
...: data1 = load_arviz_data('centered_eight')
...: data2 = load_arviz_data('non_centered_eight')
...: wasserstein(data1, data2, var_names="mu")
References
----------
.. [1] "Wasserstein metric",
https://en.wikipedia.org/wiki/Wasserstein_metric
"""
if round_to is None:
round_to = rcParams["stats.round_to"]
dist1, dist2 = _prepare_distribution_pair(
data1,
data2,
group=group,
var_names=var_names,
sample_dims=sample_dims,
num_samples=num_samples,
random_seed=random_seed,
)
if joint:
distance = wasserstein_distance_nd(dist1, dist2)
else:
distance = 0
for var1, var2 in zip(dist1.T, dist2.T):
distance += wasserstein_distance(var1, var2)
distance = distance.item()
return round_num(distance, round_to)
def _prepare_distribution_pair(
data1, data2, group, var_names, sample_dims, num_samples, random_seed
):
"""Prepare the distribution pair for metric calculations."""
data1 = convert_to_datatree(data1)
data2 = convert_to_datatree(data2)
if sample_dims is None:
sample_dims = rcParams["data.sample_dims"]
dist1 = _extract_and_reindex(
data1,
group=group,
var_names=var_names,
sample_dims=sample_dims,
num_samples=num_samples,
random_seed=random_seed,
)
dist2 = _extract_and_reindex(
data2,
group=group,
var_names=var_names,
sample_dims=sample_dims,
num_samples=num_samples,
random_seed=random_seed,
)
shared_var_names = set(dist1.data_vars) & set(dist2.data_vars)
if not shared_var_names:
raise ValueError(
"No shared variable names found between the two datasets. "
"Ensure that both datasets contain variables with matching names."
)
if var_names is None:
var_names = list(shared_var_names)
dist1, dist2 = dist1[var_names], dist2[var_names]
dist1 = dataset_to_dataarray(dist1, sample_dims=["sample"])
dist2 = dataset_to_dataarray(dist2, sample_dims=["sample"])
return dist1, dist2
def _extract_and_reindex(data, group, var_names, sample_dims, num_samples, random_seed):
return (
extract(
data,
group=group,
sample_dims=sample_dims,
var_names=var_names,
num_samples=num_samples,
random_seed=random_seed,
keep_dataset=True,
)
.reset_index("sample")
.drop_vars(sample_dims)
.assign_coords(sample=range(num_samples))
)
def _kld(ary0, ary1):
"""Kullback-Leibler divergence approximation.
Compute KL-divergence using equation 14 from [1]_. Assumes both arrays
are of the same shape.
Parameters
----------
ary0, ary1 : (N, M) array-like
Samples of the input distributions. ``N`` represents the number of samples (e.g. posterior
samples) and ``M`` the number of outputs (e.g. number of variables in the posterior)
Returns
-------
float
The Kullback-Leibler divergence between the two
distributions.
References
----------
.. [1] F. Perez-Cruz, *Kullback-Leibler divergence estimation of continuous distributions*
IEEE International Symposium on Information Theory. (2008)
https://doi.org/10.1109/ISIT.2008.4595271.
preprint https://www.tsc.uc3m.es/~fernando/bare_conf3.pdf
"""
# for discrete data we need to smooth the samples to avoid numerical errors
# here we are adding a small noise to all samples, differences should be negligible
# but we may want to do something more sophisticated in the future
rng = np.random.default_rng(0)
ary0 = ary0 + rng.normal(0, ary0.std(axis=0) / 1e6, size=ary0.shape)
ary1 = ary1 + rng.normal(0, ary1.std(axis=0) / 1e6, size=ary1.shape)
samples, dim = ary0.shape
# Build KD-trees for X and Y
kd_tree_ary0 = cKDTree(ary0)
kd_tree_ary1 = cKDTree(ary1)
# first nearest neighbour distances of X to Y
r_k, _ = kd_tree_ary1.query(ary0)
# second nearest neighbour distances of X to X
# we skip the trivial first nearest neighbour distance
s_k = kd_tree_ary0.query(ary0, k=2)[0][:, 1]
kl_div = (dim / samples) * np.sum(np.log(r_k / s_k)) + np.log(samples / (samples - 1))
# Due to numerical errors and for very similar samples we can get negative values
kl_div = max(0.0, kl_div.item())
return kl_div
def _metrics(observed, predicted, kind, round_to):
"""Compute performance metrics.
Parameters
----------
observed: DataArray
Observed data.
predicted: DataArray
Predicted data.
kind: str
The kind of metric to compute. Available options are:
- 'mae': mean absolute error.
- 'mse': mean squared error.
- 'rmse': root mean squared error. Default.
- 'acc': classification accuracy.
- 'acc_balanced': balanced classification accuracy.
round_to: int or str or None, optional
If integer, number of decimal places to round the result. Integers can be negative.
If string of the form '2g' number of significant digits to round the result.
Defaults to rcParams["stats.round_to"] if None. Use the string "None" or "none" to
return raw numbers.
Returns
-------
estimate: namedtuple
A namedtuple with the mean of the computed metric and its standard error.
"""
if round_to is None:
round_to = rcParams["stats.round_to"]
valid_kind = ["mae", "rmse", "mse", "acc", "acc_balanced"]
if kind not in valid_kind:
raise ValueError(f"kind must be one of {valid_kind}")
estimate = namedtuple(kind, ["mean", "se"])
mean, std_error = array_stats.metrics(observed, predicted, kind=kind)
return estimate(round_num(mean, round_to), round_num(std_error, round_to))
def _summary_r2(name, r_squared, point_estimate, ci_kind, ci_prob, round_to):
estimate = getattr(np, point_estimate)(r_squared).item()
c_i = getattr(array_stats, ci_kind)(r_squared, ci_prob)
r2_summary = namedtuple(f"{name}_R2", [point_estimate, f"{ci_kind}_lb", f"{ci_kind}_ub"])
estimate = round_num(estimate, round_to)
c_i = (round_num(c_i[0].item(), round_to), round_num(c_i[1].item(), round_to))
return r2_summary(estimate, c_i[0], c_i[1])