MCMC Estimation of Copula CAN'T WORK
The samples is the initial values.
import numpy as np import os import re os.environ["CUDA_VISIBLE_DEVICES"] ="-1" from scipy.special import gamma import scipy.stats
import tensorflow as tf from tensorflow.keras.models import Model from tensorflow.keras.layers import Dense from tensorflow.keras.optimizers import Adam from math import pi
import tensorflow_probability as tfp tfd = tfp.distributions tfb = tfp.bijectors import matplotlib.pyplot as plt import matplotlib.animation as animation import seaborn as sns from scipy.integrate import quad
print("TFP Version", tfp.version) print("TF Version", tf.version)
np.random.seed(1704)
import scipy import seaborn import matplotlib.pyplot as plt import numpy as np from scipy.integrate import nquad
data = np.array(value_lists)[:, :, 0] NUM_POSTERIOR_SAMPLES = 10000 NUM_BURNIN_ITERATIONS = int(0.25 * NUM_BURNIN_ITERATIONS) NUM_ADAPTATION = int(0.5 * NUM_BURNIN_ITERATIONS)
class GaussianCopulaTriL(tfd.TransformedDistribution):
"""Takes a location, and lower triangular matrix for the Cholesky factor."""
def __init__(self, loc, scale_tril):
super(GaussianCopulaTriL, self).__init__(
distribution=tfd.MultivariateNormalTriL(
loc=loc,
scale_tril=scale_tril),
bijector=tfb.NormalCDF(),
validate_args=False,
name="GaussianCopulaTriLUniform")
def _parameter_properties(self, dtype, num_classes=None):
return dict(
# Annotations may optionally specify properties, such as `event_ndims`,
# `default_constraining_bijector_fn`, `specifies_shape`, etc.; see
# the `ParameterProperties` documentation for details.
loc=tfp.util.ParameterProperties(),
scale_tril=tfp.util.ParameterProperties())
class WarpedGaussianCopula(tfd.TransformedDistribution):
"""Application of a Gaussian Copula on a list of target marginals.
This implements an application of a Gaussian Copula. Given [x_0, ... x_n]
which are distributed marginally (with CDF) [F_0, ... F_n],
`GaussianCopula` represents an application of the Copula, such that the
resulting multivariate distribution has the above specified marginals.
The marginals are specified by `marginal_bijectors`: These are
bijectors whose `inverse` encodes the CDF and `forward` the inverse CDF.
block_sizes is a 1-D Tensor to determine splits for `marginal_bijectors`
length should be same as length of `marginal_bijectors`.
See tfb.Blockwise for details
"""
def __init__(self, loc, scale_tril, marginal_bijectors, block_sizes=None):
super(WarpedGaussianCopula, self).__init__(
distribution=GaussianCopulaTriL(loc=loc, scale_tril=scale_tril),
bijector=tfb.Blockwise(bijectors=marginal_bijectors,
block_sizes=block_sizes),
validate_args=False,
name="GaussianCopula")
def _parameter_properties(self, dtype, num_classes=None):
return dict(
# Annotations may optionally specify properties, such as `event_ndims`,
# `default_constraining_bijector_fn`, `specifies_shape`, etc.; see
# the `ParameterProperties` documentation for details.
loc=tfp.util.ParameterProperties(),
scale_tril=tfp.util.ParameterProperties(),
marginal_bijectors = tfp.util.ParameterProperties(),
block_sizes = tfp.util.ParameterProperties())
def joint_log_prob(data,
correlation,
ijv_loc,
ijv_scale,
cca_loc,
cca_scale):
"""
Estimate the mean and variance of a Normal distribution.
Parameters
----------
@data: the observed data, binary (0, 1)
@mu: the mean parameter. This is "learnable"
@sigma: the variance parameter. This is "learnable"
"""
correlation_prior = tfd.Normal(loc=0., scale=1.)
ijv_loc_prior = tfd.Normal(loc=400., scale=200.)
ijv_scale_prior = tfd.Normal(loc=100., scale=50.)
cca_loc_prior = tfd.Normal(loc=50., scale=50.)
cca_scale_prior = tfd.Normal(loc=20., scale=10.)
zero_prior = tfd.Normal(loc=0., scale=1.)
if True:
rv_data = WarpedGaussianCopula(
loc=[0., 0.],
scale_tril=[[1., 0.], [correlation, tf.sqrt(1. - correlation ** 2)]],
#These encode the marginals we want. In this case we want X_0 has
#Kumaraswamy marginal, and X_1 has Gumbel marginal.
marginal_bijectors=[
tfb.Invert(tfb.KumaraswamyCDF(ijv_loc, ijv_scale)),
tfb.Invert(tfb.GumbelCDF(loc=cca_loc, scale=cca_scale))])
return (
correlation_prior.log_prob(correlation) +
ijv_loc_prior.log_prob(ijv_loc) +
ijv_scale_prior.log_prob(ijv_scale) +
cca_loc_prior.log_prob(cca_loc) +
cca_scale_prior.log_prob(cca_scale) +
tf.reduce_sum(rv_data.log_prob(data))
)
def prediction(correlation_,
ijv_loc_,
ijv_scal_,
cca_loc_,
cca_scale_):
sampler = lambda z: np.random.choice(z, 1, replace=True)
correlation = np.array(sampler(correlation_))[0]
ijv_loc = np.array(sampler(ijv_loc_))[0]
ijv_scale = np.array(sampler(ijv_scal_))[0]
cca_loc = np.array(sampler(cca_loc_))[0]
cca_scale = np.array(sampler(cca_scale_))[0]
print("correlation, ijv_loc, ijv_scale, cca_loc, cca_scale",
correlation, ijv_loc, ijv_scale, cca_loc, cca_scale)
y_hat = WarpedGaussianCopula(
loc=[0., 0.],
scale_tril=[[1., 0.], [correlation, tf.sqrt(1. - correlation ** 2)]],
marginal_bijectors=[
tfb.Invert(tfb.KumaraswamyCDF(ijv_loc, ijv_scale)),
tfb.Invert(tfb.GumbelCDF(loc=cca_loc, scale=cca_scale))])
return y_hat
data = np.array(value_lists)[:, 1:, 0]
unnormalized_posterior = lambda correlation, ijv_loc, ijv_scale, cca_loc, cca_scale: \
joint_log_prob(data, correlation, ijv_loc, ijv_scale, cca_loc, cca_scale)
[correlation_, ijv_loc_, ijv_scale_, cca_loc_, cca_scale_] = run_chain(
construct_hmc(unnormalized_posterior, adaptation_steps=NUM_ADAPTATION),
inits=[0., 300., 200., 50., 20.],
iters=[NUM_POSTERIOR_SAMPLES, NUM_BURNIN_ITERATIONS]
)
can you share run_chain and construct_hmc? it's probably a too-large learning rate leading to rejections.
can you share
run_chainandconstruct_hmc? it's probably a too-large learning rate leading to rejections.
ok, thanks.
def run_chain(hmc, inits=[], iters=[NUM_POSTERIOR_SAMPLES, NUM_BURNIN_ITERATIONS]): """ Extract samples from HMC chain
Parameters
----------
@hmc: a MonteCarlo chain (returned from construct_hmc())
@initial_state: starting point for the chain
@iters: list [length of chain, burnin iterations]
def construct_hmc(log_posterior, adaptation_steps=NUM_ADAPTATION): """ Define a reaonsably "vanilla" HMC chain.
Parameters
----------
@log_posterior: function of the joint posterior distribution
"""
hmc = tfp.mcmc.SimpleStepSizeAdaptation(
# The actual HMC is very simple to define
tfp.mcmc.HamiltonianMonteCarlo(
# Log Posterior goes here
target_log_prob_fn=log_posterior,
num_leapfrog_steps=3,
# constant step size
step_size=1
),
num_adaptation_steps=adaptation_steps
)
return hmc
Returns
-------
samples: list of posterior samples for each parameter estimated
"""
samples, is_accepted = tfp.mcmc.sample_chain(
num_results=iters[0],
num_burnin_steps=iters[1],
current_state=inits,
kernel=hmc
)
return samples
I misspoke. Meant step size not learning rate. Yours is set to 1, which is almost certainly too big. Think of this like the step size parameter in an Euler integration. Start small, like 1e-3. If you get some accepted proposals, great. Step size adaptation should take it from there. If no accepts, 1e-4, and so on. As you decrease step size, you may want to increase leapfrog steps to explore sufficiently, otherwise it's not much different than random walk metropolis.