Skip to content

Commit 415abb9

Browse files
[Pull-based Ingestion] Support segment replication for pull-based ingestion (#17359)
1 parent ee7fbbd commit 415abb9

File tree

13 files changed

+548
-971
lines changed

13 files changed

+548
-971
lines changed

plugins/ingestion-kafka/src/internalClusterTest/java/org/opensearch/plugin/kafka/IngestFromKafkaIT.java

+74-138
Original file line numberDiff line numberDiff line change
@@ -8,12 +8,6 @@
88

99
package org.opensearch.plugin.kafka;
1010

11-
import com.carrotsearch.randomizedtesting.annotations.ThreadLeakFilters;
12-
13-
import org.apache.kafka.clients.producer.KafkaProducer;
14-
import org.apache.kafka.clients.producer.Producer;
15-
import org.apache.kafka.clients.producer.ProducerRecord;
16-
import org.apache.kafka.common.serialization.StringSerializer;
1711
import org.opensearch.action.admin.cluster.node.info.NodeInfo;
1812
import org.opensearch.action.admin.cluster.node.info.NodesInfoRequest;
1913
import org.opensearch.action.admin.cluster.node.info.NodesInfoResponse;
@@ -22,40 +16,24 @@
2216
import org.opensearch.cluster.metadata.IndexMetadata;
2317
import org.opensearch.common.settings.Settings;
2418
import org.opensearch.index.query.RangeQueryBuilder;
25-
import org.opensearch.plugins.Plugin;
2619
import org.opensearch.plugins.PluginInfo;
2720
import org.opensearch.test.OpenSearchIntegTestCase;
2821
import org.junit.Assert;
2922

30-
import java.util.Arrays;
31-
import java.util.Collection;
3223
import java.util.List;
33-
import java.util.Properties;
3424
import java.util.concurrent.TimeUnit;
3525
import java.util.function.Function;
3626
import java.util.stream.Collectors;
3727
import java.util.stream.Stream;
3828

39-
import org.testcontainers.containers.KafkaContainer;
40-
import org.testcontainers.utility.DockerImageName;
41-
4229
import static org.hamcrest.Matchers.is;
4330
import static org.awaitility.Awaitility.await;
4431

4532
/**
4633
* Integration test for Kafka ingestion
4734
*/
48-
@ThreadLeakFilters(filters = TestContainerWatchdogThreadLeakFilter.class)
49-
public class IngestFromKafkaIT extends OpenSearchIntegTestCase {
50-
static final String topicName = "test";
51-
52-
private KafkaContainer kafka;
53-
54-
@Override
55-
protected Collection<Class<? extends Plugin>> nodePlugins() {
56-
return Arrays.asList(KafkaPlugin.class);
57-
}
58-
35+
@OpenSearchIntegTestCase.ClusterScope(scope = OpenSearchIntegTestCase.Scope.TEST, numDataNodes = 0)
36+
public class IngestFromKafkaIT extends KafkaIngestionBaseIT {
5937
/**
6038
* test ingestion-kafka-plugin is installed
6139
*/
@@ -75,128 +53,86 @@ public void testPluginsAreInstalled() {
7553
}
7654

7755
public void testKafkaIngestion() {
78-
try {
79-
setupKafka();
80-
// create an index with ingestion source from kafka
81-
createIndex(
82-
"test",
83-
Settings.builder()
84-
.put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1)
85-
.put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 0)
86-
.put("ingestion_source.type", "kafka")
87-
.put("ingestion_source.pointer.init.reset", "earliest")
88-
.put("ingestion_source.param.topic", "test")
89-
.put("ingestion_source.param.bootstrap_servers", kafka.getBootstrapServers())
90-
.build(),
91-
"{\"properties\":{\"name\":{\"type\": \"text\"},\"age\":{\"type\": \"integer\"}}}}"
92-
);
56+
produceData("1", "name1", "24");
57+
produceData("2", "name2", "20");
58+
59+
createIndex(
60+
"test",
61+
Settings.builder()
62+
.put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1)
63+
.put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 0)
64+
.put("ingestion_source.type", "kafka")
65+
.put("ingestion_source.pointer.init.reset", "earliest")
66+
.put("ingestion_source.param.topic", "test")
67+
.put("ingestion_source.param.bootstrap_servers", kafka.getBootstrapServers())
68+
.put("index.replication.type", "SEGMENT")
69+
.build(),
70+
"{\"properties\":{\"name\":{\"type\": \"text\"},\"age\":{\"type\": \"integer\"}}}}"
71+
);
9372

94-
RangeQueryBuilder query = new RangeQueryBuilder("age").gte(21);
95-
await().atMost(10, TimeUnit.SECONDS).untilAsserted(() -> {
96-
refresh("test");
97-
SearchResponse response = client().prepareSearch("test").setQuery(query).get();
98-
assertThat(response.getHits().getTotalHits().value(), is(1L));
99-
});
100-
} finally {
101-
stopKafka();
102-
}
73+
RangeQueryBuilder query = new RangeQueryBuilder("age").gte(21);
74+
await().atMost(10, TimeUnit.SECONDS).untilAsserted(() -> {
75+
refresh("test");
76+
SearchResponse response = client().prepareSearch("test").setQuery(query).get();
77+
assertThat(response.getHits().getTotalHits().value(), is(1L));
78+
});
10379
}
10480

