Skip to content

Commit 3e4f0b2

Browse files
committed
Derived Source POC
Signed-off-by: Shreyansh Ray <rayshrey@amazon.com>
1 parent 98dbc4a commit 3e4f0b2

File tree

6 files changed

+291
-2
lines changed

6 files changed

+291
-2
lines changed

server/src/main/java/org/opensearch/index/IndexService.java

+2-1
Original file line numberDiff line numberDiff line change
@@ -691,7 +691,8 @@ protected void closeInternal() {
691691
recoverySettings,
692692
remoteStoreSettings,
693693
seedRemote,
694-
discoveryNodes
694+
discoveryNodes,
695+
indexFieldData
695696
);
696697
eventListener.indexShardStateChanged(indexShard, null, indexShard.state(), "shard created");
697698
eventListener.afterIndexShardCreated(indexShard);

server/src/main/java/org/opensearch/index/fielddata/IndexFieldDataService.java

+2
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,7 @@
3434

3535
import org.apache.lucene.util.Accountable;
3636
import org.opensearch.ExceptionsHelper;
37+
import org.opensearch.common.annotation.PublicApi;
3738
import org.opensearch.common.settings.Setting;
3839
import org.opensearch.common.settings.Setting.Property;
3940
import org.opensearch.core.index.shard.ShardId;
@@ -59,6 +60,7 @@
5960
*
6061
* @opensearch.internal
6162
*/
63+
@PublicApi(since = "1.0.0")
6264
public class IndexFieldDataService extends AbstractIndexComponent implements Closeable {
6365
public static final String FIELDDATA_CACHE_VALUE_NODE = "node";
6466
public static final String FIELDDATA_CACHE_KEY = "index.fielddata.cache";

server/src/main/java/org/opensearch/index/get/ShardGetService.java

+139
Original file line numberDiff line numberDiff line change
@@ -37,10 +37,14 @@
3737
import org.apache.lucene.index.IndexOptions;
3838
import org.apache.lucene.index.IndexableField;
3939
import org.apache.lucene.index.IndexableFieldType;
40+
import org.apache.lucene.index.LeafReader;
41+
import org.apache.lucene.index.SortedNumericDocValues;
42+
import org.apache.lucene.index.SortedSetDocValues;
4043
import org.apache.lucene.index.StoredFieldVisitor;
4144
import org.apache.lucene.index.Term;
4245
import org.apache.lucene.index.VectorEncoding;
4346
import org.apache.lucene.index.VectorSimilarityFunction;
47+
import org.apache.lucene.util.BytesRef;
4448
import org.opensearch.OpenSearchException;
4549
import org.opensearch.common.Nullable;
4650
import org.opensearch.common.annotation.PublicApi;
@@ -51,36 +55,46 @@
5155
import org.opensearch.common.metrics.CounterMetric;
5256
import org.opensearch.common.metrics.MeanMetric;
5357
import org.opensearch.common.util.set.Sets;
58+
import org.opensearch.common.xcontent.XContentFactory;
5459
import org.opensearch.common.xcontent.XContentHelper;
5560
import org.opensearch.common.xcontent.XContentType;
5661
import org.opensearch.common.xcontent.support.XContentMapValues;
5762
import org.opensearch.core.common.bytes.BytesReference;
5863
import org.opensearch.core.xcontent.MediaTypeRegistry;
64+
import org.opensearch.core.xcontent.XContentBuilder;
5965
import org.opensearch.index.IndexSettings;
6066
import org.opensearch.index.VersionType;
6167
import org.opensearch.index.engine.Engine;
6268
import org.opensearch.index.engine.TranslogLeafReader;
69+
import org.opensearch.index.fielddata.LeafNumericFieldData;
70+
import org.opensearch.index.fielddata.SortedNumericDoubleValues;
6371
import org.opensearch.index.fieldvisitor.CustomFieldsVisitor;
6472
import org.opensearch.index.fieldvisitor.FieldsVisitor;
6573
import org.opensearch.index.mapper.DocumentMapper;
74+
import org.opensearch.index.mapper.FieldMapper;
6675
import org.opensearch.index.mapper.IdFieldMapper;
6776
import org.opensearch.index.mapper.Mapper;
6877
import org.opensearch.index.mapper.MapperService;
78+
import org.opensearch.index.mapper.MetadataFieldMapper;
79+
import org.opensearch.index.mapper.NumberFieldMapper;
6980
import org.opensearch.index.mapper.ParsedDocument;
7081
import org.opensearch.index.mapper.RoutingFieldMapper;
7182
import org.opensearch.index.mapper.SourceFieldMapper;
7283
import org.opensearch.index.mapper.SourceToParse;
7384
import org.opensearch.index.mapper.Uid;
7485
import org.opensearch.index.shard.AbstractIndexShardComponent;
7586
import org.opensearch.index.shard.IndexShard;
87+
import org.opensearch.search.DocValueFormat;
7688
import org.opensearch.search.fetch.subphase.FetchSourceContext;
7789

7890
import java.io.IOException;
91+
import java.util.Arrays;
7992
import java.util.Collections;
8093
import java.util.HashMap;
8194
import java.util.List;
8295
import java.util.Map;
8396
import java.util.concurrent.TimeUnit;
97+
import java.util.stream.Collectors;
8498
import java.util.stream.Stream;
8599

86100
import static org.opensearch.index.seqno.SequenceNumbers.UNASSIGNED_PRIMARY_TERM;
@@ -415,6 +429,18 @@ private GetResult innerGetLoadFromStoredFields(
415429
}
416430
}
417431

