Skip to content

Commit f0c2fa6

Browse files
Star tree codec changes (#14514)
--------- Signed-off-by: Bharathwaj G <bharath78910@gmail.com>
1 parent f14b5c8 commit f0c2fa6

16 files changed

+662
-4
lines changed

server/src/main/java/org/opensearch/index/codec/CodecService.java

+12-4
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,7 @@
3939
import org.opensearch.common.Nullable;
4040
import org.opensearch.common.collect.MapBuilder;
4141
import org.opensearch.index.IndexSettings;
42+
import org.opensearch.index.codec.composite.CompositeCodecFactory;
4243
import org.opensearch.index.mapper.MapperService;
4344

4445
import java.util.Map;
@@ -63,6 +64,7 @@ public class CodecService {
6364
* the raw unfiltered lucene default. useful for testing
6465
*/
6566
public static final String LUCENE_DEFAULT_CODEC = "lucene_default";
67+
private final CompositeCodecFactory compositeCodecFactory = new CompositeCodecFactory();
6668

6769
public CodecService(@Nullable MapperService mapperService, IndexSettings indexSettings, Logger logger) {
6870
final MapBuilder<String, Codec> codecs = MapBuilder.<String, Codec>newMapBuilder();
@@ -73,10 +75,16 @@ public CodecService(@Nullable MapperService mapperService, IndexSettings indexSe
7375
codecs.put(BEST_COMPRESSION_CODEC, new Lucene99Codec(Mode.BEST_COMPRESSION));
7476
codecs.put(ZLIB, new Lucene99Codec(Mode.BEST_COMPRESSION));
7577
} else {
76-
codecs.put(DEFAULT_CODEC, new PerFieldMappingPostingFormatCodec(Mode.BEST_SPEED, mapperService, logger));
77-
codecs.put(LZ4, new PerFieldMappingPostingFormatCodec(Mode.BEST_SPEED, mapperService, logger));
78-
codecs.put(BEST_COMPRESSION_CODEC, new PerFieldMappingPostingFormatCodec(Mode.BEST_COMPRESSION, mapperService, logger));
79-
codecs.put(ZLIB, new PerFieldMappingPostingFormatCodec(Mode.BEST_COMPRESSION, mapperService, logger));
78+
// CompositeCodec still delegates to PerFieldMappingPostingFormatCodec
79+
// We can still support all the compression codecs when composite index is present
80+
if (mapperService.isCompositeIndexPresent()) {
81+
codecs.putAll(compositeCodecFactory.getCompositeIndexCodecs(mapperService, logger));
82+
} else {
83+
codecs.put(DEFAULT_CODEC, new PerFieldMappingPostingFormatCodec(Mode.BEST_SPEED, mapperService, logger));
84+
codecs.put(LZ4, new PerFieldMappingPostingFormatCodec(Mode.BEST_SPEED, mapperService, logger));
85+
codecs.put(BEST_COMPRESSION_CODEC, new PerFieldMappingPostingFormatCodec(Mode.BEST_COMPRESSION, mapperService, logger));
86+
codecs.put(ZLIB, new PerFieldMappingPostingFormatCodec(Mode.BEST_COMPRESSION, mapperService, logger));
87+
}
8088
}
8189
codecs.put(LUCENE_DEFAULT_CODEC, Codec.getDefault());
8290
for (String codec : Codec.availableCodecs()) {
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,57 @@
1+
/*
2+
* SPDX-License-Identifier: Apache-2.0
3+
*
4+
* The OpenSearch Contributors require contributions made to
5+
* this file be licensed under the Apache-2.0 license or a
6+
* compatible open source license.
7+
*/
8+
9+
package org.opensearch.index.codec.composite;
10+
11+
import org.apache.logging.log4j.Logger;
12+
import org.apache.lucene.codecs.Codec;
13+
import org.apache.lucene.codecs.DocValuesFormat;
14+
import org.apache.lucene.codecs.FilterCodec;
15+
import org.apache.lucene.codecs.lucene99.Lucene99Codec;
16+
import org.opensearch.common.annotation.ExperimentalApi;
17+
import org.opensearch.index.codec.PerFieldMappingPostingFormatCodec;
18+
import org.opensearch.index.mapper.MapperService;
19+
20+
/**
21+
* Extends the Codec to support new file formats for composite indices eg: star tree index
22+
* based on the mappings.
23+
*
24+
* @opensearch.experimental
25+
*/
26+
@ExperimentalApi
27+
public class Composite99Codec extends FilterCodec {
28+
public static final String COMPOSITE_INDEX_CODEC_NAME = "Composite99Codec";
29+
private final MapperService mapperService;
30+
31+
// needed for SPI - this is used in reader path
32+
public Composite99Codec() {
33+
this(COMPOSITE_INDEX_CODEC_NAME, new Lucene99Codec(), null);
34+
}
35+
36+
public Composite99Codec(Lucene99Codec.Mode compressionMode, MapperService mapperService, Logger logger) {
37+
this(COMPOSITE_INDEX_CODEC_NAME, new PerFieldMappingPostingFormatCodec(compressionMode, mapperService, logger), mapperService);
38+
}
39+
40+
/**
41+
* Sole constructor. When subclassing this codec, create a no-arg ctor and pass the delegate codec and a unique name to
42+
* this ctor.
43+
*
44+
* @param name name of the codec
45+
* @param delegate codec delegate
46+
* @param mapperService mapper service instance
47+
*/
48+
protected Composite99Codec(String name, Codec delegate, MapperService mapperService) {
49+
super(name, delegate);
50+
this.mapperService = mapperService;
51+
}
52+
53+
@Override
54+
public DocValuesFormat docValuesFormat() {
55+
return new Composite99DocValuesFormat(mapperService);
56+
}
57+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,64 @@
1+
/*
2+
* SPDX-License-Identifier: Apache-2.0
3+
*
4+
* The OpenSearch Contributors require contributions made to
5+
* this file be licensed under the Apache-2.0 license or a
6+
* compatible open source license.
7+
*/
8+
9+
package org.opensearch.index.codec.composite;
10+
11+
import org.apache.lucene.codecs.DocValuesConsumer;
12+
import org.apache.lucene.codecs.DocValuesFormat;
13+
import org.apache.lucene.codecs.DocValuesProducer;
14+
import org.apache.lucene.codecs.lucene90.Lucene90DocValuesFormat;
15+
import org.apache.lucene.index.SegmentReadState;
16+
import org.apache.lucene.index.SegmentWriteState;
17+
import org.opensearch.common.annotation.ExperimentalApi;
18+
import org.opensearch.index.mapper.MapperService;
19+
20+
import java.io.IOException;
21+
22+
/**
23+
* DocValues format to handle composite indices
24+
*
25+
* @opensearch.experimental
26+
*/
27+
@ExperimentalApi
28+
public class Composite99DocValuesFormat extends DocValuesFormat {
29+
/**
30+
* Creates a new docvalues format.
31+
*
32+
* <p>The provided name will be written into the index segment in some configurations (such as
33+
* when using {@code PerFieldDocValuesFormat}): in such configurations, for the segment to be read
34+
* this class should be registered with Java's SPI mechanism (registered in META-INF/ of your jar
35+
* file, etc).
36+
*/
37+
private final DocValuesFormat delegate;
38+
private final MapperService mapperService;
39+
40+
// needed for SPI
41+
public Composite99DocValuesFormat() {
42+
this(new Lucene90DocValuesFormat(), null);
43+
}
44+
45+
public Composite99DocValuesFormat(MapperService mapperService) {
46+
this(new Lucene90DocValuesFormat(), mapperService);
47+
}
48+
49+
public Composite99DocValuesFormat(DocValuesFormat delegate, MapperService mapperService) {
50+
super(delegate.getName());
51+
this.delegate = delegate;
52+
this.mapperService = mapperService;
53+
}
54+
55+
@Override
56+
public DocValuesConsumer fieldsConsumer(SegmentWriteState state) throws IOException {
57+
return new Composite99DocValuesWriter(delegate.fieldsConsumer(state), state, mapperService);
58+
}
59+
60+
@Override
61+
public DocValuesProducer fieldsProducer(SegmentReadState state) throws IOException {
62+
return new Composite99DocValuesReader(delegate.fieldsProducer(state), state);
63+
}
64+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,89 @@
1+
/*
2+
* SPDX-License-Identifier: Apache-2.0
3+
*
4+
* The OpenSearch Contributors require contributions made to
5+
* this file be licensed under the Apache-2.0 license or a
6+
* compatible open source license.
7+
*/
8+
9+
package org.opensearch.index.codec.composite;
10+
11+
import org.apache.lucene.codecs.DocValuesProducer;
12+
import org.apache.lucene.index.BinaryDocValues;
13+
import org.apache.lucene.index.FieldInfo;
14+
import org.apache.lucene.index.NumericDocValues;
15+
import org.apache.lucene.index.SegmentReadState;
16+
import org.apache.lucene.index.SortedDocValues;
17+
import org.apache.lucene.index.SortedNumericDocValues;
18+
import org.apache.lucene.index.SortedSetDocValues;
19+
import org.opensearch.common.annotation.ExperimentalApi;
20+
import org.opensearch.index.mapper.CompositeMappedFieldType;
21+
22+
import java.io.IOException;
23+
import java.util.List;
24+
25+
/**
26+
* Reader for star tree index and star tree doc values from the segments
27+
*
28+
* @opensearch.experimental
29+
*/
30+
@ExperimentalApi
31+
public class Composite99DocValuesReader extends DocValuesProducer implements CompositeIndexReader {
32+
private DocValuesProducer delegate;
33+
34+
public Composite99DocValuesReader(DocValuesProducer producer, SegmentReadState state) throws IOException {
35+
this.delegate = producer;
36+
// TODO : read star tree files
37+
}
38+
39+
@Override
40+
public NumericDocValues getNumeric(FieldInfo field) throws IOException {
41+
return delegate.getNumeric(field);
42+
}
43+
44+
@Override
45+
public BinaryDocValues getBinary(FieldInfo field) throws IOException {
46+
return delegate.getBinary(field);
47+
}
48+
49+
@Override
50+
public SortedDocValues getSorted(FieldInfo field) throws IOException {
51+
return delegate.getSorted(field);
52+
}
53+
54+
@Override
55+
public SortedNumericDocValues getSortedNumeric(FieldInfo field) throws IOException {
56+
return delegate.getSortedNumeric(field);
57+
}
58+
59+
@Override
60+
public SortedSetDocValues getSortedSet(FieldInfo field) throws IOException {
61+
return delegate.getSortedSet(field);
62+
}
63+
64+
@Override
65+
public void checkIntegrity() throws IOException {
66+
delegate.checkIntegrity();
67+
// Todo : check integrity of composite index related [star tree] files
68+
}
69+
70+
@Override
71+
public void close() throws IOException {
72+
delegate.close();
73+
// Todo: close composite index related files [star tree] files
74+
}
75+
76+
@Override
77+
public List<String> getCompositeIndexFields() {
78+
// todo : read from file formats and get the field names.
79+
throw new UnsupportedOperationException();
80+
81+
}
82+
83+
@Override
84+
public CompositeIndexValues getCompositeIndexValues(String field, CompositeMappedFieldType.CompositeFieldType fieldType)
85+
throws IOException {
86+
// TODO : read compositeIndexValues [starTreeValues] from star tree files
87+
throw new UnsupportedOperationException();
88+
}
89+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,114 @@
1+
/*
2+
* SPDX-License-Identifier: Apache-2.0
3+
*
4+
* The OpenSearch Contributors require contributions made to
5+
* this file be licensed under the Apache-2.0 license or a
6+
* compatible open source license.
7+
*/
8+
9+
package org.opensearch.index.codec.composite;
10+
11+
import org.apache.lucene.codecs.DocValuesConsumer;
12+
import org.apache.lucene.codecs.DocValuesProducer;
13+
import org.apache.lucene.index.FieldInfo;
14+
import org.apache.lucene.index.MergeState;
15+
import org.apache.lucene.index.SegmentWriteState;
16+
import org.opensearch.common.annotation.ExperimentalApi;
17+
import org.opensearch.index.mapper.CompositeMappedFieldType;
18+
import org.opensearch.index.mapper.MapperService;
19+
import org.opensearch.index.mapper.StarTreeMapper;
20+
21+
import java.io.IOException;
22+
import java.util.HashMap;
23+
import java.util.HashSet;
24+
import java.util.Map;
25+
import java.util.Set;
26+
import java.util.concurrent.atomic.AtomicReference;
27+
28+
/**
29+
* This class write the star tree index and star tree doc values
30+
* based on the doc values structures of the original index
31+
*
32+
* @opensearch.experimental
33+
*/
34+
@ExperimentalApi
35+
public class Composite99DocValuesWriter extends DocValuesConsumer {
36+
private final DocValuesConsumer delegate;
37+
private final SegmentWriteState state;
38+
private final MapperService mapperService;
39+
AtomicReference<MergeState> mergeState = new AtomicReference<>();
40+
private final Set<CompositeMappedFieldType> compositeMappedFieldTypes;
41+
private final Set<String> compositeFieldSet;
42+
43+
private final Map<String, DocValuesProducer> fieldProducerMap = new HashMap<>();
44+
45+
public Composite99DocValuesWriter(DocValuesConsumer delegate, SegmentWriteState segmentWriteState, MapperService mapperService) {
46+
47+
this.delegate = delegate;
48+
this.state = segmentWriteState;
49+
this.mapperService = mapperService;
50+
this.compositeMappedFieldTypes = mapperService.getCompositeFieldTypes();
51+
compositeFieldSet = new HashSet<>();
52+
for (CompositeMappedFieldType type : compositeMappedFieldTypes) {
53+
compositeFieldSet.addAll(type.fields());
54+
}
55+
}
56+
57+
@Override
58+
public void addNumericField(FieldInfo field, DocValuesProducer valuesProducer) throws IOException {
59+
delegate.addNumericField(field, valuesProducer);
60+
}
61+
62+
@Override
63+
public void addBinaryField(FieldInfo field, DocValuesProducer valuesProducer) throws IOException {
64+
delegate.addBinaryField(field, valuesProducer);
65+
}
66+
67+
@Override
68+
public void addSortedField(FieldInfo field, DocValuesProducer valuesProducer) throws IOException {
69+
delegate.addSortedField(field, valuesProducer);
70+
}
71+
72+
@Override
73+
public void addSortedNumericField(FieldInfo field, DocValuesProducer valuesProducer) throws IOException {
74+
delegate.addSortedNumericField(field, valuesProducer);
75+
// Perform this only during flush flow
76+
if (mergeState.get() == null) {
77+
createCompositeIndicesIfPossible(valuesProducer, field);
78+
}
79+
}
80+
81+
@Override
82+
public void addSortedSetField(FieldInfo field, DocValuesProducer valuesProducer) throws IOException {
83+
delegate.addSortedSetField(field, valuesProducer);
84+
}
85+
86+
@Override
87+
public void close() throws IOException {
88+
delegate.close();
89+
}
90+
91+
private void createCompositeIndicesIfPossible(DocValuesProducer valuesProducer, FieldInfo field) throws IOException {
92+
if (compositeFieldSet.isEmpty()) return;
93+
if (compositeFieldSet.contains(field.name)) {
94+
fieldProducerMap.put(field.name, valuesProducer);
95+
compositeFieldSet.remove(field.name);
96+
}
97+
// we have all the required fields to build composite fields
98+
if (compositeFieldSet.isEmpty()) {
99+
for (CompositeMappedFieldType mappedType : compositeMappedFieldTypes) {
100+
if (mappedType instanceof StarTreeMapper.StarTreeFieldType) {
101+
// TODO : Call StarTree builder
102+
}
103+
}
104+
}
105+
}
106+
107+
@Override
108+
public void merge(MergeState mergeState) throws IOException {
109+
this.mergeState.compareAndSet(null, mergeState);
110+
super.merge(mergeState);
111+
// TODO : handle merge star tree
112+
// mergeStarTreeFields(mergeState);
113+
}
114+
}

0 commit comments

Comments
 (0)