Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

WIP: Headbang re-work #11

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -1,2 +1,8 @@
*.pyc
.ipynb_checkpoints/

# Egg metadata
*.egg-info/

# Distributables
dist/
124 changes: 121 additions & 3 deletions esda/smoothing.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,9 @@
from scipy.stats import gamma, norm, chi2, poisson
from functools import reduce

from itertools import combinations
import warnings

__all__ = ['Excess_Risk', 'Empirical_Bayes', 'Spatial_Empirical_Bayes', 'Spatial_Rate', 'Kernel_Smoother', 'Age_Adjusted_Smoother', 'Disk_Smoother', 'Spatial_Median_Rate', 'Spatial_Filtering', 'Headbanging_Triples', 'Headbanging_Median_Rate', 'flatten', 'weighted_median', 'sum_by_n', 'crude_age_standardization', 'direct_age_standardization', 'indirect_age_standardization', 'standardized_mortality_ratio', 'choynowski', 'assuncao_rate']


Expand Down Expand Up @@ -1533,8 +1536,11 @@ def by_col(cls, df, e, b, x_grid, y_grid, geom_col='geometry', **kwargs):
outdf = pd.concat(res)
return outdf


"""
class Headbanging_Triples(object):
"""Generate a pseudo spatial weights instance that contains headbanging triples
"""
"""Generate a pseudo spatial weights instance that contains headbanging triples

Parameters
----------
Expand Down Expand Up @@ -1660,7 +1666,8 @@ class Headbanging_Triples(object):

>>> round(extrapolated[1],5), round(extrapolated[2],6)
(0.33753, 0.302707)
"""
"""
"""
def __init__(self, data, w, k=5, t=3, angle=135.0, edgecor=False):
raise DeprecationWarning('Deprecated')
if k < 3:
Expand Down Expand Up @@ -1721,6 +1728,117 @@ def __init__(self, data, w, k=5, t=3, angle=135.0, edgecor=False):
break
self.triples[ps[point]].append(extra[0])
self.extra[ps[point]] = extra
"""


class Headbanging_Triples(object):
@staticmethod
def is_valid_triple(p0, p1, p2, angle):
ray1 = Ray(p0, p1)
ray2 = Ray(p0, p2)
ang = abs(np.rad2deg(get_angle_between(ray1, ray2)))
return ang > angle

@staticmethod
def construct_triples(p0, neighbors, angle):
triple = []
for i1, i2 in combinations(neighbors.keys(), 2):
if i1 > i2: # Need to swap for consistency sake
i2, i1 = i1, i2
p1 = tuple(neighbors[i1])
p2 = tuple(neighbors[i2])
if Headbanging_Triples.is_valid_triple(p0, p1, p2, angle):
triple.append(((p1, p2), (i1, i2)))
return triple

@staticmethod
def construct_extra_triples(p0, neighbors, angle):
extras = []
for i1, i2 in combinations(neighbors.keys(), 2):
p1 = tuple(neighbors[i1])
p2 = tuple(neighbors[i2])
extra = None
if Headbanging_Triples.is_valid_triple(p1, p0, p2, 90 + angle / 2):
extra = Headbanging_Triples.construct_one_extra(p0, p1, p2)
ix1, ix2 = i1, i2
elif Headbanging_Triples.is_valid_triple(p2, p0, p1,
90 + angle / 2):
extra = Headbanging_Triples.construct_one_extra(p0, p2, p1)
ix2, ix1 = i1, i2
if extra:
extras.append(((ix1, ix2),
get_points_dist(p1, p2),
get_points_dist(p0, extra)))
extras = [(dist1, ix, dist1, dist2) for ix, dist1, dist2 in extras]
if len(extras) > 0:
extras = sorted(extras)[0]
i1, i2, i3, i4 = extras
return [i2, i3, i4]
else:
return []

@staticmethod
def construct_one_extra(p0, p1, p2):
ray1 = Ray(p1, p0)
ray2 = Ray(p1, p2)
ang = get_angle_between(ray1, ray2)
dist = get_points_dist(p0, p1)
ray0 = Ray(p0, p1)
return get_point_at_angle_and_dist(ray0, (2 * ang) - np.pi, dist)

