21
21
import org .opensearch .core .action .support .DefaultShardOperationFailedException ;
22
22
import org .opensearch .core .common .io .stream .StreamInput ;
23
23
import org .opensearch .core .index .shard .ShardId ;
24
- import org .opensearch .index .IndexService ;
25
24
import org .opensearch .index .SegmentReplicationPerGroupStats ;
26
25
import org .opensearch .index .SegmentReplicationPressureService ;
27
26
import org .opensearch .index .SegmentReplicationShardStats ;
38
37
import java .util .HashMap ;
39
38
import java .util .List ;
40
39
import java .util .Map ;
40
+ import java .util .Set ;
41
41
import java .util .stream .Collectors ;
42
+ import java .util .stream .Stream ;
42
43
43
44
/**
44
45
* Transport action for shard segment replication operation. This transport action does not actually
@@ -96,11 +97,11 @@ protected SegmentReplicationStatsResponse newResponse(
96
97
) {
97
98
String [] shards = request .shards ();
98
99
final List <Integer > shardsToFetch = Arrays .stream (shards ).map (Integer ::valueOf ).collect (Collectors .toList ());
99
-
100
100
// organize replica responses by allocationId.
101
101
final Map <String , SegmentReplicationState > replicaStats = new HashMap <>();
102
102
// map of index name to list of replication group stats.
103
103
final Map <String , List <SegmentReplicationPerGroupStats >> primaryStats = new HashMap <>();
104
+
104
105
for (SegmentReplicationShardStatsResponse response : responses ) {
105
106
if (response != null ) {
106
107
if (response .getReplicaStats () != null ) {
@@ -109,6 +110,7 @@ protected SegmentReplicationStatsResponse newResponse(
109
110
replicaStats .putIfAbsent (shardRouting .allocationId ().getId (), response .getReplicaStats ());
110
111
}
111
112
}
113
+
112
114
if (response .getPrimaryStats () != null ) {
113
115
final ShardId shardId = response .getPrimaryStats ().getShardId ();
114
116
if (shardsToFetch .isEmpty () || shardsToFetch .contains (shardId .getId ())) {
@@ -126,15 +128,20 @@ protected SegmentReplicationStatsResponse newResponse(
126
128
}
127
129
}
128
130
}
129
- // combine the replica stats to the shard stat entry in each group.
130
- for (Map .Entry <String , List <SegmentReplicationPerGroupStats >> entry : primaryStats .entrySet ()) {
131
- for (SegmentReplicationPerGroupStats group : entry .getValue ()) {
132
- for (SegmentReplicationShardStats replicaStat : group .getReplicaStats ()) {
133
- replicaStat .setCurrentReplicationState (replicaStats .getOrDefault (replicaStat .getAllocationId (), null ));
134
- }
135
- }
136
- }
137
- return new SegmentReplicationStatsResponse (totalShards , successfulShards , failedShards , primaryStats , shardFailures );
131
+
132
+ Map <String , List <SegmentReplicationPerGroupStats >> replicationStats = primaryStats .entrySet ()
133
+ .stream ()
134
+ .collect (
135
+ Collectors .toMap (
136
+ Map .Entry ::getKey ,
137
+ entry -> entry .getValue ()
138
+ .stream ()
139
+ .map (groupStats -> updateGroupStats (groupStats , replicaStats ))
140
+ .collect (Collectors .toList ())
141
+ )
142
+ );
143
+
144
+ return new SegmentReplicationStatsResponse (totalShards , successfulShards , failedShards , replicationStats , shardFailures );
138
145
}
139
146
140
147
@ Override
@@ -144,9 +151,8 @@ protected SegmentReplicationStatsRequest readRequestFrom(StreamInput in) throws
144
151
145
152
@ Override
146
153
protected SegmentReplicationShardStatsResponse shardOperation (SegmentReplicationStatsRequest request , ShardRouting shardRouting ) {
147
- IndexService indexService = indicesService .indexServiceSafe (shardRouting .shardId ().getIndex ());
148
- IndexShard indexShard = indexService .getShard (shardRouting .shardId ().id ());
149
154
ShardId shardId = shardRouting .shardId ();
155
+ IndexShard indexShard = indicesService .indexServiceSafe (shardId .getIndex ()).getShard (shardId .id ());
150
156
151
157
if (indexShard .indexSettings ().isSegRepEnabledOrRemoteNode () == false ) {
152
158
return null ;
@@ -156,11 +162,7 @@ protected SegmentReplicationShardStatsResponse shardOperation(SegmentReplication
156
162
return new SegmentReplicationShardStatsResponse (pressureService .getStatsForShard (indexShard ));
157
163
}
158
164
159
- // return information about only on-going segment replication events.
160
- if (request .activeOnly ()) {
161
- return new SegmentReplicationShardStatsResponse (targetService .getOngoingEventSegmentReplicationState (shardId ));
162
- }
163
- return new SegmentReplicationShardStatsResponse (targetService .getSegmentReplicationState (shardId ));
165
+ return new SegmentReplicationShardStatsResponse (getSegmentReplicationState (shardId , request .activeOnly ()));
164
166
}
165
167
166
168
@ Override
@@ -181,4 +183,83 @@ protected ClusterBlockException checkRequestBlock(
181
183
) {
182
184
return state .blocks ().indicesBlockedException (ClusterBlockLevel .METADATA_READ , concreteIndices );
183
185
}
186
+
187
+ private SegmentReplicationPerGroupStats updateGroupStats (
188
+ SegmentReplicationPerGroupStats groupStats ,
189
+ Map <String , SegmentReplicationState > replicaStats
190
+ ) {
191
+ // Update the SegmentReplicationState for each of the replicas
192
+ Set <SegmentReplicationShardStats > updatedReplicaStats = groupStats .getReplicaStats ()
193
+ .stream ()
194
+ .peek (replicaStat -> replicaStat .setCurrentReplicationState (replicaStats .getOrDefault (replicaStat .getAllocationId (), null )))
195
+ .collect (Collectors .toSet ());
196
+
197
+ // Compute search replica stats
198
+ Set <SegmentReplicationShardStats > searchReplicaStats = computeSearchReplicaStats (groupStats .getShardId (), replicaStats );
199
+
200
+ // Combine ReplicaStats and SearchReplicaStats
201
+ Set <SegmentReplicationShardStats > combinedStats = Stream .concat (updatedReplicaStats .stream (), searchReplicaStats .stream ())
202
+ .collect (Collectors .toSet ());
203
+
204
+ return new SegmentReplicationPerGroupStats (groupStats .getShardId (), combinedStats , groupStats .getRejectedRequestCount ());
205
+ }
206
+
207
+ private Set <SegmentReplicationShardStats > computeSearchReplicaStats (
208
+ ShardId shardId ,
209
+ Map <String , SegmentReplicationState > replicaStats
210
+ ) {
211
+ return replicaStats .values ()
212
+ .stream ()
213
+ .filter (segmentReplicationState -> segmentReplicationState .getShardRouting ().shardId ().equals (shardId ))
214
+ .filter (segmentReplicationState -> segmentReplicationState .getShardRouting ().isSearchOnly ())
215
+ .map (segmentReplicationState -> {
216
+ ShardRouting shardRouting = segmentReplicationState .getShardRouting ();
217
+ SegmentReplicationShardStats segmentReplicationStats = computeSegmentReplicationShardStats (shardRouting );
218
+ segmentReplicationStats .setCurrentReplicationState (segmentReplicationState );
219
+ return segmentReplicationStats ;
220
+ })
221
+ .collect (Collectors .toSet ());
222
+ }
223
+
224
+ SegmentReplicationShardStats computeSegmentReplicationShardStats (ShardRouting shardRouting ) {
225
+ ShardId shardId = shardRouting .shardId ();
226
+ SegmentReplicationState completedSegmentReplicationState = targetService .getlatestCompletedEventSegmentReplicationState (shardId );
227
+ SegmentReplicationState ongoingSegmentReplicationState = targetService .getOngoingEventSegmentReplicationState (shardId );
228
+
229
+ return new SegmentReplicationShardStats (
230
+ shardRouting .allocationId ().getId (),
231
+ 0 ,
232
+ calculateBytesRemainingToReplicate (ongoingSegmentReplicationState ),
233
+ 0 ,
234
+ getCurrentReplicationLag (ongoingSegmentReplicationState ),
235
+ getLastCompletedReplicationLag (completedSegmentReplicationState )
236
+ );
237
+ }
238
+
239
+ private SegmentReplicationState getSegmentReplicationState (ShardId shardId , boolean isActiveOnly ) {
240
+ if (isActiveOnly ) {
241
+ return targetService .getOngoingEventSegmentReplicationState (shardId );
242
+ } else {
243
+ return targetService .getSegmentReplicationState (shardId );
244
+ }
245
+ }
246
+
247
+ private long calculateBytesRemainingToReplicate (SegmentReplicationState ongoingSegmentReplicationState ) {
248
+ if (ongoingSegmentReplicationState == null ) {
249
+ return 0 ;
250
+ }
251
+ return ongoingSegmentReplicationState .getIndex ()
252
+ .fileDetails ()
253
+ .stream ()
254
+ .mapToLong (index -> index .length () - index .recovered ())
255
+ .sum ();
256
+ }
257
+
258
+ private long getCurrentReplicationLag (SegmentReplicationState ongoingSegmentReplicationState ) {
259
+ return ongoingSegmentReplicationState != null ? ongoingSegmentReplicationState .getTimer ().time () : 0 ;
260
+ }
261
+
262
+ private long getLastCompletedReplicationLag (SegmentReplicationState completedSegmentReplicationState ) {
263
+ return completedSegmentReplicationState != null ? completedSegmentReplicationState .getTimer ().time () : 0 ;
264
+ }
184
265
}
0 commit comments