Skip to content

Commit 9cac0f8

Browse files
committed
fix issues
1 parent 208770d commit 9cac0f8

File tree

2 files changed

+355
-121
lines changed

2 files changed

+355
-121
lines changed

src/main/java/org/opensearch/plugin/insights/core/exporter/LocalIndexExporter.java

+118-99
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121
import java.util.Collections;
2222
import java.util.List;
2323
import java.util.Objects;
24+
import java.util.concurrent.CompletableFuture;
2425

2526
import org.apache.logging.log4j.LogManager;
2627
import org.apache.logging.log4j.Logger;
@@ -29,6 +30,9 @@
2930
import org.opensearch.action.admin.indices.create.CreateIndexRequest;
3031
import org.opensearch.action.admin.indices.create.CreateIndexResponse;
3132
import org.opensearch.action.admin.indices.delete.DeleteIndexRequest;
33+
import org.opensearch.action.admin.indices.exists.indices.IndicesExistsRequest;
34+
import org.opensearch.action.admin.indices.exists.indices.IndicesExistsResponse;
35+
import org.opensearch.action.admin.indices.template.get.GetComposableIndexTemplateAction;
3236
import org.opensearch.action.admin.indices.template.put.PutComposableIndexTemplateAction;
3337
import org.opensearch.action.bulk.BulkRequestBuilder;
3438
import org.opensearch.action.bulk.BulkResponse;
@@ -142,111 +146,80 @@ public void export(final List<SearchQueryRecord> records) {
142146
try {
143147
final String indexName = buildLocalIndexName();
144148
if (!checkIndexExists(indexName)) {
145-
// Create template with fixed priority
146-
// ensureTemplateExists();
147-
String indexPattern = TOP_QUERIES_INDEX_PATTERN_GLOB;
148-
try {
149-
// Create a V2 template (ComposableIndexTemplate)
150-
CompressedXContent compressedMapping = new CompressedXContent(readIndexMappings());
151-
152-
// Create template component
153-
org.opensearch.cluster.metadata.Template template = new org.opensearch.cluster.metadata.Template(
154-
Settings.builder()
155-
.put("index.number_of_shards", DEFAULT_NUMBER_OF_SHARDS)
156-
.put("index.number_of_replicas", DEFAULT_NUMBER_OF_REPLICA)
157-
.build(),
158-
compressedMapping,
159-
null
160-
);
161-
162-
// Create the composable template
163-
ComposableIndexTemplate composableTemplate =
164-
new ComposableIndexTemplate(
165-
Collections.singletonList(indexPattern),
166-
template,
167-
null, // No composed_of templates
168-
templatePriority, // Priority using configured value
169-
null,
170-
null
171-
);
172-
173-
// Use the V2 API to put the template
174-
PutComposableIndexTemplateAction.Request request = new PutComposableIndexTemplateAction.Request(TEMPLATE_NAME)
175-
.indexTemplate(composableTemplate);
176-
177-
client.execute(
178-
PutComposableIndexTemplateAction.INSTANCE,
179-
request,
180-
new ActionListener<>() {
181-
@Override
182-
public void onResponse(AcknowledgedResponse response) {
183-
if (response.isAcknowledged()) {
184-
logger.info("Successfully created or updated V2 template [{}] with priority {}",
185-
TEMPLATE_NAME, templatePriority);
186-
} else {
187-
logger.warn("Failed to create or update V2 template [{}]", TEMPLATE_NAME);
188-
}
189-
}
190-
191-
@Override
192-
public void onFailure(Exception e) {
193-
logger.error("Error creating or updating V2 template [{}]", TEMPLATE_NAME, e);
194-
OperationalMetricsCounter.getInstance().incrementCounter(OperationalMetric.LOCAL_INDEX_EXPORTER_EXCEPTIONS);
195-
}
196-
}
197-
);
198-
} catch (Exception e) {
199-
logger.error("Failed to manage V2 template", e);
200-
OperationalMetricsCounter.getInstance().incrementCounter(OperationalMetric.LOCAL_INDEX_EXPORTER_EXCEPTIONS);
201-
}
202-
203-
CreateIndexRequest createIndexRequest = new CreateIndexRequest(indexName);
204-
205-
createIndexRequest.settings(
206-
Settings.builder()
207-
.put("index.number_of_shards", DEFAULT_NUMBER_OF_SHARDS)
208-
.put("index.number_of_replicas", DEFAULT_NUMBER_OF_REPLICA)
209-
);
210-
createIndexRequest.mapping(readIndexMappings());
211-
212-
client.admin().indices().create(createIndexRequest, new ActionListener<>() {
213-
@Override
214-
public void onResponse(CreateIndexResponse createIndexResponse) {
215-
if (createIndexResponse.isAcknowledged()) {
216-
try {
217-
bulk(indexName, records);
218-
} catch (IOException e) {
219-
OperationalMetricsCounter.getInstance().incrementCounter(OperationalMetric.LOCAL_INDEX_EXPORTER_EXCEPTIONS);
220-
logger.error("Unable to index query insights data: ", e);
221-
}
222-
}
149+
// First ensure the template exists, then create the index
150+
ensureTemplateExists().whenComplete((templateCreated, templateException) -> {
151+
if (templateException != null) {
152+
logger.error("Error ensuring template exists:", templateException);
153+
OperationalMetricsCounter.getInstance().incrementCounter(OperationalMetric.LOCAL_INDEX_EXPORTER_EXCEPTIONS);
223154
}
224-
225-
@Override
226-
public void onFailure(Exception e) {
227-
Throwable cause = ExceptionsHelper.unwrapCause(e);
228-
if (cause instanceof ResourceAlreadyExistsException) {
229-
try {
230-
bulk(indexName, records);
231-
} catch (IOException ioe) {
232-
OperationalMetricsCounter.getInstance().incrementCounter(OperationalMetric.LOCAL_INDEX_EXPORTER_EXCEPTIONS);
233-
logger.error("Unable to index query insights data: ", ioe);
234-
}
235-
} else {
236-
OperationalMetricsCounter.getInstance().incrementCounter(OperationalMetric.LOCAL_INDEX_EXPORTER_EXCEPTIONS);
237-
logger.error("Unable to create query insights index: ", cause);
238-
}
155+
156+
// Proceed with index creation even if there was a template error
157+
// The template might already exist or might be created by another node
158+
try {
159+
createIndex(indexName, records);
160+
} catch (IOException e) {
161+
logger.error("Error creating index:", e);
162+
OperationalMetricsCounter.getInstance().incrementCounter(OperationalMetric.LOCAL_INDEX_EXPORTER_EXCEPTIONS);
239163
}
240164
});
241165
} else {
166+
// Index already exists, just send the data
242167
bulk(indexName, records);
243168
}
244-
} catch (IOException e) {
169+
} catch (Exception e) {
245170
OperationalMetricsCounter.getInstance().incrementCounter(OperationalMetric.LOCAL_INDEX_EXPORTER_EXCEPTIONS);
246-
logger.error("Unable to create query insights exporter: ", e);
171+
logger.error("Unable to export query insights data: ", e);
247172
}
248173
}
249174

175+
/**
176+
* Creates an index with the specified name and exports records to it
177+
*
178+
* @param indexName Name of the index to create
179+
* @param records Records to export
180+
* @throws IOException If there's an error reading mappings
181+
*/
182+
private void createIndex(String indexName, List<SearchQueryRecord> records) throws IOException {
183+
CreateIndexRequest createIndexRequest = new CreateIndexRequest(indexName);
184+
185+
createIndexRequest.settings(
186+
Settings.builder()
187+
.put("index.number_of_shards", DEFAULT_NUMBER_OF_SHARDS)
188+
.put("index.number_of_replicas", DEFAULT_NUMBER_OF_REPLICA)
189+
);
190+
createIndexRequest.mapping(readIndexMappings());
191+
192+
client.admin().indices().create(createIndexRequest, new ActionListener<>() {
193+
@Override
194+
public void onResponse(CreateIndexResponse createIndexResponse) {
195+
if (createIndexResponse.isAcknowledged()) {
196+
try {
197+
bulk(indexName, records);
198+
} catch (IOException e) {
199+
OperationalMetricsCounter.getInstance().incrementCounter(OperationalMetric.LOCAL_INDEX_EXPORTER_EXCEPTIONS);
200+
logger.error("Unable to index query insights data: ", e);
201+
}
202+
}
203+
}
204+
205+
@Override
206+
public void onFailure(Exception e) {
207+
Throwable cause = ExceptionsHelper.unwrapCause(e);
208+
if (cause instanceof ResourceAlreadyExistsException) {
209+
try {
210+
bulk(indexName, records);
211+
} catch (IOException ioe) {
212+
OperationalMetricsCounter.getInstance().incrementCounter(OperationalMetric.LOCAL_INDEX_EXPORTER_EXCEPTIONS);
213+
logger.error("Unable to index query insights data: ", ioe);
214+
}
215+
} else {
216+
OperationalMetricsCounter.getInstance().incrementCounter(OperationalMetric.LOCAL_INDEX_EXPORTER_EXCEPTIONS);
217+
logger.error("Unable to create query insights index: ", cause);
218+
}
219+
}
220+
});
221+
}
222+
250223
private void bulk(final String indexName, final List<SearchQueryRecord> records) throws IOException {
251224
final BulkRequestBuilder bulkRequestBuilder = client.prepareBulk().setTimeout(TimeValue.timeValueMinutes(1));
252225
for (SearchQueryRecord record : records) {
@@ -363,11 +336,53 @@ private String readIndexMappings() throws IOException {
363336
}
364337

365338
/**
366-
* Ensure a template exists for our index pattern with the configured priority
339+
* Ensures that the template exists. This method first checks if the template exists and
340+
* only creates it if it doesn't.
341+
*
342+
* @return CompletableFuture that completes when the template check/creation is done
367343
*/
368-
private void ensureTemplateExists() {
344+
private CompletableFuture<Boolean> ensureTemplateExists() {
345+
CompletableFuture<Boolean> future = new CompletableFuture<>();
369346
String indexPattern = TOP_QUERIES_INDEX_PATTERN_GLOB;
370347

348+
// First check if the template already exists
349+
GetComposableIndexTemplateAction.Request getRequest = new GetComposableIndexTemplateAction.Request(TEMPLATE_NAME);
350+
351+
client.execute(
352+
GetComposableIndexTemplateAction.INSTANCE,
353+
getRequest,
354+
new ActionListener<>() {
355+
@Override
356+
public void onResponse(GetComposableIndexTemplateAction.Response response) {
357+
// If the template exists, we don't need to create it
358+
if (response.indexTemplates().containsKey(TEMPLATE_NAME)) {
359+
logger.debug("Template [{}] already exists, skipping creation", TEMPLATE_NAME);
360+
future.complete(true);
361+
return;
362+
}
363+
364+
// Template doesn't exist, create it
365+
createTemplate(future);
366+
}
367+
368+
@Override
369+
public void onFailure(Exception e) {
370+
// If we can't retrieve the template info, try creating it anyway
371+
logger.warn("Failed to check if template [{}] exists: {}", TEMPLATE_NAME, e.getMessage());
372+
createTemplate(future);
373+
}
374+
}
375+
);
376+
377+
return future;
378+
}
379+
380+
/**
381+
* Helper method to create the template
382+
*
383+
* @param future The CompletableFuture to complete when done
384+
*/
385+
private void createTemplate(CompletableFuture<Boolean> future) {
371386
try {
372387
// Create a V2 template (ComposableIndexTemplate)
373388
CompressedXContent compressedMapping = new CompressedXContent(readIndexMappings());
@@ -385,7 +400,7 @@ private void ensureTemplateExists() {
385400
// Create the composable template
386401
ComposableIndexTemplate composableTemplate =
387402
new ComposableIndexTemplate(
388-
Collections.singletonList(indexPattern),
403+
Collections.singletonList(TOP_QUERIES_INDEX_PATTERN_GLOB),
389404
template,
390405
null, // No composed_of templates
391406
templatePriority, // Priority using configured value
@@ -402,25 +417,29 @@ private void ensureTemplateExists() {
402417
request,
403418
new ActionListener<>() {
404419
@Override
405-
public void onResponse(org.opensearch.action.support.AcknowledgedResponse response) {
420+
public void onResponse(AcknowledgedResponse response) {
406421
if (response.isAcknowledged()) {
407422
logger.info("Successfully created or updated V2 template [{}] with priority {}",
408423
TEMPLATE_NAME, templatePriority);
424+
future.complete(true);
409425
} else {
410426
logger.warn("Failed to create or update V2 template [{}]", TEMPLATE_NAME);
427+
future.complete(false);
411428
}
412429
}
413430

414431
@Override
415432
public void onFailure(Exception e) {
416433
logger.error("Error creating or updating V2 template [{}]", TEMPLATE_NAME, e);
417434
OperationalMetricsCounter.getInstance().incrementCounter(OperationalMetric.LOCAL_INDEX_EXPORTER_EXCEPTIONS);
435+
future.completeExceptionally(e);
418436
}
419437
}
420438
);
421439
} catch (Exception e) {
422440
logger.error("Failed to manage V2 template", e);
423441
OperationalMetricsCounter.getInstance().incrementCounter(OperationalMetric.LOCAL_INDEX_EXPORTER_EXCEPTIONS);
442+
future.completeExceptionally(e);
424443
}
425444
}
426445

0 commit comments

Comments
 (0)