Skip to content

Commit 4dbd6fa

Browse files
authored
Created new ReplicaShardBatchAllocator (opensearch-project#8992)
* Created new ReplicaShardBatchAllocator to be used instead of ReplicaShardAllocator for batch calls Signed-off-by: Shivansh Arora <shivansh.arora@protonmail.com>
1 parent e6975e4 commit 4dbd6fa

File tree

2 files changed

+1037
-0
lines changed

2 files changed

+1037
-0
lines changed
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,188 @@
1+
/*
2+
* SPDX-License-Identifier: Apache-2.0
3+
*
4+
* The OpenSearch Contributors require contributions made to
5+
* this file be licensed under the Apache-2.0 license or a
6+
* compatible open source license.
7+
*/
8+
9+
package org.opensearch.gateway;
10+
11+
import org.apache.logging.log4j.Logger;
12+
import org.opensearch.cluster.node.DiscoveryNode;
13+
import org.opensearch.cluster.routing.ShardRouting;
14+
import org.opensearch.cluster.routing.UnassignedInfo;
15+
import org.opensearch.cluster.routing.allocation.AllocateUnassignedDecision;
16+
import org.opensearch.cluster.routing.allocation.NodeAllocationResult;
17+
import org.opensearch.cluster.routing.allocation.RoutingAllocation;
18+
import org.opensearch.cluster.routing.allocation.decider.Decision;
19+
import org.opensearch.common.collect.Tuple;
20+
import org.opensearch.core.index.shard.ShardId;
21+
import org.opensearch.gateway.AsyncShardFetch.FetchResult;
22+
import org.opensearch.indices.store.TransportNodesListShardStoreMetadata;
23+
import org.opensearch.indices.store.TransportNodesListShardStoreMetadataBatch.NodeStoreFilesMetadata;
24+
import org.opensearch.indices.store.TransportNodesListShardStoreMetadataBatch.NodeStoreFilesMetadataBatch;
25+
import org.opensearch.indices.store.TransportNodesListShardStoreMetadataHelper.StoreFilesMetadata;
26+
27+
import java.util.ArrayList;
28+
import java.util.Collections;
29+
import java.util.HashMap;
30+
import java.util.List;
31+
import java.util.Map;
32+
33+
/**
34+
* Allocates replica shards in a batch mode
35+
*
36+
* @opensearch.internal
37+
*/
38+
public abstract class ReplicaShardBatchAllocator extends ReplicaShardAllocator {
39+
40+
/**
41+
* Process existing recoveries of replicas and see if we need to cancel them if we find a better
42+
* match. Today, a better match is one that can perform a no-op recovery while the previous recovery
43+
* has to copy segment files.
44+
*
45+
* @param allocation the overall routing allocation
46+
* @param shardBatches a list of shard batches to check for existing recoveries
47+
*/
48+
public void processExistingRecoveries(RoutingAllocation allocation, List<List<ShardRouting>> shardBatches) {
49+
List<Runnable> shardCancellationActions = new ArrayList<>();
50+
// iterate through the batches, each batch needs to be processed together as fetch call should be made for shards from same batch
51+
for (List<ShardRouting> shardBatch : shardBatches) {
52+
List<ShardRouting> eligibleShards = new ArrayList<>();
53+
List<ShardRouting> ineligibleShards = new ArrayList<>();
54+
// iterate over shards to check for match for each of those
55+
for (ShardRouting shard : shardBatch) {
56+
if (shard != null && !shard.primary()) {
57+
// need to iterate over all the nodes to find matching shard
58+
if (shouldSkipFetchForRecovery(shard)) {
59+
ineligibleShards.add(shard);
60+
continue;
61+
}
62+
eligibleShards.add(shard);
63+
}
64+
}
65+
AsyncShardFetch.FetchResult<NodeStoreFilesMetadataBatch> shardState = fetchData(eligibleShards, ineligibleShards, allocation);
66+
if (!shardState.hasData()) {
67+
logger.trace("{}: fetching new stores for initializing shard batch", eligibleShards);
68+
continue; // still fetching
69+
}
70+
for (ShardRouting shard : eligibleShards) {
71+
Map<DiscoveryNode, StoreFilesMetadata> nodeShardStores = convertToNodeStoreFilesMetadataMap(shard, shardState);
72+
73+
Runnable cancellationAction = cancelExistingRecoveryForBetterMatch(shard, allocation, nodeShardStores);
74+
if (cancellationAction != null) {
75+
shardCancellationActions.add(cancellationAction);
76+
}
77+
}
78+
}
79+
for (Runnable action : shardCancellationActions) {
80+
action.run();
81+
}
82+
}
83+
84+
abstract protected FetchResult<NodeStoreFilesMetadataBatch> fetchData(
85+
List<ShardRouting> eligibleShards,
86+
List<ShardRouting> ineligibleShards,
87+
RoutingAllocation allocation
88+
);
89+
90+
@Override
91+
protected FetchResult<TransportNodesListShardStoreMetadata.NodeStoreFilesMetadata> fetchData(
92+
ShardRouting shard,
93+
RoutingAllocation allocation
94+
) {
95+
logger.error("fetchData for single shard called via batch allocator");
96+
throw new IllegalStateException("ReplicaShardBatchAllocator should only be used for a batch of shards");
97+
}
98+
99+
@Override
100+
public AllocateUnassignedDecision makeAllocationDecision(ShardRouting unassignedShard, RoutingAllocation allocation, Logger logger) {
101+
return makeAllocationDecision(Collections.singletonList(unassignedShard), allocation, logger).get(unassignedShard);
102+
}
103+
104+
@Override
105+
public HashMap<ShardRouting, AllocateUnassignedDecision> makeAllocationDecision(
106+
List<ShardRouting> shards,
107+
RoutingAllocation allocation,
108+
Logger logger
109+
) {
110+
HashMap<ShardRouting, AllocateUnassignedDecision> shardAllocationDecisions = new HashMap<>();
111+
final boolean explain = allocation.debugDecision();
112+
List<ShardRouting> eligibleShards = new ArrayList<>();
113+
List<ShardRouting> ineligibleShards = new ArrayList<>();
114+
HashMap<ShardRouting, Tuple<Decision, Map<String, NodeAllocationResult>>> nodeAllocationDecisions = new HashMap<>();
115+
for (ShardRouting shard : shards) {
116+
if (!isResponsibleFor(shard)) {
117+
// this allocator n is not responsible for allocating this shard
118+
ineligibleShards.add(shard);
119+
shardAllocationDecisions.put(shard, AllocateUnassignedDecision.NOT_TAKEN);
120+
continue;
121+
}
122+
123+
Tuple<Decision, Map<String, NodeAllocationResult>> result = canBeAllocatedToAtLeastOneNode(shard, allocation);
124+
Decision allocationDecision = result.v1();
125+
if (allocationDecision.type() != Decision.Type.YES && (!explain || !hasInitiatedFetching(shard))) {
126+
// only return early if we are not in explain mode, or we are in explain mode but we have not
127+
// yet attempted to fetch any shard data
128+
logger.trace("{}: ignoring allocation, can't be allocated on any node", shard);
129+
shardAllocationDecisions.put(
130+
shard,
131+
AllocateUnassignedDecision.no(
132+
UnassignedInfo.AllocationStatus.fromDecision(allocationDecision.type()),
133+
result.v2() != null ? new ArrayList<>(result.v2().values()) : null
134+
)
135+
);
136+
continue;
137+
}
138+
// storing the nodeDecisions in nodeAllocationDecisions if the decision is not YES
139+
// so that we don't have to compute the decisions again
140+
nodeAllocationDecisions.put(shard, result);
141+
142+
eligibleShards.add(shard);
143+
}
144+
145+
// Do not call fetchData if there are no eligible shards
146+
if (eligibleShards.isEmpty()) {
147+
return shardAllocationDecisions;
148+
}
149+
// only fetch data for eligible shards
150+
final FetchResult<NodeStoreFilesMetadataBatch> shardsState = fetchData(eligibleShards, ineligibleShards, allocation);
151+
152+
for (ShardRouting unassignedShard : eligibleShards) {
153+
Tuple<Decision, Map<String, NodeAllocationResult>> result = nodeAllocationDecisions.get(unassignedShard);
154+
shardAllocationDecisions.put(
155+
unassignedShard,
156+
getAllocationDecision(
157+
unassignedShard,
158+
allocation,
159+
convertToNodeStoreFilesMetadataMap(unassignedShard, shardsState),
160+
result,
161+
logger
162+
)
163+
);
164+
}
165+
return shardAllocationDecisions;
166+
}
167+
168+
private Map<DiscoveryNode, StoreFilesMetadata> convertToNodeStoreFilesMetadataMap(
169+
ShardRouting unassignedShard,
170+
FetchResult<NodeStoreFilesMetadataBatch> data
171+
) {
172+
if (!data.hasData()) {
173+
return null;
174+
}
175+
176+
Map<DiscoveryNode, StoreFilesMetadata> map = new HashMap<>();
177+
178+
data.getData().forEach((discoveryNode, value) -> {
179+
Map<ShardId, NodeStoreFilesMetadata> batch = value.getNodeStoreFilesMetadataBatch();
180+
NodeStoreFilesMetadata metadata = batch.get(unassignedShard.shardId());
181+
if (metadata != null) {
182+
map.put(discoveryNode, metadata.storeFilesMetadata());
183+
}
184+
});
185+
186+
return map;
187+
}
188+
}

0 commit comments

Comments
 (0)