@@ -19,16 +19,15 @@ package org.apache.spark.sql.catalyst.analysis
19
19
20
20
import scala .util .control .NonFatal
21
21
22
+ import org .apache .spark .SparkException
22
23
import org .apache .spark .sql .catalyst .expressions .{Cast , DefaultStringProducingExpression , Expression , Literal , SubqueryExpression }
23
24
import org .apache .spark .sql .catalyst .plans .logical .{AddColumns , AlterColumns , AlterColumnSpec , AlterViewAs , ColumnDefinition , CreateTable , CreateTempView , CreateView , LogicalPlan , QualifiedColType , ReplaceColumns , ReplaceTable , TableSpec , V2CreateTablePlan }
24
25
import org .apache .spark .sql .catalyst .rules .Rule
25
26
import org .apache .spark .sql .catalyst .trees .CurrentOrigin
26
- import org .apache .spark .sql .catalyst .util .CharVarcharUtils . CHAR_VARCHAR_TYPE_STRING_METADATA_KEY
27
- import org .apache .spark .sql .connector .catalog .{CatalogV2Util , SupportsNamespaces , Table , TableCatalog }
27
+ import org .apache .spark .sql .catalyst .util .CharVarcharUtils
28
+ import org .apache .spark .sql .connector .catalog .{SupportsNamespaces , TableCatalog }
28
29
import org .apache .spark .sql .connector .catalog .SupportsNamespaces .PROP_COLLATION
29
- import org .apache .spark .sql .errors .DataTypeErrors .toSQLId
30
- import org .apache .spark .sql .errors .QueryCompilationErrors
31
- import org .apache .spark .sql .types .{CharType , DataType , StringType , StructField , VarcharType }
30
+ import org .apache .spark .sql .types .{CharType , DataType , StringType , VarcharType }
32
31
33
32
/**
34
33
* Resolves string types in logical plans by assigning them the appropriate collation. The
@@ -180,9 +179,9 @@ object ApplyDefaultCollationToStringType extends Rule[LogicalPlan] {
180
179
case replaceCols@ ReplaceColumns (_ : ResolvedTable , _) =>
181
180
replaceCols.copy(columnsToAdd = replaceColumnTypes(replaceCols.columnsToAdd, newType))
182
181
183
- case a @ AlterColumns (ResolvedTable (_, _, table : Table , _), specs : Seq [AlterColumnSpec ]) =>
182
+ case a @ AlterColumns (ResolvedTable (_, _, _ , _), specs : Seq [AlterColumnSpec ]) =>
184
183
val newSpecs = specs.map {
185
- case spec if shouldApplyDefaultCollationToAlterColumn(spec, table ) =>
184
+ case spec if shouldApplyDefaultCollationToAlterColumn(spec) =>
186
185
spec.copy(newDataType = Some (replaceDefaultStringType(spec.newDataType.get, newType)))
187
186
case col => col
188
187
}
@@ -203,9 +202,9 @@ object ApplyDefaultCollationToStringType extends Rule[LogicalPlan] {
203
202
private def pruneRedundantAlterColumnTypes (plan : LogicalPlan ): LogicalPlan = {
204
203
plan match {
205
204
case alterColumns@ AlterColumns (
206
- ResolvedTable (_, _, table : Table , _), specs : Seq [AlterColumnSpec ]) =>
205
+ ResolvedTable (_, _, _ , _), specs : Seq [AlterColumnSpec ]) =>
207
206
val resolvedSpecs = specs.map { spec =>
208
- if (spec.newDataType.isDefined && isStringTypeColumn(spec.column, table ) &&
207
+ if (spec.newDataType.isDefined && isStringTypeColumn(spec.column) &&
209
208
isDefaultStringType(spec.newDataType.get)) {
210
209
spec.copy(newDataType = None )
211
210
} else {
@@ -223,35 +222,28 @@ object ApplyDefaultCollationToStringType extends Rule[LogicalPlan] {
223
222
}
224
223
225
224
private def shouldApplyDefaultCollationToAlterColumn (
226
- alterColumnSpec : AlterColumnSpec , table : Table ): Boolean = {
225
+ alterColumnSpec : AlterColumnSpec ): Boolean = {
227
226
alterColumnSpec.newDataType.isDefined &&
228
227
// Applies the default collation only if the original column's type is not StringType.
229
- ! isStringTypeColumn(alterColumnSpec.column, table ) &&
228
+ ! isStringTypeColumn(alterColumnSpec.column) &&
230
229
hasDefaultStringType(alterColumnSpec.newDataType.get)
231
230
}
232
231
233
232
/**
234
- * Checks whether the column's [[DataType ]] is [[StringType ]] in the given table. Throws an error
235
- * if the column is not found.
233
+ * Checks whether the field's [[DataType ]] is [[StringType ]]..
236
234
*/
237
- private def isStringTypeColumn (fieldName : FieldName , table : Table ): Boolean = {
238
- CatalogV2Util .v2ColumnsToStructType(table.columns())
239
- .findNestedField(fieldName.name, includeCollections = true , resolver = conf.resolver)
240
- .map {
241
- case (_, StructField (_, _ : CharType , _, _)) =>
242
- false
243
- case (_, StructField (_, _ : VarcharType , _, _)) =>
244
- false
245
- case (_, StructField (_, _ : StringType , _, metadata))
246
- if ! metadata.contains(CHAR_VARCHAR_TYPE_STRING_METADATA_KEY ) =>
247
- true
248
- case (_, _) =>
249
- false
250
- }
251
- .getOrElse {
252
- throw QueryCompilationErrors .unresolvedColumnError(
253
- toSQLId(fieldName.name), table.columns().map(_.name))
254
- }
235
+ private def isStringTypeColumn (fieldName : FieldName ): Boolean = {
236
+ fieldName match {
237
+ case ResolvedFieldName (_, field) =>
238
+ CharVarcharUtils .getRawType(field.metadata).getOrElse(field.dataType) match {
239
+ case _ : CharType => false
240
+ case _ : VarcharType => false
241
+ case _ : StringType => true
242
+ case _ => false
243
+ }
244
+ case UnresolvedFieldName (name) =>
245
+ throw SparkException .internalError(s " Unexpected UnresolvedFieldName: $name" )
246
+ }
255
247
}
256
248
257
249
/**
0 commit comments