-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathclean_flights_starter.py
67 lines (58 loc) · 2.59 KB
/
clean_flights_starter.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
# Exercise:
# “clean” a CSV file using PySpark.
# * Grab sample data from https://packages.revolutionanalytics.com/datasets/AirlineSubsetCsv.tar.gz
# * Inspect the columns: what type of data do they hold?
# * Create an ETL job with PySpark where you read in the csv file, and perform
# the cleansing steps mentioned in the classroom:
# - improve column names (subjective)
# - fix data types
# - flag missing or unknown data
# - remove redundant data
# * Write the data to a parquet file. How big is the parquet file compared
# to the compressed csv file? And compared to the uncompressed csv file?
# How long does your processing take?
# * While your job is running, open up the Spark UI and get a feel for what's
# there (together with the instructor).
# For explanations on the columns, check https://www.transtats.bts.gov/Fields.asp?gnoyr_VQ=FGK
from pathlib import Path
import findspark
findspark.init()
from pyspark.sql import DataFrame, SparkSession
def read_data(path: Path):
spark = SparkSession.builder.getOrCreate()
return spark.read.csv(
str(path),
# For a CSV, `inferSchema=False` means every column stays of the string
# type. There is no time wasted on inferring the schema, which is
# arguably not something you would depend on in production either.
inferSchema=False,
header=True,
# The dataset mixes two values for null: sometimes there's an empty attribute,
# which you will see in the CSV file as two neighboring commas. But there are
# also literal "null" strings, like in this sample: `420.0,null,,1.0,`
# The following option makes a literal null-string equivalent to the empty value.
nullValue="null",
)
# TODO: implement the clean function
def clean(input_df: DataFrame) -> DataFrame:
return input_df
if __name__ == "__main__":
# use relative paths, so that the location of this project on your system
# won't mean editing paths
path_to_exercises = Path(__file__).parents[1]
resources_dir = path_to_exercises / "resources"
target_dir = path_to_exercises / "target"
# Create the folder where the results of this script's ETL-pipeline will
# be stored.
target_dir.mkdir(exist_ok=True)
# Extract
df = read_data(resources_dir / "flights")
# Transform
cleaned_frame = clean(df)
# Load
cleaned_frame.write.parquet(
path=str(target_dir / "cleaned_flights"),
mode="overwrite",
# Exercise: how much bigger are the files when the compression codec is set to "uncompressed"? And 'gzip'?
compression="snappy",
)