-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathgeodata_loader.py
69 lines (55 loc) · 2.18 KB
/
geodata_loader.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
# -*- coding: utf-8 -*-
import random
import logging
from ipaddress import ip_network
from sqlalchemy.orm import sessionmaker
from maxmind_client import get_locations_for_ip
from ip_selector import filter_ip
from init_db import BlockedIpData, GeoPrefix, BlockGeoData, engine, get_bin_prefix
NUM_INDIVIDUAL_IPS = 0
logger = logging.getLogger(__name__)
Session = sessionmaker(bind=engine)
def load_some_geodata(session, addresses, is_subnet=False):
geo_map = dict()
for block_id, addr in addresses.items():
if addr.startswith('127'):
loc = {}
loc['latitude'] = random.uniform(52.297, 63.996)
loc['longitude'] = random.uniform(29.307, 135.654)
loc['covered_percentage'] = 100
loc['prefixes'] = [addr]
locations = [loc]
else:
try:
locations = get_locations_for_ip(addr, is_subnet)
if not locations:
continue
except:
continue
geo_map[block_id] = locations
for block_id, locations in geo_map.items():
for loc in locations:
block_geo = BlockGeoData(block_id, loc['longitude'], loc['latitude'])
session.add(block_geo)
session.flush()
prefix_bins = []
for prefix in loc['prefixes']:
prefix_bins.append(get_bin_prefix(ip_network(prefix)))
if is_subnet:
prefix_bin = get_bin_prefix(ip_network(addresses[block_id]))
if not prefix_bin in prefix_bins:
prefix_bins.append(prefix_bin)
for prefix in prefix_bins:
session.add(GeoPrefix(block_geo.id, prefix))
def load_geodata(session):
data = session.query(BlockedIpData.id, BlockedIpData.ip, BlockedIpData.ip_subnet).all()
session.rollback()
subnets_data = {row[0]: row[2] for row in data if row[2]}
load_some_geodata(session, subnets_data, True)
ips_data = {row[0]: row[1] for row in data if row[1]}
top_level_ips = filter_ip(ips_data, {})
load_some_geodata(session, top_level_ips)
session.commit()
if __name__ == '__main__':
session = Session()
load_geodata(session)