Skip to content

Commit 5b07cf1

Browse files
authored
[Pull-based Ingestion] use ConfigurationUtil for KafkaSourceConfig (#17223)
--------- Signed-off-by: Yupeng Fu <yupeng@uber.com>
1 parent ab2f5f6 commit 5b07cf1

File tree

4 files changed

+330
-5
lines changed

4 files changed

+330
-5
lines changed

CHANGELOG-3.0.md

+1
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
1515
- Allow to pass the list settings through environment variables (like [], ["a", "b", "c"], ...) ([#10625](https://github.com/opensearch-project/OpenSearch/pull/10625))
1616
- Views, simplify data access and manipulation by providing a virtual layer over one or more indices ([#11957](https://github.com/opensearch-project/OpenSearch/pull/11957))
1717
- Added pull-based Ingestion (APIs, for ingestion source, a Kafka plugin, and IngestionEngine that pulls data from the ingestion source) ([#16958](https://github.com/opensearch-project/OpenSearch/pull/16958))
18+
- Added ConfigurationUtils to core for the ease of configuration parsing [#17223](https://github.com/opensearch-project/OpenSearch/pull/17223)
1819

1920
### Dependencies
2021
- Update Apache Lucene to 10.1.0 ([#16366](https://github.com/opensearch-project/OpenSearch/pull/16366))
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,186 @@
1+
/*
2+
* SPDX-License-Identifier: Apache-2.0
3+
*
4+
* The OpenSearch Contributors require contributions made to
5+
* this file be licensed under the Apache-2.0 license or a
6+
* compatible open source license.
7+
*/
8+
9+
package org.opensearch.core.util;
10+
11+
import org.opensearch.OpenSearchException;
12+
import org.opensearch.OpenSearchParseException;
13+
import org.opensearch.common.annotation.PublicApi;
14+
15+
import java.util.Map;
16+
17+
/**
18+
* Utility class for parsing configurations.
19+
*
20+
* @opensearch.api
21+
*/
22+
@PublicApi(since = "3.0.0")
23+
public final class ConfigurationUtils {
24+
25+
private ConfigurationUtils() {}
26+
27+
/**
28+
* Returns and removes the specified optional property from the specified configuration map.
29+
* <p>
30+
* If the property value isn't of type string a {@link OpenSearchParseException} is thrown.
31+
*/
32+
public static String readOptionalStringProperty(Map<String, Object> configuration, String propertyName) {
33+
Object value = configuration.get(propertyName);
34+
return readString(propertyName, value);
35+
}
36+
37+
/**
38+
* Returns and removes the specified property from the specified configuration map.
39+
* <p>
40+
* If the property value isn't of type string an {@link OpenSearchParseException} is thrown.
41+
* If the property is missing an {@link OpenSearchParseException} is thrown
42+
*/
43+
public static String readStringProperty(Map<String, Object> configuration, String propertyName) {
44+
return readStringProperty(configuration, propertyName, null);
45+
}
46+
47+
/**
48+
* Returns the specified property from the specified configuration map.
49+
* <p>
50+
* If the property value isn't of type string a {@link OpenSearchParseException} is thrown.
51+
* If the property is missing and no default value has been specified a {@link OpenSearchParseException} is thrown
52+
*/
53+
public static String readStringProperty(Map<String, Object> configuration, String propertyName, String defaultValue) {
54+
Object value = configuration.get(propertyName);
55+
if (value == null && defaultValue != null) {
56+
return defaultValue;
57+
} else if (value == null) {
58+
throw newConfigurationException(propertyName, "required property is missing");
59+
}
60+
return readString(propertyName, value);
61+
}
62+
63+
public static OpenSearchException newConfigurationException(String propertyName, String reason) {
64+
String msg;
65+
if (propertyName == null) {
66+
msg = reason;
67+
} else {
68+
msg = "[" + propertyName + "] " + reason;
69+
}
70+
OpenSearchParseException exception = new OpenSearchParseException(msg);
71+
addMetadataToException(exception, propertyName);
72+
return exception;
73+
}
74+
75+
private static String readString(String propertyName, Object value) {
76+
if (value == null) {
77+
return null;
78+
}
79+
if (value instanceof String) {
80+
return (String) value;
81+
}
82+
throw newConfigurationException(propertyName, "property isn't a string, but of type [" + value.getClass().getName() + "]");
83+
}
84+
85+
private static void addMetadataToException(OpenSearchException exception, String propertyName) {
86+
if (propertyName != null) {
87+
exception.addMetadata("opensearch.property_name", propertyName);
88+
}
89+
}
90+
91+
/**
92+
* Returns the specified property from the specified configuration map.
93+
* <p>
94+
* If the property value isn't of type string or int a {@link OpenSearchParseException} is thrown.
95+
* If the property is missing and no default value has been specified a {@link OpenSearchParseException} is thrown
96+
*/
97+
public static String readStringOrIntProperty(Map<String, Object> configuration, String propertyName, String defaultValue) {
98+
Object value = configuration.get(propertyName);
99+
if (value == null && defaultValue != null) {
100+
return defaultValue;
101+
} else if (value == null) {
102+
throw newConfigurationException(propertyName, "required property is missing");
103+
}
104+
return readStringOrInt(propertyName, value);
105+
}
106+
107+
private static String readStringOrInt(String propertyName, Object value) {
108+
if (value == null) {
109+
return null;
110+
}
111+
if (value instanceof String) {
112+
return (String) value;
113+
} else if (value instanceof Integer) {
114+
return String.valueOf(value);
115+
}
116+
throw newConfigurationException(propertyName, "property isn't a string or int, but of type [" + value.getClass().getName() + "]");
117+
}
118+
119+
/**
120+
* Returns the specified property from the specified configuration map.
121+
* <p>
122+
* If the property value isn't of type string or int a {@link OpenSearchParseException} is thrown.
123+
*/
124+
public static String readOptionalStringOrIntProperty(Map<String, Object> configuration, String propertyName) {
125+
Object value = configuration.get(propertyName);
126+
if (value == null) {
127+
return null;
128+
}
129+
return readStringOrInt(propertyName, value);
130+
}
131+
132+
public static boolean readBooleanProperty(Map<String, Object> configuration, String propertyName, boolean defaultValue) {
133+
Object value = configuration.get(propertyName);
134+
if (value == null) {
135+
return defaultValue;
136+
} else {
137+
return readBoolean(propertyName, value).booleanValue();
138+
}
139+
}
140+
141+
private static Boolean readBoolean(String propertyName, Object value) {
142+
if (value == null) {
143+
return null;
144+
}
145+
if (value instanceof Boolean) {
146+
return (boolean) value;
147+
}
148+
throw newConfigurationException(propertyName, "property isn't a boolean, but of type [" + value.getClass().getName() + "]");
149+
}
150+
151+
/**
152+
* Returns the specified property from the specified configuration map.
153+
* <p>
154+
* If the property value isn't of type int a {@link OpenSearchParseException} is thrown.
155+
* If the property is missing an {@link OpenSearchParseException} is thrown
156+
*/
157+
public static Integer readIntProperty(Map<String, Object> configuration, String propertyName, Integer defaultValue) {
158+
Object value = configuration.get(propertyName);
159+
if (value == null) {
160+
return defaultValue;
161+
}
162+
try {
163+
return Integer.parseInt(value.toString());
164+
} catch (Exception e) {
165+
throw newConfigurationException(propertyName, "property cannot be converted to an int [" + value + "]");
166+
}
167+
}
168+
169+
/**
170+
* Returns the specified property from the specified configuration map.
171+
* <p>
172+
* If the property value isn't of type int a {@link OpenSearchParseException} is thrown.
173+
* If the property is missing an {@link OpenSearchParseException} is thrown
174+
*/
175+
public static Double readDoubleProperty(Map<String, Object> configuration, String propertyName) {
176+
Object value = configuration.get(propertyName);
177+
if (value == null) {
178+
throw newConfigurationException(propertyName, "required property is missing");
179+
}
180+
try {
181+
return Double.parseDouble(value.toString());
182+
} catch (Exception e) {
183+
throw newConfigurationException(propertyName, "property cannot be converted to a double [" + value + "]");
184+
}
185+
}
186+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,136 @@
1+
/*
2+
* SPDX-License-Identifier: Apache-2.0
3+
*
4+
* The OpenSearch Contributors require contributions made to
5+
* this file be licensed under the Apache-2.0 license or a
6+
* compatible open source license.
7+
*/
8+
9+
package org.opensearch.core.util;
10+
11+
import org.opensearch.OpenSearchParseException;
12+
import org.opensearch.test.OpenSearchTestCase;
13+
import org.junit.Before;
14+
15+
import java.util.Arrays;
16+
import java.util.HashMap;
17+
import java.util.Map;
18+
19+
import static org.hamcrest.Matchers.equalTo;
20+
21+
public class ConfigurationUtilsTests extends OpenSearchTestCase {
22+
private Map<String, Object> config;
23+
24+
@Before
25+
public void setConfig() {
26+
config = new HashMap<>();
27+
config.put("foo", "bar");
28+
config.put("boolVal", true);
29+
config.put("null", null);
30+
config.put("arr", Arrays.asList("1", "2", "3"));
31+
config.put("ip", "127.0.0.1");
32+
config.put("num", 1);
33+
config.put("double", 1.0);
34+
}
35+
36+
public void testReadStringProperty() {
37+
String val = ConfigurationUtils.readStringProperty(config, "foo");
38+
assertThat(val, equalTo("bar"));
39+
String val1 = ConfigurationUtils.readStringProperty(config, "foo1", "none");
40+
assertThat(val1, equalTo("none"));
41+
try {
42+
ConfigurationUtils.readStringProperty(config, "foo1", null);
43+
} catch (OpenSearchParseException e) {
44+
assertThat(e.getMessage(), equalTo("[foo1] required property is missing"));
45+
}
46+
}
47+
48+
public void testOptionalReadStringProperty() {
49+
String val = ConfigurationUtils.readOptionalStringProperty(config, "foo");
50+
assertThat(val, equalTo("bar"));
51+
String val1 = ConfigurationUtils.readOptionalStringProperty(config, "foo1");
52+
assertThat(val, equalTo("bar"));
53+
assertThat(val1, equalTo(null));
54+
}
55+
56+
public void testReadStringPropertyInvalidType() {
57+
try {
58+
ConfigurationUtils.readStringProperty(config, "arr");
59+
} catch (OpenSearchParseException e) {
60+
assertThat(e.getMessage(), equalTo("[arr] property isn't a string, but of type [java.util.Arrays$ArrayList]"));
61+
}
62+
}
63+
64+
public void testReadBooleanProperty() {
65+
Boolean val = ConfigurationUtils.readBooleanProperty(config, "boolVal", false);
66+
assertThat(val, equalTo(true));
67+
}
68+
69+
public void testReadNullBooleanProperty() {
70+
Boolean val = ConfigurationUtils.readBooleanProperty(config, "null", false);
71+
assertThat(val, equalTo(false));
72+
}
73+
74+
public void testReadBooleanPropertyInvalidType() {
75+
try {
76+
ConfigurationUtils.readBooleanProperty(config, "arr", true);
77+
} catch (OpenSearchParseException e) {
78+
assertThat(e.getMessage(), equalTo("[arr] property isn't a boolean, but of type [java.util.Arrays$ArrayList]"));
79+
}
80+
}
81+
82+
public void testReadStringOrIntProperty() {
83+
String val1 = ConfigurationUtils.readStringOrIntProperty(config, "foo", null);
84+
String val2 = ConfigurationUtils.readStringOrIntProperty(config, "num", null);
85+
assertThat(val1, equalTo("bar"));
86+
assertThat(val2, equalTo("1"));
87+
}
88+
89+
public void testOptionalReadStringOrIntProperty() {
90+
String val1 = ConfigurationUtils.readOptionalStringOrIntProperty(config, "foo");
91+
String val2 = ConfigurationUtils.readOptionalStringOrIntProperty(config, "num");
92+
String val3 = ConfigurationUtils.readOptionalStringOrIntProperty(config, "num1");
93+
assertThat(val1, equalTo("bar"));
94+
assertThat(val2, equalTo("1"));
95+
assertThat(val3, equalTo(null));
96+
}
97+
98+
public void testReadIntProperty() {
99+
int val = ConfigurationUtils.readIntProperty(config, "num", null);
100+
assertThat(val, equalTo(1));
101+
try {
102+
ConfigurationUtils.readIntProperty(config, "foo", 2);
103+
} catch (OpenSearchParseException e) {
104+
assertThat(e.getMessage(), equalTo("[foo] property cannot be converted to an int [bar]"));
105+
}
106+
try {
107+
ConfigurationUtils.readIntProperty(config, "foo1", 2);
108+
} catch (OpenSearchParseException e) {
109+
assertThat(e.getMessage(), equalTo("required property is missing"));
110+
}
111+
}
112+
113+
public void testReadDoubleProperty() {
114+
double val = ConfigurationUtils.readDoubleProperty(config, "double");
115+
assertThat(val, equalTo(1.0));
116+
try {
117+
ConfigurationUtils.readDoubleProperty(config, "foo");
118+
} catch (OpenSearchParseException e) {
119+
assertThat(e.getMessage(), equalTo("[foo] property cannot be converted to a double [bar]"));
120+
}
121+
try {
122+
ConfigurationUtils.readDoubleProperty(config, "foo1");
123+
} catch (OpenSearchParseException e) {
124+
assertThat(e.getMessage(), equalTo("[foo1] required property is missing"));
125+
}
126+
}
127+
128+
public void testReadStringOrIntPropertyInvalidType() {
129+
try {
130+
ConfigurationUtils.readStringOrIntProperty(config, "arr", null);
131+
} catch (OpenSearchParseException e) {
132+
assertThat(e.getMessage(), equalTo("[arr] property isn't a string or int, but of type [java.util.Arrays$ArrayList]"));
133+
}
134+
}
135+
136+
}

plugins/ingestion-kafka/src/main/java/org/opensearch/plugin/kafka/KafkaSourceConfig.java

+7-5
Original file line numberDiff line numberDiff line change
@@ -8,13 +8,17 @@
88

99
package org.opensearch.plugin.kafka;
1010

11+
import org.opensearch.core.util.ConfigurationUtils;
12+
1113
import java.util.Map;
12-
import java.util.Objects;
1314

1415
/**
1516
* Class encapsulating the configuration of a Kafka source.
1617
*/
1718
public class KafkaSourceConfig {
19+
private final String PROP_TOPIC = "topic";
20+
private final String PROP_BOOTSTRAP_SERVERS = "bootstrap_servers";
21+
1822
private final String topic;
1923
private final String bootstrapServers;
2024

@@ -23,10 +27,8 @@ public class KafkaSourceConfig {
2327
* @param params the configuration parameters
2428
*/
2529
public KafkaSourceConfig(Map<String, Object> params) {
26-
// TODO: better parsing and validation
27-
this.topic = (String) Objects.requireNonNull(params.get("topic"));
28-
this.bootstrapServers = (String) Objects.requireNonNull(params.get("bootstrap_servers"));
29-
assert this.bootstrapServers != null;
30+
this.topic = ConfigurationUtils.readStringProperty(params, PROP_TOPIC);
31+
this.bootstrapServers = ConfigurationUtils.readStringProperty(params, PROP_BOOTSTRAP_SERVERS);
3032
}
3133

3234
/**

0 commit comments

Comments
 (0)