Skip to content

Commit ffbc35c

Browse files
Add stashcontext to connector getter (opensearch-project#2742) (opensearch-project#2745)
Signed-off-by: b4sjoo <sicheng.song@outlook.com> (cherry picked from commit f8d6c7b) Co-authored-by: Sicheng Song <sicheng.song@outlook.com>
1 parent 9ea79d0 commit ffbc35c

File tree

1 file changed

+25
-18
lines changed

1 file changed

+25
-18
lines changed

plugin/src/main/java/org/opensearch/ml/model/MLModelManager.java

+25-18
Original file line numberDiff line numberDiff line change
@@ -1638,26 +1638,33 @@ public void getController(String modelId, ActionListener<MLController> listener)
16381638
*/
16391639
public void getConnector(String connectorId, ActionListener<Connector> listener) {
16401640
GetRequest getRequest = new GetRequest().index(CommonValue.ML_CONNECTOR_INDEX).id(connectorId);
1641-
client.get(getRequest, ActionListener.wrap(r -> {
1642-
if (r != null && r.isExists()) {
1643-
try (
1644-
XContentParser parser = MLNodeUtils
1645-
.createXContentParserFromRegistry(NamedXContentRegistry.EMPTY, r.getSourceAsBytesRef())
1646-
) {
1647-
ensureExpectedToken(XContentParser.Token.START_OBJECT, parser.nextToken(), parser);
1648-
Connector connector = Connector.createConnector(parser);
1649-
listener.onResponse(connector);
1650-
} catch (Exception e) {
1651-
log.error("Failed to parse connector:" + connectorId);
1652-
listener.onFailure(e);
1641+
try (ThreadContext.StoredContext context = client.threadPool().getThreadContext().stashContext()) {
1642+
ActionListener<Connector> wrappedListener = ActionListener.runBefore(listener, context::restore);
1643+
client.get(getRequest, ActionListener.wrap(r -> {
1644+
if (r != null && r.isExists()) {
1645+
try (
1646+
XContentParser parser = MLNodeUtils
1647+
.createXContentParserFromRegistry(NamedXContentRegistry.EMPTY, r.getSourceAsBytesRef())
1648+
) {
1649+
ensureExpectedToken(XContentParser.Token.START_OBJECT, parser.nextToken(), parser);
1650+
Connector connector = Connector.createConnector(parser);
1651+
wrappedListener.onResponse(connector);
1652+
} catch (Exception e) {
1653+
log.error("Failed to parse connector:" + connectorId);
1654+
wrappedListener.onFailure(e);
1655+
}
1656+
} else {
1657+
wrappedListener
1658+
.onFailure(new OpenSearchStatusException("Failed to find connector:" + connectorId, RestStatus.NOT_FOUND));
16531659
}
1654-
} else {
1655-
listener.onFailure(new OpenSearchStatusException("Failed to find connector:" + connectorId, RestStatus.NOT_FOUND));
1656-
}
1657-
}, e -> {
1660+
}, e -> {
1661+
log.error("Failed to get connector", e);
1662+
wrappedListener.onFailure(new OpenSearchStatusException("Failed to get connector:" + connectorId, RestStatus.NOT_FOUND));
1663+
}));
1664+
} catch (Exception e) {
16581665
log.error("Failed to get connector", e);
1659-
listener.onFailure(new OpenSearchStatusException("Failed to get connector:" + connectorId, RestStatus.NOT_FOUND));
1660-
}));
1666+
listener.onFailure(e);
1667+
}
16611668
}
16621669

16631670
/**

0 commit comments

Comments
 (0)