Skip to content

Commit

Permalink
feat(ESCatalog): Ignore unavailable indices to avoid crashes when the…
Browse files Browse the repository at this point in the history
…re is a closed index among alias/datastream indices
  • Loading branch information
kuseman committed May 24, 2024
1 parent 9a441b3 commit 883ed26
Show file tree
Hide file tree
Showing 3 changed files with 86 additions and 17 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ static String getSearchUrl(String endpoint, String index, String type, Integer s
String typePart = useType(type) ? ("/" + type)
: "";

return String.format("%s/%s%s/_search%s?filter_path=_scroll_id,hits.hits%s%s", endpoint, indexPart, typePart, isTemplate ? "/template"
return String.format("%s/%s%s/_search%s?filter_path=_scroll_id,hits.hits&ignore_unavailable=true%s%s", endpoint, indexPart, typePart, isTemplate ? "/template"
: "",
scrollMinutes != null ? ("&scroll=" + scrollMinutes + "m")
: "",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,13 +16,15 @@

import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.time.Duration;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;

import org.apache.commons.io.IOUtils;
import org.apache.commons.io.ThreadUtils;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.hc.client5.http.classic.methods.HttpDelete;
import org.apache.hc.client5.http.classic.methods.HttpGet;
Expand Down Expand Up @@ -1277,6 +1279,47 @@ public void test_datasource_table_scan_batching() throws IOException
assertEquals(3, rowCount);
}

@Test
public void test_ingore_unavailable_indices() throws IOException, InterruptedException
{
createIndex(endpoint, "ignoretest_opened");
createIndex(endpoint, "ignoretest_closed");

for (Pair<String, Map<String, Object>> p : testData)
{
index(endpoint, "ignoretest_opened", type, MAPPER.writeValueAsString(p.getValue()), p.getKey());
index(endpoint, "ignoretest_closed", type, MAPPER.writeValueAsString(p.getValue()), p.getKey());
}

// Let indices be created before closing
ThreadUtils.sleep(Duration.ofMillis(250));

closeIndex(endpoint, "ignoretest_closed");

ESDatasource.Data data = new ESDatasource.Data();
IExecutionContext context = mockExecutionContext(CATALOG_ALIAS, Map.of("endpoint", endpoint, "index", "ignoretest_opened,ignoretest_closed"), 0, data);
IDatasource ds = catalog.getScanDataSource(context.getSession(), CATALOG_ALIAS, QualifiedName.of(version.getStrategy()
.supportsTypes() ? type
: ESCatalog.SINGLE_TYPE_TABLE_NAME),
new DatasourceData(0, Optional.empty(), emptyList(), emptyList(), emptyList(), emptyList()));

IDatasourceOptions options = mockOptions(500);

TupleIterator it = ds.execute(context, options);
int rowCount = 0;
while (it.hasNext())
{
TupleVector next = it.next();

assertVectorsEquals(vv(Type.Any, "ignoretest_opened", "ignoretest_opened", "ignoretest_opened"), next.getColumn(0));

rowCount += next.getRowCount();
}
it.close();

assertEquals(3, rowCount);
}

protected Map<String, Object> modifyMappingProperties(Map<String, Object> mappings)
{
// Wrap the mapping with a type
Expand Down Expand Up @@ -1354,6 +1397,22 @@ protected void createIndex(String endpoint, String index) throws IOException
createIndex(endpoint, index, null);
}

protected void closeIndex(String endpoint, String index) throws IOException
{
HttpPost post = new HttpPost(endpoint + "/" + index + "/_close");
HttpClientUtils.execute("", post, null, null, null, null, null, null, response ->
{
if (!(response.getCode() >= 200
&& response.getCode() < 299))
{
String error = IOUtils.toString(response.getEntity()
.getContent(), StandardCharsets.UTF_8);
throw new RuntimeException("Error closing index: " + error);
}
return null;
});
}

protected void createIndex(String endpoint, String index, Map<String, Object> body) throws IOException
{
HttpPut put = new HttpPut(endpoint + "/" + index);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,35 +41,45 @@ public void test_getSearchTemplateUrl_fail()
public void test_getSearchTemplateUrl()
{
// Global type (_doc)
assertEquals("http://localhost:9200/*/_search/template?filter_path=_scroll_id,hits.hits", ESQueryUtils.getSearchUrl("http://localhost:9200", null, "_doc", null, null, true));
assertEquals("http://localhost:9200/myIndex/_search/template?filter_path=_scroll_id,hits.hits", ESQueryUtils.getSearchUrl("http://localhost:9200", "myIndex", "_doc", null, null, true));
assertEquals("http://localhost:9200/myIndex/_search/template?filter_path=_scroll_id,hits.hits&size=100",
assertEquals("http://localhost:9200/*/_search/template?filter_path=_scroll_id,hits.hits&ignore_unavailable=true",
ESQueryUtils.getSearchUrl("http://localhost:9200", null, "_doc", null, null, true));
assertEquals("http://localhost:9200/myIndex/_search/template?filter_path=_scroll_id,hits.hits&ignore_unavailable=true",
ESQueryUtils.getSearchUrl("http://localhost:9200", "myIndex", "_doc", null, null, true));
assertEquals("http://localhost:9200/myIndex/_search/template?filter_path=_scroll_id,hits.hits&ignore_unavailable=true&size=100",
ESQueryUtils.getSearchUrl("http://localhost:9200", "myIndex", "_doc", 100, null, true));
assertEquals("http://localhost:9200/myIndex/_search/template?filter_path=_scroll_id,hits.hits&scroll=2m", ESQueryUtils.getSearchUrl("http://localhost:9200", "myIndex", "_doc", null, 2, true));
assertEquals("http://localhost:9200/myIndex/_search/template?filter_path=_scroll_id,hits.hits&scroll=2m&size=300",
assertEquals("http://localhost:9200/myIndex/_search/template?filter_path=_scroll_id,hits.hits&ignore_unavailable=true&scroll=2m",
ESQueryUtils.getSearchUrl("http://localhost:9200", "myIndex", "_doc", null, 2, true));
assertEquals("http://localhost:9200/myIndex/_search/template?filter_path=_scroll_id,hits.hits&ignore_unavailable=true&scroll=2m&size=300",
ESQueryUtils.getSearchUrl("http://localhost:9200", "myIndex", "_doc", 300, 2, true));

// With specific type
assertEquals("http://localhost:9200/*/type/_search/template?filter_path=_scroll_id,hits.hits", ESQueryUtils.getSearchUrl("http://localhost:9200", null, "type", null, null, true));
assertEquals("http://localhost:9200/myIndex/type/_search/template?filter_path=_scroll_id,hits.hits", ESQueryUtils.getSearchUrl("http://localhost:9200", "myIndex", "type", null, null, true));
assertEquals("http://localhost:9200/myIndex/type/_search/template?filter_path=_scroll_id,hits.hits&size=100",
assertEquals("http://localhost:9200/*/type/_search/template?filter_path=_scroll_id,hits.hits&ignore_unavailable=true",
ESQueryUtils.getSearchUrl("http://localhost:9200", null, "type", null, null, true));
assertEquals("http://localhost:9200/myIndex/type/_search/template?filter_path=_scroll_id,hits.hits&ignore_unavailable=true",
ESQueryUtils.getSearchUrl("http://localhost:9200", "myIndex", "type", null, null, true));
assertEquals("http://localhost:9200/myIndex/type/_search/template?filter_path=_scroll_id,hits.hits&ignore_unavailable=true&size=100",
ESQueryUtils.getSearchUrl("http://localhost:9200", "myIndex", "type", 100, null, true));
assertEquals("http://localhost:9200/myIndex/type/_search/template?filter_path=_scroll_id,hits.hits&scroll=2m",
assertEquals("http://localhost:9200/myIndex/type/_search/template?filter_path=_scroll_id,hits.hits&ignore_unavailable=true&scroll=2m",
ESQueryUtils.getSearchUrl("http://localhost:9200", "myIndex", "type", null, 2, true));
assertEquals("http://localhost:9200/myIndex/type/_search/template?filter_path=_scroll_id,hits.hits&scroll=2m&size=300",
assertEquals("http://localhost:9200/myIndex/type/_search/template?filter_path=_scroll_id,hits.hits&ignore_unavailable=true&scroll=2m&size=300",
ESQueryUtils.getSearchUrl("http://localhost:9200", "myIndex", "type", 300, 2, true));

}

@Test
public void test_getSearchUrl()
{
assertEquals("http://localhost:9200/*/_search?filter_path=_scroll_id,hits.hits", ESQueryUtils.getSearchUrl("http://localhost:9200", null, "_doc", null, null, false));
assertEquals("http://localhost:9200/myindex/_search?filter_path=_scroll_id,hits.hits", ESQueryUtils.getSearchUrl("http://localhost:9200", "myindex", null, null, null, false));
assertEquals("http://localhost:9200/myindex/_search?filter_path=_scroll_id,hits.hits", ESQueryUtils.getSearchUrl("http://localhost:9200", "myindex", "_doc", null, null, false));
assertEquals("http://localhost:9200/myindex/_search?filter_path=_scroll_id,hits.hits&size=100", ESQueryUtils.getSearchUrl("http://localhost:9200", "myindex", "_doc", 100, null, false));
assertEquals("http://localhost:9200/myindex/_search?filter_path=_scroll_id,hits.hits&scroll=2m", ESQueryUtils.getSearchUrl("http://localhost:9200", "myindex", "_doc", null, 2, false));
assertEquals("http://localhost:9200/myindex/_search?filter_path=_scroll_id,hits.hits&scroll=2m&size=200", ESQueryUtils.getSearchUrl("http://localhost:9200", "myindex", "_doc", 200, 2, false));
assertEquals("http://localhost:9200/*/_search?filter_path=_scroll_id,hits.hits&ignore_unavailable=true", ESQueryUtils.getSearchUrl("http://localhost:9200", null, "_doc", null, null, false));
assertEquals("http://localhost:9200/myindex/_search?filter_path=_scroll_id,hits.hits&ignore_unavailable=true",
ESQueryUtils.getSearchUrl("http://localhost:9200", "myindex", null, null, null, false));
assertEquals("http://localhost:9200/myindex/_search?filter_path=_scroll_id,hits.hits&ignore_unavailable=true",
ESQueryUtils.getSearchUrl("http://localhost:9200", "myindex", "_doc", null, null, false));
assertEquals("http://localhost:9200/myindex/_search?filter_path=_scroll_id,hits.hits&ignore_unavailable=true&size=100",
ESQueryUtils.getSearchUrl("http://localhost:9200", "myindex", "_doc", 100, null, false));
assertEquals("http://localhost:9200/myindex/_search?filter_path=_scroll_id,hits.hits&ignore_unavailable=true&scroll=2m",
ESQueryUtils.getSearchUrl("http://localhost:9200", "myindex", "_doc", null, 2, false));
assertEquals("http://localhost:9200/myindex/_search?filter_path=_scroll_id,hits.hits&ignore_unavailable=true&scroll=2m&size=200",
ESQueryUtils.getSearchUrl("http://localhost:9200", "myindex", "_doc", 200, 2, false));
}

@Test(
Expand Down

0 comments on commit 883ed26

Please sign in to comment.