17
17
package com .microsoft .hyperspace .index .rules
18
18
19
19
import org .apache .spark .internal .Logging
20
- import org .apache .spark .sql .catalyst .analysis .CleanupAliases
21
- import org .apache .spark .sql .catalyst .expressions .{ AttributeReference , Expression }
20
+ import org .apache .spark .sql .catalyst .analysis .{ CleanupAliases , Resolver , UnresolvedAttribute }
21
+ import org .apache .spark .sql .catalyst .expressions .Expression
22
22
import org .apache .spark .sql .catalyst .plans .logical .{Filter , LeafNode , LogicalPlan , Project }
23
23
import org .apache .spark .sql .catalyst .rules .Rule
24
24
25
25
import com .microsoft .hyperspace .{ActiveSparkSession , Hyperspace }
26
26
import com .microsoft .hyperspace .actions .Constants
27
27
import com .microsoft .hyperspace .index .IndexLogEntry
28
28
import com .microsoft .hyperspace .index .rankers .FilterIndexRanker
29
+ import com .microsoft .hyperspace .index .rules .PlanUtils ._
29
30
import com .microsoft .hyperspace .index .sources .FileBasedRelation
30
31
import com .microsoft .hyperspace .telemetry .{AppInfo , HyperspaceEventLogging , HyperspaceIndexUsageEvent }
31
- import com .microsoft .hyperspace .util .{HyperspaceConf , ResolverUtils }
32
+ import com .microsoft .hyperspace .util .{HyperspaceConf , ResolverUtils , SchemaUtils }
32
33
33
34
/**
34
35
* FilterIndex rule looks for opportunities in a logical plan to replace
@@ -53,7 +54,7 @@ object FilterIndexRule
53
54
case ExtractFilterNode (originalPlan, filter, outputColumns, filterColumns) =>
54
55
try {
55
56
val candidateIndexes =
56
- findCoveringIndexes(filter, outputColumns, filterColumns)
57
+ findCoveringIndexes(filter, outputColumns, filterColumns, plan )
57
58
FilterIndexRanker .rank(spark, filter, candidateIndexes) match {
58
59
case Some (index) =>
59
60
// As FilterIndexRule is not intended to support bucketed scan, we set
@@ -99,7 +100,8 @@ object FilterIndexRule
99
100
private def findCoveringIndexes (
100
101
filter : Filter ,
101
102
outputColumns : Seq [String ],
102
- filterColumns : Seq [String ]): Seq [IndexLogEntry ] = {
103
+ filterColumns : Seq [String ],
104
+ plan : LogicalPlan ): Seq [IndexLogEntry ] = {
103
105
RuleUtils .getRelation(spark, filter) match {
104
106
case Some (r) =>
105
107
val indexManager = Hyperspace
@@ -111,20 +113,35 @@ object FilterIndexRule
111
113
// See https://github.com/microsoft/hyperspace/issues/65
112
114
val allIndexes = indexManager.getIndexes(Seq (Constants .States .ACTIVE ))
113
115
114
- val candidateIndexes = allIndexes.filter { index =>
115
- indexCoversPlan(
116
- outputColumns,
117
- filterColumns,
118
- index.indexedColumns,
119
- index.includedColumns)
116
+ def resolveWithChildren (fieldName : String , plan : LogicalPlan , resolver : Resolver ) = {
117
+ plan.resolveChildren(UnresolvedAttribute .parseAttributeName(fieldName), resolver)
120
118
}
121
119
122
- // Get candidate via file-level metadata validation. This is performed after pruning
123
- // by column schema, as this might be expensive when there are numerous files in the
124
- // relation or many indexes to be checked.
125
- RuleUtils .getCandidateIndexes(spark, candidateIndexes, r)
126
-
127
- case None => Nil // There is zero or more than one supported relations in Filter's sub-plan.
120
+ // Resolve output columns with default resolver method
121
+ val resolvedOutputColumnsOpt =
122
+ ResolverUtils .resolve(spark, outputColumns, plan, resolveWithChildren, force = false )
123
+ // Resolve
124
+ val resolvedFilterColumnsOpt =
125
+ ResolverUtils .resolve(spark, filterColumns, plan, resolveWithChildren, force = false )
126
+
127
+ (resolvedOutputColumnsOpt, resolvedFilterColumnsOpt) match {
128
+ case (Some (resolvedOutputColumns), Some (resolvedFilterColumns)) =>
129
+ val candidateIndexes = allIndexes.filter { index =>
130
+ indexCoversPlan(
131
+ SchemaUtils .prefixNestedFieldNames(resolvedOutputColumns),
132
+ SchemaUtils .prefixNestedFieldNames(resolvedFilterColumns),
133
+ index.indexedColumns,
134
+ index.includedColumns)
135
+ }
136
+
137
+ // Get candidate via file-level metadata validation. This is performed after pruning
138
+ // by column schema, as this might be expensive when there are numerous files in the
139
+ // relation or many indexes to be checked.
140
+ RuleUtils .getCandidateIndexes(spark, candidateIndexes, r)
141
+
142
+ case _ => Nil
143
+ }
144
+ case _ => Nil // There is zero or more than one supported relations in Filter's sub-plan.
128
145
}
129
146
}
130
147
@@ -136,7 +153,6 @@ object FilterIndexRule
136
153
* @param filterColumns List of columns in filter predicate.
137
154
* @param indexedColumns List of indexed columns (e.g. from an index being checked)
138
155
* @param includedColumns List of included columns (e.g. from an index being checked)
139
- * @param fileFormat FileFormat for input relation in original logical plan.
140
156
* @return 'true' if
141
157
* 1. Index fully covers output and filter columns, and
142
158
* 2. Filter predicate contains first column in index's 'indexed' columns.
@@ -168,9 +184,17 @@ object ExtractFilterNode {
168
184
val projectColumnNames = CleanupAliases (project)
169
185
.asInstanceOf [Project ]
170
186
.projectList
171
- .map(_.references.map(_. asInstanceOf [ AttributeReference ].name) )
187
+ .map(extractNamesFromExpression )
172
188
.flatMap(_.toSeq)
173
- val filterColumnNames = condition.references.map(_.name).toSeq
189
+ val filterColumnNames = extractNamesFromExpression(condition).toSeq
190
+ .sortBy(- _.length)
191
+ .foldLeft(Seq .empty[String ]) { (acc, e) =>
192
+ if (! acc.exists(i => i.startsWith(e))) {
193
+ acc :+ e
194
+ } else {
195
+ acc
196
+ }
197
+ }
174
198
175
199
Some (project, filter, projectColumnNames, filterColumnNames)
176
200
0 commit comments