def __init__(self, data, w, t=3, angle=135.0, edgecor=False):
if w.k < 3:
raise ValueError("w should be NeareastNeighbors instance & the "
"number of neighbors should be more than 3.")
if not w.id_order_set:
raise ValueError("w id_order must be set to align with the order "
"of data")
self.triples = {}
for key in w.neighbors.keys():
p0 = tuple(data[key])
neighbors_ix = w.neighbors[key]
neighbor_points = data[neighbors_ix]
neighbors = {
ix: tuple(neighbor_points[i])
for i, ix in enumerate(neighbors_ix)
}
triples = Headbanging_Triples.construct_triples(p0, neighbors,
angle)
if len(triples) > 3:
triple_dis = []
for points, triple in triples:
dist = get_segment_point_dist(
LineSegment(points[0], points[1]), p0)
triple_dis.append((dist, triple))
triples = triple_dis[:t]
if not edgecor and len(triples) == 0:
warnings.warn('Index %s has no eligible triple and edge-'
'correction is off. Consider adding more '
'neighbors or using a smaller angle.' % key)
self.triples[key] = [triple for _, triple in triples]
if edgecor:
self.extra = {}
for key in self.triples.keys():
if len(self.triples[key]) == 0:
p0 = tuple(data[key])
neighbors_ix = w.neighbors[key]
neighbor_points = data[neighbors_ix]
neighbors = {
ix: tuple(neighbor_points[i])
for i, ix in enumerate(neighbors_ix)
}
extra = Headbanging_Triples.construct_extra_triples(
p0,
neighbors,
angle
)
if extra == []:
warnings.warn('edge-correction failed for index %s. '
'Consider adding more neighbors or '
'using a smaller angle.' % key)
else:
self.extra[key] = extra