10581
public void testKafkaIngestion_RewindByTimeStamp() {
106-
try {
107-
setupKafka();
108-
// create an index with ingestion source from kafka
109-
createIndex(
110-
"test_rewind_by_timestamp",
111-
Settings.builder()
112-
.put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1)
113-
.put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 0)
114-
.put("ingestion_source.type", "kafka")
115-
.put("ingestion_source.pointer.init.reset", "rewind_by_timestamp")
116-
// 1739459500000 is the timestamp of the first message
117-
// 1739459800000 is the timestamp of the second message
118-
// by resetting to 1739459600000, only the second message will be ingested
119-
.put("ingestion_source.pointer.init.reset.value", "1739459600000")
120-
.put("ingestion_source.param.topic", "test")
121-
.put("ingestion_source.param.bootstrap_servers", kafka.getBootstrapServers())
122-
.put("ingestion_source.param.auto.offset.reset", "latest")
123-
.build(),
124-
"{\"properties\":{\"name\":{\"type\": \"text\"},\"age\":{\"type\": \"integer\"}}}}"
125-
);
82+
produceData("1", "name1", "24", 1739459500000L);
83+
produceData("2", "name2", "20", 1739459800000L);
84+
85+
// create an index with ingestion source from kafka
86+
createIndex(
87+
"test_rewind_by_timestamp",
88+
Settings.builder()
89+
.put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1)
90+
.put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 0)
91+
.put("ingestion_source.type", "kafka")
92+
.put("ingestion_source.pointer.init.reset", "rewind_by_timestamp")
93+
// 1739459500000 is the timestamp of the first message
94+
// 1739459800000 is the timestamp of the second message
95+
// by resetting to 1739459600000, only the second message will be ingested
96+
.put("ingestion_source.pointer.init.reset.value", "1739459600000")
97+
.put("ingestion_source.param.topic", "test")
98+
.put("ingestion_source.param.bootstrap_servers", kafka.getBootstrapServers())
99+
.put("ingestion_source.param.auto.offset.reset", "latest")
100+
.build(),
101+
"{\"properties\":{\"name\":{\"type\": \"text\"},\"age\":{\"type\": \"integer\"}}}}"
102+
);
126103

127-
RangeQueryBuilder query = new RangeQueryBuilder("age").gte(0);
128-
await().atMost(10, TimeUnit.SECONDS).untilAsserted(() -> {
129-
refresh("test_rewind_by_timestamp");
130-
SearchResponse response = client().prepareSearch("test_rewind_by_timestamp").setQuery(query).get();
131-
assertThat(response.getHits().getTotalHits().value(), is(1L));
132-
});
133-
} finally {
134-
stopKafka();
135-
}
104+
RangeQueryBuilder query = new RangeQueryBuilder("age").gte(0);
105+
await().atMost(10, TimeUnit.SECONDS).untilAsserted(() -> {
106+
refresh("test_rewind_by_timestamp");
107+
SearchResponse response = client().prepareSearch("test_rewind_by_timestamp").setQuery(query).get();
108+
assertThat(response.getHits().getTotalHits().value(), is(1L));
109+
});
136110
}
137111

