Skip to content

Commit 62483e8

Browse files
committed
Fixing spark-sql updates for pk less table
1 parent b8796d0 commit 62483e8

File tree

5 files changed

+69
-49
lines changed

5 files changed

+69
-49
lines changed

hudi-client/hudi-client-common/src/main/java/org/apache/hudi/keygen/KeyGenUtils.java

+3-1
Original file line numberDiff line numberDiff line change
@@ -268,6 +268,8 @@ public static List<String> getRecordKeyFields(TypedProperties props) {
268268
* @return true if record keys need to be auto generated. false otherwise.
269269
*/
270270
public static boolean isAutoGeneratedRecordKeysEnabled(TypedProperties props) {
271-
return !props.containsKey(KeyGeneratorOptions.RECORDKEY_FIELD_NAME.key());
271+
return !props.containsKey(KeyGeneratorOptions.RECORDKEY_FIELD_NAME.key())
272+
|| props.getProperty(KeyGeneratorOptions.RECORDKEY_FIELD_NAME.key()).equals(StringUtils.EMPTY_STRING);
273+
// spark-sql sets record key config to empty string for update, and couple of other statements.
272274
}
273275
}

hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/keygen/factory/HoodieSparkKeyGeneratorFactory.java

+3
Original file line numberDiff line numberDiff line change
@@ -88,6 +88,9 @@ public static KeyGenerator createKeyGenerator(String keyGeneratorClass, TypedPro
8888
//Need to prevent overwriting the keygen for spark sql merge into because we need to extract
8989
//the recordkey from the meta cols if it exists. Sql keygen will use pkless keygen if needed.
9090
&& !props.getBoolean(SPARK_SQL_MERGE_INTO_PREPPED_KEY, false);
91+
if (autoRecordKeyGen) {
92+
props.remove(KeyGeneratorOptions.RECORDKEY_FIELD_NAME.key());
93+
}
9194
KeyGenerator keyGenerator = (KeyGenerator) ReflectionUtils.loadClass(keyGeneratorClass, props);
9295
if (autoRecordKeyGen) {
9396
return new AutoRecordGenWrapperKeyGenerator(props, (BuiltinKeyGenerator) keyGenerator);

hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala

+2-2
Original file line numberDiff line numberDiff line change
@@ -228,8 +228,8 @@ class HoodieSparkSqlWriterInternal {
228228
originKeyGeneratorClassName, paramsWithoutDefaults)
229229

230230
// Validate datasource and tableconfig keygen are the same
231-
validateKeyGeneratorConfig(originKeyGeneratorClassName, tableConfig);
232-
validateTableConfig(sqlContext.sparkSession, optParams, tableConfig, mode == SaveMode.Overwrite);
231+
validateKeyGeneratorConfig(originKeyGeneratorClassName, tableConfig)
232+
validateTableConfig(sqlContext.sparkSession, optParams, tableConfig, mode == SaveMode.Overwrite)
233233

234234
asyncCompactionTriggerFnDefined = streamingWritesParamsOpt.map(_.asyncCompactionTriggerFn.isDefined).orElse(Some(false)).get
235235
asyncClusteringTriggerFnDefined = streamingWritesParamsOpt.map(_.asyncClusteringTriggerFn.isDefined).orElse(Some(false)).get

hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/dml/TestDeleteTable.scala

+12-4
Original file line numberDiff line numberDiff line change
@@ -80,28 +80,35 @@ class TestDeleteTable extends HoodieSparkSqlTestBase {
8080
test("Test Delete Table Without Primary Key") {
8181
withTempDir { tmp =>
8282
Seq("cow", "mor").foreach { tableType =>
83+
Seq (true, false).foreach { isPartitioned =>
8384
val tableName = generateTableName
85+
val partitionedClause = if (isPartitioned) {
86+
"PARTITIONED BY (name)"
87+
} else {
88+
""
89+
}
8490
// create table
8591
spark.sql(
8692
s"""
8793
|create table $tableName (
8894
| id int,
89-
| name string,
9095
| price double,
91-
| ts long
96+
| ts long,
97+
| name string,
9298
|) using hudi
9399
| location '${tmp.getCanonicalPath}/$tableName'
94100
| tblproperties (
95101
| type = '$tableType',
96102
| preCombineField = 'ts'
97103
| )
104+
| $partitionedClause
98105
""".stripMargin)
99106

100107
// test with optimized sql writes enabled.
101108
spark.sql(s"set ${SPARK_SQL_OPTIMIZED_WRITES.key()}=true")
102109

103110
// insert data to table
104-
spark.sql(s"insert into $tableName select 1, 'a1', 10, 1000")
111+
spark.sql(s"insert into $tableName select 1, 10, 1000, 'a1'")
105112
checkAnswer(s"select id, name, price, ts from $tableName")(
106113
Seq(1, "a1", 10.0, 1000)
107114
)
@@ -112,7 +119,7 @@ class TestDeleteTable extends HoodieSparkSqlTestBase {
112119
Seq(0)
113120
)
114121

115-
spark.sql(s"insert into $tableName select 2, 'a2', 10, 1000")
122+
spark.sql(s"insert into $tableName select 2, 10, 1000, 'a2'")
116123
spark.sql(s"delete from $tableName where id = 1")
117124
checkAnswer(s"select id, name, price, ts from $tableName")(
118125
Seq(2, "a2", 10.0, 1000)
@@ -124,6 +131,7 @@ class TestDeleteTable extends HoodieSparkSqlTestBase {
124131
)
125132
}
126133
}
134+
}
127135
}
128136

129137
test("Test Delete Table On Non-PK Condition") {

hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/dml/TestUpdateTable.scala

+49-42
Original file line numberDiff line numberDiff line change
@@ -77,54 +77,61 @@ class TestUpdateTable extends HoodieSparkSqlTestBase {
7777
test("Test Update Table Without Primary Key") {
7878
withRecordType()(withTempDir { tmp =>
7979
Seq("cow", "mor").foreach { tableType =>
80-
val tableName = generateTableName
81-
// create table
82-
spark.sql(
83-
s"""
84-
|create table $tableName (
85-
| id int,
86-
| name string,
87-
| price double,
88-
| ts long
89-
|) using hudi
90-
| location '${tmp.getCanonicalPath}/$tableName'
91-
| tblproperties (
92-
| type = '$tableType',
93-
| preCombineField = 'ts'
94-
| )
95-
""".stripMargin)
96-
97-
// insert data to table
98-
spark.sql(s"insert into $tableName select 1, 'a1', 10, 1000")
99-
checkAnswer(s"select id, name, price, ts from $tableName")(
100-
Seq(1, "a1", 10.0, 1000)
101-
)
80+
Seq(true, false).foreach { isPartitioned =>
81+
val tableName = generateTableName
82+
val partitionedClause = if (isPartitioned) {
83+
"PARTITIONED BY (name)"
84+
} else {
85+
""
86+
}
87+
// create table
88+
spark.sql(
89+
s"""
90+
|create table $tableName (
91+
| id int,
92+
| price double,
93+
| ts long,
94+
| name string
95+
|) using hudi
96+
| location '${tmp.getCanonicalPath}/$tableName'
97+
| tblproperties (
98+
| type = '$tableType',
99+
| preCombineField = 'ts'
100+
| )
101+
| $partitionedClause
102+
""".stripMargin)
102103

103-
// test with optimized sql writes enabled.
104-
spark.sql(s"set ${SPARK_SQL_OPTIMIZED_WRITES.key()}=true")
104+
// insert data to table
105+
spark.sql(s"insert into $tableName select 1,10, 1000, 'a1'")
106+
checkAnswer(s"select id, name, price, ts from $tableName")(
107+
Seq(1, "a1", 10.0, 1000)
108+
)
105109

106-
// update data
107-
spark.sql(s"update $tableName set price = 20 where id = 1")
108-
checkAnswer(s"select id, name, price, ts from $tableName")(
109-
Seq(1, "a1", 20.0, 1000)
110-
)
110+
// test with optimized sql writes enabled.
111+
spark.sql(s"set ${SPARK_SQL_OPTIMIZED_WRITES.key()}=true")
111112

112-
// update data
113-
spark.sql(s"update $tableName set price = price * 2 where id = 1")
114-
checkAnswer(s"select id, name, price, ts from $tableName")(
115-
Seq(1, "a1", 40.0, 1000)
116-
)
113+
// update data
114+
spark.sql(s"update $tableName set price = 20 where id = 1")
115+
checkAnswer(s"select id, name, price, ts from $tableName")(
116+
Seq(1, "a1", 20.0, 1000)
117+
)
117118

118-
// verify default compaction w/ MOR
119-
if (tableType.equals(HoodieTableType.MERGE_ON_READ)) {
120-
spark.sql(s"update $tableName set price = price * 2 where id = 1")
121-
spark.sql(s"update $tableName set price = price * 2 where id = 1")
119+
// update data
122120
spark.sql(s"update $tableName set price = price * 2 where id = 1")
123-
// verify compaction is complete
124-
val metaClient = createMetaClient(spark, tmp.getCanonicalPath + "/" + tableName)
125-
assertEquals(metaClient.getActiveTimeline.getLastCommitMetadataWithValidData.get.getLeft.getAction, "commit")
126-
}
121+
checkAnswer(s"select id, name, price, ts from $tableName")(
122+
Seq(1, "a1", 40.0, 1000)
123+
)
127124

125+
// verify default compaction w/ MOR
126+
if (tableType.equals(HoodieTableType.MERGE_ON_READ)) {
127+
spark.sql(s"update $tableName set price = price * 2 where id = 1")
128+
spark.sql(s"update $tableName set price = price * 2 where id = 1")
129+
spark.sql(s"update $tableName set price = price * 2 where id = 1")
130+
// verify compaction is complete
131+
val metaClient = createMetaClient(spark, tmp.getCanonicalPath + "/" + tableName)
132+
assertEquals(metaClient.getActiveTimeline.getLastCommitMetadataWithValidData.get.getLeft.getAction, "commit")
133+
}
134+
}
128135
}
129136
})
130137
}

0 commit comments

Comments
 (0)