Skip to content
This repository was archived by the owner on Jun 14, 2024. It is now read-only.

Commit 30cc3ef

Browse files
Support filter with indexes on nested fields
1 parent cbfd4f6 commit 30cc3ef

File tree

4 files changed

+298
-14
lines changed

4 files changed

+298
-14
lines changed

src/main/scala/com/microsoft/hyperspace/index/rules/FilterIndexRule.scala

+15-7
Original file line numberDiff line numberDiff line change
@@ -18,17 +18,18 @@ package com.microsoft.hyperspace.index.rules
1818

1919
import org.apache.spark.internal.Logging
2020
import org.apache.spark.sql.catalyst.analysis.CleanupAliases
21-
import org.apache.spark.sql.catalyst.expressions.{AttributeReference, Expression}
21+
import org.apache.spark.sql.catalyst.expressions.Expression
2222
import org.apache.spark.sql.catalyst.plans.logical.{Filter, LeafNode, LogicalPlan, Project}
2323
import org.apache.spark.sql.catalyst.rules.Rule
2424

2525
import com.microsoft.hyperspace.{ActiveSparkSession, Hyperspace}
2626
import com.microsoft.hyperspace.actions.Constants
2727
import com.microsoft.hyperspace.index.IndexLogEntry
2828
import com.microsoft.hyperspace.index.rankers.FilterIndexRanker
29+
import com.microsoft.hyperspace.index.rules.PlanUtils._
2930
import com.microsoft.hyperspace.index.sources.FileBasedRelation
3031
import com.microsoft.hyperspace.telemetry.{AppInfo, HyperspaceEventLogging, HyperspaceIndexUsageEvent}
31-
import com.microsoft.hyperspace.util.{HyperspaceConf, ResolverUtils}
32+
import com.microsoft.hyperspace.util.{HyperspaceConf, ResolverUtils, SchemaUtils}
3233

