Skip to content

Commit 6f1b59e

Browse files
Add logic in master service to optimize performance and retain detailed logging for critical cluster operations. (opensearch-project#16421)
Signed-off-by: Sumit Bansal <sumitsb@amazon.com> Signed-off-by: shwetathareja <shwetathareja@live.com> Co-authored-by: shwetathareja <shwetathareja@live.com>
1 parent b2d537a commit 6f1b59e

File tree

4 files changed

+64
-47
lines changed

4 files changed

+64
-47
lines changed

CHANGELOG.md

+1
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
3131
- URI path filtering support in cluster stats API ([#15938](https://github.com/opensearch-project/OpenSearch/pull/15938))
3232
- [Star Tree - Search] Add support for metric aggregations with/without term query ([15289](https://github.com/opensearch-project/OpenSearch/pull/15289))
3333
- Add support for restoring from snapshot with search replicas ([#16111](https://github.com/opensearch-project/OpenSearch/pull/16111))
34+
- Add logic in master service to optimize performance and retain detailed logging for critical cluster operations. ([#14795](https://github.com/opensearch-project/OpenSearch/pull/14795))
3435
- Add Setting to adjust the primary constraint weights ([#16471](https://github.com/opensearch-project/OpenSearch/pull/16471))
3536

3637
### Dependencies

server/src/main/java/org/opensearch/cluster/service/MasterService.java

+18-14
Original file line numberDiff line numberDiff line change
@@ -299,33 +299,37 @@ public static boolean assertNotMasterUpdateThread(String reason) {
299299
}
300300

301301
private void runTasks(TaskInputs taskInputs) {
302-
final String longSummary = logger.isTraceEnabled() ? taskInputs.taskSummaryGenerator.apply(true) : "";
303-
final String shortSummary = taskInputs.taskSummaryGenerator.apply(false);
302+
final String summary;
303+
if (logger.isTraceEnabled()) {
304+
summary = taskInputs.taskSummaryGenerator.apply(true);
305+
} else {
306+
summary = taskInputs.taskSummaryGenerator.apply(false);
307+
}
304308

305309
if (!lifecycle.started()) {
306-
logger.debug("processing [{}]: ignoring, cluster-manager service not started", shortSummary);
310+
logger.debug("processing [{}]: ignoring, cluster-manager service not started", summary);
307311
return;
308312
}
309313

310314
if (logger.isTraceEnabled()) {
311-
logger.trace("executing cluster state update for [{}]", longSummary);
315+
logger.trace("executing cluster state update for [{}]", summary);
312316
} else {
313-
logger.debug("executing cluster state update for [{}]", shortSummary);
317+
logger.debug("executing cluster state update for [{}]", summary);
314318
}
315319

316320
final ClusterState previousClusterState = state();
317321

318322
if (!previousClusterState.nodes().isLocalNodeElectedClusterManager() && taskInputs.runOnlyWhenClusterManager()) {
319-
logger.debug("failing [{}]: local node is no longer cluster-manager", shortSummary);
323+
logger.debug("failing [{}]: local node is no longer cluster-manager", summary);
320324
taskInputs.onNoLongerClusterManager();
321325
return;
322326
}
323327

324328
final long computationStartTime = threadPool.preciseRelativeTimeInNanos();
325-
final TaskOutputs taskOutputs = calculateTaskOutputs(taskInputs, previousClusterState, shortSummary);
329+
final TaskOutputs taskOutputs = calculateTaskOutputs(taskInputs, previousClusterState, summary);
326330
taskOutputs.notifyFailedTasks();
327331
final TimeValue computationTime = getTimeSince(computationStartTime);
328-
logExecutionTime(computationTime, "compute cluster state update", shortSummary);
332+
logExecutionTime(computationTime, "compute cluster state update", summary);
329333

330334
clusterManagerMetrics.recordLatency(
331335
clusterManagerMetrics.clusterStateComputeHistogram,
@@ -337,25 +341,25 @@ private void runTasks(TaskInputs taskInputs) {
337341
final long notificationStartTime = threadPool.preciseRelativeTimeInNanos();
338342
taskOutputs.notifySuccessfulTasksOnUnchangedClusterState();
339343
final TimeValue executionTime = getTimeSince(notificationStartTime);
340-
logExecutionTime(executionTime, "notify listeners on unchanged cluster state", shortSummary);
344+
logExecutionTime(executionTime, "notify listeners on unchanged cluster state", summary);
341345
} else {
342346
final ClusterState newClusterState = taskOutputs.newClusterState;
343347
if (logger.isTraceEnabled()) {
344-
logger.trace("cluster state updated, source [{}]\n{}", longSummary, newClusterState);
348+
logger.trace("cluster state updated, source [{}]\n{}", summary, newClusterState);
345349
} else {
346-
logger.debug("cluster state updated, version [{}], source [{}]", newClusterState.version(), shortSummary);
350+
logger.debug("cluster state updated, version [{}], source [{}]", newClusterState.version(), summary);
347351
}
348352
final long publicationStartTime = threadPool.preciseRelativeTimeInNanos();
349353
try {
350-
ClusterChangedEvent clusterChangedEvent = new ClusterChangedEvent(shortSummary, newClusterState, previousClusterState);
354+
ClusterChangedEvent clusterChangedEvent = new ClusterChangedEvent(summary, newClusterState, previousClusterState);
351355
// new cluster state, notify all listeners
352356
final DiscoveryNodes.Delta nodesDelta = clusterChangedEvent.nodesDelta();
353357
if (nodesDelta.hasChanges() && logger.isInfoEnabled()) {
354358
String nodesDeltaSummary = nodesDelta.shortSummary();
355359
if (nodesDeltaSummary.length() > 0) {
356360
logger.info(
357361
"{}, term: {}, version: {}, delta: {}",
358-
shortSummary,
362+
summary,
359363
newClusterState.term(),
360364
newClusterState.version(),
361365
nodesDeltaSummary
@@ -366,7 +370,7 @@ private void runTasks(TaskInputs taskInputs) {
366370
logger.debug("publishing cluster state version [{}]", newClusterState.version());
367371
publish(clusterChangedEvent, taskOutputs, publicationStartTime);
368372
} catch (Exception e) {
369-
handleException(shortSummary, publicationStartTime, newClusterState, e);
373+
handleException(summary, publicationStartTime, newClusterState, e);
370374
}
371375
}
372376
}

server/src/main/java/org/opensearch/cluster/service/TaskBatcher.java

+23-11
Original file line numberDiff line numberDiff line change
@@ -195,25 +195,37 @@ void runIfNotProcessed(BatchedTask updateTask) {
195195
if (toExecute.isEmpty() == false) {
196196
Function<Boolean, String> taskSummaryGenerator = (longSummaryRequired) -> {
197197
if (longSummaryRequired == null || !longSummaryRequired) {
198-
return buildShortSummary(updateTask.batchingKey, toExecute.size());
198+
final List<BatchedTask> sampleTasks = toExecute.stream()
199+
.limit(Math.min(1000, toExecute.size()))
200+
.collect(Collectors.toList());
201+
return buildShortSummary(updateTask.batchingKey, toExecute.size(), getSummary(updateTask, sampleTasks));
199202
}
200-
final Map<String, List<BatchedTask>> processTasksBySource = new HashMap<>();
201-
for (final BatchedTask task : toExecute) {
202-
processTasksBySource.computeIfAbsent(task.source, s -> new ArrayList<>()).add(task);
203-
}
204-
return processTasksBySource.entrySet().stream().map(entry -> {
205-
String tasks = updateTask.describeTasks(entry.getValue());
206-
return tasks.isEmpty() ? entry.getKey() : entry.getKey() + "[" + tasks + "]";
207-
}).reduce((s1, s2) -> s1 + ", " + s2).orElse("");
203+
return getSummary(updateTask, toExecute);
208204
};
209205
taskBatcherListener.onBeginProcessing(toExecute);
210206
run(updateTask.batchingKey, toExecute, taskSummaryGenerator);
211207
}
212208
}
213209
}
214210

215-
private String buildShortSummary(final Object batchingKey, final int taskCount) {
216-
return "Tasks batched with key: " + batchingKey.toString().split("\\$")[0] + " and count: " + taskCount;
211+
private String getSummary(final BatchedTask updateTask, final List<BatchedTask> toExecute) {
212+
final Map<String, List<BatchedTask>> processTasksBySource = new HashMap<>();
213+
for (final BatchedTask task : toExecute) {
214+
processTasksBySource.computeIfAbsent(task.source, s -> new ArrayList<>()).add(task);
215+
}
216+
return processTasksBySource.entrySet().stream().map(entry -> {
217+
String tasks = updateTask.describeTasks(entry.getValue());
218+
return tasks.isEmpty() ? entry.getKey() : entry.getKey() + "[" + tasks + "]";
219+
}).reduce((s1, s2) -> s1 + ", " + s2).orElse("");
220+
}
221+
222+
private String buildShortSummary(final Object batchingKey, final int taskCount, final String sampleTasks) {
223+
return "Tasks batched with key: "
224+
+ batchingKey.toString().split("\\$")[0]
225+
+ ", count:"
226+
+ taskCount
227+
+ " and sample tasks: "
228+
+ sampleTasks;
217229
}
218230

219231
/**

0 commit comments

Comments
 (0)