Skip to content

Commit ca9c256

Browse files
committed
[SPARK-54300][PYTHON] Optimize Py4J calls in df.toPandas
### What changes were proposed in this pull request? Optimize Py4J config calls in df.toPandas ### Why are the changes needed? In spark connect, we get all configs in a batch; in spark classic, we can do the similar optimization that all configs are fetched in a batch, so that the py4j calls can be minimized. ### Does this PR introduce _any_ user-facing change? no ### How was this patch tested? ci ### Was this patch authored or co-authored using generative AI tooling? no Closes #52994 from zhengruifeng/py4j_conf_topandas. Authored-by: Ruifeng Zheng <[email protected]> Signed-off-by: Ruifeng Zheng <[email protected]>
1 parent c21d5a4 commit ca9c256

File tree

2 files changed

+31
-9
lines changed

2 files changed

+31
-9
lines changed

python/pyspark/sql/pandas/conversion.py

Lines changed: 25 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -71,20 +71,36 @@ def toPandas(self) -> "PandasDataFrameLike":
7171

7272
import pandas as pd
7373

74-
jconf = self.sparkSession._jconf
74+
(
75+
sessionLocalTimeZone,
76+
arrowPySparkEnabled,
77+
arrowUseLargeVarTypes,
78+
arrowPySparkFallbackEnabled,
79+
arrowPySparkSelfDestructEnabled,
80+
pandasStructHandlingMode,
81+
) = self.sparkSession._jconf.getConfs(
82+
[
83+
"spark.sql.session.timeZone",
84+
"spark.sql.execution.arrow.pyspark.enabled",
85+
"spark.sql.execution.arrow.useLargeVarTypes",
86+
"spark.sql.execution.arrow.pyspark.fallback.enabled",
87+
"spark.sql.execution.arrow.pyspark.selfDestruct.enabled",
88+
"spark.sql.execution.pandas.structHandlingMode",
89+
]
90+
)
7591

76-
if jconf.arrowPySparkEnabled():
92+
if arrowPySparkEnabled == "true":
7793
use_arrow = True
7894
try:
7995
from pyspark.sql.pandas.types import to_arrow_schema
8096
from pyspark.sql.pandas.utils import require_minimum_pyarrow_version
8197

8298
require_minimum_pyarrow_version()
8399
arrow_schema = to_arrow_schema(
84-
self.schema, prefers_large_types=jconf.arrowUseLargeVarTypes()
100+
self.schema, prefers_large_types=arrowUseLargeVarTypes == "true"
85101
)
86102
except Exception as e:
87-
if jconf.arrowPySparkFallbackEnabled():
103+
if arrowPySparkFallbackEnabled == "true":
88104
msg = (
89105
"toPandas attempted Arrow optimization because "
90106
"'spark.sql.execution.arrow.pyspark.enabled' is set to true; however, "
@@ -112,7 +128,7 @@ def toPandas(self) -> "PandasDataFrameLike":
112128
try:
113129
import pyarrow as pa
114130

115-
self_destruct = jconf.arrowPySparkSelfDestructEnabled()
131+
self_destruct = arrowPySparkSelfDestructEnabled == "true"
116132
batches = self._collect_as_arrow(split_batches=self_destruct)
117133

118134
# Rename columns to avoid duplicated column names.
@@ -148,8 +164,8 @@ def toPandas(self) -> "PandasDataFrameLike":
148164
)
149165

150166
if len(self.columns) > 0:
151-
timezone = jconf.sessionLocalTimeZone()
152-
struct_in_pandas = jconf.pandasStructHandlingMode()
167+
timezone = sessionLocalTimeZone
168+
struct_in_pandas = pandasStructHandlingMode
153169

154170
error_on_duplicated_field_names = False
155171
if struct_in_pandas == "legacy":
@@ -200,8 +216,8 @@ def toPandas(self) -> "PandasDataFrameLike":
200216
pdf = pd.DataFrame(columns=self.columns)
201217

202218
if len(pdf.columns) > 0:
203-
timezone = jconf.sessionLocalTimeZone()
204-
struct_in_pandas = jconf.pandasStructHandlingMode()
219+
timezone = sessionLocalTimeZone
220+
struct_in_pandas = pandasStructHandlingMode
205221

206222
return pd.concat(
207223
[

sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7886,6 +7886,12 @@ class SQLConf extends Serializable with Logging with SqlApiConf {
78867886
}
78877887
}
78887888

7889+
/** Return the value of Spark SQL configuration property for the given keys. */
7890+
@throws[NoSuchElementException]("if key is not set")
7891+
private[spark] def getConfs(keys: util.List[String]): Array[String] = {
7892+
Array.tabulate(keys.size())(i => this.getConfString(keys.get(i)))
7893+
}
7894+
78897895
/**
78907896
* Return all the configuration properties that have been set (i.e. not the default).
78917897
* This creates a new copy of the config properties in the form of a Map.

0 commit comments

Comments
 (0)