3334
/**
3435
* FilterIndex rule looks for opportunities in a logical plan to replace
@@ -113,8 +114,8 @@ object FilterIndexRule
113114

114115
val candidateIndexes = allIndexes.filter { index =>
115116
indexCoversPlan(
116-
outputColumns,
117-
filterColumns,
117+
SchemaUtils.prefixNestedFieldNames(outputColumns),
118+
SchemaUtils.prefixNestedFieldNames(filterColumns),
118119
index.indexedColumns,
119120
index.includedColumns)
120121
}
@@ -136,7 +137,6 @@ object FilterIndexRule
136137
* @param filterColumns List of columns in filter predicate.
137138
* @param indexedColumns List of indexed columns (e.g. from an index being checked)
138139
* @param includedColumns List of included columns (e.g. from an index being checked)
139-
* @param fileFormat FileFormat for input relation in original logical plan.
140140
* @return 'true' if
141141
* 1. Index fully covers output and filter columns, and
142142
* 2. Filter predicate contains first column in index's 'indexed' columns.
@@ -168,9 +168,17 @@ object ExtractFilterNode {
168168
val projectColumnNames = CleanupAliases(project)
169169
.asInstanceOf[Project]
170170
.projectList
171-
.map(_.references.map(_.asInstanceOf[AttributeReference].name))
171+
.map(extractNamesFromExpression)
172172
.flatMap(_.toSeq)
173-
val filterColumnNames = condition.references.map(_.name).toSeq
173+
val filterColumnNames = extractNamesFromExpression(condition).toSeq
174+
.sortBy(-_.length)
175+
.foldLeft(Seq.empty[String]) { (acc, e) =>
176+
if (!acc.exists(i => i.startsWith(e))) {
177+
acc :+ e
178+
} else {
179+
acc
180+
}
181+
}
174182

175183
Some(project, filter, projectColumnNames, filterColumnNames)
176184

Original file line numberDiff line numberDiff line change
@@ -0,0 +1,165 @@
1+
/*
2+
* Copyright (2020) The Hyperspace Project Authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package com.microsoft.hyperspace.index.rules
18+
19+
import scala.util.Try
20+
21+
import org.apache.spark.sql.catalyst.expressions.{AttributeReference, Expression, GetStructField}
22+
import org.apache.spark.sql.types.{DataType, StructType}
23+
24+
import com.microsoft.hyperspace.util.SchemaUtils
25+
26+
object PlanUtils {
27+
28+
/**
29+
* The method extract field names from a Spark Catalyst [[Expression]].
30+
*
31+
* @param exp The Spark Catalyst expression from which to extract names.
32+
* @return A set of distinct field names.
33+
*/
34+
def extractNamesFromExpression(exp: Expression): Set[String] = {
35+
exp match {
36+
case AttributeReference(name, _, _, _) =>
37+
Set(s"$name")
38+
case otherExp =>
39+
otherExp.containsChild.map {
40+
case g: GetStructField =>
41+
s"${getChildNameFromStruct(g)}"
42+
case e: Expression =>
43+
extractNamesFromExpression(e).filter(_.nonEmpty).mkString(".")
44+
case _ => ""
45+
}
46+
}
47+
}
48+
49+
/**
50+
* Given a [[GetStructField]] expression for a nested field (aka a struct)
51+
* the method will extract the full field `.` (dot) separated name.
52+
*
53+
* @param field The [[GetStructField]] field from which we want to extract
54+
* the name.
55+
* @return A field name `.` (dot) separated if nested.
56+
*/
57+
def getChildNameFromStruct(field: GetStructField): String = {
58+
field.child match {
59+
case f: GetStructField =>
60+
s"${getChildNameFromStruct(f)}.${field.name.get}"
61+
case a: AttributeReference =>
62+
s"${a.name}.${field.name.get}"
63+
case _ =>
64+
s"${field.name.get}"
65+
}
66+
}
67+
68+
/**
69+
* Given an Spark Catalyst [[Expression]] and a field name the method extracts
70+
* the parent search expression and the expression that contains the field name
71+
* @param exp The Spark Catalyst [[Expression]] to extract from.
72+
* @param name The field name to search for.
73+
* @return A tuple with the parent expression and the leaf expression that
74+
* contains the given name.
75+
*/
76+
def extractSearchQuery(exp: Expression, name: String): (Expression, Expression) = {
77+
val splits = name.split(".")
78+
val expFound = exp.find {
79+
case a: AttributeReference if splits.forall(s => a.name.contains(s)) => true
80+
case f: GetStructField if splits.forall(s => f.toString().contains(s)) => true
81+
case _ => false
82+
}.get
83+
val parent = exp.find {
84+
case e: Expression if e.containsChild.contains(expFound) => true
85+
case _ => false
86+
}.get
87+
(parent, expFound)
88+
}
89+
90+
/**
91+
* Given an Spark Catalyst [[Expression]], a needle [[Expression]] and a replace
92+
* [[Expression]] the method will replace the needle with the replacement into
93+
* the parent expression.
94+
*
95+
* @param parent The parent Spark Catalyst [[Expression]] into which to replace.
96+
* @param needle The Spark Catalyst [[Expression]] needle to search for.
97+
* @param repl The replacement Spark Catalyst [[Expression]].
98+
* @return A new Spark Catalyst [[Expression]].
99+
*/
100+
def replaceInSearchQuery(
101+
parent: Expression,
102+
needle: Expression,
103+
repl: Expression): Expression = {
104+
parent.mapChildren { c =>
105+
if (c == needle) {
106+
repl
107+
} else {
108+
c
109+
}
110+
}
111+
}
112+
113+
/**
114+
* Given an Spark Catalyst [[Expression]] and a field name the method
115+
* extracts the [[AttributeReference]] for that field name.
116+
*
117+
* @param exp The Spark Catalyst [[Expression]] to extract from.
118+
* @param name The field name for which to extract the attribute reference.
119+
* @return A Spark Catalyst [[AttributeReference]] pointing to the field name.
120+
*/
121+
def extractAttributeRef(exp: Expression, name: String): AttributeReference = {
122+
val splits = name.split(".")
123+
val elem = exp.find {
124+
case a: AttributeReference if splits.contains(a.name) => true
125+
case _ => false
126+
}
127+
elem.get.asInstanceOf[AttributeReference]
128+
}
129+
130+
/**
131+
* Given a Spark Catalyst [[Expression]] and a field name the method
132+
* extracts the type of the field as a Spark SQL [[DataType]].
133+
*
134+
* @param exp The Spark Catalyst [[Expression]] from which to extract the type.
135+
* @param name The field name for which we need to get the type.
136+
* @return A Spark SQL [[DataType]] of the given field name.
137+
*/
138+
def extractTypeFromExpression(exp: Expression, name: String): DataType = {
139+
val splits = name.split(".")
140+
val elem = exp.flatMap {
141+
case a: AttributeReference =>
142+
if (splits.forall(s => a.name == s)) {
143+
Some((name, a.dataType))
144+
} else {
145+
Try({
146+
val h :: t = splits.toList
147+
if (a.name == h && a.dataType.isInstanceOf[StructType]) {
148+
val currentDataType = a.dataType.asInstanceOf[StructType]
149+
val foldedFields = t.foldLeft(Seq.empty[(String, DataType)]) { (acc, i) =>
150+
val idx = currentDataType.indexWhere(_.name.equalsIgnoreCase(i))
151+
acc :+ (i, currentDataType(idx).dataType)
152+
}
153+
Some(foldedFields.last)
154+
} else {
155+
None
156+
}
157+
}).getOrElse(None)
158+
}
159+
case f: GetStructField if splits.forall(s => f.toString().contains(s)) =>
160+
Some((name, f.dataType))
161+
case _ => None
162+
}
163+
elem.find(e => e._1 == name || e._1 == splits.last).get._2
164+
}
165+
}