432+
try {
433+
Map<String, Object> sourceAsMap = buildUsingDocValues(docIdAndVersion.docId, docIdAndVersion.reader, mapperService, indexShard);
434+
sourceAsMap = unflatten(sourceAsMap);
435+
try (XContentBuilder builder = XContentFactory.jsonBuilder()) {
436+
builder.map(sourceAsMap);
437+
source = BytesReference.bytes(builder);
438+
}
439+
} catch (IOException ex) {
440+
throw new RuntimeException(ex);
441+
}
442+
443+
418444
return new GetResult(
419445
shardId.getIndexName(),
420446
id,
@@ -428,11 +454,124 @@ private GetResult innerGetLoadFromStoredFields(
428454
);
429455
}
430456

457+
private static Map<String, Object> unflatten(Map<String, Object> flattened) {
458+
Map<String, Object> unflattened = new HashMap<>();
459+
for (String key : flattened.keySet()) {
460+
doUnflatten(flattened, unflattened, key, flattened.get(key));
461+
}
462+
return unflattened;
463+
}
464+
465+
private static Map<String, Object> doUnflatten(
466+
Map<String, Object> flattened,
467+
Map<String, Object> unflattened,
468+
String key,
469+
Object value) {
470+
471+
String[] parts = key.split("\\.");
472+
for (int i = 0; i < parts.length; i++) {
473+
String part = parts[i];
474+
Object current = flattened.get(part);
475+
if (i == (parts.length - 1)) {
476+
unflattened.put(part, value);
477+
} else if (current == null) {
478+
if ((current = unflattened.get(part)) == null) {
479+
current = new HashMap<>();
480+
}
481+
unflattened.put(part, current);
482+
unflattened = (Map<String, Object>) current;
483+
} else if (current instanceof Map) {
484+
unflattened.put(part, current);
485+
unflattened = (Map<String, Object>) current;
486+
}
487+
}
488+
return unflattened;
489+
}
490+
491+
431492
private static FieldsVisitor buildFieldsVisitors(String[] fields, FetchSourceContext fetchSourceContext) {
432493
if (fields == null || fields.length == 0) {
433494
return fetchSourceContext.fetchSource() ? new FieldsVisitor(true) : null;
434495
}
435496

436497
return new CustomFieldsVisitor(Sets.newHashSet(fields), fetchSourceContext.fetchSource());
437498
}
499+
500+
private static Map<String, Object> buildUsingDocValues(int docId, LeafReader reader, MapperService mapperService, IndexShard indexShard) throws IOException {
501+
Map<String, Object> docValues = new HashMap<>();
502+
for (Mapper mapper: mapperService.documentMapper().mappers()) {
503+
if (mapper instanceof MetadataFieldMapper) {
504+
continue;
505+
}
506+
mapper.name();
507+
if (mapper instanceof FieldMapper) {
508+
FieldMapper fieldMapper = (FieldMapper) mapper;
509+
if (fieldMapper.fieldType().hasDocValues()) {
510+
String fieldName = fieldMapper.name();
511+
FieldInfo fieldInfo = reader.getFieldInfos().fieldInfo(fieldName);
512+
DocValueFormat format = fieldMapper.fieldType().docValueFormat(null, null);
513+
if (fieldInfo != null) {
514+
switch (fieldInfo.getDocValuesType()) {
515+
case SORTED_SET:
516+
SortedSetDocValues dv = reader.getSortedSetDocValues(fieldName);
517+
if (dv.advanceExact(docId)) {
518+
BytesRef[] values = new BytesRef[dv.docValueCount()];
519+
for (int i = 0; i < dv.docValueCount(); i++) {
520+
values[i] = dv.lookupOrd(dv.nextOrd());
521+
}
522+
if (values.length > 1) {
523+
docValues.put(fieldName, Arrays.stream(values).map(format::format).collect(Collectors.toList()));
524+
} else {
525+
docValues.put(fieldName, format.format(values[0]));
526+
}
527+
}
528+
break;
529+
case SORTED_NUMERIC:
530+
SortedNumericDocValues sndv = reader.getSortedNumericDocValues(fieldName);
531+
if (fieldMapper instanceof NumberFieldMapper) {
532+
NumberFieldMapper.NumberType numberType = ((NumberFieldMapper) fieldMapper).getType();
533+
switch (numberType) {
534+
case HALF_FLOAT:
535+
case FLOAT:
536+
case DOUBLE:
537+
SortedNumericDoubleValues doubleValues = ((LeafNumericFieldData) indexShard.indexFieldDataService().getForField(fieldMapper.fieldType(), "", () -> null)
538+
.load(reader.getContext())).getDoubleValues();
539+
if (doubleValues.advanceExact(docId)) {
540+
int size = doubleValues.docValueCount();
541+
double[] vals = new double[size];
542+
for (int i = 0; i < size; i++) {
543+
vals[i] = doubleValues.nextValue();
544+
}
545+
if (size > 1) {
546+
docValues.put(fieldName, vals);
547+
} else {
548+
docValues.put(fieldName, vals[0]);
549+
}
550+
}
551+
break;
552+
case INTEGER:
553+
case LONG:
554+
case UNSIGNED_LONG:
555+
if (sndv.advanceExact(docId)) {
556+
int size = sndv.docValueCount();
557+
long[] vals = new long[size];
558+
for (int i = 0; i < size; i++) {
559+
vals[i] = sndv.nextValue();
560+
}
561+
if (size > 1) {
562+
docValues.put(fieldName, vals);
563+
} else {
564+
docValues.put(fieldName, vals[0]);
565+
}
566+
}
567+
}
568+
}
569+
break;
570+
}
571+
}
572+
}
573+
}
574+
}
575+
return docValues;
576+
}
438577
}