class Headbanging_Median_Rate(object):
Expand Down Expand Up @@ -1808,7 +1926,7 @@ class Headbanging_Median_Rate(object):
array([ 0.00091659, 0. , 0.00156838, 0.0018315 , 0.00498891])
"""
def __init__(self, e, b, t, aw=None, iteration=1):
raise DeprecationWarning('Deprecated')
# raise DeprecationWarning('Deprecated')
self.r = e * 1.0 / b
self.tr, self.aw = t.triples, aw
if hasattr(t, 'extra'):
Expand Down
128 changes: 128 additions & 0 deletions esda/tests/test_smoothing.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@
from .. import smoothing as sm
import numpy as np
from libpysal.common import RTOL, ATOL, pandas
import warnings


PANDAS_EXTINCT = pandas is None

Expand Down Expand Up @@ -257,6 +259,8 @@ def test_Smoother_multicol(self):
np.testing.assert_allclose(out_df[col].values[:5], answer,
rtol=RTOL, atol=ATOL)


"""
class TestHB(unittest.TestCase):
def setUp(self):
sids = pysal.open(pysal.examples.get_path('sids2.shp'), 'r')
Expand Down Expand Up @@ -337,6 +341,130 @@ def test_Headbanging_Median_Rate_tabular(self):
self.assertIsInstance(this_col, np.ndarray)
np.testing.assert_allclose(sids_hr_df[col][:5], answer,
rtol=RTOL, atol=ATOL*10)
"""

class TestHB(unittest.TestCase):
def setUp(self):
sids = pysal.open(pysal.examples.get_path('sids2.shp'), 'r')
self.sids = sids
self.d = np.array([i.centroid for i in sids])
self.w = KNN.from_array(self.d, k=5)
if not self.w.id_order_set:
self.w.id_order = self.w.id_order
sids_db = pysal.open(pysal.examples.get_path('sids2.dbf'), 'r')
self.b, self.e = np.array(sids_db[:, 8]), np.array(sids_db[:, 9])
self.sids_hb_rr5 = np.array([0.00075586, 0.,
0.0008285, 0.0018315, 0.00498891])
self.sids_hb_r2r5 = np.array([0.0008285, 0.00084331,
0.00086896, 0.0018315, 0.00498891])
self.sids_hb_r3r5 = np.array([0.00091659, 0.,
0.00156838, 0.0018315, 0.00498891])
if not PANDAS_EXTINCT:
self.df = sids_db.to_df()
self.ename = 'SID74'
self.bname = 'BIR74'
self.enames = [self.ename, 'SID79']
self.bnames = [self.bname, 'BIR79']
self.sids79r = np.array([.000563, .001659, .001879,
.002410, .002720])

def test_Headbanging_Triples(self):
with warnings.catch_warnings(record=True) as w:
ht = sm.Headbanging_Triples(self.d, self.w)
self.assertTrue(len(w) >= 5) # Should have at least 5 warnings
self.assertEqual(len(ht.triples), len(self.d))

with warnings.catch_warnings(record=True) as w:
ht2 = sm.Headbanging_Triples(self.d, self.w, edgecor=True)
self.assertTrue(len(w) > 0) # Should have at least 1 warning
self.assertTrue(hasattr(ht2, 'extra'))

with warnings.catch_warnings(record=True) as w:
ht = sm.Headbanging_Triples(self.d, self.w, edgecor=True,
angle=120)
self.assertTrue(len(w) == 0) # Should have no warnings
self.assertEqual(len(ht2.triples), len(self.d))
htr = sm.Headbanging_Median_Rate(self.e, self.b, ht2, iteration=5)
self.assertEqual(len(htr.r), len(self.e))
for i in htr.r:
self.assertTrue(i is not None)

def test_headbang_valid_triple(self):
p0 = (0, 0) # Center
p1 = (0, -1) # x_1 vertex
p2_45_yes = (1, 1.01) # Should be just beyond 135 degrees
p2_45_no = (1.01, 1) # Should be just before 135 degrees
p2_n45_yes = (-1, 1.01) # Should be just beyond 135 degrees
p2_n45_no = (-1.01, 1) # Should be just before 135 degrees

result = sm.Headbanging_Triples.is_valid_triple(p0, p1, p2_45_yes, 135)
self.assertTrue(result)

result = sm.Headbanging_Triples.is_valid_triple(p0, p1, p2_n45_yes,
135)
self.assertTrue(result)

result = sm.Headbanging_Triples.is_valid_triple(p0, p1, p2_45_no, 135)
self.assertTrue(~result)

result = sm.Headbanging_Triples.is_valid_triple(p0, p1, p2_n45_no, 135)
self.assertTrue(~result)

def test_headbang_make_triple(self):
p0 = (0, 0)
neighbors = {2: [-1, 0],
5: [0, -1],
42: [1, 0],
99: [0, 1]}
result = sm.Headbanging_Triples.construct_triples(p0, neighbors, 135)
expected = [(((-1, 0), (1, 0)), (2, 42)), (((0, -1), (0, 1)), (5, 99))]
self.assertTrue(sorted(result) == sorted(expected))
p0 = (0, 0)
neighbors = {2: [-1, 0.5],
5: [0, -1],
42: [1, 0],
99: [0.5, 1]}
result = sm.Headbanging_Triples.construct_triples(p0, neighbors, 135)
expected = [(((-1, .5), (1, 0)), (2, 42)),
(((0, -1), (.5, 1)), (5, 99))]
self.assertTrue(sorted(result) == sorted(expected))

def test_construct_one_extra(self):
p0 = (0., 0.)
p1 = (1., 100.)
p2 = (1., 105.)
result = sm.Headbanging_Triples.construct_one_extra(p0, p1, p2)
expected = (1., -100.)
np.testing.assert_allclose(result, expected)

p0 = (0., 0.)
p1 = (-1., 100.)
p2 = (-1., 105.)
result = sm.Headbanging_Triples.construct_one_extra(p0, p1, p2)
expected = (-1., -100.)
np.testing.assert_allclose(result, expected)

def test_construct_extras(self):
p0 = (0., 0.)
neighbors = {2: (1, 100),
5: (1, 105)}
result = sm.Headbanging_Triples.construct_extra_triples(p0, neighbors,
135)
expected = [(2, 5), 5., np.sqrt(100 ** 2 + 1 ** 2)]
np.testing.assert_equal(result[0], expected[0])
np.testing.assert_allclose(result[1], expected[1])
np.testing.assert_allclose(result[2], expected[2])

p0 = (0., 0.)
neighbors = {2: (1, 105),
5: (1, 100)}
result = sm.Headbanging_Triples.construct_extra_triples(p0, neighbors,
135)
expected = [(5, 2), 5., np.sqrt(100 ** 2 + 1 ** 2)]
np.testing.assert_equal(result[0], expected[0])
np.testing.assert_allclose(result[1], expected[1])
np.testing.assert_allclose(result[2], expected[2])


class TestKernel_AgeAdj_SM(unittest.TestCase):
def setUp(self):
Expand Down