21
21
import org .opensearch .indices .IndicesService ;
22
22
import org .opensearch .indices .recovery .FileChunkWriter ;
23
23
import org .opensearch .indices .recovery .RecoverySettings ;
24
- import org .opensearch .indices .replication .checkpoint .ReplicationCheckpoint ;
25
- import org .opensearch .indices .replication .common .CopyState ;
26
24
27
25
import java .io .IOException ;
26
+ import java .io .UncheckedIOException ;
28
27
import java .util .Collections ;
29
- import java .util .HashMap ;
30
28
import java .util .List ;
31
29
import java .util .Map ;
32
30
import java .util .Set ;
36
34
/**
37
35
* Manages references to ongoing segrep events on a node.
38
36
* Each replica will have a new {@link SegmentReplicationSourceHandler} created when starting replication.
39
- * CopyStates will be cached for reuse between replicas and only released when all replicas have finished copying segments.
40
37
*
41
38
* @opensearch.internal
42
39
*/
@@ -45,7 +42,6 @@ class OngoingSegmentReplications {
45
42
private static final Logger logger = LogManager .getLogger (OngoingSegmentReplications .class );
46
43
private final RecoverySettings recoverySettings ;
47
44
private final IndicesService indicesService ;
48
- private final Map <ReplicationCheckpoint , CopyState > copyStateMap ;
49
45
private final Map <String , SegmentReplicationSourceHandler > allocationIdToHandlers ;
50
46
51
47
/**
@@ -57,46 +53,9 @@ class OngoingSegmentReplications {
57
53
OngoingSegmentReplications (IndicesService indicesService , RecoverySettings recoverySettings ) {
58
54
this .indicesService = indicesService ;
59
55
this .recoverySettings = recoverySettings ;
60
- this .copyStateMap = Collections .synchronizedMap (new HashMap <>());
61
56
this .allocationIdToHandlers = ConcurrentCollections .newConcurrentMap ();
62
57
}
63
58
64
- /*
65
- Operations on the {@link #copyStateMap} member.
66
- */
67
-
68
- /**
69
- * A synchronized method that checks {@link #copyStateMap} for the given {@link ReplicationCheckpoint} key
70
- * and returns the cached value if one is present. If the key is not present, a {@link CopyState}
71
- * object is constructed and stored in the map before being returned.
72
- */
73
- synchronized CopyState getCachedCopyState (ReplicationCheckpoint checkpoint ) throws IOException {
74
- if (isInCopyStateMap (checkpoint )) {
75
- final CopyState copyState = fetchFromCopyStateMap (checkpoint );
76
- // we incref the copyState for every replica that is using this checkpoint.
77
- // decref will happen when copy completes.
78
- copyState .incRef ();
79
- return copyState ;
80
- } else {
81
- // From the checkpoint's shard ID, fetch the IndexShard
82
- ShardId shardId = checkpoint .getShardId ();
83
- final IndexService indexService = indicesService .indexServiceSafe (shardId .getIndex ());
84
- final IndexShard indexShard = indexService .getShard (shardId .id ());
85
- // build the CopyState object and cache it before returning
86
- final CopyState copyState = new CopyState (checkpoint , indexShard );
87
-
88
- /*
89
- Use the checkpoint from the request as the key in the map, rather than
90
- the checkpoint from the created CopyState. This maximizes cache hits
91
- if replication targets make a request with an older checkpoint.
92
- Replication targets are expected to fetch the checkpoint in the response
93
- CopyState to bring themselves up to date.
94
- */
95
- addToCopyStateMap (checkpoint , copyState );
96
- return copyState ;
97
- }
98
- }
99
-
100
59
/**
101
60
* Start sending files to the replica.
102
61
*
@@ -114,51 +73,43 @@ void startSegmentCopy(GetSegmentFilesRequest request, ActionListener<GetSegmentF
114
73
);
115
74
}
116
75
// update the given listener to release the CopyState before it resolves.
117
- final ActionListener <GetSegmentFilesResponse > wrappedListener = ActionListener .runBefore (listener , () -> {
118
- final SegmentReplicationSourceHandler sourceHandler = allocationIdToHandlers .remove (request .getTargetAllocationId ());
119
- if (sourceHandler != null ) {
120
- removeCopyState (sourceHandler .getCopyState ());
121
- }
122
- });
76
+ final ActionListener <GetSegmentFilesResponse > wrappedListener = ActionListener .runBefore (
77
+ listener ,
78
+ () -> allocationIdToHandlers .remove (request .getTargetAllocationId ())
79
+ );
123
80
handler .sendFiles (request , wrappedListener );
124
81
} else {
125
82
listener .onResponse (new GetSegmentFilesResponse (Collections .emptyList ()));
126
83
}
127
84
}
128
85
129
86
/**
130
- * Prepare for a Replication event. This method constructs a {@link CopyState} holding files to be sent off of the current
131
- * node's store. This state is intended to be sent back to Replicas before copy is initiated so the replica can perform a diff against its
132
- * local store. It will then build a handler to orchestrate the segment copy that will be stored locally and started on a subsequent request from replicas
133
- * with the list of required files.
87
+ * Prepare for a Replication event. This method constructs a {@link SegmentReplicationSourceHandler} that orchestrates segment copy and
88
+ * will internally incref files for copy.
134
89
*
135
90
* @param request {@link CheckpointInfoRequest}
136
91
* @param fileChunkWriter {@link FileChunkWriter} writer to handle sending files over the transport layer.
137
- * @return {@link CopyState} the built CopyState for this replication event.
138
- * @throws IOException - When there is an IO error building CopyState.
92
+ * @return {@link SegmentReplicationSourceHandler} the built CopyState for this replication event.
139
93
*/
140
- CopyState prepareForReplication (CheckpointInfoRequest request , FileChunkWriter fileChunkWriter ) throws IOException {
141
- final CopyState copyState = getCachedCopyState (request .getCheckpoint ());
142
- final SegmentReplicationSourceHandler newHandler = createTargetHandler (
143
- request .getTargetNode (),
144
- copyState ,
145
- request .getTargetAllocationId (),
146
- fileChunkWriter
147
- );
148
- final SegmentReplicationSourceHandler existingHandler = allocationIdToHandlers .putIfAbsent (
149
- request .getTargetAllocationId (),
150
- newHandler
151
- );
152
- // If we are already replicating to this allocation Id, cancel the old and replace with a new execution.
153
- // This will clear the old handler & referenced copy state holding an incref'd indexCommit.
154
- if (existingHandler != null ) {
155
- logger .warn ("Override handler for allocation id {}" , request .getTargetAllocationId ());
156
- cancelHandlers (handler -> handler .getAllocationId ().equals (request .getTargetAllocationId ()), "cancel due to retry" );
157
- assert allocationIdToHandlers .containsKey (request .getTargetAllocationId ()) == false ;
158
- allocationIdToHandlers .put (request .getTargetAllocationId (), newHandler );
159
- }
160
- assert allocationIdToHandlers .containsKey (request .getTargetAllocationId ());
161
- return copyState ;
94
+ SegmentReplicationSourceHandler prepareForReplication (CheckpointInfoRequest request , FileChunkWriter fileChunkWriter ) {
95
+ return allocationIdToHandlers .computeIfAbsent (request .getTargetAllocationId (), aId -> {
96
+ try {
97
+ // From the checkpoint's shard ID, fetch the IndexShard
98
+ final ShardId shardId = request .getCheckpoint ().getShardId ();
99
+ final IndexService indexService = indicesService .indexServiceSafe (shardId .getIndex ());
100
+ final IndexShard indexShard = indexService .getShard (shardId .id ());
101
+ return new SegmentReplicationSourceHandler (
102
+ request .getTargetNode (),
103
+ fileChunkWriter ,
104
+ indexShard ,
105
+ request .getTargetAllocationId (),
106
+ Math .toIntExact (recoverySettings .getChunkSize ().getBytes ()),
107
+ recoverySettings .getMaxConcurrentFileChunks ()
108
+ );
109
+ } catch (IOException e ) {
110
+ throw new UncheckedIOException ("Error creating replication handler" , e );
111
+ }
112
+ });
162
113
}
163
114
164
115
/**
@@ -167,8 +118,8 @@ CopyState prepareForReplication(CheckpointInfoRequest request, FileChunkWriter f
167
118
* @param shard {@link IndexShard}
168
119
* @param reason {@link String} - Reason for the cancel
169
120
*/
170
- synchronized void cancel (IndexShard shard , String reason ) {
171
- cancelHandlers (handler -> handler .getCopyState (). getShard (). shardId ().equals (shard .shardId ()), reason );
121
+ void cancel (IndexShard shard , String reason ) {
122
+ cancelHandlers (handler -> handler .shardId ().equals (shard .shardId ()), reason );
172
123
}
173
124
174
125
/**
@@ -177,11 +128,10 @@ synchronized void cancel(IndexShard shard, String reason) {
177
128
* @param allocationId {@link String} - Allocation ID.
178
129
* @param reason {@link String} - Reason for the cancel
179
130
*/
180
- synchronized void cancel (String allocationId , String reason ) {
131
+ void cancel (String allocationId , String reason ) {
181
132
final SegmentReplicationSourceHandler handler = allocationIdToHandlers .remove (allocationId );
182
133
if (handler != null ) {
183
134
handler .cancel (reason );
184
- removeCopyState (handler .getCopyState ());
185
135
}
186
136
}
187
137
@@ -194,14 +144,6 @@ void cancelReplication(DiscoveryNode node) {
194
144
cancelHandlers (handler -> handler .getTargetNode ().equals (node ), "Node left" );
195
145
}
196
146
197
- /**
198
- * Checks if the {@link #copyStateMap} has the input {@link ReplicationCheckpoint}
199
- * as a key by invoking {@link Map#containsKey(Object)}.
200
- */
201
- boolean isInCopyStateMap (ReplicationCheckpoint replicationCheckpoint ) {
202
- return copyStateMap .containsKey (replicationCheckpoint );
203
- }
204
-
205
147
int size () {
206
148
return allocationIdToHandlers .size ();
207
149
}
@@ -211,58 +153,20 @@ Map<String, SegmentReplicationSourceHandler> getHandlers() {
211
153
return allocationIdToHandlers ;
212
154
}
213
155
214
- int cachedCopyStateSize () {
215
- return copyStateMap .size ();
216
- }
217
-
218
- private SegmentReplicationSourceHandler createTargetHandler (
219
- DiscoveryNode node ,
220
- CopyState copyState ,
221
- String allocationId ,
222
- FileChunkWriter fileChunkWriter
223
- ) {
224
- return new SegmentReplicationSourceHandler (
225
- node ,
226
- fileChunkWriter ,
227
- copyState .getShard ().getThreadPool (),
228
- copyState ,
229
- allocationId ,
230
- Math .toIntExact (recoverySettings .getChunkSize ().getBytes ()),
231
- recoverySettings .getMaxConcurrentFileChunks ()
232
- );
233
- }
234
-
235
156
/**
236
- * Adds the input {@link CopyState} object to {@link #copyStateMap}.
237
- * The key is the CopyState's {@link ReplicationCheckpoint} object.
238
- */
239
- private void addToCopyStateMap (ReplicationCheckpoint checkpoint , CopyState copyState ) {
240
- copyStateMap .putIfAbsent (checkpoint , copyState );
241
- }
242
-
243
- /**
244
- * Given a {@link ReplicationCheckpoint}, return the corresponding
245
- * {@link CopyState} object, if any, from {@link #copyStateMap}.
246
- */
247
- private CopyState fetchFromCopyStateMap (ReplicationCheckpoint replicationCheckpoint ) {
248
- return copyStateMap .get (replicationCheckpoint );
249
- }
250
-
251
- /**
252
- * Remove a CopyState. Intended to be called after a replication event completes.
253
- * This method will remove a copyState from the copyStateMap only if its refCount hits 0.
254
- *
255
- * @param copyState {@link CopyState}
157
+ * Clear handlers for any allocationIds not in sync.
158
+ * @param shardId {@link ShardId}
159
+ * @param inSyncAllocationIds {@link List} of in-sync allocation Ids.
256
160
*/
257
- private synchronized void removeCopyState (CopyState copyState ) {
258
- if (copyState .decRef () == true ) {
259
- copyStateMap .remove (copyState .getRequestedReplicationCheckpoint ());
260
- }
161
+ void clearOutOfSyncIds (ShardId shardId , Set <String > inSyncAllocationIds ) {
162
+ cancelHandlers (
163
+ (handler ) -> handler .shardId ().equals (shardId ) && inSyncAllocationIds .contains (handler .getAllocationId ()) == false ,
164
+ "Shard is no longer in-sync with the primary"
165
+ );
261
166
}
262
167
263
168
/**
264
169
* Remove handlers from allocationIdToHandlers map based on a filter predicate.
265
- * This will also decref the handler's CopyState reference.
266
170
*/
267
171
private void cancelHandlers (Predicate <? super SegmentReplicationSourceHandler > predicate , String reason ) {
268
172
final List <String > allocationIds = allocationIdToHandlers .values ()
@@ -278,17 +182,4 @@ private void cancelHandlers(Predicate<? super SegmentReplicationSourceHandler> p
278
182
cancel (allocationId , reason );
279
183
}
280
184
}
281
-
282
- /**
283
- * Clear copystate and target handlers for any non insync allocationIds.
284
- * @param shardId {@link ShardId}
285
- * @param inSyncAllocationIds {@link List} of in-sync allocation Ids.
286
- */
287
- public void clearOutOfSyncIds (ShardId shardId , Set <String > inSyncAllocationIds ) {
288
- cancelHandlers (
289
- (handler ) -> handler .getCopyState ().getShard ().shardId ().equals (shardId )
290
- && inSyncAllocationIds .contains (handler .getAllocationId ()) == false ,
291
- "Shard is no longer in-sync with the primary"
292
- );
293
- }
294
185
}
0 commit comments