Skip to content

Commit 4da7c3d

Browse files
committed
[SPARK-46905][SQL] Add dedicated class to keep column definition instead of StructField in Create/ReplaceTable command
### What changes were proposed in this pull request? This is a follow-up of apache#44876 to refactor the code and make it cleaner. The idea is to add a dedicated class for column definition instead of `StructField` in Create/ReplaceTable command. This is more flexible and cleaner than adding an additional default column expression in `CreateTable` command. ### Why are the changes needed? Code refactor, and makes it easier to fully eliminate the need of a fake analyzer for default value handling in the future. We should make similar code changes to ALTER TABLE command and v1 CREATE TABLE command. ### Does this PR introduce _any_ user-facing change? no ### How was this patch tested? existing tests ### Was this patch authored or co-authored using generative AI tooling? no Closes apache#44935 from cloud-fan/refactor. Lead-authored-by: Wenchen Fan <cloud0fan@gmail.com> Co-authored-by: Wenchen Fan <wenchen@databricks.com> Signed-off-by: Wenchen Fan <wenchen@databricks.com>
1 parent abd9d27 commit 4da7c3d

File tree

16 files changed

+393
-230
lines changed

16 files changed

+393
-230
lines changed

common/utils/src/main/resources/error/error-classes.json

+5
Original file line numberDiff line numberDiff line change
@@ -1766,6 +1766,11 @@
17661766
"which requires <expectedType> type, but the statement provided a value of incompatible <actualType> type."
17671767
]
17681768
},
1769+
"NOT_CONSTANT" : {
1770+
"message" : [
1771+
"which is not a constant expression whose equivalent value is known at query planning time."
1772+
]
1773+
},
17691774
"SUBQUERY_EXPRESSION" : {
17701775
"message" : [
17711776
"which contains subquery expressions."

docs/sql-error-conditions-invalid-default-value-error-class.md

+4
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,10 @@ This error class has the following derived error classes:
3434

3535
which requires `<expectedType>` type, but the statement provided a value of incompatible `<actualType>` type.
3636

37+
## NOT_CONSTANT
38+
39+
which is not a constant expression whose equivalent value is known at query planning time.
40+
3741
## SUBQUERY_EXPRESSION
3842

3943
which contains subquery expressions.

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala

+3
Original file line numberDiff line numberDiff line change
@@ -302,6 +302,9 @@ trait CheckAnalysis extends PredicateHelper with LookupCatalog with QueryErrorsB
302302
// general unresolved check below to throw a more tailored error message.
303303
new ResolveReferencesInAggregate(catalogManager).checkUnresolvedGroupByAll(operator)
304304

305+
// Early checks for column definitions, to produce better error messages
306+
ColumnDefinition.checkColumnDefinitions(operator)
307+
305308
getAllExpressions(operator).foreach(_.foreachUp {
306309
case a: Attribute if !a.resolved =>
307310
failUnresolvedAttribute(operator, a, "UNRESOLVED_COLUMN")

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala

+39-67
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,6 @@ package org.apache.spark.sql.catalyst.parser
2020
import java.util.Locale
2121
import java.util.concurrent.TimeUnit
2222

23-
import scala.collection.mutable
2423
import scala.collection.mutable.{ArrayBuffer, Set}
2524
import scala.jdk.CollectionConverters._
2625
import scala.util.{Left, Right}
@@ -44,7 +43,7 @@ import org.apache.spark.sql.catalyst.plans.logical._
4443
import org.apache.spark.sql.catalyst.trees.CurrentOrigin
4544
import org.apache.spark.sql.catalyst.trees.TreePattern.PARAMETER
4645
import org.apache.spark.sql.catalyst.types.DataTypeUtils
47-
import org.apache.spark.sql.catalyst.util.{CharVarcharUtils, DateTimeUtils, GeneratedColumn, IntervalUtils, ResolveDefaultColumns}
46+
import org.apache.spark.sql.catalyst.util.{CharVarcharUtils, DateTimeUtils, IntervalUtils}
4847
import org.apache.spark.sql.catalyst.util.DateTimeUtils.{convertSpecialDate, convertSpecialTimestamp, convertSpecialTimestampNTZ, getZoneId, stringToDate, stringToTimestamp, stringToTimestampWithoutTimeZone}
4948
import org.apache.spark.sql.connector.catalog.{CatalogV2Util, SupportsNamespaces, TableCatalog}
5049
import org.apache.spark.sql.connector.catalog.TableChange.ColumnPosition
@@ -3123,24 +3122,26 @@ class AstBuilder extends DataTypeAstBuilder with SQLConfHelper with Logging {
31233122
* Create top level table schema.
31243123
*/
31253124
protected def createSchema(ctx: CreateOrReplaceTableColTypeListContext): StructType = {
3126-
StructType(Option(ctx).toArray.flatMap(visitCreateOrReplaceTableColTypeList))
3125+
val columns = Option(ctx).toArray.flatMap(visitCreateOrReplaceTableColTypeList)
3126+
StructType(columns.map(_.toV1Column))
31273127
}
31283128

31293129
/**
3130-
* Create a [[StructType]] from a number of CREATE TABLE column definitions.
3130+
* Get CREATE TABLE column definitions.
31313131
*/
31323132
override def visitCreateOrReplaceTableColTypeList(
3133-
ctx: CreateOrReplaceTableColTypeListContext): Seq[StructField] = withOrigin(ctx) {
3133+
ctx: CreateOrReplaceTableColTypeListContext): Seq[ColumnDefinition] = withOrigin(ctx) {
31343134
ctx.createOrReplaceTableColType().asScala.map(visitCreateOrReplaceTableColType).toSeq
31353135
}
31363136

31373137
/**
3138-
* Create a top level [[StructField]] from a CREATE TABLE column definition.
3138+
* Get a CREATE TABLE column definition.
31393139
*/
31403140
override def visitCreateOrReplaceTableColType(
3141-
ctx: CreateOrReplaceTableColTypeContext): StructField = withOrigin(ctx) {
3141+
ctx: CreateOrReplaceTableColTypeContext): ColumnDefinition = withOrigin(ctx) {
31423142
import ctx._
31433143

3144+
val name: String = colName.getText
31443145
// Check that no duplicates exist among any CREATE TABLE column options specified.
31453146
var nullable = true
31463147
var defaultExpression: Option[DefaultExpressionContext] = None
@@ -3150,61 +3151,44 @@ class AstBuilder extends DataTypeAstBuilder with SQLConfHelper with Logging {
31503151
if (option.NULL != null) {
31513152
if (!nullable) {
31523153
throw QueryParsingErrors.duplicateTableColumnDescriptor(
3153-
option, colName.getText, "NOT NULL")
3154+
option, name, "NOT NULL")
31543155
}
31553156
nullable = false
31563157
}
31573158
Option(option.defaultExpression()).foreach { expr =>
3159+
if (!conf.getConf(SQLConf.ENABLE_DEFAULT_COLUMNS)) {
3160+
throw QueryParsingErrors.defaultColumnNotEnabledError(ctx)
3161+
}
31583162
if (defaultExpression.isDefined) {
31593163
throw QueryParsingErrors.duplicateTableColumnDescriptor(
3160-
option, colName.getText, "DEFAULT")
3164+
option, name, "DEFAULT")
31613165
}
31623166
defaultExpression = Some(expr)
31633167
}
31643168
Option(option.generationExpression()).foreach { expr =>
31653169
if (generationExpression.isDefined) {
31663170
throw QueryParsingErrors.duplicateTableColumnDescriptor(
3167-
option, colName.getText, "GENERATED ALWAYS AS")
3171+
option, name, "GENERATED ALWAYS AS")
31683172
}
31693173
generationExpression = Some(expr)
31703174
}
31713175
Option(option.commentSpec()).foreach { spec =>
31723176
if (commentSpec.isDefined) {
31733177
throw QueryParsingErrors.duplicateTableColumnDescriptor(
3174-
option, colName.getText, "COMMENT")
3178+
option, name, "COMMENT")
31753179
}
31763180
commentSpec = Some(spec)
31773181
}
31783182
}
31793183

3180-
val builder = new MetadataBuilder
3181-
// Add comment to metadata
3182-
commentSpec.map(visitCommentSpec).foreach {
3183-
builder.putString("comment", _)
3184-
}
3185-
// Add the 'DEFAULT expression' clause in the column definition, if any, to the column metadata.
3186-
defaultExpression.map(visitDefaultExpression).foreach { field =>
3187-
if (conf.getConf(SQLConf.ENABLE_DEFAULT_COLUMNS)) {
3188-
// Add default to metadata
3189-
builder.putString(ResolveDefaultColumns.CURRENT_DEFAULT_COLUMN_METADATA_KEY, field)
3190-
builder.putString(ResolveDefaultColumns.EXISTS_DEFAULT_COLUMN_METADATA_KEY, field)
3191-
} else {
3192-
throw QueryParsingErrors.defaultColumnNotEnabledError(ctx)
3193-
}
3194-
}
3195-
// Add the 'GENERATED ALWAYS AS expression' clause in the column definition, if any, to the
3196-
// column metadata.
3197-
generationExpression.map(visitGenerationExpression).foreach { field =>
3198-
builder.putString(GeneratedColumn.GENERATION_EXPRESSION_METADATA_KEY, field)
3199-
}
3200-
3201-
val name: String = colName.getText
3202-
3203-
StructField(
3184+
ColumnDefinition(
32043185
name = name,
32053186
dataType = typedVisit[DataType](ctx.dataType),
32063187
nullable = nullable,
3207-
metadata = builder.build())
3188+
comment = commentSpec.map(visitCommentSpec),
3189+
defaultValue = defaultExpression.map(visitDefaultExpression),
3190+
generationExpression = generationExpression.map(visitGenerationExpression)
3191+
)
32083192
}
32093193

32103194
/**
@@ -3240,11 +3224,11 @@ class AstBuilder extends DataTypeAstBuilder with SQLConfHelper with Logging {
32403224
}
32413225

32423226
/**
3243-
* Create a default string.
3227+
* Create `DefaultValueExpression` for a column.
32443228
*/
3245-
override def visitDefaultExpression(ctx: DefaultExpressionContext): String =
3229+
override def visitDefaultExpression(ctx: DefaultExpressionContext): DefaultValueExpression =
32463230
withOrigin(ctx) {
3247-
getDefaultExpression(ctx.expression(), "DEFAULT").originalSQL
3231+
getDefaultExpression(ctx.expression(), "DEFAULT")
32483232
}
32493233

32503234
/**
@@ -3410,7 +3394,7 @@ class AstBuilder extends DataTypeAstBuilder with SQLConfHelper with Logging {
34103394
* types like `i INT`, which should be appended to the existing table schema.
34113395
*/
34123396
type TableClauses = (
3413-
Seq[Transform], Seq[StructField], Option[BucketSpec], Map[String, String],
3397+
Seq[Transform], Seq[ColumnDefinition], Option[BucketSpec], Map[String, String],
34143398
OptionList, Option[String], Option[String], Option[SerdeInfo], Option[ClusterBySpec])
34153399

34163400
/**
@@ -3437,12 +3421,15 @@ class AstBuilder extends DataTypeAstBuilder with SQLConfHelper with Logging {
34373421
* Parse a list of transforms or columns.
34383422
*/
34393423
override def visitPartitionFieldList(
3440-
ctx: PartitionFieldListContext): (Seq[Transform], Seq[StructField]) = withOrigin(ctx) {
3424+
ctx: PartitionFieldListContext): (Seq[Transform], Seq[ColumnDefinition]) = withOrigin(ctx) {
34413425
val (transforms, columns) = ctx.fields.asScala.map {
34423426
case transform: PartitionTransformContext =>
34433427
(Some(visitPartitionTransform(transform)), None)
34443428
case field: PartitionColumnContext =>
3445-
(None, Some(visitColType(field.colType)))
3429+
val f = visitColType(field.colType)
3430+
// The parser rule of `visitColType` only supports basic column info with comment.
3431+
val col = ColumnDefinition(f.name, f.dataType, f.nullable, f.getComment())
3432+
(None, Some(col))
34463433
}.unzip
34473434

34483435
(transforms.flatten.toSeq, columns.flatten.toSeq)
@@ -3918,13 +3905,13 @@ class AstBuilder extends DataTypeAstBuilder with SQLConfHelper with Logging {
39183905

39193906
private def partitionExpressions(
39203907
partTransforms: Seq[Transform],
3921-
partCols: Seq[StructField],
3908+
partCols: Seq[ColumnDefinition],
39223909
ctx: ParserRuleContext): Seq[Transform] = {
39233910
if (partTransforms.nonEmpty) {
39243911
if (partCols.nonEmpty) {
39253912
val references = partTransforms.map(_.describe()).mkString(", ")
39263913
val columns = partCols
3927-
.map(field => s"${field.name} ${field.dataType.simpleString}")
3914+
.map(column => s"${column.name} ${column.dataType.simpleString}")
39283915
.mkString(", ")
39293916
operationNotAllowed(
39303917
s"""PARTITION BY: Cannot mix partition expressions and partition columns:
@@ -3998,22 +3985,6 @@ class AstBuilder extends DataTypeAstBuilder with SQLConfHelper with Logging {
39983985
val tableSpec = UnresolvedTableSpec(properties, provider, options, location, comment,
39993986
serdeInfo, external)
40003987

4001-
// Parse column defaults from the table into separate expressions in the CREATE TABLE operator.
4002-
val specifiedDefaults: mutable.Map[Int, Expression] = mutable.Map.empty
4003-
Option(ctx.createOrReplaceTableColTypeList()).foreach {
4004-
_.createOrReplaceTableColType().asScala.zipWithIndex.foreach { case (typeContext, index) =>
4005-
typeContext.colDefinitionOption().asScala.foreach { option =>
4006-
Option(option.defaultExpression()).foreach { defaultExprContext =>
4007-
specifiedDefaults.update(index, expression(defaultExprContext.expression()))
4008-
}
4009-
}
4010-
}
4011-
}
4012-
val defaultValueExpressions: Seq[Option[Expression]] =
4013-
(0 until columns.size).map { index: Int =>
4014-
specifiedDefaults.get(index)
4015-
}
4016-
40173988
Option(ctx.query).map(plan) match {
40183989
case Some(_) if columns.nonEmpty =>
40193990
operationNotAllowed(
@@ -4033,9 +4004,10 @@ class AstBuilder extends DataTypeAstBuilder with SQLConfHelper with Logging {
40334004
case _ =>
40344005
// Note: table schema includes both the table columns list and the partition columns
40354006
// with data type.
4036-
val schema = StructType(columns ++ partCols)
4037-
CreateTable(withIdentClause(identifierContext, UnresolvedIdentifier(_)),
4038-
schema, partitioning, tableSpec, ignoreIfExists = ifNotExists, defaultValueExpressions)
4007+
val allColumns = columns ++ partCols
4008+
CreateTable(
4009+
withIdentClause(identifierContext, UnresolvedIdentifier(_)),
4010+
allColumns, partitioning, tableSpec, ignoreIfExists = ifNotExists)
40394011
}
40404012
}
40414013

@@ -4107,10 +4079,10 @@ class AstBuilder extends DataTypeAstBuilder with SQLConfHelper with Logging {
41074079
case _ =>
41084080
// Note: table schema includes both the table columns list and the partition columns
41094081
// with data type.
4110-
val schema = StructType(columns ++ partCols)
4082+
val allColumns = columns ++ partCols
41114083
ReplaceTable(
41124084
withIdentClause(ctx.replaceTableHeader.identifierReference(), UnresolvedIdentifier(_)),
4113-
schema, partitioning, tableSpec, orCreate = orCreate)
4085+
allColumns, partitioning, tableSpec, orCreate = orCreate)
41144086
}
41154087
}
41164088

@@ -4246,7 +4218,7 @@ class AstBuilder extends DataTypeAstBuilder with SQLConfHelper with Logging {
42464218
// Add the 'DEFAULT expression' clause in the column definition, if any, to the column metadata.
42474219
val defaultExpr = defaultExpression.map(visitDefaultExpression).map { field =>
42484220
if (conf.getConf(SQLConf.ENABLE_DEFAULT_COLUMNS)) {
4249-
field
4221+
field.originalSQL
42504222
} else {
42514223
throw QueryParsingErrors.defaultColumnNotEnabledError(ctx)
42524224
}
@@ -4343,7 +4315,7 @@ class AstBuilder extends DataTypeAstBuilder with SQLConfHelper with Logging {
43434315
}
43444316
val setDefaultExpression: Option[String] =
43454317
if (action.defaultExpression != null) {
4346-
Option(action.defaultExpression()).map(visitDefaultExpression)
4318+
Option(action.defaultExpression()).map(visitDefaultExpression).map(_.originalSQL)
43474319
} else if (action.dropDefault != null) {
43484320
Some("")
43494321
} else {

0 commit comments

Comments
 (0)