-
Notifications
You must be signed in to change notification settings - Fork 115
Support filter over indexes on nested fields #380
base: master
Are you sure you want to change the base?
Support filter over indexes on nested fields #380
Conversation
2eb6f15
to
c63e477
Compare
30cc3ef
to
6e3656f
Compare
4b6b5f4
to
6aa3e13
Compare
src/main/scala/com/microsoft/hyperspace/index/rules/PlanUtils.scala
Outdated
Show resolved
Hide resolved
src/main/scala/com/microsoft/hyperspace/index/rules/FilterIndexRule.scala
Show resolved
Hide resolved
src/main/scala/com/microsoft/hyperspace/index/rules/PlanUtils.scala
Outdated
Show resolved
Hide resolved
src/main/scala/com/microsoft/hyperspace/index/rules/PlanUtils.scala
Outdated
Show resolved
Hide resolved
src/main/scala/com/microsoft/hyperspace/index/rules/RuleUtils.scala
Outdated
Show resolved
Hide resolved
d077c61
to
8c10382
Compare
src/main/scala/com/microsoft/hyperspace/index/rules/RuleUtils.scala
Outdated
Show resolved
Hide resolved
8c10382
to
61e1ec9
Compare
@sezruby, @imback82: Could you check why the builds are failing with:
Is there a mechanism to trigger the build once more on this PR? Something like commenting a keyword to trigger the build again? |
You can trigger via "re-run" in |
@sezruby I don't see any "re-run" in the Checks tab. I don't have same rights as you. 😃 |
Actually I'm waiting for #393 because of the conflict. |
* @param project Project to check if it's supported. | ||
* @return True if the given project is a supported relation. | ||
*/ | ||
protected[rules] def hasNestedColumns(project: Project, index: IndexLogEntry): Boolean = { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
seems to be a good candidate for a unit test
* @param filter Filter to check if it's supported. | ||
* @return True if the given project is a supported relation. | ||
*/ | ||
protected[rules] def hasNestedColumns(filter: Filter, index: IndexLogEntry): Boolean = { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
seems to be a good candidate for a unit test
* @param fieldName The name of the field to search for. | ||
* @return An [[ExprId]] if that could be found in the plan otherwise [[None]]. | ||
*/ | ||
private def getExprId(plan: LogicalPlan, fieldName: String): Option[ExprId] = { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
seems to be a good candidate for a unit test
* @param exp The Spark Catalyst expression from which to extract names. | ||
* @return A set of distinct field names. | ||
*/ | ||
def extractNamesFromExpression(exp: Expression): ExtractedNames = { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
seems to be a good candidate for a unit test
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I will add tests to all methods in here.
|
||
import com.microsoft.hyperspace.util.ResolverUtils | ||
|
||
object PlanUtils { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The name sounds general, but the actual code seems mostly about nested fields?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The code work for any kind of field. Is needed by the nested fields feature because it needs to do more transformations. In my initial implementation the methods here were used for flat fields too.
} | ||
} | ||
|
||
var toRemove = Seq.empty[String] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is to remove the unnecessary isnull check, right? If someone without the knowledge that this code is about supporting nested fields, the meaning of this name could be vague.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Right. Given this plan:
Project [Date#89, nested#94.leaf.cnt AS cnt#336, nested#94.leaf.id AS id#337]
+- Filter ((isnotnull(nested#94) && (nested#94.leaf.cnt > 10)) && (nested#94.leaf.id = leaf_id9))
+- Relation[Date#89,RGUID#90,Query#91,imprs#92,clicks#93,nested#94] parquet
It will be transformed into:
Project [Date#380 AS Date#381, __hs_nested.nested.leaf.cnt#379 AS cnt#382, __hs_nested.nested.leaf.id#378 AS id#383]
+- Filter ((__hs_nested.nested.leaf.cnt#379 > 10) && (__hs_nested.nested.leaf.id#378 = leaf_id9))
+- Relation[__hs_nested.nested.leaf.id#378,__hs_nested.nested.leaf.cnt#379,Date#380] Hyperspace(Type: CI, Name: filterNestedIndex, LogVersion: 1)
The isnotnull(nested#94)
is no longer suitable and is removed in the transformFilter
method because:
- The
nested
field name is not part of the index (index contains__hs_nested.nested.leaf.cnt
and__hs_nested.nested.leaf.id
) - The
isnotnull
construct checks for a nested field to not be null because trying to access the leaves when it's null would end up in exceptions - The values stored in the index are not nested, they are flat
- In the example above, we cannot just rename the
isnotnull
attribute reference as we have two fields instead of one (__hs_nested.nested.leaf.cnt
and__hs_nested.nested.leaf.id
)
I'll add more info on the method.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we add both isnotnull(__hs_nested.nested.leaf.cnt) & isnotnull(__hs_nested.nested.leaf.id)?
Since it seems Spark automatically adds isnotnull
for project columns and filter condition columns.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we can. I'll try it.
Maybe a dumb question, but why should we rename the fields? Can't we just keep the original nested structures in index data? |
* @param useBucketSpec Option whether to use BucketSpec for reading index data. | ||
* @return A transformed [[LogicalRelation]]. | ||
*/ | ||
private[rules] def transformRelation( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could you add some NOTE to reviewers comments, for example which part is changed in this file?
src/test/scala/com/microsoft/hyperspace/index/E2EHyperspaceRulesTest.scala
Outdated
Show resolved
Hide resolved
* @param relation Relation with which given indexes are compared. | ||
* @return Active indexes built for this plan. | ||
*/ | ||
def getCandidateIndexes( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Though I will refactor this soon with HyperspaceOneRule, but could you move this function back to RuleUtils? and rename this class as IndexPlanApplyHelper (& IndexPlanWithNestedApplyHelper)
Also you can use spark
from FilterIndexRule or JoinIndexRule as they extend ActiveSparkSession.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
https://github.com/microsoft/hyperspace/pull/380/files#r617166273
Please address this comment - to reduce the diff.
Frankly speaking, the change became quite big - it's around 3000 lines of diff.
Though most of changes are from existing codebase, we cannot check them line by line.
So I wonder if we could split this PR into 2 PRs.. 1) refactoring only 2) add IndexPlanWithNestedApplyHelper
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
- I rather not put back
getCandidateIndexes
as the method requires spark being given to it. The current implementation is consistent as the methods that don't require spark are inRuleUtils
while the other are part of the helper instances. - There 3000 lined due to extensive tests that I added to the verify the plan transformation. In a way or another someone should review it.
- "Frankly speaking", I already did a lot of extra work complying with your requests of splitting in multiple PRs. Now, once more this request to split PR to this same feature. This is very frustrating for me and looks like you're using any opportunity to push this back. I would understand the split into multiple PRs for new features but I though we did come to an agreement at the time we decided to split the feature in 4-5 other PRs. And this is only the second PR 😞.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
1=>
It will be refactored out after rule refactoring. Just put it back to reduce the diff...
You can use spark from FilterIndexRule / JoinIndexRule, as before.
Also no need to getCandidate
function in this class. (IndexPlanApplyHelper ?) - they have different role.
2/3=>
I understand your frustration, but the changes modify wide range of codebase and also core part.
This refactoring was not considered at the time we agreed and that's why this change became huge.
And you know small & explicit changes can be merged quickly, in general.. and splitting PR is also helping to trace the history.
Sorry but I think we should do that. If you are not up for it, I'll push a refactoring PR(no nested related code) based on your PR, until early next week.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@sezruby I want to correct you. The code does not add lots of changes. If you remember - and to help you remember, here is a previous discussion a month back, #380 (comment) - we agreed to have separate classes for the already existing code and the nested features. So now we have BaseRuleHelper
and NestedRuleHelper
. From the old RuleUtils
I extracted all the methods that need spark
into the BaseRuleHelper
and kept the methods that don't need spark
in RuleUtils
as static methods. So, from the old RuleUtils
now we have 2 files: a stripped down RuleUtils
and the BaseRuleHelper
. The classes have the code unchanged from the old RuleUtils
, only the method signatures are changed now that spark
is a property inside the instance.
The new things are inside NestedRuleHelper
which extends some methods from BaseRuleHelper
to accommodate the nested fields support.
This is exactly what we agreed on that comment.
There is no other refactor here than this what I explained above refactor that does NOT make any sense without the next changes.
I propose to merge try merge this and you create the changes that you want on top of it.
} | ||
} | ||
|
||
var toRemove = Seq.empty[String] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we add both isnotnull(__hs_nested.nested.leaf.cnt) & isnotnull(__hs_nested.nested.leaf.id)?
Since it seems Spark automatically adds isnotnull
for project columns and filter condition columns.
* @param repl The replacement Spark Catalyst [[Expression]]. | ||
* @return A new Spark Catalyst [[Expression]]. | ||
*/ | ||
def replaceInSearchQuery( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
how about replaceExpression
src/main/scala/com/microsoft/hyperspace/index/rules/PlanUtils.scala
Outdated
Show resolved
Hide resolved
@clee704 That's a good question. One reason could be that we store index data as "indexed columns" ++ "included columns". For example, if indexed columns are |
@sezruby, @clee704, @imback82: The datasets I use to create indexes over contains deep nested fields with many children. For example, this is a half of the schema of one of my datasets and has about 600 nested fields in one of its structs:
I want to index over the If so, I don't think that is efficient at all and I'll give a few reasons:
Performance and cost wise I can say that the current approach — extracting/flattening the nested field — is what is needed in Hyperspace even though the implementation code gets more complex. FYI: I'm currently a bit busy with some other work assignment but I'll get back to integrating your feedback soon. |
@andrei-ionescu Since we store data in a parquet format, which is efficient in storing nested data in a columnar format, does it matter? For a parquet file, a dataset with a single column "a" or a single nested column "a.b.c.d.e" should make no difference, I guess. |
It does matter! Even if parquet stores data in an efficient columnar way, there are issues with parquet and nested fields in the way the predicate push down works - or to say it better, it doesn't work. This is the performance cost. There is the storage cost too that is not negligible either - if you are forced to store half of the dataset in an index. Lastly but not the least, the management cost (rebuild index, incremental refresh, etc) will incur additional time that will make index harder update and keep in sync with the data itself - if the index is not easily (fast & efficient) updatable then it's not usable in these big data fast moving datasets. These costs are not acceptable for the cases of big nested datasets. @rapoth, @imback82, @sezruby, @apoorvedave1, @clee704: Here is an article I wrote on what Adobe is doing on big data and what's the scale we work with: https://medium.com/adobetech/high-throughput-ingestion-with-iceberg-ccf7877a413f. Or if you prefer video here is a presentation on the same matter: https://www.dremio.com/subsurface/high-frequency-small-files-vs-slow-moving-datasets. |
I don't think we can "bucket by" nested fields either. BTW, predicate pushdown for nested fields should be supported in Spark 3.0: apache/spark@cb0db21 @clee704 is your concerned addressed? |
That's right @imback82! I totally forgot about the bucketing done in Hyperspace. That is not possible over nested fields. |
Can't we just store the indexed/included nested fields, ignoring others? Also, it seems the problem is not inherent to the nested fields, but due to a lack of proper support in the framework. Anyway, it seems Spark lacks support for nested fields and that's a good enough reason that justifies renaming. |
5730140
to
a69a8b6
Compare
@imback82, @sezruby, @clee704: Sorry for being out of this PR for a few weeks. Now, I got some time and I'm back to it. Here are the changes:
Please have another look. |
* @param relation Relation with which given indexes are compared. | ||
* @return Active indexes built for this plan. | ||
*/ | ||
def getCandidateIndexes( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
https://github.com/microsoft/hyperspace/pull/380/files#r617166273
Please address this comment - to reduce the diff.
Frankly speaking, the change became quite big - it's around 3000 lines of diff.
Though most of changes are from existing codebase, we cannot check them line by line.
So I wonder if we could split this PR into 2 PRs.. 1) refactoring only 2) add IndexPlanWithNestedApplyHelper
src/main/scala/com/microsoft/hyperspace/index/rules/FilterIndexRule.scala
Outdated
Show resolved
Hide resolved
c018c8a
to
374fbb3
Compare
What is the context for this pull request?
What changes were proposed in this pull request?
This PR adds support for filtering over nested fields using indexes created over nested fields.
Given the
nestedDataset
dataset with schemaand the following data
And the following search/filter query
The optimized and spark plans without index are
The transformed optimized and spark plans should look like
Complexities
Transforming the plan
Filters inside the plan must be modified to accomodate the index schema not the data schema - the flattened schema not the nested field. Instead of accessing the field with
GetStructField(GetStructField(AttributeReference))
it must directly access withAttributeReference
.Given the query plan
The filter will be modified from
to
The projection from
to
The relation from
to
Does this PR introduce any user-facing change?
No.
How was this patch tested?