Skip to content

Commit

Permalink
Fix linting
Browse files Browse the repository at this point in the history
  • Loading branch information
mattigrthr committed Jul 29, 2022
1 parent 2e11a43 commit 3d63723
Show file tree
Hide file tree
Showing 6 changed files with 120 additions and 70 deletions.
6 changes: 4 additions & 2 deletions kuwala/common/python_utils/src/file_converter.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,5 +2,7 @@


def txt_to_csv(file_path):
read_file = pandas.read_csv(file_path, delimiter='\t', header=None, low_memory=False)
read_file.to_csv(file_path.replace('.txt', '.csv'), index=None, header=False)
read_file = pandas.read_csv(
file_path, delimiter="\t", header=None, low_memory=False
)
read_file.to_csv(file_path.replace(".txt", ".csv"), index=None, header=False)
91 changes: 64 additions & 27 deletions kuwala/core/database/importer/src/admin_boundary_importer.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,18 @@
from pyspark.sql.functions import lit


def import_osm_admin_boundaries(spark, database_host, database_port, database_name, database_url, database_properties,
country, file_path):
def import_osm_admin_boundaries(
spark,
database_host,
database_port,
database_name,
database_url,
database_properties,
country,
file_path,
):
if not os.path.exists(file_path):
logging.warning('No OSM admin boundaries file available. Skipping import.')
logging.warning("No OSM admin boundaries file available. Skipping import.")
return