server/src/main/java/org/opensearch/index/mapper/NumberFieldMapper.java

+4
Original file line numberDiff line numberDiff line change
@@ -1817,4 +1817,8 @@ protected void parseCreateField(ParseContext context) throws IOException {
18171817
public ParametrizedFieldMapper.Builder getMergeBuilder() {
18181818
return new Builder(simpleName(), type, ignoreMalformedByDefault, coerceByDefault).init(this);
18191819
}
1820+
1821+
public NumberType getType() {
1822+
return type;
1823+
}
18201824
}

server/src/main/java/org/opensearch/index/shard/IndexShard.java

+9-1
Original file line numberDiff line numberDiff line change
@@ -133,6 +133,7 @@
133133
import org.opensearch.index.engine.Segment;
134134
import org.opensearch.index.engine.SegmentsStats;
135135
import org.opensearch.index.fielddata.FieldDataStats;
136+
import org.opensearch.index.fielddata.IndexFieldDataService;
136137
import org.opensearch.index.fielddata.ShardFieldData;
137138
import org.opensearch.index.flush.FlushStats;
138139
import org.opensearch.index.get.GetStats;
@@ -361,6 +362,7 @@ Runnable getGlobalCheckpointSyncer() {
361362
*/
362363
private final ShardMigrationState shardMigrationState;
363364
private DiscoveryNodes discoveryNodes;
365+
private final IndexFieldDataService indexFieldDataService;
364366

365367
public IndexShard(
366368
final ShardRouting shardRouting,
@@ -391,7 +393,8 @@ public IndexShard(
391393
final RecoverySettings recoverySettings,
392394
final RemoteStoreSettings remoteStoreSettings,
393395
boolean seedRemote,
394-
final DiscoveryNodes discoveryNodes
396+
final DiscoveryNodes discoveryNodes,
397+
final IndexFieldDataService indexFieldDataService
395398
) throws IOException {
396399
super(shardRouting.shardId(), indexSettings);
397400
assert shardRouting.initializing();
@@ -493,6 +496,11 @@ public boolean shouldCache(Query query) {
493496
this.fileDownloader = new RemoteStoreFileDownloader(shardRouting.shardId(), threadPool, recoverySettings);
494497
this.shardMigrationState = getShardMigrationState(indexSettings, seedRemote);
495498
this.discoveryNodes = discoveryNodes;
499+
this.indexFieldDataService = indexFieldDataService;
500+
}
501+
502+
public IndexFieldDataService indexFieldDataService() {
503+
return indexFieldDataService;
496504
}
497505

498506
public ThreadPool getThreadPool() {

0 commit comments

Comments
 (0)