src/main/scala/com/microsoft/hyperspace/index/rules/RuleUtils.scala

+79-5
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@ import scala.collection.mutable
2121
import org.apache.hadoop.fs.Path
2222
import org.apache.spark.sql.SparkSession
2323
import org.apache.spark.sql.catalyst.catalog.BucketSpec
24-
import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeReference, In, Literal, Not}
24+
import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeReference, Expression, ExprId, GetStructField, In, Literal, Not}
2525
import org.apache.spark.sql.catalyst.optimizer.OptimizeIn
2626
import org.apache.spark.sql.catalyst.plans.logical._
2727
import org.apache.spark.sql.execution.datasources._
@@ -32,8 +32,9 @@ import com.microsoft.hyperspace.Hyperspace
3232
import com.microsoft.hyperspace.index._
3333
import com.microsoft.hyperspace.index.IndexLogEntryTags.{HYBRIDSCAN_RELATED_CONFIGS, IS_HYBRIDSCAN_CANDIDATE}
3434
import com.microsoft.hyperspace.index.plans.logical.{BucketUnion, IndexHadoopFsRelation}
35+
import com.microsoft.hyperspace.index.rules.PlanUtils._
3536
import com.microsoft.hyperspace.index.sources.FileBasedRelation
36-
import com.microsoft.hyperspace.util.HyperspaceConf
37+
import com.microsoft.hyperspace.util.{HyperspaceConf, ResolverUtils, SchemaUtils}
3738

3839
object RuleUtils {
3940

@@ -286,10 +287,31 @@ object RuleUtils {
286287
new ParquetFileFormat,
287288
Map(IndexConstants.INDEX_RELATION_IDENTIFIER))(spark, index)
288289

289-
val updatedOutput = relation.plan.output
290-
.filter(attr => indexFsRelation.schema.fieldNames.contains(attr.name))
291-
.map(_.asInstanceOf[AttributeReference])
290+
val flatSchema =
291+
ResolverUtils.resolve(spark, index.indexedColumns ++ index.includedColumns, relation.plan)
292+
// SchemaUtils.escapeFieldNames(SchemaUtils.flatten(relation.plan.schema))
293+
val updatedOutput =
294+
if (flatSchema.isDefined && SchemaUtils.containsNestedFieldNames(flatSchema.get)) {
295+
indexFsRelation.schema.flatMap { s =>
296+
val exprId = getFieldPosition(index, s.name)
297+
relation.plan.output.find(a => s.name.contains(a.name)).map { a =>
298+
AttributeReference(s.name, s.dataType, a.nullable, a.metadata)(
299+
ExprId(exprId),
300+
a.qualifier)
301+
}
302+
}
303+
} else {
304+
relation.plan.output
305+
.filter(attr => indexFsRelation.schema.fieldNames.contains(attr.name))
306+
.map(_.asInstanceOf[AttributeReference])
307+
}
292308
relation.createLogicalRelation(indexFsRelation, updatedOutput)
309+
310+
case p: Project if provider.isSupportedProject(p) =>
311+
transformProject(p, index)
312+
313+
case f: Filter if provider.isSupportedFilter(f) =>
314+
transformFilter(f, index)
293315
}
294316
}
295317

