Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

PermutationCorrelator class to complement Iman-Conover #679

Open
wants to merge 4 commits into
base: main
Choose a base branch
from
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
297 changes: 294 additions & 3 deletions src/semeio/fmudesign/iman_conover.py
Original file line number Diff line number Diff line change
Expand Up @@ -239,7 +239,298 @@ def decorrelate(X, remove_variance=True):
return mean + X


if __name__ == "__main__":
import doctest
import itertools


class PermutationCorrelator:
def __init__(
self,
correlation_matrix,
*,
weights=None,
iterations=1000,
max_iter_no_change=250,
correlation_type="pearson",
seed=None,
verbose=False,
):
"""Create a PermutationCorrelator instance, which induces correlations
between variables in X by randomly shuffling rows in a given column.

Parameters
----------
correlation_matrix : 2d numpy array
A target correlation matrix. We only check that this matrix is
square and symmetric, since even a non-valid correlation matrix
will work with this algorithm.
weights : 2d numpy array or None, optional
Elementwise weights for the target correlation matrix.
The default is None, which corresponds to uniform weights.
iterations : int, optional
Maximal number of iterations to run. Each iterations consists of
one loop over all variables. Choosing 0 means infinite iterations.
The default is 1000.
max_iter_no_change : int, optional
Maximal number of iterations without any improvement in the
objective before terminating. The default is 250.
correlation_type : str, optional
Either "pearson" or "spearman". The default is "pearson".
seed : int or None, optional
A seed for the random number generator. The default is None.
verbose : bool, optional
Whether or not to print information. The default is False.

Notes
-----
The paper "Correlation control in small-sample Monte Carlo type
simulations I: A simulated annealing approach" by Vořechovský et al.
proposes using simulated annealing. We implement a simple randomized
hill climbing procedure instead, because it is good enough.
- https://www.sciencedirect.com/science/article/pii/S0266892009000113
- https://en.wikipedia.org/wiki/Hill_climbing

Examples
--------
>>> rng = np.random.default_rng(42)
>>> X = rng.normal(size=(100, 2))
>>> sp.stats.pearsonr(*X.T).statistic
0.1573...
>>> correlation_matrix = np.array([[1, 0.7], [0.7, 1]])
>>> perm_trans = PermutationCorrelator(correlation_matrix, seed=0)
>>> X_transformed = perm_trans.hill_climb(X)
>>> sp.stats.pearsonr(*X_transformed.T).statistic
0.6999...

For large matrices, it often makes sense to first use Iman-Conover
to get a good initial solution, then give it to PermutationCorrelator.
Start by creating a large correlation matrix:

>>> variables = 25
>>> correlation_matrix = np.ones((variables, variables)) * 0.7
>>> np.fill_diagonal(correlation_matrix, 1.0)
>>> perm_trans = PermutationCorrelator(correlation_matrix,
... iterations=1000,
... max_iter_no_change=250,
... seed=0, verbose=True)

Create data X, then transform using Iman-Conover:

>>> X = rng.normal(size=(10 * variables, variables))
>>> perm_trans._error(X) # Initial error
0.4846...
>>> ic_trans = ImanConover(correlation_matrix)
>>> X_ic = ic_trans(X)
>>> perm_trans._error(X_ic) # Error after Iman-Conover
0.0071...
>>> X_ic_pc = perm_trans.hill_climb(X_ic)
Running permutation correlator for 1000 iterations.
Iteration 100 Error: 0.0037
Iteration 200 Error: 0.0024
Iteration 300 Error: 0.0017
Iteration 400 Error: 0.0014
Iteration 500 Error: 0.0012
Iteration 600 Error: 0.0010
Iteration 700 Error: 0.0008
Iteration 800 Error: 0.0007
Iteration 900 Error: 0.0006
Terminating at iteration 940.
No improvement for 250 iterations. Error: 0.0006
>>> perm_trans._error(X_ic_pc) # Error after Iman-Conover + permutation
0.0006119...
"""
corr_types = {"pearson": self._pearson, "spearman": self._spearman}

doctest.testmod()
# Validate all input arguments
if not isinstance(correlation_matrix, np.ndarray):
raise TypeError("Input argument `correlation_matrix` must be NumPy array.")
if not correlation_matrix.ndim == 2:
raise ValueError("Correlation matrix must be square.")
if not correlation_matrix.shape[0] == correlation_matrix.shape[1]:
raise ValueError("Correlation matrix must be square.")
if not np.allclose(correlation_matrix.T, correlation_matrix):
raise ValueError("Correlation matrix must be symmetric.")
if not (weights is None or (isinstance(weights, np.ndarray))):
raise TypeError("Input argument `weights` must be None or NumPy array.")
if not (weights is None or weights.shape == correlation_matrix.shape):
raise ValueError("`weights` and `correlation_matrix` must have same shape.")
if not (weights is None or np.all(weights > 0)):
raise ValueError("`weights` must have positive entries.")
if not isinstance(iterations, int) and iterations >= 0:
raise ValueError("`iterations` must be non-negative integer.")
if not isinstance(max_iter_no_change, int) and max_iter_no_change >= 0:
raise ValueError("`max_iter_no_change` must be non-negative integer.")
if iterations == 0 and max_iter_no_change == 0:
raise ValueError("`iterations` or `max_iter_no_change` must be positive.")
if not (isinstance(correlation_type, str) and correlation_type in corr_types):
raise ValueError(
f"`correlation_type` must be one of: {tuple(corr_types.keys())}"
)
if not (seed is None or isinstance(seed, int)):
raise TypeError("`seed` must be None or an integer")
if not isinstance(verbose, bool):
raise TypeError("`verbose` must be boolean")

