Skip to content

Commit 70b9089

Browse files
mch2austintlee
authored andcommitted
Segment Replication - Remove redundant replica doc parsing on writes. (opensearch-project#7279)
This change removes unnecessary doc parsing currently performed on replicas by updating applyIndexOperationOnReplicas to pass a doc id from the primary. Signed-off-by: Marc Handalian <handalm@amazon.com>
1 parent cede2c9 commit 70b9089

File tree

8 files changed

+78
-1
lines changed

8 files changed

+78
-1
lines changed

server/src/main/java/org/opensearch/action/bulk/TransportShardBulkAction.java

+1
Original file line numberDiff line numberDiff line change
@@ -857,6 +857,7 @@ private static Engine.Result performOpOnReplica(
857857
indexRequest.routing()
858858
);
859859
result = replica.applyIndexOperationOnReplica(
860+
primaryResponse.getId(),
860861
primaryResponse.getSeqNo(),
861862
primaryResponse.getPrimaryTerm(),
862863
primaryResponse.getVersion(),

server/src/main/java/org/opensearch/index/shard/IndexShard.java

+33
Original file line numberDiff line numberDiff line change
@@ -911,13 +911,31 @@ public Engine.IndexResult applyIndexOperationOnPrimary(
911911
}
912912

913913
public Engine.IndexResult applyIndexOperationOnReplica(
914+
String id,
914915
long seqNo,
915916
long opPrimaryTerm,
916917
long version,
917918
long autoGeneratedTimeStamp,
918919
boolean isRetry,
919920
SourceToParse sourceToParse
920921
) throws IOException {
922+
if (indexSettings.isSegRepEnabled()) {
923+
Engine.Index index = new Engine.Index(
924+
new Term(IdFieldMapper.NAME, Uid.encodeId(id)),
925+
new ParsedDocument(null, null, id, null, null, sourceToParse.source(), sourceToParse.getXContentType(), null),
926+
seqNo,
927+
opPrimaryTerm,
928+
version,
929+
null,
930+
Engine.Operation.Origin.REPLICA,
931+
System.nanoTime(),
932+
autoGeneratedTimeStamp,
933+
isRetry,
934+
UNASSIGNED_SEQ_NO,
935+
0
936+
);
937+
return getEngine().index(index);
938+
}
921939
return applyIndexOperation(
922940
getEngine(),
923941
seqNo,
@@ -1128,6 +1146,21 @@ public Engine.DeleteResult applyDeleteOperationOnPrimary(
11281146
}
11291147

11301148
public Engine.DeleteResult applyDeleteOperationOnReplica(long seqNo, long opPrimaryTerm, long version, String id) throws IOException {
1149+
if (indexSettings.isSegRepEnabled()) {
1150+
final Engine.Delete delete = new Engine.Delete(
1151+
id,
1152+
new Term(IdFieldMapper.NAME, Uid.encodeId(id)),
1153+
seqNo,
1154+
opPrimaryTerm,
1155+
version,
1156+
null,
1157+
Engine.Operation.Origin.REPLICA,
1158+
System.nanoTime(),
1159+
UNASSIGNED_SEQ_NO,
1160+
0
1161+
);
1162+
return getEngine().delete(delete);
1163+
}
11311164
return applyDeleteOperation(
11321165
getEngine(),
11331166
seqNo,

server/src/test/java/org/opensearch/index/replication/RecoveryDuringReplicationTests.java

+1
Original file line numberDiff line numberDiff line change
@@ -152,6 +152,7 @@ public void testRecoveryToReplicaThatReceivedExtraDocument() throws Exception {
152152
final IndexShard remainingReplica = shards.getReplicas().get(1);
153153
// slip the extra document into the replica
154154
remainingReplica.applyIndexOperationOnReplica(
155+
"id",
155156
remainingReplica.getLocalCheckpoint() + 1,
156157
remainingReplica.getOperationPrimaryTerm(),
157158
1,

server/src/test/java/org/opensearch/index/shard/IndexShardTests.java

+11
Original file line numberDiff line numberDiff line change
@@ -170,6 +170,7 @@
170170
import java.util.Locale;
171171
import java.util.Map;
172172
import java.util.Set;
173+
import java.util.UUID;
173174
import java.util.concurrent.BrokenBarrierException;
174175
import java.util.concurrent.CountDownLatch;
175176
import java.util.concurrent.CyclicBarrier;
@@ -2297,6 +2298,7 @@ public void testRecoverFromStoreWithOutOfOrderDelete() throws IOException {
22972298
shard.applyDeleteOperationOnReplica(1, primaryTerm, 2, "id");
22982299
shard.getEngine().translogManager().rollTranslogGeneration(); // isolate the delete in it's own generation
22992300
shard.applyIndexOperationOnReplica(
2301+
UUID.randomUUID().toString(),
23002302
0,
23012303
primaryTerm,
23022304
1,
@@ -2305,6 +2307,7 @@ public void testRecoverFromStoreWithOutOfOrderDelete() throws IOException {
23052307
new SourceToParse(shard.shardId().getIndexName(), "id", new BytesArray("{}"), XContentType.JSON)
23062308
);
23072309
shard.applyIndexOperationOnReplica(
2310+
UUID.randomUUID().toString(),
23082311
3,
23092312
primaryTerm,
23102313
3,
@@ -2315,6 +2318,7 @@ public void testRecoverFromStoreWithOutOfOrderDelete() throws IOException {
23152318
// Flushing a new commit with local checkpoint=1 allows to skip the translog gen #1 in recovery.
23162319
shard.flush(new FlushRequest().force(true).waitIfOngoing(true));
23172320
shard.applyIndexOperationOnReplica(
2321+
UUID.randomUUID().toString(),
23182322
2,
23192323
primaryTerm,
23202324
3,
@@ -2323,6 +2327,7 @@ public void testRecoverFromStoreWithOutOfOrderDelete() throws IOException {
23232327
new SourceToParse(shard.shardId().getIndexName(), "id-2", new BytesArray("{}"), XContentType.JSON)
23242328
);
23252329
shard.applyIndexOperationOnReplica(
2330+
UUID.randomUUID().toString(),
23262331
5,
23272332
primaryTerm,
23282333
1,
@@ -2470,6 +2475,7 @@ public void testRecoverFromStoreWithNoOps() throws IOException {
24702475
updateMappings(otherShard, shard.indexSettings().getIndexMetadata());
24712476
SourceToParse sourceToParse = new SourceToParse(shard.shardId().getIndexName(), "1", new BytesArray("{}"), XContentType.JSON);
24722477
otherShard.applyIndexOperationOnReplica(
2478+
UUID.randomUUID().toString(),
24732479
1,
24742480
otherShard.getOperationPrimaryTerm(),
24752481
1,
@@ -2597,6 +2603,7 @@ public void testRecoverFromStoreRemoveStaleOperations() throws Exception {
25972603
final String indexName = shard.shardId().getIndexName();
25982604
// Index #0, index #1
25992605
shard.applyIndexOperationOnReplica(
2606+
UUID.randomUUID().toString(),
26002607
0,
26012608
primaryTerm,
26022609
1,
@@ -2607,6 +2614,7 @@ public void testRecoverFromStoreRemoveStaleOperations() throws Exception {
26072614
flushShard(shard);
26082615
shard.updateGlobalCheckpointOnReplica(0, "test"); // stick the global checkpoint here.
26092616
shard.applyIndexOperationOnReplica(
2617+
UUID.randomUUID().toString(),
26102618
1,
26112619
primaryTerm,
26122620
1,
@@ -2619,6 +2627,7 @@ public void testRecoverFromStoreRemoveStaleOperations() throws Exception {
26192627
shard.getEngine().translogManager().rollTranslogGeneration();
26202628
shard.markSeqNoAsNoop(1, primaryTerm, "test");
26212629
shard.applyIndexOperationOnReplica(
2630+
UUID.randomUUID().toString(),
26222631
2,
26232632
primaryTerm,
26242633
1,
@@ -4009,6 +4018,7 @@ private Result indexOnReplicaWithGaps(final IndexShard indexShard, final int ope
40094018
XContentType.JSON
40104019
);
40114020
indexShard.applyIndexOperationOnReplica(
4021+
UUID.randomUUID().toString(),
40124022
i,
40134023
indexShard.getOperationPrimaryTerm(),
40144024
1,
@@ -4633,6 +4643,7 @@ public void testDoNotTrimCommitsWhenOpenReadOnlyEngine() throws Exception {
46334643
seqNo++; // create gaps in sequence numbers
46344644
}
46354645
shard.applyIndexOperationOnReplica(
4646+
UUID.randomUUID().toString(),
46364647
seqNo,
46374648
shard.getOperationPrimaryTerm(),
46384649
1,

server/src/test/java/org/opensearch/index/shard/SegmentReplicationIndexShardTests.java

+23-1
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,8 @@
3838
import org.opensearch.index.replication.TestReplicationSource;
3939
import org.opensearch.index.store.Store;
4040
import org.opensearch.index.store.StoreFileMetadata;
41+
import org.opensearch.index.translog.SnapshotMatchers;
42+
import org.opensearch.index.translog.Translog;
4143
import org.opensearch.indices.recovery.RecoverySettings;
4244
import org.opensearch.indices.recovery.RecoveryTarget;
4345
import org.opensearch.indices.replication.CheckpointInfoResponse;
@@ -176,6 +178,7 @@ public void testSegmentReplication_Index_Update_Delete() throws Exception {
176178
shards.index(new IndexRequest(index.getName()).id(String.valueOf(i)).source("{\"foo\": \"bar\"}", XContentType.JSON));
177179
}
178180

181+
assertEqualTranslogOperations(shards, primaryShard);
179182
primaryShard.refresh("Test");
180183
replicateSegments(primaryShard, shards.getReplicas());
181184

@@ -189,7 +192,7 @@ public void testSegmentReplication_Index_Update_Delete() throws Exception {
189192
);
190193
}
191194
}
192-
195+
assertEqualTranslogOperations(shards, primaryShard);
193196
primaryShard.refresh("Test");
194197
replicateSegments(primaryShard, shards.getReplicas());
195198
shards.assertAllEqual(numDocs);
@@ -204,6 +207,7 @@ public void testSegmentReplication_Index_Update_Delete() throws Exception {
204207
shards.delete(new DeleteRequest(index.getName()).id(String.valueOf(i)));
205208
}
206209
}
210+
assertEqualTranslogOperations(shards, primaryShard);
207211
primaryShard.refresh("Test");
208212
replicateSegments(primaryShard, shards.getReplicas());
209213
final List<DocIdSeqNoAndSource> docsAfterDelete = getDocIdAndSeqNos(primaryShard);
@@ -753,6 +757,7 @@ public void testNRTReplicaPromotedAsPrimary() throws Exception {
753757
for (IndexShard shard : shards.getReplicas()) {
754758
assertDocCounts(shard, numDocs, numDocs);
755759
}
760+
assertEqualTranslogOperations(shards, oldPrimary);
756761

757762
// 2. Create ops that are in the replica's xlog, not in the index.
758763
// index some more into both but don't replicate. replica will have only numDocs searchable, but should have totalDocs
@@ -761,6 +766,7 @@ public void testNRTReplicaPromotedAsPrimary() throws Exception {
761766
final int totalDocs = numDocs + additonalDocs;
762767

763768
assertDocCounts(oldPrimary, totalDocs, totalDocs);
769+
assertEqualTranslogOperations(shards, oldPrimary);
764770
for (IndexShard shard : shards.getReplicas()) {
765771
assertDocCounts(shard, totalDocs, numDocs);
766772
}
@@ -1083,4 +1089,20 @@ private void assertEqualCommittedSegments(IndexShard primary, IndexShard... repl
10831089
assertTrue(diff.missing.isEmpty());
10841090
}
10851091
}
1092+
1093+
private void assertEqualTranslogOperations(ReplicationGroup shards, IndexShard primaryShard) throws IOException {
1094+
try (final Translog.Snapshot snapshot = getTranslog(primaryShard).newSnapshot()) {
1095+
List<Translog.Operation> operations = new ArrayList<>();
1096+
Translog.Operation op;
1097+
while ((op = snapshot.next()) != null) {
1098+
final Translog.Operation newOp = op;
1099+
operations.add(newOp);
1100+
}
1101+
for (IndexShard replica : shards.getReplicas()) {
1102+
try (final Translog.Snapshot replicaSnapshot = getTranslog(replica).newSnapshot()) {
1103+
assertThat(replicaSnapshot, SnapshotMatchers.containsOperationsInAnyOrder(operations));
1104+
}
1105+
}
1106+
}
1107+
}
10861108
}

server/src/test/java/org/opensearch/indices/recovery/PeerRecoveryTargetServiceTests.java

+2
Original file line numberDiff line numberDiff line change
@@ -65,6 +65,7 @@
6565
import java.util.Collections;
6666
import java.util.List;
6767
import java.util.Optional;
68+
import java.util.UUID;
6869
import java.util.concurrent.ArrayBlockingQueue;
6970
import java.util.concurrent.BlockingQueue;
7071
import java.util.concurrent.CyclicBarrier;
@@ -182,6 +183,7 @@ private SeqNoStats populateRandomData(IndexShard shard) throws IOException {
182183
Randomness.shuffle(seqNos);
183184
for (long seqNo : seqNos) {
184185
shard.applyIndexOperationOnReplica(
186+
UUID.randomUUID().toString(),
185187
seqNo,
186188
1,
187189
shard.getOperationPrimaryTerm(),

server/src/test/java/org/opensearch/indices/recovery/RecoveryTests.java

+5
Original file line numberDiff line numberDiff line change
@@ -78,6 +78,7 @@
7878
import java.util.HashMap;
7979
import java.util.List;
8080
import java.util.Map;
81+
import java.util.UUID;
8182
import java.util.concurrent.CountDownLatch;
8283
import java.util.concurrent.Future;
8384
import java.util.concurrent.atomic.AtomicReference;
@@ -181,6 +182,7 @@ public void testRecoveryWithOutOfOrderDeleteWithSoftDeletes() throws Exception {
181182
orgReplica.flush(new FlushRequest().force(true)); // isolate delete#1 in its own translog generation and lucene segment
182183
// index #0
183184
orgReplica.applyIndexOperationOnReplica(
185+
UUID.randomUUID().toString(),
184186
0,
185187
primaryTerm,
186188
1,
@@ -190,6 +192,7 @@ public void testRecoveryWithOutOfOrderDeleteWithSoftDeletes() throws Exception {
190192
);
191193
// index #3
192194
orgReplica.applyIndexOperationOnReplica(
195+
UUID.randomUUID().toString(),
193196
3,
194197
primaryTerm,
195198
1,
@@ -201,6 +204,7 @@ public void testRecoveryWithOutOfOrderDeleteWithSoftDeletes() throws Exception {
201204
orgReplica.flush(new FlushRequest().force(true).waitIfOngoing(true));
202205
// index #2
203206
orgReplica.applyIndexOperationOnReplica(
207+
UUID.randomUUID().toString(),
204208
2,
205209
primaryTerm,
206210
1,
@@ -212,6 +216,7 @@ public void testRecoveryWithOutOfOrderDeleteWithSoftDeletes() throws Exception {
212216
orgReplica.updateGlobalCheckpointOnReplica(3L, "test");
213217
// index #5 -> force NoOp #4.
214218
orgReplica.applyIndexOperationOnReplica(
219+
UUID.randomUUID().toString(),
215220
5,
216221
primaryTerm,
217222
1,

test/framework/src/main/java/org/opensearch/index/shard/IndexShardTestCase.java

+2
Original file line numberDiff line numberDiff line change
@@ -151,6 +151,7 @@
151151
import java.util.List;
152152
import java.util.Map;
153153
import java.util.Set;
154+
import java.util.UUID;
154155
import java.util.concurrent.CountDownLatch;
155156
import java.util.concurrent.ExecutionException;
156157
import java.util.concurrent.TimeUnit;
@@ -1122,6 +1123,7 @@ protected Engine.IndexResult indexDoc(IndexShard shard, String id, String source
11221123
final long seqNo = shard.seqNoStats().getMaxSeqNo() + 1;
11231124
shard.advanceMaxSeqNoOfUpdatesOrDeletes(seqNo); // manually replicate max_seq_no_of_updates
11241125
result = shard.applyIndexOperationOnReplica(
1126+
UUID.randomUUID().toString(),
11251127
seqNo,
11261128
shard.getOperationPrimaryTerm(),
11271129
0,

0 commit comments

Comments
 (0)