data = (
Expand Down Expand Up @@ -38,37 +46,66 @@ def import_osm_admin_boundaries(spark, database_host, database_port, database_na

def import_geonames_cities(spark, database_url, database_properties, file_path):
if not os.path.exists(file_path):
logging.warning('No GeoNames city names available. Skipping import.')
logging.warning("No GeoNames city names available. Skipping import.")
return

data = spark.read.parquet(file_path)

data.write.option('truncate', True) \
.jdbc(url=database_url, table='admin_boundary_geonames_cities', mode='overwrite',
properties=database_properties)
data.write.option("truncate", True).jdbc(
url=database_url,
table="admin_boundary_geonames_cities",
mode="overwrite",
properties=database_properties,
)


def import_admin_boundaries(spark, database_host, database_port, database_name, database_url, database_properties,
continent, country, country_region):
def import_admin_boundaries(
spark,
database_host,
database_port,
database_name,
database_url,
database_properties,
continent,
country,
country_region,
):
start_time = time.time()

logging.info(f'Starting import of admin boundaries for '
f'{f"{country_region}, " if country_region else ""}{country}, {continent}')
logging.info(
f"Starting import of admin boundaries for "
f'{f"{country_region}, " if country_region else ""}{country}, {continent}'
)

script_dir = os.path.dirname(__file__)
file_path_geonames_cities = os.path.join(script_dir,
f'../../../../tmp/kuwala/admin_boundary_files/cities_500.parquet')
file_path_osm_admin_boundaries = \
os.path.join(script_dir, f'../../../../tmp/kuwala/admin_boundary_files/{continent}/{country}'
f'{f"/{country_region}" if country_region else ""}/admin_boundaries.parquet')

import_geonames_cities(spark=spark, database_url=database_url, database_properties=database_properties,
file_path=file_path_geonames_cities)
import_osm_admin_boundaries(spark=spark, database_host=database_host, database_port=database_port,
database_name=database_name, database_url=database_url,
database_properties=database_properties, country=country,
file_path=file_path_osm_admin_boundaries)

logging.info(f'Successfully imported admin boundaries for '
f'{f"{country_region}, " if country_region else ""}{country}, {continent} after '
f'{round(time.time() - start_time)} s')
file_path_geonames_cities = os.path.join(
script_dir, "../../../../tmp/kuwala/admin_boundary_files/cities_500.parquet"
)
file_path_osm_admin_boundaries = os.path.join(
script_dir,
f"../../../../tmp/kuwala/admin_boundary_files/{continent}/{country}"
f'{f"/{country_region}" if country_region else ""}/admin_boundaries.parquet',
)

import_geonames_cities(
spark=spark,
database_url=database_url,
database_properties=database_properties,
file_path=file_path_geonames_cities,
)
import_osm_admin_boundaries(
spark=spark,
database_host=database_host,
database_port=database_port,
database_name=database_name,
database_url=database_url,
database_properties=database_properties,
country=country,
file_path=file_path_osm_admin_boundaries,
)

logging.info(
f"Successfully imported admin boundaries for "
f'{f"{country_region}, " if country_region else ""}{country}, {continent} after '
f"{round(time.time() - start_time)} s"
)
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
import os

from pyspark.sql.functions import col, concat_ws, lit, udf
from pyspark.sql.types import IntegerType, StringType
from pyspark.sql.types import IntegerType
from shapely.geometry import shape


Expand Down
80 changes: 45 additions & 35 deletions kuwala/pipelines/admin-boundaries/src/geonames_controller.py
Original file line number Diff line number Diff line change
@@ -1,61 +1,71 @@
import os
import re
import zipfile
from python_utils.src.file_converter import txt_to_csv
from python_utils.src.FileDownloader import download_file

from pyspark.sql.functions import split
from pyspark.sql.types import DateType, DoubleType, IntegerType, StructType, StringType
from pyspark.sql.types import DateType, DoubleType, IntegerType, StringType, StructType
from python_utils.src.FileDownloader import download_file
from python_utils.src.file_converter import txt_to_csv


def download_geonames_file(dump_name, file_path):
download_file(url=f'https://download.geonames.org/export/dump/{dump_name}.zip', path=file_path)
download_file(
url=f"https://download.geonames.org/export/dump/{dump_name}.zip", path=file_path
)

with zipfile.ZipFile(file_path, 'r') as zip_ref:
zip_ref.extractall(file_path.split(f'/{dump_name}.zip')[0])
with zipfile.ZipFile(file_path, "r") as zip_ref:
zip_ref.extractall(file_path.split(f"/{dump_name}.zip")[0])

os.remove(file_path)


def get_schema():
return StructType() \
.add('geoname_id', IntegerType()) \
.add('name', StringType()) \
.add('ascii_name', StringType()) \
.add('alternate_names', StringType()) \
.add('latitude', DoubleType()) \
.add('longitude', DoubleType()) \
.add('feature_class', StringType()) \
.add('feature_code', StringType()) \
.add('country_code', StringType()) \
.add('alternate_country_codes', StringType()) \
.add('admin_1_code', StringType()) \
.add('admin_2_code', StringType()) \
.add('admin_3_code', StringType()) \
.add('admin_4_code', StringType()) \
.add('population', IntegerType()) \
.add('elevation', IntegerType()) \
.add('digital_elevation_model', IntegerType()) \
.add('timezone', StringType()) \
.add('modification_date', DateType())
return (
StructType()
.add("geoname_id", IntegerType())
.add("name", StringType())
.add("ascii_name", StringType())
.add("alternate_names", StringType())
.add("latitude", DoubleType())
.add("longitude", DoubleType())
.add("feature_class", StringType())
.add("feature_code", StringType())
.add("country_code", StringType())
.add("alternate_country_codes", StringType())
.add("admin_1_code", StringType())
.add("admin_2_code", StringType())
.add("admin_3_code", StringType())
.add("admin_4_code", StringType())
.add("population", IntegerType())
.add("elevation", IntegerType())
.add("digital_elevation_model", IntegerType())
.add("timezone", StringType())
.add("modification_date", DateType())
)


def get_geonames_cities(sp):
dump_name = 'cities500'
dump_name = "cities500"
script_dir = os.path.dirname(__file__)
file_path_zip = os.path.join(script_dir, f'../../../tmp/kuwala/admin_boundary_files/{dump_name}.zip')
file_path_txt = file_path_zip.replace('.zip', '.txt')
file_path_csv = file_path_zip.replace('.zip', '.csv')
r = re.compile('([a-zA-Z]+)([0-9]+)')
file_path_zip = os.path.join(
script_dir, f"../../../tmp/kuwala/admin_boundary_files/{dump_name}.zip"
)
file_path_txt = file_path_zip.replace(".zip", ".txt")
file_path_csv = file_path_zip.replace(".zip", ".csv")
r = re.compile("([a-zA-Z]+)([0-9]+)")
m = r.match(dump_name)
file_path_parquet = file_path_zip.replace(f'{dump_name}.zip', f'{m.group(1)}_{m.group(2)}.parquet')
file_path_parquet = file_path_zip.replace(
f"{dump_name}.zip", f"{m.group(1)}_{m.group(2)}.parquet"
)

download_geonames_file(dump_name=dump_name, file_path=file_path_zip)
txt_to_csv(file_path=file_path_txt)

df = sp.read.csv(file_path_csv, schema=get_schema())
df = df.withColumn('alternate_names', split('alternate_names', ',')) \
.withColumn('alternate_country_codes', split('alternate_country_codes', ','))
df = df.withColumn("alternate_names", split("alternate_names", ",")).withColumn(
"alternate_country_codes", split("alternate_country_codes", ",")
)

df.write.mode('overwrite').parquet(file_path_parquet)
df.write.mode("overwrite").parquet(file_path_parquet)
os.remove(file_path_txt)
os.remove(file_path_csv)
9 changes: 5 additions & 4 deletions kuwala/pipelines/admin-boundaries/src/main.py
Original file line number Diff line number Diff line change
@@ -1,14 +1,13 @@
import sys

sys.path.insert(0, '../../../common/')
sys.path.insert(0, "../../../common/")

import argparse
import os

from pyspark.sql import SparkSession
from admin_boundaries_controller import get_admin_boundaries
from geonames_controller import get_geonames_cities

from pyspark.sql import SparkSession

if __name__ == "__main__":
parser = argparse.ArgumentParser()
Expand All @@ -30,4 +29,6 @@
)

get_geonames_cities(sp=spark)
get_admin_boundaries(sp=spark, continent=continent, country=country, country_region=country_region)
get_admin_boundaries(
sp=spark, continent=continent, country=country, country_region=country_region
)
2 changes: 1 addition & 1 deletion kuwala/pipelines/osm-poi/osm-parquetizer

0 comments on commit 3d63723

Please sign in to comment.