Skip to content

Commit b712700

Browse files
Yicong-Huangzhengruifeng
authored andcommitted
[SPARK-54317][PYTHON][CONNECT] Unify Arrow conversion logic for Classic and Connect toPandas
## What changes were proposed in this pull request? This PR merges the Arrow conversion code paths between Spark Connect and Classic Spark by extracting shared logic into a reusable helper function `_convert_arrow_table_to_pandas`. ## Why are the changes needed? This unifies optimizations from two separate PRs: - **[SPARK-53967]** (Classic): Avoid intermediate pandas DataFrame creation by converting Arrow columns directly to Series - **[SPARK-54183]** (Connect): Same optimization implemented for Spark Connect ## Does this PR introduce any user-facing change? No. This is a pure refactoring with no API or behavior changes. ## How was this patch tested? Ran existing Arrow test suite: `python/pyspark/sql/tests/arrow/test_arrow.py` ## Was this patch authored or co-authored using generative AI tooling? Co-Generated-by Cursor with Claude 4.5 Sonnet Closes #53045 from Yicong-Huang/SPARK-54317/feat/merge-arrow-code-path. Lead-authored-by: Yicong-Huang <[email protected]> Co-authored-by: Yicong Huang <[email protected]> Signed-off-by: Ruifeng Zheng <[email protected]>
1 parent 78d1d52 commit b712700

File tree

2 files changed

+143
-122
lines changed

2 files changed

+143
-122
lines changed

python/pyspark/sql/connect/client/core.py

