8
8
import static org .opensearch .ml .common .utils .StringUtils .getJsonPath ;
9
9
import static org .opensearch .ml .common .utils .StringUtils .obtainFieldNameFromJsonPath ;
10
10
11
- import java .util .Collection ;
11
+ import java .util .Arrays ;
12
12
import java .util .HashMap ;
13
13
import java .util .List ;
14
14
import java .util .Map ;
15
- import java .util .Optional ;
15
+ import java .util .Objects ;
16
16
import java .util .concurrent .CompletableFuture ;
17
17
import java .util .concurrent .atomic .AtomicInteger ;
18
18
import java .util .stream .Collectors ;
34
34
35
35
@ Log4j2
36
36
public class AbstractIngestion implements Ingestable {
37
- public static final String OUTPUT = "output" ;
38
- public static final String INPUT = "input" ;
39
- public static final String OUTPUT_FIELD_NAMES = "output_names" ;
40
- public static final String INPUT_FIELD_NAMES = "input_names" ;
41
- public static final String INGEST_FIELDS = "ingest_fields" ;
42
- public static final String ID_FIELD = "id_field" ;
43
37
44
38
private final Client client ;
45
39
@@ -85,12 +79,11 @@ protected double calculateSuccessRate(List<Double> successRates) {
85
79
* Filters fields in the map where the value contains the specified source index as a prefix.
86
80
*
87
81
* @param mlBatchIngestionInput The MLBatchIngestionInput.
88
- * @param index The source index to filter by.
89
- * @return A new map with only the entries that match the specified source index.
82
+ * @param indexInFieldMap The source index to filter by.
83
+ * @return A new map with only the entries that match the specified source index and correctly mapped to JsonPath .
90
84
*/
91
- protected Map <String , Object > filterFieldMapping (MLBatchIngestionInput mlBatchIngestionInput , int index ) {
85
+ protected Map <String , Object > filterFieldMapping (MLBatchIngestionInput mlBatchIngestionInput , int indexInFieldMap ) {
92
86
Map <String , Object > fieldMap = mlBatchIngestionInput .getFieldMapping ();
93
- int indexInFieldMap = index + 1 ;
94
87
String prefix = "source[" + indexInFieldMap + "]" ;
95
88
96
89
Map <String , Object > filteredFieldMap = fieldMap .entrySet ().stream ().filter (entry -> {
@@ -104,19 +97,29 @@ protected Map<String, Object> filterFieldMapping(MLBatchIngestionInput mlBatchIn
104
97
}).collect (Collectors .toMap (Map .Entry ::getKey , entry -> {
105
98
Object value = entry .getValue ();
106
99
if (value instanceof String ) {
107
- return value ;
100
+ return getJsonPath (( String ) value ) ;
108
101
} else if (value instanceof List ) {
109
- return ((List <String >) value ).stream ().filter (val -> val .contains (prefix )).collect (Collectors .toList ());
102
+ return ((List <String >) value )
103
+ .stream ()
104
+ .filter (val -> val .contains (prefix ))
105
+ .map (StringUtils ::getJsonPath )
106
+ .collect (Collectors .toList ());
110
107
}
111
108
return null ;
112
109
}));
113
110
114
- if (filteredFieldMap .containsKey (OUTPUT )) {
115
- filteredFieldMap .put (OUTPUT_FIELD_NAMES , fieldMap .get (OUTPUT_FIELD_NAMES ));
116
- }
117
- if (filteredFieldMap .containsKey (INPUT )) {
118
- filteredFieldMap .put (INPUT_FIELD_NAMES , fieldMap .get (INPUT_FIELD_NAMES ));
111
+ String [] ingestFields = mlBatchIngestionInput .getIngestFields ();
112
+ if (ingestFields != null ) {
113
+ Arrays
114
+ .stream (ingestFields )
115
+ .filter (Objects ::nonNull )
116
+ .filter (val -> val .contains (prefix ))
117
+ .map (StringUtils ::getJsonPath )
118
+ .forEach (jsonPath -> {
119
+ filteredFieldMap .put (obtainFieldNameFromJsonPath (jsonPath ), jsonPath );
120
+ });
119
121
}
122
+
120
123
return filteredFieldMap ;
121
124
}
122
125
@@ -128,42 +131,21 @@ protected Map<String, Object> filterFieldMapping(MLBatchIngestionInput mlBatchIn
128
131
* @return A new map that contains all the fields and data for ingestion.
129
132
*/
130
133
protected Map <String , Object > processFieldMapping (String jsonStr , Map <String , Object > fieldMapping ) {
131
- String inputJsonPath = fieldMapping .containsKey (INPUT ) ? getJsonPath ((String ) fieldMapping .get (INPUT )) : null ;
132
- List <String > remoteModelInput = inputJsonPath != null ? (List <String >) JsonPath .read (jsonStr , inputJsonPath ) : null ;
133
- List <String > inputFieldNames = inputJsonPath != null ? (List <String >) fieldMapping .get (INPUT_FIELD_NAMES ) : null ;
134
-
135
- String outputJsonPath = fieldMapping .containsKey (OUTPUT ) ? getJsonPath ((String ) fieldMapping .get (OUTPUT )) : null ;
136
- List <List > remoteModelOutput = outputJsonPath != null ? (List <List >) JsonPath .read (jsonStr , outputJsonPath ) : null ;
137
- List <String > outputFieldNames = outputJsonPath != null ? (List <String >) fieldMapping .get (OUTPUT_FIELD_NAMES ) : null ;
138
-
139
- List <String > ingestFieldsJsonPath = Optional
140
- .ofNullable ((List <String >) fieldMapping .get (INGEST_FIELDS ))
141
- .stream ()
142
- .flatMap (Collection ::stream )
143
- .map (StringUtils ::getJsonPath )
144
- .collect (Collectors .toList ());
145
-
146
134
Map <String , Object > jsonMap = new HashMap <>();
147
-
148
- populateJsonMap (jsonMap , inputFieldNames , remoteModelInput );
149
- populateJsonMap (jsonMap , outputFieldNames , remoteModelOutput );
150
-
151
- for (String fieldPath : ingestFieldsJsonPath ) {
152
- jsonMap .put (obtainFieldNameFromJsonPath (fieldPath ), JsonPath .read (jsonStr , fieldPath ));
135
+ if (fieldMapping == null || fieldMapping .isEmpty ()) {
136
+ return jsonMap ;
153
137
}
154
138
155
- if (fieldMapping .containsKey (ID_FIELD )) {
156
- List <String > docIdJsonPath = Optional
157
- .ofNullable ((List <String >) fieldMapping .get (ID_FIELD ))
158
- .stream ()
159
- .flatMap (Collection ::stream )
160
- .map (StringUtils ::getJsonPath )
161
- .collect (Collectors .toList ());
162
- if (docIdJsonPath .size () != 1 ) {
163
- throw new IllegalArgumentException ("The Id field must contains only 1 jsonPath for each source" );
139
+ fieldMapping .entrySet ().stream ().forEach (entry -> {
140
+ Object value = entry .getValue ();
141
+ if (value instanceof String ) {
142
+ String jsonPath = (String ) value ;
143
+ jsonMap .put (entry .getKey (), JsonPath .read (jsonStr , jsonPath ));
144
+ } else if (value instanceof List ) {
145
+ ((List <String >) value ).stream ().forEach (jsonPath -> { jsonMap .put (entry .getKey (), JsonPath .read (jsonStr , jsonPath )); });
164
146
}
165
- jsonMap . put ( "_id" , JsonPath . read ( jsonStr , docIdJsonPath . get ( 0 )) );
166
- }
147
+ } );
148
+
167
149
return jsonMap ;
168
150
}
169
151
@@ -180,12 +162,11 @@ protected void batchIngest(
180
162
? mlBatchIngestionInput .getFieldMapping ()
181
163
: filterFieldMapping (mlBatchIngestionInput , sourceIndex );
182
164
Map <String , Object > jsonMap = processFieldMapping (jsonStr , filteredMapping );
183
- if (isSoleSource || sourceIndex == 0 ) {
165
+ if (jsonMap .isEmpty ()) {
166
+ return ;
167
+ }
168
+ if (isSoleSource && !jsonMap .containsKey ("_id" )) {
184
169
IndexRequest indexRequest = new IndexRequest (mlBatchIngestionInput .getIndexName ());
185
- if (jsonMap .containsKey ("_id" )) {
186
- String id = (String ) jsonMap .remove ("_id" );
187
- indexRequest .id (id );
188
- }
189
170
indexRequest .source (jsonMap );
190
171
bulkRequest .add (indexRequest );
191
172
} else {
0 commit comments