Skip to content

Commit 4f4d758

Browse files
committed
Exception handling
Signed-off-by: Craig Perkins <cwperx@amazon.com>
1 parent 18737dc commit 4f4d758

File tree

11 files changed

+247
-287
lines changed

11 files changed

+247
-287
lines changed

server/src/main/java/org/opensearch/cluster/service/ClusterApplierService.java

+10-12
Original file line numberDiff line numberDiff line change
@@ -396,8 +396,8 @@ private void submitStateUpdateTask(
396396
}
397397
final ThreadContext threadContext = threadPool.getThreadContext();
398398
final Supplier<ThreadContext.StoredContext> supplier = threadContext.newRestorableContext(true);
399-
try {
400-
SystemSubject.getInstance().runAs(() -> {
399+
SystemSubject.getInstance().runAs(() -> {
400+
try {
401401
final UpdateTask updateTask = new UpdateTask(
402402
config.priority(),
403403
source,
@@ -414,17 +414,15 @@ private void submitStateUpdateTask(
414414
} else {
415415
threadPoolExecutor.execute(updateTask);
416416
}
417-
return null;
418-
});
419-
} catch (OpenSearchRejectedExecutionException e) {
420-
// ignore cases where we are shutting down..., there is really nothing interesting
421-
// to be done here...
422-
if (!lifecycle.stoppedOrClosed()) {
423-
throw e;
417+
} catch (OpenSearchRejectedExecutionException e) {
418+
// ignore cases where we are shutting down..., there is really nothing interesting
419+
// to be done here...
420+
if (!lifecycle.stoppedOrClosed()) {
421+
throw e;
422+
}
424423
}
425-
} catch (Exception e) {
426-
throw new RuntimeException(e);
427-
}
424+
return null;
425+
});
428426
}
429427

430428
/** asserts that the current thread is <b>NOT</b> the cluster state update thread */

server/src/main/java/org/opensearch/cluster/service/MasterService.java

+11-12
Original file line numberDiff line numberDiff line change
@@ -1022,24 +1022,23 @@ public <T> void submitStateUpdateTasks(
10221022
}
10231023
final ThreadContext threadContext = threadPool.getThreadContext();
10241024
final Supplier<ThreadContext.StoredContext> supplier = threadContext.newRestorableContext(true);
1025-
try {
1026-
SystemSubject.getInstance().runAs(() -> {
1025+
1026+
SystemSubject.getInstance().runAs(() -> {
1027+
try {
10271028
List<Batcher.UpdateTask> safeTasks = tasks.entrySet()
10281029
.stream()
10291030
.map(e -> taskBatcher.new UpdateTask(config.priority(), source, e.getKey(), safe(e.getValue(), supplier), executor))
10301031
.collect(Collectors.toList());
10311032
taskBatcher.submitTasks(safeTasks, config.timeout());
1032-
return null;
1033-
});
1034-
} catch (OpenSearchRejectedExecutionException e) {
1035-
// ignore cases where we are shutting down..., there is really nothing interesting
1036-
// to be done here...
1037-
if (!lifecycle.stoppedOrClosed()) {
1038-
throw e;
1033+
} catch (OpenSearchRejectedExecutionException e) {
1034+
// ignore cases where we are shutting down..., there is really nothing interesting
1035+
// to be done here...
1036+
if (!lifecycle.stoppedOrClosed()) {
1037+
throw e;
1038+
}
10391039
}
1040-
} catch (Exception e) {
1041-
throw new RuntimeException(e);
1042-
}
1040+
return null;
1041+
});
10431042
}
10441043

10451044
public ClusterStateStats getClusterStateStats() {

server/src/main/java/org/opensearch/http/AbstractHttpServerTransport.java

+14-19
Original file line numberDiff line numberDiff line change
@@ -385,26 +385,21 @@ public void incomingRequest(final HttpRequest httpRequest, final HttpChannel htt
385385
// Visible for testing
386386
void dispatchRequest(final RestRequest restRequest, final RestChannel channel, final Throwable badRequestCause) {
387387
final ThreadContext threadContext = threadPool.getThreadContext();
388-
try {
389-
SystemSubject.getInstance().runAs(() -> {
390-
RestChannel traceableRestChannel = channel;
391-
final Span span = tracer.startSpan(SpanBuilder.from(restRequest));
392-
try (final SpanScope spanScope = tracer.withSpanInScope(span)) {
393-
if (channel != null) {
394-
traceableRestChannel = TraceableRestChannel.create(channel, span, tracer);
395-
}
396-
if (badRequestCause != null) {
397-
dispatcher.dispatchBadRequest(traceableRestChannel, threadContext, badRequestCause);
398-
} else {
399-
dispatcher.dispatchRequest(restRequest, traceableRestChannel, threadContext);
400-
}
388+
SystemSubject.getInstance().runAs(() -> {
389+
RestChannel traceableRestChannel = channel;
390+
final Span span = tracer.startSpan(SpanBuilder.from(restRequest));
391+
try (final SpanScope spanScope = tracer.withSpanInScope(span)) {
392+
if (channel != null) {
393+
traceableRestChannel = TraceableRestChannel.create(channel, span, tracer);
401394
}
402-
return null;
403-
});
404-
} catch (Exception e) {
405-
throw new RuntimeException(e);
406-
}
407-
395+
if (badRequestCause != null) {
396+
dispatcher.dispatchBadRequest(traceableRestChannel, threadContext, badRequestCause);
397+
} else {
398+
dispatcher.dispatchRequest(restRequest, traceableRestChannel, threadContext);
399+
}
400+
}
401+
return null;
402+
});
408403
}
409404

410405
private void handleIncomingRequest(final HttpRequest httpRequest, final HttpChannel httpChannel, final Exception exception) {

server/src/main/java/org/opensearch/identity/SystemSubject.java

+6-2
Original file line numberDiff line numberDiff line change
@@ -45,11 +45,15 @@ public Principal getPrincipal() {
4545
}
4646

4747
@Override
48-
public <T> T runAs(Callable<T> callable) throws Exception {
48+
public <T> T runAs(Callable<T> callable) throws RuntimeException {
4949
ThreadContext threadContext = threadPool.getThreadContext();
5050
try (ThreadContext.StoredContext ctx = threadPool.getThreadContext().stashContext()) {
5151
ThreadContextAccess.doPrivilegedVoid(threadContext::markAsSystemContext);
52-
return callable.call();
52+
try {
53+
return callable.call();
54+
} catch (Exception e) {
55+
throw new RuntimeException(e);
56+
}
5357
}
5458
}
5559

server/src/main/java/org/opensearch/index/seqno/GlobalCheckpointSyncAction.java

+8-12
Original file line numberDiff line numberDiff line change
@@ -96,18 +96,14 @@ public GlobalCheckpointSyncAction(
9696
}
9797

9898
public void updateGlobalCheckpointForShard(final ShardId shardId) {
99-
try {
100-
SystemSubject.getInstance().runAs(() -> {
101-
execute(new Request(shardId), ActionListener.wrap(r -> {}, e -> {
102-
if (ExceptionsHelper.unwrap(e, AlreadyClosedException.class, IndexShardClosedException.class) == null) {
103-
logger.info(new ParameterizedMessage("{} global checkpoint sync failed", shardId), e);
104-
}
105-
}));
106-
return null;
107-
});
108-
} catch (Exception e) {
109-
throw new RuntimeException(e);
110-
}
99+
SystemSubject.getInstance().runAs(() -> {
100+
execute(new Request(shardId), ActionListener.wrap(r -> {}, e -> {
101+
if (ExceptionsHelper.unwrap(e, AlreadyClosedException.class, IndexShardClosedException.class) == null) {
102+
logger.info(new ParameterizedMessage("{} global checkpoint sync failed", shardId), e);
103+
}
104+
}));
105+
return null;
106+
});
111107
}
112108

113109
@Override

server/src/main/java/org/opensearch/index/seqno/RetentionLeaseBackgroundSyncAction.java

+44-52
Original file line numberDiff line numberDiff line change
@@ -119,63 +119,55 @@ protected void doExecute(Task task, Request request, ActionListener<ReplicationR
119119
}
120120

121121
final void backgroundSync(ShardId shardId, String primaryAllocationId, long primaryTerm, RetentionLeases retentionLeases) {
122-
try {
123-
SystemSubject.getInstance().runAs(() -> {
124-
final Request request = new Request(shardId, retentionLeases);
125-
final ReplicationTask task = (ReplicationTask) taskManager.register(
126-
"transport",
127-
"retention_lease_background_sync",
128-
request
129-
);
130-
transportService.sendChildRequest(
131-
clusterService.localNode(),
132-
transportPrimaryAction,
133-
new ConcreteShardRequest<>(request, primaryAllocationId, primaryTerm),
134-
task,
135-
transportOptions,
136-
new TransportResponseHandler<ReplicationResponse>() {
137-
@Override
138-
public ReplicationResponse read(StreamInput in) throws IOException {
139-
return newResponseInstance(in);
140-
}
122+
SystemSubject.getInstance().runAs(() -> {
123+
final Request request = new Request(shardId, retentionLeases);
124+
final ReplicationTask task = (ReplicationTask) taskManager.register("transport", "retention_lease_background_sync", request);
125+
transportService.sendChildRequest(
126+
clusterService.localNode(),
127+
transportPrimaryAction,
128+
new ConcreteShardRequest<>(request, primaryAllocationId, primaryTerm),
129+
task,
130+
transportOptions,
131+
new TransportResponseHandler<ReplicationResponse>() {
132+
@Override
133+
public ReplicationResponse read(StreamInput in) throws IOException {
134+
return newResponseInstance(in);
135+
}
141136

142-
@Override
143-
public String executor() {
144-
return ThreadPool.Names.SAME;
145-
}
137+
@Override
138+
public String executor() {
139+
return ThreadPool.Names.SAME;
140+
}
146141

147-
@Override
148-
public void handleResponse(ReplicationResponse response) {
149-
task.setPhase("finished");
150-
taskManager.unregister(task);
151-
}
142+
@Override
143+
public void handleResponse(ReplicationResponse response) {
144+
task.setPhase("finished");
145+
taskManager.unregister(task);
146+
}
152147

153-
@Override
154-
public void handleException(TransportException e) {
155-
task.setPhase("finished");
156-
taskManager.unregister(task);
157-
if (ExceptionsHelper.unwrap(e, NodeClosedException.class) != null) {
158-
// node shutting down
159-
return;
160-
}
161-
if (ExceptionsHelper.unwrap(
162-
e,
163-
IndexNotFoundException.class,
164-
AlreadyClosedException.class,
165-
IndexShardClosedException.class
166-
) != null) {
167-
// the index was deleted or the shard is closed
168-
return;
169-
}
170-
getLogger().warn(new ParameterizedMessage("{} retention lease background sync failed", shardId), e);
148+
@Override
149+
public void handleException(TransportException e) {
150+
task.setPhase("finished");
151+
taskManager.unregister(task);
152+
if (ExceptionsHelper.unwrap(e, NodeClosedException.class) != null) {
153+
// node shutting down
154+
return;
155+
}
156+
if (ExceptionsHelper.unwrap(
157+
e,
158+
IndexNotFoundException.class,
159+
AlreadyClosedException.class,
160+
IndexShardClosedException.class
161+
) != null) {
162+
// the index was deleted or the shard is closed
163+
return;
171164
}
165+
getLogger().warn(new ParameterizedMessage("{} retention lease background sync failed", shardId), e);
172166
}
173-
);
174-
return null;
175-
});
176-
} catch (Exception e) {
177-
throw new RuntimeException(e);
178-
}
167+
}
168+
);
169+
return null;
170+
});
179171
}
180172

181173
@Override

server/src/main/java/org/opensearch/index/seqno/RetentionLeaseSyncAction.java

+40-44
Original file line numberDiff line numberDiff line change
@@ -134,55 +134,51 @@ final void sync(
134134
RetentionLeases retentionLeases,
135135
ActionListener<ReplicationResponse> listener
136136
) {
137-
try {
138-
SystemSubject.getInstance().runAs(() -> {
139-
final Request request = new Request(shardId, retentionLeases);
140-
final ReplicationTask task = (ReplicationTask) taskManager.register("transport", "retention_lease_sync", request);
141-
transportService.sendChildRequest(
142-
clusterService.localNode(),
143-
transportPrimaryAction,
144-
new ConcreteShardRequest<>(request, primaryAllocationId, primaryTerm),
145-
task,
146-
transportOptions,
147-
new TransportResponseHandler<ReplicationResponse>() {
148-
@Override
149-
public ReplicationResponse read(StreamInput in) throws IOException {
150-
return newResponseInstance(in);
151-
}
137+
SystemSubject.getInstance().runAs(() -> {
138+
final Request request = new Request(shardId, retentionLeases);
139+
final ReplicationTask task = (ReplicationTask) taskManager.register("transport", "retention_lease_sync", request);
140+
transportService.sendChildRequest(
141+
clusterService.localNode(),
142+
transportPrimaryAction,
143+
new ConcreteShardRequest<>(request, primaryAllocationId, primaryTerm),
144+
task,
145+
transportOptions,
146+
new TransportResponseHandler<ReplicationResponse>() {
147+
@Override
148+
public ReplicationResponse read(StreamInput in) throws IOException {
149+
return newResponseInstance(in);
150+
}
152151

153-
@Override
154-
public String executor() {
155-
return ThreadPool.Names.SAME;
156-
}
152+
@Override
153+
public String executor() {
154+
return ThreadPool.Names.SAME;
155+
}
157156

158-
@Override
159-
public void handleResponse(ReplicationResponse response) {
160-
task.setPhase("finished");
161-
taskManager.unregister(task);
162-
listener.onResponse(response);
163-
}
157+
@Override
158+
public void handleResponse(ReplicationResponse response) {
159+
task.setPhase("finished");
160+
taskManager.unregister(task);
161+
listener.onResponse(response);
162+
}
164163

165-
@Override
166-
public void handleException(TransportException e) {
167-
if (ExceptionsHelper.unwrap(
168-
e,
169-
IndexNotFoundException.class,
170-
AlreadyClosedException.class,
171-
IndexShardClosedException.class
172-
) == null) {
173-
getLogger().warn(new ParameterizedMessage("{} retention lease sync failed", shardId), e);
174-
}
175-
task.setPhase("finished");
176-
taskManager.unregister(task);
177-
listener.onFailure(e);
164+
@Override
165+
public void handleException(TransportException e) {
166+
if (ExceptionsHelper.unwrap(
167+
e,
168+
IndexNotFoundException.class,
169+
AlreadyClosedException.class,
170+
IndexShardClosedException.class
171+
) == null) {
172+
getLogger().warn(new ParameterizedMessage("{} retention lease sync failed", shardId), e);
178173
}
174+
task.setPhase("finished");
175+
taskManager.unregister(task);
176+
listener.onFailure(e);
179177
}
180-
);
181-
return null;
182-
});
183-
} catch (Exception e) {
184-
throw new RuntimeException(e);
185-
}
178+
}
179+
);
180+
return null;
181+
});
186182
}
187183

188184
@Override

0 commit comments

Comments
 (0)