Skip to content

Commit 2543455

Browse files
committed
initial commit
1 parent 94f3f38 commit 2543455

File tree

2 files changed

+11
-5
lines changed

2 files changed

+11
-5
lines changed

sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1534,6 +1534,12 @@ object SQLConf {
15341534
.timeConf(TimeUnit.SECONDS)
15351535
.createWithDefaultString(s"${5 * 60}")
15361536

1537+
val MAX_BROADCAST_TABLE_SIZE = buildConf("spark.sql.maxBroadcastTableSize")
1538+
.doc("The maximum table size that can be broadcast in broadcast joins.")
1539+
.version("4.0.0")
1540+
.bytesConf(ByteUnit.BYTE)
1541+
.createWithDefault(8L << 30)
1542+
15371543
val INTERRUPT_ON_CANCEL = buildConf("spark.sql.execution.interruptOnCancel")
15381544
.doc("When true, all running tasks will be interrupted if one cancels a query.")
15391545
.version("4.0.0")
@@ -6155,6 +6161,8 @@ class SQLConf extends Serializable with Logging with SqlApiConf {
61556161
if (timeoutValue < 0) Long.MaxValue else timeoutValue
61566162
}
61576163

6164+
def maxBroadcastTableSizeInBytes: Long = getConf(MAX_BROADCAST_TABLE_SIZE)
6165+
61586166
def defaultDataSourceName: String = getConf(DEFAULT_DATA_SOURCE_NAME)
61596167

61606168
def convertCTAS: Boolean = getConf(CONVERT_CTAS)

sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/BroadcastExchangeExec.scala

Lines changed: 3 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -125,7 +125,6 @@ trait BroadcastExchangeLike extends Exchange {
125125
case class BroadcastExchangeExec(
126126
mode: BroadcastMode,
127127
child: SparkPlan) extends BroadcastExchangeLike {
128-
import BroadcastExchangeExec._
129128

130129
override lazy val metrics = Map(
131130
"dataSize" -> SQLMetrics.createSizeMetric(sparkContext, "data size"),
@@ -203,9 +202,10 @@ case class BroadcastExchangeExec(
203202
}
204203

205204
longMetric("dataSize") += dataSize
206-
if (dataSize >= MAX_BROADCAST_TABLE_BYTES) {
205+
val maxBroadcastTableSizeInBytes = conf.maxBroadcastTableSizeInBytes
206+
if (dataSize >= maxBroadcastTableSizeInBytes) {
207207
throw QueryExecutionErrors.cannotBroadcastTableOverMaxTableBytesError(
208-
MAX_BROADCAST_TABLE_BYTES, dataSize)
208+
maxBroadcastTableSizeInBytes, dataSize)
209209
}
210210

211211
val beforeBroadcast = System.nanoTime()
@@ -268,8 +268,6 @@ case class BroadcastExchangeExec(
268268
}
269269

270270
object BroadcastExchangeExec {
271-
val MAX_BROADCAST_TABLE_BYTES = 8L << 30
272-
273271
private[execution] val executionContext = ExecutionContext.fromExecutorService(
274272
ThreadUtils.newDaemonCachedThreadPool("broadcast-exchange",
275273
SQLConf.get.getConf(StaticSQLConf.BROADCAST_EXCHANGE_MAX_THREAD_THRESHOLD)))

0 commit comments

Comments
 (0)