Skip to content

Commit 0edeb40

Browse files
committed
using publish cluster-state as fallback
1 parent 92463f4 commit 0edeb40

File tree

6 files changed

+35
-1
lines changed

6 files changed

+35
-1
lines changed

server/src/main/java/org/opensearch/action/support/clustermanager/TransportClusterManagerNodeAction.java

+6-1
Original file line numberDiff line numberDiff line change
@@ -381,7 +381,12 @@ public void handleResponse(GetTermVersionResponse response) {
381381
if (isLatestClusterStatePresentOnLocalNode) {
382382
onLatestLocalState.accept(clusterState);
383383
} else {
384-
onStaleLocalState.accept(clusterManagerNode, clusterState);
384+
ClusterState publishState = clusterService.publishState();
385+
if (publishState != null && response.matches(publishState)) {
386+
onLatestLocalState.accept(publishState);
387+
} else {
388+
onStaleLocalState.accept(clusterManagerNode, clusterState);
389+
}
385390
}
386391
}
387392

server/src/main/java/org/opensearch/cluster/coordination/Coordinator.java

+4
Original file line numberDiff line numberDiff line change
@@ -464,6 +464,10 @@ && getCurrentTerm() == ZEN1_BWC_TERM
464464
ensureTermAtLeast(sourceNode, publishRequest.getAcceptedState().term());
465465
final PublishResponse publishResponse = coordinationState.get().handlePublishRequest(publishRequest);
466466

467+
final ClusterState publishState = hideStateIfNotRecovered(coordinationState.get().getLastAcceptedState());
468+
applierState = mode == Mode.CANDIDATE ? clusterStateWithNoClusterManagerBlock(publishState) : publishState;
469+
clusterApplier.onPublishClusterState(publishRequest.toString(), () -> publishState);
470+
467471
if (sourceNode.equals(getLocalNode())) {
468472
preVoteCollector.update(getPreVoteResponse(), getLocalNode());
469473
} else {

server/src/main/java/org/opensearch/cluster/service/ClusterApplier.java

+2
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,8 @@ public interface ClusterApplier {
4949
*/
5050
void setInitialState(ClusterState initialState);
5151

52+
void onPublishClusterState(String source, Supplier<ClusterState> clusterStateSupplier);
53+
5254
/**
5355
* Method to invoke when a new cluster state is available to be applied
5456
*

server/src/main/java/org/opensearch/cluster/service/ClusterApplierService.java

+14
Original file line numberDiff line numberDiff line change
@@ -118,6 +118,7 @@ public class ClusterApplierService extends AbstractLifecycleComponent implements
118118

119119
private final Collection<ClusterStateListener> clusterStateListeners = new CopyOnWriteArrayList<>();
120120
private final Map<TimeoutClusterStateListener, NotifyTimeout> timeoutClusterStateListeners = new ConcurrentHashMap<>();
121+
private final AtomicReference<ClusterState> publishState; // last published state
121122

122123
private final AtomicReference<ClusterState> state; // last applied state
123124

@@ -139,6 +140,7 @@ public ClusterApplierService(
139140
) {
140141
this.clusterSettings = clusterSettings;
141142
this.threadPool = threadPool;
143+
this.publishState = new AtomicReference<>();
142144
this.state = new AtomicReference<>();
143145
this.nodeName = nodeName;
144146

@@ -232,6 +234,10 @@ public ClusterState state() {
232234
return clusterState;
233235
}
234236

237+
public ClusterState publishState() {
238+
return publishState.get();
239+
}
240+
235241
/**
236242
* Returns true if the appliedClusterState is not null
237243
*/
@@ -367,6 +373,14 @@ public ThreadPool threadPool() {
367373
return threadPool;
368374
}
369375

376+
@Override
377+
public void onPublishClusterState(final String source, final Supplier<ClusterState> clusterStateSupplier) {
378+
ClusterState nextState = clusterStateSupplier.get();
379+
if (nextState != null) {
380+
publishState.set(nextState);
381+
}
382+
}
383+
370384
@Override
371385
public void onNewClusterState(
372386
final String source,

server/src/main/java/org/opensearch/cluster/service/ClusterService.java

+4
Original file line numberDiff line numberDiff line change
@@ -183,6 +183,10 @@ public ClusterState state() {
183183
return clusterApplierService.state();
184184
}
185185

186+
public ClusterState publishState() {
187+
return clusterApplierService.publishState();
188+
}
189+
186190
/**
187191
* Adds a high priority applier of updated cluster states.
188192
*/

server/src/test/java/org/opensearch/cluster/coordination/NoOpClusterApplier.java

+5
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,11 @@ public void setInitialState(ClusterState initialState) {
4242

4343
}
4444

45+
@Override
46+
public void onPublishClusterState(String source, Supplier<ClusterState> clusterStateSupplier) {
47+
48+
}
49+
4550
@Override
4651
public void onNewClusterState(String source, Supplier<ClusterState> clusterStateSupplier, ClusterApplyListener listener) {
4752
listener.onSuccess(source);

0 commit comments

Comments
 (0)