self.C = correlation_matrix.copy()
weights = np.ones_like(self.C) if weights is None else weights
self.weights = weights / np.sum(weights)
self.iters = iterations
self.max_iter_no_change = max_iter_no_change
self.correlation_func = corr_types[correlation_type]
self.rng = np.random.default_rng(seed)
self.verbose = verbose
self.triu_indices = np.triu_indices(self.C.shape[0], k=1)

def _pearson(self, X):
"""Given a matrix X of shape (m, n), return a matrix of shape (n, n)
with Pearson correlation coefficients."""
# The majority of runtime is spent computing correlation coefficients.
# Any attempt to speed up this code should focus on that.
# It's possible to compute the difference is the objective function
# without explicitly computing the empirical correlation afresh in
# every iteration. If X has shape (m, n), then this can take the
# runtime from O(m*n*n) to O(n), but it requires Python-loops and
# bookkeeping.
return np.corrcoef(X, rowvar=False)

def _spearman(self, X):
"""Given a matrix X of shape (m, n), return a matrix of shape (n, n)
with Spearman correlation coefficients."""
if X.shape[1] == 2:
spearman_corr = sp.stats.spearmanr(X).statistic
return np.array([[1.0, spearman_corr], [spearman_corr, 1.0]])
else:
return sp.stats.spearmanr(X).statistic

@staticmethod
def _swap(X, i, j, k):
"""Swap rows i and j in column k inplace."""
X[i, k], X[j, k] = X[j, k], X[i, k]

def _error(self, X):
"""Compute RMSE over upper triangular part of corr(X) - C."""
corr = self.correlation_func(X) # Correlation matrix
idx = self.triu_indices # Get upper triangular indices (ignore diag)
weighted_residuals_sq = self.weights[idx] * (corr[idx] - self.C[idx]) ** 2.0
return np.sqrt(np.sum(weighted_residuals_sq))

def hill_climb(self, X):
"""Hill climbing cycles through columns (variables), and for each
column it swaps two random rows (observations). If the result
leads to a smaller error (correlation closer to target), then it is
kept. If not we try again.

Parameters
----------
X : np.ndarray
A matrix with shape (observations, variables).

Returns
-------
A copy of X where rows within each column are shuffled.
"""
num_obs, num_vars = X.shape
if not (isinstance(X, np.ndarray) and X.ndim == 2):
raise ValueError("`X` must be a 2D numpy array.")
if not num_vars == self.C.shape[0]:
raise ValueError(
"Number of variables in `X` does not match `correlation_matrix`."
)

if self.verbose:
print(f"Running permutation correlator for {self.iters} iterations.")

# Set up loop generator
iters = range(1, self.iters + 1) if self.iters else itertools.count(1)
loop_gen = itertools.product(iters, range(num_vars)) # (iteration, k)

# Set up variables that are tracked in the main loop
current_X = X.copy()
current_error = self._error(current_X)
iter_no_change = 0

# Main loop. For each iteration, k cycles through all variables
for iteration, k in loop_gen:
print_iter = iteration % (self.iters // 10) if self.iters else 1000
if self.verbose and print_iter == 0 and k == 0:
print(f" Iteration {iteration} Error: {current_error:.4f}")

# Choose a random variable and two random observations
# Using rng.integers() is faster than rng.choice(replace=False)
i, j = self.rng.integers(0, high=num_obs, size=2)
if i == j:
continue

# Turn current_X into a new proposed X by swapping two observations
# i and j in column (variable) k.
self._swap(current_X, i, j, k)
proposed_error = self._error(current_X)

# The proposed X was better
if proposed_error < current_error:
current_error = proposed_error
iter_no_change = 0

# The proposed X was worse
else:
self._swap(current_X, i, j, k) # Swap indices back
iter_no_change += 1

# Termination condition triggered
if self.max_iter_no_change and (iter_no_change >= self.max_iter_no_change):
if self.verbose:
print(f""" Terminating at iteration {iteration}.
No improvement for {iter_no_change} iterations. Error: {current_error:.4f}""")
break

return current_X


if __name__ == "__main__":
import pytest

pytest.main([__file__, "--doctest-modules"])

if __name__ == "__main__" and True:
import matplotlib.pyplot as plt

# Create data
sampler = sp.stats.qmc.LatinHypercube(d=2, seed=42, scramble=True)
X = sampler.random(n=1000)
correlation_matrix = np.array([[1, 0.7], [0.7, 1]])

# IC only
transform = ImanConover(correlation_matrix)
X_ic = transform(X)
IC_corr = sp.stats.pearsonr(*X_ic.T).statistic
print("IC", IC_corr)

fig, (ax1, ax2, ax3) = plt.subplots(1, 3, figsize=(6, 3))
ax1.set_title(f"IC with correlation: {IC_corr:.3f}")
ax1.scatter(*X_ic.T, s=1)

# PC only
iterations = 500 # int(X.shape[0]**1.5)
pc_transform = PermutationCorrelator(
correlation_matrix,
seed=0,
iterations=iterations,
max_iter_no_change=100,
verbose=True,
)
X_pc = pc_transform.hill_climb(X)
PC_corr = sp.stats.pearsonr(*X_pc.T).statistic
print("PC", PC_corr)

ax2.set_title(f"HC: {PC_corr:.3f}")
ax2.scatter(*X_pc.T, s=1)

# IC first, then PC on the results
X_ic_pc = pc_transform.hill_climb(X_ic)
IC_PC_corr = sp.stats.pearsonr(*X_ic_pc.T).statistic
print("IC + PC", IC_PC_corr)

ax3.set_title(f"IC + HC: {IC_PC_corr:.3f}")
ax3.scatter(*X_ic_pc.T, s=1)

fig.tight_layout()
plt.show()
Loading