@@ -576,4 +598,56 @@ object RuleUtils {
576598
assert(shuffleInjected)
577599
shuffled
578600
}
601+
602+
private def transformProject(project: Project, index: IndexLogEntry): Project = {
603+
val projectedFields = project.projectList.map { exp =>
604+
val fieldName = extractNamesFromExpression(exp).head
605+
val escapedFieldName = SchemaUtils.prefixNestedFieldName(fieldName)
606+
val attr = extractAttributeRef(exp, fieldName)
607+
val fieldType = extractTypeFromExpression(exp, fieldName)
608+
val exprId = getFieldPosition(index, escapedFieldName)
609+
attr.copy(escapedFieldName, fieldType, attr.nullable, attr.metadata)(
610+
ExprId(exprId),
611+
attr.qualifier)
612+
}
613+
project.copy(projectList = projectedFields)
614+
}
615+
616+
private def transformFilter(filter: Filter, index: IndexLogEntry): Filter = {
617+
val fieldNames = extractNamesFromExpression(filter.condition)
618+
var mutableFilter = filter
619+
fieldNames.foreach { fieldName =>
620+
val escapedFieldName = SchemaUtils.prefixNestedFieldName(fieldName)
621+
val nestedFields = getNestedFields(index)
622+
if (nestedFields.nonEmpty &&
623+
nestedFields.exists(i => i.equalsIgnoreCase(escapedFieldName))) {
624+
val (parentExpresion, exp) =
625+
extractSearchQuery(filter.condition, fieldName)
626+
val fieldType = extractTypeFromExpression(exp, fieldName)
627+
val attr = extractAttributeRef(exp, fieldName)
628+
val exprId = getFieldPosition(index, escapedFieldName)
629+
val newAttr = attr.copy(escapedFieldName, fieldType, attr.nullable, attr.metadata)(
630+
ExprId(exprId),
631+
attr.qualifier)
632+
val newExp = exp match {
633+
case _: GetStructField => newAttr
634+
case other: Expression => other
635+
}
636+
val newParentExpression =
637+
replaceInSearchQuery(parentExpresion, exp, newExp)
638+
mutableFilter = filter.copy(condition = newParentExpression)
639+
} else {
640+
filter
641+
}
642+
}
643+
mutableFilter
644+
}
645+
646+
private def getNestedFields(index: IndexLogEntry): Seq[String] = {
647+
index.schema.fieldNames.filter(_.startsWith(SchemaUtils.NESTED_FIELD_PREFIX))
648+
}
649+
650+
private def getFieldPosition(index: IndexLogEntry, fieldName: String): Int = {
651+
index.schema.fieldNames.indexWhere(_.equalsIgnoreCase(fieldName))
652+
}
579653
}

src/main/scala/com/microsoft/hyperspace/index/sources/FileBasedSourceProviderManager.scala

+39-2
Original file line numberDiff line numberDiff line change
@@ -19,12 +19,14 @@ package com.microsoft.hyperspace.index.sources
1919
import scala.util.{Success, Try}
2020

2121
import org.apache.spark.sql.SparkSession
22-
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
22+
import org.apache.spark.sql.catalyst.plans.logical.{Filter, LogicalPlan, Project}
2323
import org.apache.spark.util.hyperspace.Utils
2424

2525
import com.microsoft.hyperspace.HyperspaceException
2626
import com.microsoft.hyperspace.index.Relation
27-
import com.microsoft.hyperspace.util.{CacheWithTransform, HyperspaceConf}
27+
import com.microsoft.hyperspace.index.rules.PlanUtils._
28+
import com.microsoft.hyperspace.util.{CacheWithTransform, HyperspaceConf, SchemaUtils}
29+
2830

2931
/**
3032
* [[FileBasedSourceProviderManager]] is responsible for loading source providers which implements
@@ -107,6 +109,41 @@ class FileBasedSourceProviderManager(spark: SparkSession) {
107109
}
108110
}
109111

112+
/**
113+
* Returns true if the given project is a supported project. If all of the registered
114+
* providers return None, this returns false.
115+
*
116+
* @param project Project to check if it's supported.
117+
* @return True if the given project is a supported relation.
118+
*/
119+
def isSupportedProject(project: Project): Boolean = {
120+
val containsNestedFields = SchemaUtils.containsNestedFieldNames(
121+
project.projectList.flatMap(extractNamesFromExpression))
122+
var containsNestedChildren = false
123+
project.child.foreach {
124+
case f: Filter =>
125+
containsNestedChildren = containsNestedChildren || {
126+
SchemaUtils.containsNestedFieldNames(SchemaUtils.removePrefixNestedFieldNames(
127+
extractNamesFromExpression(f.condition).toSeq))
128+
}
129+
case _ =>
130+
}
131+
containsNestedFields || containsNestedChildren
132+
}
133+
134+
/**
135+
* Returns true if the given filter is a supported filter. If all of the registered
136+
* providers return None, this returns false.
137+
*
138+
* @param filter Filter to check if it's supported.
139+
* @return True if the given project is a supported relation.
140+
*/
141+
def isSupportedFilter(filter: Filter): Boolean = {
142+
val containsNestedFields = SchemaUtils.containsNestedFieldNames(
143+
extractNamesFromExpression(filter.condition).toSeq)
144+
containsNestedFields
145+
}
146+
110147
/**
111148
* Runs the given function 'f', which executes a [[FileBasedSourceProvider]]'s API that returns
112149
* [[Option]] for each provider built. This function ensures that only one provider returns

0 commit comments

Comments
 (0)