Skip to content

Commit 6fa3a0d

Browse files
authored
Fix bug where replication lag grows post primary relocation (opensearch-project#11238)
* Fix bug where replication lag grows post primary relocation Signed-off-by: Marc Handalian <marc.handalian@gmail.com> * Fix broken UT Signed-off-by: Marc Handalian <marc.handalian@gmail.com> * add unit test for cluster state update Signed-off-by: Marc Handalian <marc.handalian@gmail.com> * PR feedback Signed-off-by: Marc Handalian <marc.handalian@gmail.com> * add changelog entry Signed-off-by: Marc Handalian <marc.handalian@gmail.com> --------- Signed-off-by: Marc Handalian <marc.handalian@gmail.com>
1 parent 21c0597 commit 6fa3a0d

File tree

7 files changed

+209
-16
lines changed

7 files changed

+209
-16
lines changed

CHANGELOG.md

+1
Original file line numberDiff line numberDiff line change
@@ -175,6 +175,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
175175
- Fix for stuck update action in a bulk with `retry_on_conflict` property ([#11152](https://github.com/opensearch-project/OpenSearch/issues/11152))
176176
- Remove shadowJar from `lang-painless` module publication ([#11369](https://github.com/opensearch-project/OpenSearch/issues/11369))
177177
- Fix remote shards balancer and remove unused variables ([#11167](https://github.com/opensearch-project/OpenSearch/pull/11167))
178+
- Fix bug where replication lag grows post primary relocation ([#11238](https://github.com/opensearch-project/OpenSearch/pull/11238))
178179

179180
### Security
180181

server/src/internalClusterTest/java/org/opensearch/remotestore/SegmentReplicationUsingRemoteStoreDisruptionIT.java

+77
Original file line numberDiff line numberDiff line change
@@ -8,10 +8,16 @@
88

99
package org.opensearch.remotestore;
1010

11+
import org.opensearch.action.admin.cluster.health.ClusterHealthResponse;
12+
import org.opensearch.action.admin.cluster.stats.ClusterStatsResponse;
13+
import org.opensearch.cluster.metadata.IndexMetadata;
14+
import org.opensearch.cluster.routing.allocation.command.MoveAllocationCommand;
15+
import org.opensearch.common.Priority;
1116
import org.opensearch.common.settings.Settings;
1217
import org.opensearch.common.unit.TimeValue;
1318
import org.opensearch.core.index.Index;
1419
import org.opensearch.index.IndexService;
20+
import org.opensearch.index.ReplicationStats;
1521
import org.opensearch.index.shard.IndexShard;
1622
import org.opensearch.indices.IndicesService;
1723
import org.opensearch.indices.replication.SegmentReplicationState;
@@ -20,10 +26,12 @@
2026
import org.opensearch.indices.replication.common.ReplicationCollection;
2127
import org.opensearch.test.InternalTestCluster;
2228
import org.opensearch.test.OpenSearchIntegTestCase;
29+
import org.opensearch.test.disruption.SlowClusterStateProcessing;
2330

2431
import java.nio.file.Path;
2532
import java.util.Optional;
2633
import java.util.Set;
34+
import java.util.concurrent.TimeUnit;
2735

2836
/**
2937
* This class runs tests with remote store + segRep while blocking file downloads
@@ -111,6 +119,75 @@ public void testCancelReplicationWhileFetchingMetadata() throws Exception {
111119
cleanupRepo();
112120
}
113121

122+
public void testUpdateVisibleCheckpointWithLaggingClusterStateUpdates_primaryRelocation() throws Exception {
123+
Path location = randomRepoPath().toAbsolutePath();
124+
Settings nodeSettings = Settings.builder().put(buildRemoteStoreNodeAttributes(location, 0d, "metadata", Long.MAX_VALUE)).build();
125+
internalCluster().startClusterManagerOnlyNode(nodeSettings);
126+
internalCluster().startDataOnlyNodes(2, nodeSettings);
127+
final Settings indexSettings = Settings.builder().put(indexSettings()).put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 1).build();
128+
createIndex(INDEX_NAME, indexSettings);
129+
ensureGreen(INDEX_NAME);
130+
final Set<String> dataNodeNames = internalCluster().getDataNodeNames();
131+
final String replicaNode = getNode(dataNodeNames, false);
132+
final String oldPrimary = getNode(dataNodeNames, true);
133+
134+
// index a doc.
135+
client().prepareIndex(INDEX_NAME).setId("1").setSource("foo", randomInt()).get();
136+
refresh(INDEX_NAME);
137+
138+
logger.info("--> start another node");
139+
final String newPrimary = internalCluster().startDataOnlyNode(nodeSettings);
140+
ClusterHealthResponse clusterHealthResponse = client().admin()
141+
.cluster()
142+
.prepareHealth()
143+
.setWaitForEvents(Priority.LANGUID)
144+
.setWaitForNodes("4")
145+
.get();
146+
assertEquals(clusterHealthResponse.isTimedOut(), false);
147+
148+
SlowClusterStateProcessing disruption = new SlowClusterStateProcessing(replicaNode, random(), 0, 0, 1000, 2000);
149+
internalCluster().setDisruptionScheme(disruption);
150+
disruption.startDisrupting();
151+
152+
// relocate the primary
153+
logger.info("--> relocate the shard");
154+
client().admin()
155+
.cluster()
156+
.prepareReroute()
157+
.add(new MoveAllocationCommand(INDEX_NAME, 0, oldPrimary, newPrimary))
158+
.execute()
159+
.actionGet();
160+
clusterHealthResponse = client().admin()
161+
.cluster()
162+
.prepareHealth()
163+
.setWaitForEvents(Priority.LANGUID)
164+
.setWaitForNoRelocatingShards(true)
165+
.setTimeout(new TimeValue(5, TimeUnit.MINUTES))
166+
.execute()
167+
.actionGet();
168+
assertEquals(clusterHealthResponse.isTimedOut(), false);
169+
170+
IndexShard newPrimary_shard = getIndexShard(newPrimary, INDEX_NAME);
171+
IndexShard replica = getIndexShard(replicaNode, INDEX_NAME);
172+
assertBusy(() -> {
173+
assertEquals(
174+
newPrimary_shard.getLatestReplicationCheckpoint().getSegmentInfosVersion(),
175+
replica.getLatestReplicationCheckpoint().getSegmentInfosVersion()
176+
);
177+
});
178+
179+
assertBusy(() -> {
180+
ClusterStatsResponse clusterStatsResponse = client().admin().cluster().prepareClusterStats().get();
181+
ReplicationStats replicationStats = clusterStatsResponse.getIndicesStats().getSegments().getReplicationStats();
182+
assertEquals(0L, replicationStats.maxBytesBehind);
183+
assertEquals(0L, replicationStats.maxReplicationLag);
184+
assertEquals(0L, replicationStats.totalBytesBehind);
185+
});
186+
disruption.stopDisrupting();
187+
disableRepoConsistencyCheck("Remote Store Creates System Repository");
188+
cleanupRepo();
189+
}
190+
114191
private String getNode(Set<String> dataNodeNames, boolean primary) {
115192
assertEquals(2, dataNodeNames.size());
116193
for (String name : dataNodeNames) {

server/src/main/java/org/opensearch/index/seqno/ReplicationTracker.java

+4-2
Original file line numberDiff line numberDiff line change
@@ -1322,8 +1322,10 @@ private SegmentReplicationShardStats buildShardStats(final String allocationId,
13221322
allocationId,
13231323
cps.checkpointTimers.size(),
13241324
bytesBehind,
1325-
cps.checkpointTimers.values().stream().mapToLong(SegmentReplicationLagTimer::time).max().orElse(0),
1326-
cps.checkpointTimers.values().stream().mapToLong(SegmentReplicationLagTimer::totalElapsedTime).max().orElse(0),
1325+
bytesBehind > 0L ? cps.checkpointTimers.values().stream().mapToLong(SegmentReplicationLagTimer::time).max().orElse(0) : 0,
1326+
bytesBehind > 0L
1327+
? cps.checkpointTimers.values().stream().mapToLong(SegmentReplicationLagTimer::totalElapsedTime).max().orElse(0)
1328+
: 0,
13271329
cps.lastCompletedReplicationLag
13281330
);
13291331
}

server/src/main/java/org/opensearch/index/shard/IndexShard.java

+2-8
Original file line numberDiff line numberDiff line change
@@ -1764,8 +1764,8 @@ public final boolean shouldProcessCheckpoint(ReplicationCheckpoint requestCheckp
17641764
if (isSegmentReplicationAllowed() == false) {
17651765
return false;
17661766
}
1767-
ReplicationCheckpoint localCheckpoint = getLatestReplicationCheckpoint();
1768-
if (localCheckpoint.isAheadOf(requestCheckpoint)) {
1767+
final ReplicationCheckpoint localCheckpoint = getLatestReplicationCheckpoint();
1768+
if (requestCheckpoint.isAheadOf(localCheckpoint) == false) {
17691769
logger.trace(
17701770
() -> new ParameterizedMessage(
17711771
"Ignoring new replication checkpoint - Shard is already on checkpoint {} that is ahead of {}",
@@ -1775,12 +1775,6 @@ public final boolean shouldProcessCheckpoint(ReplicationCheckpoint requestCheckp
17751775
);
17761776
return false;
17771777
}
1778-
if (localCheckpoint.equals(requestCheckpoint)) {
1779-
logger.trace(
1780-
() -> new ParameterizedMessage("Ignoring new replication checkpoint - Shard is already on checkpoint {}", requestCheckpoint)
1781-
);
1782-
return false;
1783-
}
17841778
return true;
17851779
}
17861780

server/src/main/java/org/opensearch/indices/replication/SegmentReplicationTargetService.java

+53-2
Original file line numberDiff line numberDiff line change
@@ -15,17 +15,21 @@
1515
import org.opensearch.ExceptionsHelper;
1616
import org.opensearch.OpenSearchCorruptionException;
1717
import org.opensearch.action.support.ChannelActionListener;
18+
import org.opensearch.cluster.ClusterChangedEvent;
19+
import org.opensearch.cluster.ClusterStateListener;
1820
import org.opensearch.cluster.node.DiscoveryNode;
1921
import org.opensearch.cluster.routing.ShardRouting;
2022
import org.opensearch.cluster.service.ClusterService;
2123
import org.opensearch.common.Nullable;
24+
import org.opensearch.common.lifecycle.AbstractLifecycleComponent;
2225
import org.opensearch.common.settings.Settings;
2326
import org.opensearch.common.util.CancellableThreads;
2427
import org.opensearch.common.util.concurrent.AbstractRunnable;
2528
import org.opensearch.common.util.concurrent.ConcurrentCollections;
2629
import org.opensearch.core.action.ActionListener;
2730
import org.opensearch.core.index.shard.ShardId;
2831
import org.opensearch.core.transport.TransportResponse;
32+
import org.opensearch.index.IndexService;
2933
import org.opensearch.index.shard.IndexEventListener;
3034
import org.opensearch.index.shard.IndexShard;
3135
import org.opensearch.index.shard.IndexShardState;
@@ -61,7 +65,7 @@
6165
*
6266
* @opensearch.internal
6367
*/
64-
public class SegmentReplicationTargetService implements IndexEventListener {
68+
public class SegmentReplicationTargetService extends AbstractLifecycleComponent implements ClusterStateListener, IndexEventListener {
6569

6670
private static final Logger logger = LogManager.getLogger(SegmentReplicationTargetService.class);
6771

@@ -144,6 +148,53 @@ public SegmentReplicationTargetService(
144148
);
145149
}
146150

151+
@Override
152+
protected void doStart() {
153+
if (DiscoveryNode.isDataNode(clusterService.getSettings())) {
154+
clusterService.addListener(this);
155+
}
156+
}
157+
158+
@Override
159+
protected void doStop() {
160+
if (DiscoveryNode.isDataNode(clusterService.getSettings())) {
161+
clusterService.removeListener(this);
162+
}
163+
}
164+
165+
@Override
166+
protected void doClose() throws IOException {
167+
168+
}
169+
170+
@Override
171+
public void clusterChanged(ClusterChangedEvent event) {
172+
if (event.routingTableChanged()) {
173+
for (IndexService indexService : indicesService) {
174+
if (indexService.getIndexSettings().isSegRepEnabled() && event.indexRoutingTableChanged(indexService.index().getName())) {
175+
for (IndexShard shard : indexService) {
176+
if (shard.routingEntry().primary() == false) {
177+
// for this shard look up its primary routing, if it has completed a relocation trigger replication
178+
final String previousNode = event.previousState()
179+
.routingTable()
180+
.shardRoutingTable(shard.shardId())
181+
.primaryShard()
182+
.currentNodeId();
183+
final String currentNode = event.state()
184+
.routingTable()
185+
.shardRoutingTable(shard.shardId())
186+
.primaryShard()
187+
.currentNodeId();
188+
if (previousNode.equals(currentNode) == false) {
189+
processLatestReceivedCheckpoint(shard, Thread.currentThread());
190+
}
191+
}
192+
}
193+
}
194+
}
195+
}
196+
}
197+
147198
/**
148199
* Cancel any replications on this node for a replica that is about to be closed.
149200
*/
@@ -395,7 +446,7 @@ private DiscoveryNode getPrimaryNode(ShardRouting primaryShard) {
395446
// visible to tests
396447
protected boolean processLatestReceivedCheckpoint(IndexShard replicaShard, Thread thread) {
397448
final ReplicationCheckpoint latestPublishedCheckpoint = latestReceivedCheckpoint.get(replicaShard.shardId());
398-
if (latestPublishedCheckpoint != null && latestPublishedCheckpoint.isAheadOf(replicaShard.getLatestReplicationCheckpoint())) {
449+
if (latestPublishedCheckpoint != null) {
399450
logger.trace(
400451
() -> new ParameterizedMessage(
401452
"Processing latest received checkpoint for shard {} {}",

server/src/main/java/org/opensearch/node/Node.java

+2
Original file line numberDiff line numberDiff line change
@@ -1414,6 +1414,7 @@ public Node start() throws NodeValidationException {
14141414
assert transportService.getLocalNode().equals(localNodeFactory.getNode())
14151415
: "transportService has a different local node than the factory provided";
14161416
injector.getInstance(PeerRecoverySourceService.class).start();
1417+
injector.getInstance(SegmentReplicationTargetService.class).start();
14171418
injector.getInstance(SegmentReplicationSourceService.class).start();
14181419

14191420
final RemoteClusterStateService remoteClusterStateService = injector.getInstance(RemoteClusterStateService.class);
@@ -1602,6 +1603,7 @@ public synchronized void close() throws IOException {
16021603
toClose.add(injector.getInstance(IndicesStore.class));
16031604
toClose.add(injector.getInstance(PeerRecoverySourceService.class));
16041605
toClose.add(injector.getInstance(SegmentReplicationSourceService.class));
1606+
toClose.add(injector.getInstance(SegmentReplicationTargetService.class));
16051607
toClose.add(() -> stopWatch.stop().start("cluster"));
16061608
toClose.add(injector.getInstance(ClusterService.class));
16071609
toClose.add(() -> stopWatch.stop().start("node_connections_service"));

server/src/test/java/org/opensearch/indices/replication/SegmentReplicationTargetServiceTests.java

+70-4
Original file line numberDiff line numberDiff line change
@@ -12,18 +12,26 @@
1212
import org.opensearch.ExceptionsHelper;
1313
import org.opensearch.OpenSearchException;
1414
import org.opensearch.Version;
15+
import org.opensearch.cluster.ClusterChangedEvent;
16+
import org.opensearch.cluster.ClusterName;
1517
import org.opensearch.cluster.ClusterState;
1618
import org.opensearch.cluster.metadata.IndexMetadata;
1719
import org.opensearch.cluster.node.DiscoveryNode;
1820
import org.opensearch.cluster.node.DiscoveryNodes;
21+
import org.opensearch.cluster.routing.IndexRoutingTable;
22+
import org.opensearch.cluster.routing.RecoverySource;
1923
import org.opensearch.cluster.routing.RoutingTable;
24+
import org.opensearch.cluster.routing.ShardRouting;
25+
import org.opensearch.cluster.routing.ShardRoutingState;
26+
import org.opensearch.cluster.routing.UnassignedInfo;
2027
import org.opensearch.cluster.service.ClusterService;
2128
import org.opensearch.common.settings.ClusterSettings;
2229
import org.opensearch.common.settings.Settings;
2330
import org.opensearch.common.util.CancellableThreads;
2431
import org.opensearch.core.action.ActionListener;
2532
import org.opensearch.core.index.shard.ShardId;
2633
import org.opensearch.core.transport.TransportResponse;
34+
import org.opensearch.index.IndexService;
2735
import org.opensearch.index.engine.NRTReplicationEngineFactory;
2836
import org.opensearch.index.replication.TestReplicationSource;
2937
import org.opensearch.index.shard.IndexShard;
@@ -51,6 +59,7 @@
5159
import java.io.IOException;
5260
import java.util.Collections;
5361
import java.util.List;
62+
import java.util.Set;
5463
import java.util.concurrent.CountDownLatch;
5564
import java.util.concurrent.TimeUnit;
5665
import java.util.function.BiConsumer;
@@ -91,6 +100,8 @@ public class SegmentReplicationTargetServiceTests extends IndexShardTestCase {
91100
private SegmentReplicationState state;
92101
private ReplicationCheckpoint initialCheckpoint;
93102

103+
private ClusterState clusterState;
104+
94105
private static final long TRANSPORT_TIMEOUT = 30000;// 30sec
95106

96107
@Override
@@ -129,7 +140,7 @@ public void setUp() throws Exception {
129140

130141
indicesService = mock(IndicesService.class);
131142
ClusterService clusterService = mock(ClusterService.class);
132-
ClusterState clusterState = mock(ClusterState.class);
143+
clusterState = mock(ClusterState.class);
133144
RoutingTable mockRoutingTable = mock(RoutingTable.class);
134145
when(clusterService.state()).thenReturn(clusterState);
135146
when(clusterState.routingTable()).thenReturn(mockRoutingTable);
@@ -465,9 +476,22 @@ public void testStartReplicationListenerFailure() throws InterruptedException {
465476
verify(spy, (never())).updateVisibleCheckpoint(eq(0L), eq(replicaShard));
466477
}
467478

468-
public void testDoNotProcessLatestCheckpointIfItIsbehind() {
469-
sut.updateLatestReceivedCheckpoint(replicaShard.getLatestReplicationCheckpoint(), replicaShard);
470-
assertFalse(sut.processLatestReceivedCheckpoint(replicaShard, null));
479+
public void testDoNotProcessLatestCheckpointIfCheckpointIsBehind() {
480+
SegmentReplicationTargetService service = spy(sut);
481+
doReturn(mock(SegmentReplicationTarget.class)).when(service).startReplication(any(), any(), any());
482+
ReplicationCheckpoint checkpoint = replicaShard.getLatestReplicationCheckpoint();
483+
service.updateLatestReceivedCheckpoint(checkpoint, replicaShard);
484+
service.processLatestReceivedCheckpoint(replicaShard, null);
485+
verify(service, times(0)).startReplication(eq(replicaShard), eq(checkpoint), any());
486+
}
487+
488+
public void testProcessLatestCheckpointIfCheckpointAhead() {
489+
SegmentReplicationTargetService service = spy(sut);
490+
doNothing().when(service).startReplication(any());
491+
doReturn(mock(SegmentReplicationTarget.class)).when(service).startReplication(any(), any(), any());
492+
service.updateLatestReceivedCheckpoint(aheadCheckpoint, replicaShard);
493+
service.processLatestReceivedCheckpoint(replicaShard, null);
494+
verify(service, times(1)).startReplication(eq(replicaShard), eq(aheadCheckpoint), any());
471495
}
472496

473497
public void testOnNewCheckpointInvokedOnClosedShardDoesNothing() throws IOException {
@@ -617,4 +641,46 @@ public void onReplicationFailure(SegmentReplicationState state, ReplicationFaile
617641
target.cancel("test");
618642
sut.startReplication(target);
619643
}
644+
645+
public void testProcessCheckpointOnClusterStateUpdate() {
646+
// set up mocks on indicies & index service to return our replica's index & shard.
647+
IndexService indexService = mock(IndexService.class);
648+
when(indexService.iterator()).thenReturn(Set.of(replicaShard).iterator());
649+
when(indexService.getIndexSettings()).thenReturn(replicaShard.indexSettings());
650+
when(indexService.index()).thenReturn(replicaShard.routingEntry().index());
651+
when(indicesService.iterator()).thenReturn(Set.of(indexService).iterator());
652+
653+
// create old & new cluster states
654+
final String targetNodeId = "targetNodeId";
655+
ShardRouting initialRouting = primaryShard.routingEntry().relocate(targetNodeId, 0L);
656+
assertEquals(ShardRoutingState.RELOCATING, initialRouting.state());
657+
658+
ShardRouting targetRouting = ShardRouting.newUnassigned(
659+
primaryShard.shardId(),
660+
true,
661+
RecoverySource.PeerRecoverySource.INSTANCE,
662+
new UnassignedInfo(UnassignedInfo.Reason.REINITIALIZED, "test")
663+
).initialize(targetNodeId, initialRouting.allocationId().getId(), 0L).moveToStarted();
664+
assertEquals(targetNodeId, targetRouting.currentNodeId());
665+
assertEquals(ShardRoutingState.STARTED, targetRouting.state());
666+
ClusterState oldState = ClusterState.builder(ClusterName.DEFAULT)
667+
.routingTable(
668+
RoutingTable.builder()
669+
.add(IndexRoutingTable.builder(primaryShard.shardId().getIndex()).addShard(initialRouting).build())
670+
.build()
671+
)
672+
.build();
673+
ClusterState newState = ClusterState.builder(ClusterName.DEFAULT)
674+
.routingTable(
675+
RoutingTable.builder()
676+
.add(IndexRoutingTable.builder(primaryShard.shardId().getIndex()).addShard(targetRouting).build())
677+
.build()
678+
)
679+
.build();
680+
681+
// spy so we can verify process is invoked
682+
SegmentReplicationTargetService spy = spy(sut);
683+
spy.clusterChanged(new ClusterChangedEvent("ignored", oldState, newState));
684+
verify(spy, times(1)).processLatestReceivedCheckpoint(eq(replicaShard), any());
685+
}
620686
}

0 commit comments

Comments
 (0)