Lines changed: 14 additions & 70 deletions
Original file line numberDiff line numberDiff line change
@@ -99,8 +99,9 @@
9999
)
100100
from pyspark.sql.connect.observation import Observation
101101
from pyspark.sql.connect.utils import get_python_ver
102-
from pyspark.sql.pandas.types import _create_converter_to_pandas, from_arrow_schema
103-
from pyspark.sql.types import DataType, StructType, _has_type
102+
from pyspark.sql.pandas.types import from_arrow_schema
103+
from pyspark.sql.pandas.conversion import _convert_arrow_table_to_pandas
104+
from pyspark.sql.types import DataType, StructType
104105
from pyspark.util import PythonEvalType
105106
from pyspark.storagelevel import StorageLevel
106107
from pyspark.errors import PySparkValueError, PySparkAssertionError, PySparkNotImplementedError
@@ -987,88 +988,31 @@ def to_pandas(
987988
# Get all related configs in a batch
988989
(
989990
timezone,
990-
struct_in_pandas,
991-
self_destruct,
991+
structHandlingMode,
992+
selfDestruct,
992993
) = self.get_configs(
993994
"spark.sql.session.timeZone",
994995
"spark.sql.execution.pandas.structHandlingMode",
995996
"spark.sql.execution.arrow.pyspark.selfDestruct.enabled",
996997
)
997998

998999
table, schema, metrics, observed_metrics, _ = self._execute_and_fetch(
999-
req, observations, self_destruct == "true"
1000+
req, observations, selfDestruct == "true"
10001001
)
10011002
assert table is not None
10021003
ei = ExecutionInfo(metrics, observed_metrics)
10031004

10041005
schema = schema or from_arrow_schema(table.schema, prefer_timestamp_ntz=True)
10051006
assert schema is not None and isinstance(schema, StructType)
10061007

1007-
# Rename columns to avoid duplicated column names during processing
1008-
temp_col_names = [f"col_{i}" for i in range(len(schema.names))]
1009-
table = table.rename_columns(temp_col_names)
1010-
1011-
# Pandas DataFrame created from PyArrow uses datetime64[ns] for date type
1012-
# values, but we should use datetime.date to match the behavior with when
1013-
# Arrow optimization is disabled.
1014-
pandas_options = {"coerce_temporal_nanoseconds": True}
1015-
if self_destruct == "true" and table.num_rows > 0:
1016-
# Configure PyArrow to use as little memory as possible:
1017-
# self_destruct - free columns as they are converted
1018-
# split_blocks - create a separate Pandas block for each column
1019-
# use_threads - convert one column at a time
1020-
pandas_options.update(
1021-
{
1022-
"self_destruct": True,
1023-
"split_blocks": True,
1024-
"use_threads": False,
1025-
}
1026-
)
1027-
1028-
if len(schema.names) > 0:
1029-
error_on_duplicated_field_names: bool = False
1030-
if struct_in_pandas == "legacy" and any(
1031-
_has_type(f.dataType, StructType) for f in schema.fields
1032-
):
1033-
error_on_duplicated_field_names = True
1034-
struct_in_pandas = "dict"
1035-
1036-
# SPARK-51112: If the table is empty, we avoid using pyarrow to_pandas to create the
1037-
# DataFrame, as it may fail with a segmentation fault.
1038-
if table.num_rows == 0:
1039-
# For empty tables, create empty Series with converters to preserve dtypes
1040-
pdf = pd.concat(
1041-
[
1042-
_create_converter_to_pandas(
1043-
field.dataType,
1044-
field.nullable,
1045-
timezone=timezone,
1046-
struct_in_pandas=struct_in_pandas,
1047-
error_on_duplicated_field_names=error_on_duplicated_field_names,
1048-
)(pd.Series([], name=temp_col_names[i], dtype="object"))
1049-
for i, field in enumerate(schema.fields)
1050-
],
1051-
axis="columns",
1052-
)
1053-
else:
1054-
pdf = pd.concat(
1055-
[
1056-
_create_converter_to_pandas(
1057-
field.dataType,
1058-
field.nullable,
1059-
timezone=timezone,
1060-
struct_in_pandas=struct_in_pandas,
1061-
error_on_duplicated_field_names=error_on_duplicated_field_names,
1062-
)(arrow_col.to_pandas(**pandas_options))
1063-
for arrow_col, field in zip(table.columns, schema.fields)
1064-
],
1065-
axis="columns",
1066-
)
1067-
# Restore original column names (including duplicates)
1068-
pdf.columns = schema.names
1069-
else:
1070-
# empty columns
1071-
pdf = table.to_pandas(**pandas_options)
1008+
pdf = _convert_arrow_table_to_pandas(
1009+
arrow_table=table,
1010+
schema=schema,
1011+
timezone=timezone,
1012+
struct_handling_mode=structHandlingMode,
1013+
date_as_object=False,
1014+
self_destruct=selfDestruct == "true",
1015+
)
10721016

10731017
if len(metrics) > 0:
10741018
pdf.attrs["metrics"] = metrics

python/pyspark/sql/pandas/conversion.py

Lines changed: 129 additions & 52 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,9 @@
2121
Iterator,
2222
List,
2323
Optional,
24+
Sequence,
2425
Union,
26+
cast,
2527
no_type_check,
2628
overload,
2729
TYPE_CHECKING,
@@ -37,6 +39,7 @@
3739
MapType,
3840
TimestampType,
3941
StructType,
42+
_has_type,
4043
DataType,
4144
_create_row,
4245
StringType,
@@ -53,6 +56,119 @@
5356
from pyspark.sql import DataFrame
5457

5558

59+
def _convert_arrow_table_to_pandas(
60+
arrow_table: "pa.Table",
61+
schema: "StructType",
62+
*,
63+
timezone: Optional[str] = None,
64+
struct_handling_mode: Optional[str] = None,
65+
date_as_object: bool = False,
66+
self_destruct: bool = False,
67+
) -> "PandasDataFrameLike":
68+
"""
69+
Helper function to convert Arrow table columns to a pandas DataFrame.
70+
71+
This function applies Spark-specific type converters to Arrow columns and concatenates
72+
them into a pandas DataFrame.
73+
74+
Parameters
75+
----------
76+
arrow_table : pyarrow.Table
77+
The Arrow table to convert
78+
schema : StructType
79+
The schema of the DataFrame
80+
timezone : str or None
81+
The timezone to use for timestamp conversions (can be None if not configured)
82+
struct_handling_mode : str or None
83+
How to handle struct types in pandas ("dict", "row", or "legacy", can be None
84+
if not configured). If "legacy", it will be converted to "dict" and error checking
85+
for duplicated field names will be enabled when StructType fields are present.
86+
date_as_object : bool
87+
Whether to convert date values to Python datetime.date objects (default: False)
88+
self_destruct : bool
89+
Whether to enable memory-efficient self-destruct mode for large tables (default: False)
90+
91+
Returns
92+
-------
93+
pandas.DataFrame
94+
The converted pandas DataFrame
95+
"""
96+
import pandas as pd
97+
from pyspark.sql.pandas.types import _create_converter_to_pandas
98+
99+
# Build pandas options
100+
# Pandas DataFrame created from PyArrow uses datetime64[ns] for date type
101+
# values, but we should use datetime.date to match the behavior with when
102+
# Arrow optimization is disabled.
103+
pandas_options = {"coerce_temporal_nanoseconds": True}
104+
if date_as_object:
105+
pandas_options["date_as_object"] = True
106+
107+
# Handle empty columns case
108+
if len(schema.fields) == 0:
109+
return arrow_table.to_pandas(**pandas_options)
110+
111+
# Rename columns to avoid duplicated column names during processing
112+
temp_col_names = [f"col_{i}" for i in range(len(schema.names))]
113+
arrow_table = arrow_table.rename_columns(temp_col_names)
114+
115+
# Configure self-destruct mode for memory efficiency
116+
if self_destruct and arrow_table.num_rows > 0:
117+
# Configure PyArrow to use as little memory as possible:
118+
# self_destruct - free columns as they are converted
119+
# split_blocks - create a separate Pandas block for each column
120+
# use_threads - convert one column at a time
121+
pandas_options.update(
122+
{
123+
"self_destruct": True,
124+
"split_blocks": True,
125+
"use_threads": False,
126+
}
127+
)
128+
129+
# Handle legacy struct handling mode
130+
error_on_duplicated_field_names = False
131+
if struct_handling_mode == "legacy" and any(
132+
_has_type(f.dataType, StructType) for f in schema.fields
133+
):
134+
error_on_duplicated_field_names = True
135+
struct_handling_mode = "dict"
136+
137+
# SPARK-51112: If the table is empty, we avoid using pyarrow to_pandas to create the
138+
# DataFrame, as it may fail with a segmentation fault.
139+
if arrow_table.num_rows == 0:
140+
# For empty tables, create empty Series to preserve dtypes
141+
column_data = (
142+
pd.Series([], name=temp_col_names[i], dtype="object") for i in range(len(schema.fields))
143+
)
144+
else:
145+
# For non-empty tables, convert arrow columns directly
146+
column_data = (arrow_col.to_pandas(**pandas_options) for arrow_col in arrow_table.columns)
147+
148+
# Apply Spark-specific type converters to each column
149+
pdf = pd.concat(
150+
objs=cast(
151+
Sequence[pd.Series],
152+
(
153+
_create_converter_to_pandas(
154+
field.dataType,
155+
field.nullable,
156+
timezone=timezone,
157+
struct_in_pandas=struct_handling_mode,
158+
error_on_duplicated_field_names=error_on_duplicated_field_names,
159+
)(series)
160+
for series, field in zip(column_data, schema.fields)
161+
),
162+
),
163+
axis="columns",
164+
)
165+
166+
# Restore original column names (including duplicates)
167+
pdf.columns = schema.names
168+
169+
return pdf
170+
171+
56172
class PandasConversionMixin:
57173
"""
58174
Mix-in for the conversion from Spark to pandas and PyArrow. Currently, only
@@ -128,68 +244,29 @@ def toPandas(self) -> "PandasDataFrameLike":
128244
try:
129245
import pyarrow as pa
130246

131-
self_destruct = arrowPySparkSelfDestructEnabled == "true"
132-
batches = self._collect_as_arrow(split_batches=self_destruct)
247+
batches = self._collect_as_arrow(
248+
split_batches=arrowPySparkSelfDestructEnabled == "true"
249+
)
133250

134-
# Rename columns to avoid duplicated column names.
135-
temp_col_names = [f"col_{i}" for i in range(len(self.columns))]
136251
if len(batches) > 0:
137-
table = pa.Table.from_batches(batches).rename_columns(temp_col_names)
252+
table = pa.Table.from_batches(batches)
138253
else:
139254
# empty dataset
140-
table = arrow_schema.empty_table().rename_columns(temp_col_names)
255+
table = arrow_schema.empty_table()
141256

142257
# Ensure only the table has a reference to the batches, so that
143258
# self_destruct (if enabled) is effective
144259
del batches
145260

146-
# Pandas DataFrame created from PyArrow uses datetime64[ns] for date type
147-
# values, but we should use datetime.date to match the behavior with when
148-
# Arrow optimization is disabled.
149-
pandas_options = {
150-
"date_as_object": True,
151-
"coerce_temporal_nanoseconds": True,
152-
}
153-
if self_destruct:
154-
# Configure PyArrow to use as little memory as possible:
155-
# self_destruct - free columns as they are converted
156-
# split_blocks - create a separate Pandas block for each column
157-
# use_threads - convert one column at a time
158-
pandas_options.update(
159-
{
160-
"self_destruct": True,
161-
"split_blocks": True,
162-
"use_threads": False,
163-
}
164-
)
165-
166-
if len(self.columns) > 0:
167-
timezone = sessionLocalTimeZone
168-
struct_in_pandas = pandasStructHandlingMode
169-
170-
error_on_duplicated_field_names = False
171-
if struct_in_pandas == "legacy":
172-
error_on_duplicated_field_names = True
173-
struct_in_pandas = "dict"
174-
175-
pdf = pd.concat(
176-
[
177-
_create_converter_to_pandas(
178-
field.dataType,
179-
field.nullable,
180-
timezone=timezone,
181-
struct_in_pandas=struct_in_pandas,
182-
error_on_duplicated_field_names=error_on_duplicated_field_names,
183-
)(arrow_col.to_pandas(**pandas_options))
184-
for arrow_col, field in zip(table.columns, self.schema.fields)
185-
],
186-
axis="columns",
187-
)
188-
else:
189-
# empty columns
190-
pdf = table.to_pandas(**pandas_options)
261+
pdf = _convert_arrow_table_to_pandas(
262+
arrow_table=table,
263+
schema=self.schema,
264+
timezone=sessionLocalTimeZone,
265+
struct_handling_mode=pandasStructHandlingMode,
266+
date_as_object=True,
267+
self_destruct=arrowPySparkSelfDestructEnabled == "true",
268+
)
191269

192-
pdf.columns = self.columns
193270
return pdf
194271

195272
except Exception as e:

0 commit comments

Comments
 (0)