diff --git a/esda/moran_local_mv.py b/esda/moran_local_mv.py index fda9a744..9daa1d5b 100644 --- a/esda/moran_local_mv.py +++ b/esda/moran_local_mv.py @@ -23,7 +23,7 @@ def __init__( y : (N,1) array array of data that is the targeted "outcome" covariate to compute the multivariable Moran's I - X : (N,3) array + X : (N,p) array array of data that is used as "confounding factors" to account for their covariance with Y. W : (N,N) weights object @@ -83,7 +83,7 @@ def __init__( if isinstance(W, Graph): W = W.transform("R") else: - W.transform = "r" # TODO: as a function for graph + W.transform = "r" y = y - y.mean() if unit_scale: y /= y.std() @@ -154,7 +154,10 @@ def __init__( self.quads_ = self._uvquads_[:, 1] def _make_data(self, z, X, W): - Wz = lag_spatial(W, z) + if isinstance(W, Graph): + Wz = W.lag(z) + else: + Wz = lag_spatial(W, z) if X is not None: D = np.hstack((np.ones(z.shape), z, X)) P = X.shape[1] + 1 @@ -368,7 +371,10 @@ def _crand(self): lisas = np.zeros((self.connectivity.n, self.permutations)) n_1 = self.connectivity.n - 1 prange = list(range(self.permutations)) - k = self.connectivity.cardinalities.max() + 1 + if isinstance(self.connectivity, Graph): + k = self.connectivity.cardinalities.max() + 1 + else: + k = self.connectivity.max_neighbors + 1 nn = self.connectivity.n - 1 rids = np.array([np.random.permutation(nn)[0:k] for i in prange]) ids = np.arange(self.connectivity.n) diff --git a/esda/tests/test_moran_local_mv.py b/esda/tests/test_moran_local_mv.py index 375052e1..4d11ca4b 100644 --- a/esda/tests/test_moran_local_mv.py +++ b/esda/tests/test_moran_local_mv.py @@ -3,6 +3,7 @@ import geopandas import pytest from libpysal.weights import Queen +from libpysal.graph import Graph from sklearn.linear_model import TheilSenRegressor from esda.moran_local_mv import Partial_Moran_Local, Auxiliary_Moran_Local from esda.moran import Moran_Local_BV @@ -12,21 +13,24 @@ y = df.HH_INC.values.reshape(-1,1) X = df.HSG_VAL.values.reshape(-1,1) -w = Queen.from_dataframe(df) -w.transform = 'r' +w_classic = Queen.from_dataframe(df) +w_classic.transform = 'r' +g = Graph.build_contiguity(df).transform("r") -def test_partial_runs(): +pytestmark = pytest.mark.parametrize("w", [g, w_classic]) + +def test_partial_runs(w): """Check if the class computes successfully in a default configuration""" m = Partial_Moran_Local(y,X,w, permutations=1) # done, just check if it runs -def test_partial_accuracy(): +def test_partial_accuracy(w): """Check if the class outputs expected results at a given seed""" numpy.random.seed(111221) m = Partial_Moran_Local(y,X,w, permutations=10) # compute result by hand zy = (y - y.mean())/y.std() - wz = (w.sparse * zy) + wz = (w.sparse @ zy) zx = (X - X.mean(axis=0))/X.std(axis=0) rho = numpy.corrcoef(zy.squeeze(), zx.squeeze())[0,1] left = zy - rho * zx @@ -45,7 +49,7 @@ def test_partial_accuracy(): is_odd_label = m.labels_ % 2 numpy.testing.assert_equal(is_cluster, is_odd_label) -def test_partial_unscaled(): +def test_partial_unscaled(w): """Check if the variance scaling behaves as expected""" m = Partial_Moran_Local(y,X,w, permutations=0, unit_scale=True) m2 = Partial_Moran_Local(y,X,w, permutations=0, unit_scale=False) @@ -55,33 +59,35 @@ def test_partial_unscaled(): assert s1y > s2y, "variance is incorrectly scaled for y" assert s1x < s2x, "variance is incorrectly scaled for x" -def test_partial_uvquads(): +def test_partial_uvquads(w): """Check that the quadrant decisions vary correctly with the inputs""" m = Partial_Moran_Local(y,X,w, permutations=0, mvquads=False) bv = Moran_Local_BV(y,X,w,permutations=0) # TODO: this currently fails, and it should pass. I am probably mis-calculating the bivariate quadrants for this option, and need to correct the code. numpy.testing.assert_array_equal(m.quads_, bv.q) -def test_aux_runs(): +def test_aux_runs(w): + print(type(w), w.transform) """Check that the class completes successfully in a default configuration""" - a = Auxiliary_Moran_Local(y,X,w, permutations=1) + a = Auxiliary_Moran_Local(y,X, w, permutations=1) #done, just check if it runs -def test_aux_accuracy(): +def test_aux_accuracy(w): + print(type(w), w.transform) """Check that the class outputs expected values for a given seed""" numpy.random.seed(111221) a = Auxiliary_Moran_Local(y,X,w, permutations=10) # compute result by hand zy = (y - y.mean())/y.std() - wz = (w.sparse * zy) + wz = (w.sparse @ zy) zx = (X - X.mean(axis=0))/X.std(axis=0) - wzx = w.sparse * zx + wzx = w.sparse @ zx rho = numpy.corrcoef(zy.squeeze(), zx.squeeze())[0,1] mean = zy * wz - rho * zx * wz - rho * zy * wzx + rho**2 * zx * wzx scale = (w.n-1) / (w.n * (1 - rho**2)) - manual = (mean * scale).squeeze() + manual = numpy.asarray(mean * scale).squeeze() # check values, may not be identical because of the # matrix inversion least squares estimator used in scikit numpy.testing.assert_allclose(manual, a.associations_) @@ -94,7 +100,8 @@ def test_aux_accuracy(): is_odd_label = (a.labels_ % 2).astype(bool) numpy.testing.assert_equal(is_cluster, is_odd_label) -def test_aux_unscaled(): +def test_aux_unscaled(w): + print(type(w), w.transform) """Check that the variance scaling behaves as expected""" a = Auxiliary_Moran_Local(y,X/10000,w, permutations=0, unit_scale=True) a2 = Auxiliary_Moran_Local(y,X,w, permutations=0, unit_scale=False) @@ -102,7 +109,8 @@ def test_aux_unscaled(): "variance is not scaled correctly in partial regression." ) -def test_aux_transformer(): +def test_aux_transformer(w): + print(type(w), w.transform) """Check that an alternative regressor can be used to calculate y|X""" a = Auxiliary_Moran_Local(y,X,w, permutations=0, transformer=TheilSenRegressor) # done, should just complete \ No newline at end of file