@@ -374,7 +374,7 @@ object RuleUtils {
374
374
375
375
val filesToRead = {
376
376
if (useBucketSpec || ! index.hasParquetAsSourceFormat || filesDeleted.nonEmpty ||
377
- relation.partitionSchema.nonEmpty) {
377
+ relation.partitionSchema.nonEmpty || index.usesNestedFields ) {
378
378
// Since the index data is in "parquet" format, we cannot read source files
379
379
// in formats other than "parquet" using one FileScan node as the operator requires
380
380
// files in one homogenous format. To address this, we need to read the appended
@@ -398,9 +398,10 @@ object RuleUtils {
398
398
// In order to handle deleted files, read index data with the lineage column so that
399
399
// we could inject Filter-Not-In conditions on the lineage column to exclude the indexed
400
400
// rows from the deleted files.
401
+ val flatSchema = SchemaUtils .escapeFieldNames(SchemaUtils .flatten(relation.plan.schema))
401
402
val newSchema = StructType (
402
403
index.schema.filter(s =>
403
- relation.plan.schema. contains(s) || (filesDeleted.nonEmpty && s.name.equals(
404
+ flatSchema. contains(s.name ) || (filesDeleted.nonEmpty && s.name.equals(
404
405
IndexConstants .DATA_FILE_NAME_ID ))))
405
406
406
407
def fileIndex : InMemoryFileIndex = {
@@ -421,9 +422,21 @@ object RuleUtils {
421
422
new ParquetFileFormat ,
422
423
Map (IndexConstants .INDEX_RELATION_IDENTIFIER ))(spark, index)
423
424
424
- val updatedOutput = relation.plan.output
425
- .filter(attr => indexFsRelation.schema.fieldNames.contains(attr.name))
426
- .map(_.asInstanceOf [AttributeReference ])
425
+ val updatedOutput =
426
+ if (SchemaUtils .hasNestedFields(SchemaUtils .unescapeFieldNames(flatSchema))) {
427
+ indexFsRelation.schema.flatMap { s =>
428
+ val exprId = getFieldPosition(index, s.name)
429
+ relation.plan.output.find(a => s.name.contains(a.name)).map { a =>
430
+ AttributeReference (s.name, s.dataType, a.nullable, a.metadata)(
431
+ ExprId (exprId),
432
+ a.qualifier)
433
+ }
434
+ }
435
+ } else {
436
+ relation.plan.output
437
+ .filter(attr => indexFsRelation.schema.fieldNames.contains(attr.name))
438
+ .map(_.asInstanceOf [AttributeReference ])
439
+ }
427
440
428
441
if (filesDeleted.isEmpty) {
429
442
relation.createLogicalRelation(indexFsRelation, updatedOutput)
@@ -435,6 +448,12 @@ object RuleUtils {
435
448
val filterForDeleted = Filter (Not (In (lineageAttr, deletedFileIds)), rel)
436
449
Project (updatedOutput, OptimizeIn (filterForDeleted))
437
450
}
451
+ case p : Project if provider.isSupportedProject(p) =>
452
+ transformProject(p, index)
453
+
454
+ case f : Filter if provider.isSupportedFilter(f) =>
455
+ transformFilter(f, index)
456
+
438
457
}
439
458
440
459
if (unhandledAppendedFiles.nonEmpty) {
@@ -508,11 +527,14 @@ object RuleUtils {
508
527
// Set the same output schema with the index plan to merge them using BucketUnion.
509
528
// Include partition columns for data loading.
510
529
val partitionColumns = relation.partitionSchema.map(_.name)
511
- val updatedSchema = StructType (relation.plan.schema.filter(col =>
512
- index.schema.contains(col) || relation.partitionSchema.contains(col)))
530
+ val updatedSchema = StructType (
531
+ relation.plan.schema.filter(col =>
532
+ index.schema.fieldNames.exists(n => n.contains(col.name)) ||
533
+ relation.partitionSchema.contains(col)))
513
534
val updatedOutput = relation.plan.output
514
535
.filter(attr =>
515
- index.schema.fieldNames.contains(attr.name) || partitionColumns.contains(attr.name))
536
+ index.schema.fieldNames.exists(n => n.contains(attr.name)) ||
537
+ partitionColumns.contains(attr.name))
516
538
.map(_.asInstanceOf [AttributeReference ])
517
539
val newRelation = relation.createHadoopFsRelation(
518
540
newLocation,
0 commit comments