138112
public void testKafkaIngestion_RewindByOffset() {
139-
try {
140-
setupKafka();
141-
// create an index with ingestion source from kafka
142-
createIndex(
143-
"test_rewind_by_offset",
144-
Settings.builder()
145-
.put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1)
146-
.put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 0)
147-
.put("ingestion_source.type", "kafka")
148-
.put("ingestion_source.pointer.init.reset", "rewind_by_offset")
149-
.put("ingestion_source.pointer.init.reset.value", "1")
150-
.put("ingestion_source.param.topic", "test")
151-
.put("ingestion_source.param.bootstrap_servers", kafka.getBootstrapServers())
152-
.put("ingestion_source.param.auto.offset.reset", "latest")
153-
.build(),
154-
"{\"properties\":{\"name\":{\"type\": \"text\"},\"age\":{\"type\": \"integer\"}}}}"
155-
);
156-
157-
RangeQueryBuilder query = new RangeQueryBuilder("age").gte(0);
158-
await().atMost(10, TimeUnit.SECONDS).untilAsserted(() -> {
159-
refresh("test_rewind_by_offset");
160-
SearchResponse response = client().prepareSearch("test_rewind_by_offset").setQuery(query).get();
161-
assertThat(response.getHits().getTotalHits().value(), is(1L));
162-
});
163-
} finally {
164-
stopKafka();
165-
}
166-
}
167-
168-
private void setupKafka() {
169-
kafka = new KafkaContainer(DockerImageName.parse("confluentinc/cp-kafka:6.2.1"))
170-
// disable topic auto creation
171-
.withEnv("KAFKA_AUTO_CREATE_TOPICS_ENABLE", "false");
172-
kafka.start();
173-
prepareKafkaData();
174-
}
175-
176-
private void stopKafka() {
177-
if (kafka != null) {
178-
kafka.stop();
179-
}
180-
}
181-
182-
private void prepareKafkaData() {
183-
String boostrapServers = kafka.getBootstrapServers();
184-
KafkaUtils.createTopic(topicName, 1, boostrapServers);
185-
Properties props = new Properties();
186-
props.put("bootstrap.servers", kafka.getBootstrapServers());
187-
Producer<String, String> producer = new KafkaProducer<>(props, new StringSerializer(), new StringSerializer());
188-
producer.send(
189-
new ProducerRecord<>(topicName, null, 1739459500000L, "null", "{\"_id\":\"1\",\"_source\":{\"name\":\"bob\", \"age\": 24}}")
190-
);
191-
producer.send(
192-
new ProducerRecord<>(
193-
topicName,
194-
null,
195-
1739459800000L,
196-
"null",
197-
"{\"_id\":\"2\", \"_op_type:\":\"index\",\"_source\":{\"name\":\"alice\", \"age\": 20}}"
198-
)
113+
produceData("1", "name1", "24");
114+
produceData("2", "name2", "20");
115+
// create an index with ingestion source from kafka
116+
createIndex(
117+
"test_rewind_by_offset",
118+
Settings.builder()
119+
.put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1)
120+
.put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 0)
121+
.put("ingestion_source.type", "kafka")
122+
.put("ingestion_source.pointer.init.reset", "rewind_by_offset")
123+
.put("ingestion_source.pointer.init.reset.value", "1")
124+
.put("ingestion_source.param.topic", "test")
125+
.put("ingestion_source.param.bootstrap_servers", kafka.getBootstrapServers())
126+
.put("ingestion_source.param.auto.offset.reset", "latest")
127+
.build(),
128+
"{\"properties\":{\"name\":{\"type\": \"text\"},\"age\":{\"type\": \"integer\"}}}}"
199129
);
200-
producer.close();
130+
131+
RangeQueryBuilder query = new RangeQueryBuilder("age").gte(0);
132+
await().atMost(10, TimeUnit.SECONDS).untilAsserted(() -> {
133+
refresh("test_rewind_by_offset");
134+
SearchResponse response = client().prepareSearch("test_rewind_by_offset").setQuery(query).get();
135+
assertThat(response.getHits().getTotalHits().value(), is(1L));
136+
});
201137
}
202138
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,111 @@
1+
/*
2+
* SPDX-License-Identifier: Apache-2.0
3+
*
4+
* The OpenSearch Contributors require contributions made to
5+
* this file be licensed under the Apache-2.0 license or a
6+
* compatible open source license.
7+
*/
8+
9+
package org.opensearch.plugin.kafka;
10+
11+
import com.carrotsearch.randomizedtesting.annotations.ThreadLeakFilters;
12+
13+
import org.apache.kafka.clients.producer.KafkaProducer;
14+
import org.apache.kafka.clients.producer.Producer;
15+
import org.apache.kafka.clients.producer.ProducerRecord;
16+
import org.apache.kafka.common.serialization.StringSerializer;
17+
import org.opensearch.action.search.SearchResponse;
18+
import org.opensearch.plugins.Plugin;
19+
import org.opensearch.test.OpenSearchIntegTestCase;
20+
import org.junit.After;
21+
import org.junit.Before;
22+
23+
import java.util.Arrays;
24+
import java.util.Collection;
25+
import java.util.List;
26+
import java.util.Locale;
27+
import java.util.Properties;
28+
import java.util.concurrent.TimeUnit;
29+
30+
import org.testcontainers.containers.KafkaContainer;
31+
import org.testcontainers.utility.DockerImageName;
32+
33+
/**
34+
* Base test class for Kafka ingestion tests
35+
*/
36+
@ThreadLeakFilters(filters = TestContainerThreadLeakFilter.class)
37+
public class KafkaIngestionBaseIT extends OpenSearchIntegTestCase {
38+
static final String topicName = "test";
39+
static final String indexName = "testindex";
40+
static final String mapping = "{\"properties\":{\"name\":{\"type\": \"text\"},\"age\":{\"type\": \"integer\"}}}}";
41+
static final long defaultMessageTimestamp = 1739459500000L;
42+
43+
protected KafkaContainer kafka;
44+
protected Producer<String, String> producer;
45+
46+
@Override
47+
protected Collection<Class<? extends Plugin>> nodePlugins() {
48+
return Arrays.asList(KafkaPlugin.class);
49+
}
50+
51+
@Before
52+
private void setup() {
53+
setupKafka();
54+
}
55+
56+
@After
57+
private void cleanup() {
58+
stopKafka();
59+
}
60+
61+
private void setupKafka() {
62+
kafka = new KafkaContainer(DockerImageName.parse("confluentinc/cp-kafka:6.2.1"))
63+
// disable topic auto creation
64+
.withEnv("KAFKA_AUTO_CREATE_TOPICS_ENABLE", "false");
65+
kafka.start();
66+
67+
// setup producer
68+
String boostrapServers = kafka.getBootstrapServers();
69+
KafkaUtils.createTopic(topicName, 1, boostrapServers);
70+
Properties props = new Properties();
71+
props.put("bootstrap.servers", kafka.getBootstrapServers());
72+
producer = new KafkaProducer<>(props, new StringSerializer(), new StringSerializer());
73+
}
74+
75+
private void stopKafka() {
76+
if (producer != null) {
77+
producer.close();
78+
}
79+
80+
if (kafka != null) {
81+
kafka.stop();
82+
}
83+
}
84+
85+
protected void produceData(String id, String name, String age) {
86+
produceData(id, name, age, defaultMessageTimestamp);
87+
}
88+
89+
protected void produceData(String id, String name, String age, long timestamp) {
90+
String payload = String.format(
91+
Locale.ROOT,
92+
"{\"_id\":\"%s\", \"_op_type:\":\"index\",\"_source\":{\"name\":\"%s\", \"age\": %s}}",
93+
id,
94+
name,
95+
age
96+
);
97+
producer.send(new ProducerRecord<>(topicName, null, timestamp, "null", payload));
98+
}
99+
100+
protected void waitForSearchableDocs(long docCount, List<String> nodes) throws Exception {
101+
assertBusy(() -> {
102+
for (String node : nodes) {
103+
final SearchResponse response = client(node).prepareSearch(indexName).setSize(0).setPreference("_only_local").get();
104+
final long hits = response.getHits().getTotalHits().value();
105+
if (hits < docCount) {
106+
fail("Expected search hits on node: " + node + " to be at least " + docCount + " but was: " + hits);
107+
}
108+
}
109+
}, 1, TimeUnit.MINUTES);
110+
}
111+
}

0 commit comments

Comments
 (0)