Skip to content

Commit 0c76a7d

Browse files
authored
FIX: Tableau writer schema mismatch with Bus schema (#486)
Primary fix: The bus_schema defined in the Tableau writer needs to be in the same order as the polars schema from the Bus_Performance_Manager outputs in bus_vehicle_events. Changes: Re-aligns the order of the tableau schema to match bus schema Add a select statement to ensure the read in Table is in the correct schema order Refactor conversions for tableau analysis out into a new method for easy testing and verification Add new analysis scripts that help check for issues in Tableau/parquet writing Updated debugging info in this area
1 parent d3e58f1 commit 0c76a7d

File tree

6 files changed

+171
-40
lines changed

6 files changed

+171
-40
lines changed

analysis/check_bus_tableau.py

Lines changed: 92 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,92 @@
1+
#!/usr/bin/env python
2+
from lamp_py.runtime_utils.remote_files import bus_events
3+
import pyarrow
4+
import pyarrow.parquet as pq
5+
import pyarrow.dataset as pd
6+
from pyarrow.fs import S3FileSystem
7+
from lamp_py.aws.s3 import file_list_from_s3
8+
import polars as pl
9+
10+
from lamp_py.tableau.conversions.convert_bus_performance_data import apply_bus_analysis_conversions
11+
12+
########################################################################
13+
# NOTE: ensure .env PUBLIC_ARCHIVE_BUCKET is pointed to the right bucket
14+
########################################################################
15+
16+
# this schema and the order of this schema SHOULD match what comes out
17+
# of the polars version out of bus_performance_manager.
18+
bus_schema = pyarrow.schema(
19+
[
20+
("service_date", pyarrow.date32()), # change to date type
21+
("route_id", pyarrow.large_string()),
22+
("trip_id", pyarrow.large_string()),
23+
("start_time", pyarrow.int64()),
24+
("start_dt", pyarrow.timestamp("us")),
25+
("stop_count", pyarrow.uint32()),
26+
("direction_id", pyarrow.int8()),
27+
("stop_id", pyarrow.large_string()),
28+
("stop_sequence", pyarrow.int64()),
29+
("vehicle_id", pyarrow.large_string()),
30+
("vehicle_label", pyarrow.large_string()),
31+
("gtfs_travel_to_dt", pyarrow.timestamp("us")),
32+
("tm_stop_sequence", pyarrow.int64()),
33+
("tm_scheduled_time_dt", pyarrow.timestamp("us")),
34+
("tm_actual_arrival_dt", pyarrow.timestamp("us")),
35+
("tm_actual_departure_dt", pyarrow.timestamp("us")),
36+
("tm_scheduled_time_sam", pyarrow.int64()),
37+
("tm_actual_arrival_time_sam", pyarrow.int64()),
38+
("tm_actual_departure_time_sam", pyarrow.int64()),
39+
("plan_trip_id", pyarrow.large_string()),
40+
("exact_plan_trip_match", pyarrow.bool_()),
41+
("block_id", pyarrow.large_string()),
42+
("service_id", pyarrow.large_string()),
43+
("route_pattern_id", pyarrow.large_string()),
44+
("route_pattern_typicality", pyarrow.int64()),
45+
("direction", pyarrow.large_string()),
46+
("direction_destination", pyarrow.large_string()),
47+
("plan_stop_count", pyarrow.uint32()),
48+
("plan_start_time", pyarrow.int64()),
49+
("plan_start_dt", pyarrow.timestamp("us")),
50+
("stop_name", pyarrow.large_string()),
51+
("plan_travel_time_seconds", pyarrow.int64()),
52+
("plan_route_direction_headway_seconds", pyarrow.int64()),
53+
("plan_direction_destination_headway_seconds", pyarrow.int64()),
54+
("stop_arrival_dt", pyarrow.timestamp("us")),
55+
("stop_departure_dt", pyarrow.timestamp("us")),
56+
("gtfs_travel_to_seconds", pyarrow.int64()),
57+
("stop_arrival_seconds", pyarrow.int64()),
58+
("stop_departure_seconds", pyarrow.int64()),
59+
("travel_time_seconds", pyarrow.int64()),
60+
("dwell_time_seconds", pyarrow.int64()),
61+
("route_direction_headway_seconds", pyarrow.int64()),
62+
("direction_destination_headway_seconds", pyarrow.int64()),
63+
]
64+
)
65+
s3_uris = file_list_from_s3(bucket_name=bus_events.bucket, file_prefix=bus_events.prefix)
66+
ds_paths = [s.replace("s3://", "") for s in s3_uris]
67+
68+
ds_paths = ds_paths[-5:]
69+
70+
ds = pd.dataset(
71+
ds_paths,
72+
format="parquet",
73+
filesystem=S3FileSystem(),
74+
)
75+
76+
with pq.ParquetWriter("test.parquet", schema=bus_schema) as writer:
77+
for batch in ds.to_batches(batch_size=500_000):
78+
try:
79+
# this select() is here to make sure the order of the polars_df
80+
# schema is the same as the bus_schema above.
81+
# order of schema matters to the ParquetWriter
82+
83+
# if the bus_schema above is in the same order as the batch
84+
# schema, then the select will do nothing - as expected
85+
polars_df = pl.from_arrow(batch).select(bus_schema.names) # type: ignore[union-attr]
86+
87+
if not isinstance(polars_df, pl.DataFrame):
88+
raise TypeError(f"Expected a Polars DataFrame or Series, but got {type(polars_df)}")
89+
90+
writer.write_table(apply_bus_analysis_conversions(polars_df))
91+
except Exception as exception:
92+
print(exception)
Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,31 @@
1+
import polars as pl
2+
from pyarrow import Table
3+
4+
5+
def apply_bus_analysis_conversions(polars_df: pl.DataFrame) -> Table:
6+
"""
7+
Function to apply final conversions to lamp data before outputting for tableau consumption
8+
"""
9+
# Convert datetime to Eastern Time
10+
polars_df = polars_df.with_columns(
11+
pl.col("stop_arrival_dt").dt.convert_time_zone(time_zone="US/Eastern").dt.replace_time_zone(None),
12+
pl.col("stop_departure_dt").dt.convert_time_zone(time_zone="US/Eastern").dt.replace_time_zone(None),
13+
pl.col("gtfs_travel_to_dt").dt.convert_time_zone(time_zone="US/Eastern").dt.replace_time_zone(None),
14+
)
15+
16+
# Convert seconds columns to be aligned with Eastern Time
17+
polars_df = polars_df.with_columns(
18+
(pl.col("gtfs_travel_to_dt") - pl.col("service_date").str.strptime(pl.Date, "%Y%m%d"))
19+
.dt.total_seconds()
20+
.alias("gtfs_travel_to_seconds"),
21+
(pl.col("stop_arrival_dt") - pl.col("service_date").str.strptime(pl.Date, "%Y%m%d"))
22+
.dt.total_seconds()
23+
.alias("stop_arrival_seconds"),
24+
(pl.col("stop_departure_dt") - pl.col("service_date").str.strptime(pl.Date, "%Y%m%d"))
25+
.dt.total_seconds()
26+
.alias("stop_departure_seconds"),
27+
)
28+
29+
polars_df = polars_df.with_columns(pl.col("service_date").str.strptime(pl.Date, "%Y%m%d", strict=False))
30+
31+
return polars_df.to_arrow()

src/lamp_py/tableau/hyper.py

Lines changed: 12 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -282,29 +282,32 @@ def run_parquet(self, db_manager: Optional[DatabaseManager]) -> None:
282282
)
283283
remote_schema_match = self.parquet_schema.equals(remote_schema)
284284
remote_version_match = self.remote_version_match()
285+
process_log.add_metadata(
286+
stage="check_schema",
287+
remote_schema_match=remote_schema_match,
288+
remote_version_match=remote_version_match,
289+
)
285290

