43
43
import org .apache .lucene .search .Weight ;
44
44
import org .apache .lucene .search .join .BitSetProducer ;
45
45
import org .apache .lucene .util .BitSet ;
46
+ import org .opensearch .common .collect .Tuple ;
46
47
import org .opensearch .common .lucene .search .Queries ;
47
48
import org .opensearch .core .ParseField ;
49
+ import org .opensearch .index .mapper .MapperService ;
48
50
import org .opensearch .index .mapper .ObjectMapper ;
49
51
import org .opensearch .search .aggregations .Aggregator ;
50
52
import org .opensearch .search .aggregations .AggregatorFactories ;
@@ -88,12 +90,25 @@ public class NestedAggregator extends BucketsAggregator implements SingleBucketA
88
90
) throws IOException {
89
91
super (name , factories , context , parent , cardinality , metadata );
90
92
91
- Query parentFilter = parentObjectMapper != null ? parentObjectMapper .nestedTypeFilter () : Queries .newNonNestedFilter ();
93
+ Query parentFilter = isParent (parentObjectMapper , childObjectMapper , context .mapperService ())
94
+ ? parentObjectMapper .nestedTypeFilter ()
95
+ : Queries .newNonNestedFilter ();
92
96
this .parentFilter = context .bitsetFilterCache ().getBitSetProducer (parentFilter );
93
97
this .childFilter = childObjectMapper .nestedTypeFilter ();
94
98
this .collectsFromSingleBucket = cardinality .map (estimate -> estimate < 2 );
95
99
}
96
100
101
+ private boolean isParent (ObjectMapper parentObjectMapper , ObjectMapper childObjectMapper , MapperService mapperService ) {
102
+ if (parentObjectMapper == null ) {
103
+ return false ;
104
+ }
105
+ ObjectMapper parent ;
106
+ do {
107
+ parent = childObjectMapper .getParentObjectMapper (mapperService );
108
+ } while (parent != null && parent != parentObjectMapper );
109
+ return parentObjectMapper == parent ;
110
+ }
111
+
97
112
@ Override
98
113
public LeafBucketCollector getLeafCollector (final LeafReaderContext ctx , final LeafBucketCollector sub ) throws IOException {
99
114
IndexReaderContext topLevelContext = ReaderUtil .getTopLevelContext (ctx );
@@ -107,20 +122,17 @@ public LeafBucketCollector getLeafCollector(final LeafReaderContext ctx, final L
107
122
if (collectsFromSingleBucket ) {
108
123
return new LeafBucketCollectorBase (sub , null ) {
109
124
@ Override
110
- public void collect (int parentDoc , long bucket ) throws IOException {
111
- // if parentDoc is 0 then this means that this parent doesn't have child docs (b/c these appear always before the parent
112
- // doc), so we can skip:
113
- if (parentDoc == 0 || parentDocs == null || childDocs == null ) {
125
+ public void collect (int parentAggDoc , long bucket ) throws IOException {
126
+ // parentAggDoc can be 0 when aggregation:
127
+ if (parentDocs == null || childDocs == null ) {
114
128
return ;
115
129
}
116
130
117
- final int prevParentDoc = parentDocs .prevSetBit (parentDoc - 1 );
118
- int childDocId = childDocs .docID ();
119
- if (childDocId <= prevParentDoc ) {
120
- childDocId = childDocs .advance (prevParentDoc + 1 );
121
- }
131
+ Tuple <Integer , Integer > res = getParentAndChildId (parentDocs , childDocs , parentAggDoc );
132
+ int currentParentDoc = res .v1 ();
133
+ int childDocId = res .v2 ();
122
134
123
- for (; childDocId < parentDoc ; childDocId = childDocs .nextDoc ()) {
135
+ for (; childDocId < currentParentDoc ; childDocId = childDocs .nextDoc ()) {
124
136
collectBucket (sub , childDocId , bucket );
125
137
}
126
138
}
@@ -130,6 +142,43 @@ public void collect(int parentDoc, long bucket) throws IOException {
130
142
}
131
143
}
132
144
145
+ /**
146
+ * In one case, it's talking about the parent doc (from the Lucene block-join standpoint),
147
+ * while in the other case, it's talking about a child doc ID (from the block-join standpoint)
148
+ * from the parent aggregation, where we're trying to aggregate over a sibling of that child.
149
+ * So, we need to map from that document to its parent, then join to the appropriate sibling.
150
+ *
151
+ * @param parentAggDoc the parent aggregation's current doc
152
+ * (which may or may not be a block-level parent doc)
153
+ * @return a tuple consisting of the current block-level parent doc (the parent of the
154
+ * parameter doc), and the next matching child doc (hopefully under this parent)
155
+ * for the aggregation (according to the child doc iterator).
156
+ */
157
+ static Tuple <Integer , Integer > getParentAndChildId (BitSet parentDocs , DocIdSetIterator childDocs , int parentAggDoc ) throws IOException {
158
+ int currentParentAggDoc ;
159
+ int prevParentDoc = parentDocs .prevSetBit (parentAggDoc );
160
+ if (prevParentDoc == -1 ) {
161
+ currentParentAggDoc = parentDocs .nextSetBit (0 );
162
+ } else if (prevParentDoc == parentAggDoc ) {
163
+ // parentAggDoc is the parent of that child, and is belongs to parentDocs
164
+ currentParentAggDoc = parentAggDoc ;
165
+ if (currentParentAggDoc == 0 ) {
166
+ prevParentDoc = -1 ;
167
+ } else {
168
+ prevParentDoc = parentDocs .prevSetBit (currentParentAggDoc - 1 );
169
+ }
170
+ } else {
171
+ // parentAggDoc is the sibling of that child, and it means the block-join parent
172
+ currentParentAggDoc = parentDocs .nextSetBit (prevParentDoc + 1 );
173
+ }
174
+
175
+ int childDocId = childDocs .docID ();
176
+ if (childDocId <= prevParentDoc ) {
177
+ childDocId = childDocs .advance (prevParentDoc + 1 );
178
+ }
179
+ return Tuple .tuple (currentParentAggDoc , childDocId );
180
+ }
181
+
133
182
@ Override
134
183
protected void preGetSubLeafCollectors (LeafReaderContext ctx ) throws IOException {
135
184
super .preGetSubLeafCollectors (ctx );
@@ -191,9 +240,8 @@ public void setScorer(Scorable scorer) throws IOException {
191
240
192
241
@ Override
193
242
public void collect (int parentDoc , long bucket ) throws IOException {
194
- // if parentDoc is 0 then this means that this parent doesn't have child docs (b/c these appear always before the parent
195
- // doc), so we can skip:
196
- if (parentDoc == 0 || parentDocs == null || childDocs == null ) {
243
+ // parentAggDoc can be 0 when aggregation:
244
+ if (parentDocs == null || childDocs == null ) {
197
245
return ;
198
246
}
199
247
@@ -214,11 +262,9 @@ void processBufferedChildBuckets() throws IOException {
214
262
return ;
215
263
}
216
264
217
- final int prevParentDoc = parentDocs .prevSetBit (currentParentDoc - 1 );
218
- int childDocId = childDocs .docID ();
219
- if (childDocId <= prevParentDoc ) {
220
- childDocId = childDocs .advance (prevParentDoc + 1 );
221
- }
265
+ Tuple <Integer , Integer > res = getParentAndChildId (parentDocs , childDocs , currentParentDoc );
266
+ int currentParentDoc = res .v1 ();
267
+ int childDocId = res .v2 ();
222
268
223
269
for (; childDocId < currentParentDoc ; childDocId = childDocs .nextDoc ()) {
224
270
cachedScorer .doc = childDocId ;
0 commit comments