Skip to content

Commit ab2d6c0

Browse files
committed
skip empty chunks
1 parent bdf52de commit ab2d6c0

File tree

2 files changed

+13
-14
lines changed

2 files changed

+13
-14
lines changed

python/pyspark/sql/pandas/serializers.py

Lines changed: 10 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -2010,23 +2010,20 @@ def row_stream():
20102010
for i, c in enumerate(flatten_state_table.itercolumns())
20112011
]
20122012

2013-
flatten_init_table = flatten_columns(batch, "initState")
2014-
init_data_pandas = [
2015-
self.arrow_to_pandas(c, i)
2016-
for i, c in enumerate(flatten_init_table.itercolumns())
2017-
]
2018-
2019-
# Determine which column has data
2020-
has_input_data = bool(data_pandas)
2021-
2022-
if has_input_data:
2013+
if bool(data_pandas):
20232014
for row in pd.concat(data_pandas, axis=1).itertuples(index=False):
20242015
batch_key = tuple(row[s] for s in self.key_offsets)
20252016
yield (batch_key, row, None)
20262017
else:
2027-
for row in pd.concat(init_data_pandas, axis=1).itertuples(index=False):
2028-
batch_key = tuple(row[s] for s in self.init_key_offsets)
2029-
yield (batch_key, None, row)
2018+
flatten_init_table = flatten_columns(batch, "initState")
2019+
init_data_pandas = [
2020+
self.arrow_to_pandas(c, i)
2021+
for i, c in enumerate(flatten_init_table.itercolumns())
2022+
]
2023+
if bool(init_data_pandas):
2024+
for row in pd.concat(init_data_pandas, axis=1).itertuples(index=False):
2025+
batch_key = tuple(row[s] for s in self.init_key_offsets)
2026+
yield (batch_key, None, row)
20302027

20312028
EMPTY_DATAFRAME = pd.DataFrame()
20322029
for batch_key, group_rows in groupby(row_stream(), key=lambda x: x[0]):

sql/core/src/main/scala/org/apache/spark/sql/execution/python/streaming/TransformWithStateInPySparkPythonRunner.scala

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -197,7 +197,9 @@ class TransformWithStateInPySparkPythonInitialStateRunner(
197197

198198
true
199199
} else {
200-
pandasWriter.finalizeCurrentArrowBatch()
200+
if (pandasWriter.getTotalNumRowsForBatch > 0) {
201+
pandasWriter.finalizeCurrentArrowBatch()
202+
}
201203
super[PythonArrowInput].close()
202204
false
203205
}

0 commit comments

Comments
 (0)