286291
if remote_schema_match is False or remote_version_match is False:
287292
# create new parquet if no remote parquet found or
288293
# remote schema does not match expected local schema
289-
run_action = "create"
290294
upload_parquet = True
295+
process_log.add_metadata(stage="create_parquet")
291296
self.create_parquet(db_manager)
297+
292298
else:
293-
run_action = "update"
299+
process_log.add_metadata(stage="update_parquet")
294300
upload_parquet = self.update_parquet(db_manager)
295301

296302
parquet_file_size_mb = 0.0
297303
if os.path.exists(self.local_parquet_path):
298304
parquet_file_size_mb = os.path.getsize(self.local_parquet_path) / (1024 * 1024)
299305

300-
process_log.add_metadata(
301-
remote_schema_match=remote_schema_match,
302-
run_action=run_action,
303-
upload_parquet=upload_parquet,
304-
parquet_file_size_mb=f"{parquet_file_size_mb:.2f}",
305-
)
306-
307306
if upload_parquet:
307+
process_log.add_metadata(
308+
stage="upload_parquet",
309+
parquet_file_size_mb=f"{parquet_file_size_mb:.2f}",
310+
)
308311
upload_file(
309312
file_name=self.local_parquet_path,
310313
object_path=self.remote_parquet_path,

src/lamp_py/tableau/jobs/bus_performance.py

Lines changed: 20 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,6 @@
11
from typing import Optional
22
from datetime import datetime
33
from datetime import timezone
4-
54
import pyarrow
65
import pyarrow.parquet as pq
76
import pyarrow.dataset as pd
@@ -10,13 +9,18 @@
109
import polars as pl
1110

1211
from lamp_py.tableau.hyper import HyperJob
12+
from lamp_py.tableau.conversions.convert_bus_performance_data import apply_bus_analysis_conversions
13+
1314
from lamp_py.runtime_utils.remote_files import bus_events
1415
from lamp_py.runtime_utils.remote_files import tableau_bus_all
1516
from lamp_py.runtime_utils.remote_files import tableau_bus_recent
1617
from lamp_py.aws.s3 import file_list_from_s3
1718
from lamp_py.aws.s3 import file_list_from_s3_with_details
1819
from lamp_py.aws.s3 import object_exists
1920

21+
# this schema and the order of this schema SHOULD match what comes out
22+
# of the polars version from bus_performance_manager.
23+
# see select() comment below..
2024
bus_schema = pyarrow.schema(
2125
[
2226
("service_date", pyarrow.date32()), # change to date type
@@ -32,6 +36,12 @@
3236
("vehicle_label", pyarrow.large_string()),
3337
("gtfs_travel_to_dt", pyarrow.timestamp("us")),
3438
("tm_stop_sequence", pyarrow.int64()),
39+
("tm_scheduled_time_dt", pyarrow.timestamp("us")),
40+
("tm_actual_arrival_dt", pyarrow.timestamp("us")),
41+
("tm_actual_departure_dt", pyarrow.timestamp("us")),
42+
("tm_scheduled_time_sam", pyarrow.int64()),
43+
("tm_actual_arrival_time_sam", pyarrow.int64()),
44+
("tm_actual_departure_time_sam", pyarrow.int64()),
3545
("plan_trip_id", pyarrow.large_string()),
3646
("exact_plan_trip_match", pyarrow.bool_()),
3747
("block_id", pyarrow.large_string()),
@@ -56,12 +66,6 @@
5666
("dwell_time_seconds", pyarrow.int64()),
5767
("route_direction_headway_seconds", pyarrow.int64()),
5868
("direction_destination_headway_seconds", pyarrow.int64()),
59-
("tm_scheduled_time_dt", pyarrow.timestamp("us")),
60-
("tm_actual_arrival_dt", pyarrow.timestamp("us")),
61-
("tm_actual_departure_dt", pyarrow.timestamp("us")),
62-
("tm_scheduled_time_sam", pyarrow.int64()),
63-
("tm_actual_arrival_time_sam", pyarrow.int64()),
64-
("tm_actual_departure_time_sam", pyarrow.int64()),
6569
]
6670
)
6771

@@ -84,34 +88,19 @@ def create_bus_parquet(job: HyperJob, num_files: Optional[int]) -> None:
8488

8589
with pq.ParquetWriter(job.local_parquet_path, schema=job.parquet_schema) as writer:
8690
for batch in ds.to_batches(batch_size=500_000):
87-
polars_df = pl.from_arrow(batch)
91+
# this select() is here to make sure the order of the polars_df
92+
# schema is the same as the bus_schema above.
93+
# order of schema matters to the ParquetWriter
94+
95+
# if the bus_schema above is in the same order as the batch
96+
# schema, then the select will do nothing - as expected
97+
98+
polars_df = pl.from_arrow(batch).select(bus_schema.names) # type: ignore[union-attr]
8899

89100
if not isinstance(polars_df, pl.DataFrame):
90101
raise TypeError(f"Expected a Polars DataFrame or Series, but got {type(polars_df)}")
91102

92-
# Convert datetime to Eastern Time
93-
polars_df = polars_df.with_columns(
94-
pl.col("stop_arrival_dt").dt.convert_time_zone(time_zone="US/Eastern").dt.replace_time_zone(None),
95-
pl.col("stop_departure_dt").dt.convert_time_zone(time_zone="US/Eastern").dt.replace_time_zone(None),
96-
pl.col("gtfs_travel_to_dt").dt.convert_time_zone(time_zone="US/Eastern").dt.replace_time_zone(None),
97-
)
98-
99-
# Convert seconds columns to be aligned with Eastern Time
100-
polars_df = polars_df.with_columns(
101-
(pl.col("gtfs_travel_to_dt") - pl.col("service_date").str.strptime(pl.Date, "%Y%m%d"))
102-
.dt.total_seconds()
103-
.alias("gtfs_travel_to_seconds"),
104-
(pl.col("stop_arrival_dt") - pl.col("service_date").str.strptime(pl.Date, "%Y%m%d"))
105-
.dt.total_seconds()
106-
.alias("stop_arrival_seconds"),
107-
(pl.col("stop_departure_dt") - pl.col("service_date").str.strptime(pl.Date, "%Y%m%d"))
108-
.dt.total_seconds()
109-
.alias("stop_departure_seconds"),
110-
)
111-
112-
polars_df = polars_df.with_columns(pl.col("service_date").str.strptime(pl.Date, "%Y%m%d", strict=False))
113-
114-
writer.write_table(polars_df.to_arrow())
103+
writer.write_table(apply_bus_analysis_conversions(polars_df))
115104

116105

117106
class HyperBusPerformanceAll(HyperJob):
Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,16 @@
1+
#!/usr/bin/env python
2+
3+
import polars as pl
4+
5+
from lamp_py.tableau.conversions.convert_bus_performance_data import apply_bus_analysis_conversions
6+
7+
8+
# poetry run pytest -s tests/bus_performance_manager/test_bus_convert_for_tableau.py
9+
def test_apply_bus_analysis_conversions() -> None:
10+
"""
11+
Test extracted conversions for tableau user view
12+
"""
13+
df = pl.read_parquet("tests/test_files/PUBLIC_ARCHIVE/lamp/bus_vehicle_events/test_events.parquet")
14+
table = apply_bus_analysis_conversions(polars_df=df)
15+
print(df)
16+
print(table)
Binary file not shown.

0 commit comments

Comments
 (0)