44
44
import org .opensearch .common .settings .Setting .Property ;
45
45
import org .opensearch .common .settings .Settings ;
46
46
47
+ import java .util .Locale ;
48
+ import java .util .function .BiFunction ;
49
+
47
50
import static org .opensearch .cluster .routing .allocation .decider .Decision .THROTTLE ;
48
51
import static org .opensearch .cluster .routing .allocation .decider .Decision .YES ;
49
52
@@ -71,7 +74,7 @@ public class ThrottlingAllocationDecider extends AllocationDecider {
71
74
private static final Logger logger = LogManager .getLogger (ThrottlingAllocationDecider .class );
72
75
73
76
public static final int DEFAULT_CLUSTER_ROUTING_ALLOCATION_NODE_CONCURRENT_RECOVERIES = 2 ;
74
- public static final int DEFAULT_CLUSTER_ROUTING_ALLOCATION_NODE_INITIAL_PRIMARIES_RECOVERIES = 4 ;
77
+ public static final int DEFAULT_CLUSTER_ROUTING_ALLOCATION_NODE_INITIAL_RECOVERIES = 4 ;
75
78
public static final String NAME = "throttling" ;
76
79
public static final Setting <Integer > CLUSTER_ROUTING_ALLOCATION_NODE_CONCURRENT_RECOVERIES_SETTING =
77
80
new Setting <>("cluster.routing.allocation.node_concurrent_recoveries" ,
@@ -80,7 +83,11 @@ public class ThrottlingAllocationDecider extends AllocationDecider {
80
83
Property .Dynamic , Property .NodeScope );
81
84
public static final Setting <Integer > CLUSTER_ROUTING_ALLOCATION_NODE_INITIAL_PRIMARIES_RECOVERIES_SETTING =
82
85
Setting .intSetting ("cluster.routing.allocation.node_initial_primaries_recoveries" ,
83
- DEFAULT_CLUSTER_ROUTING_ALLOCATION_NODE_INITIAL_PRIMARIES_RECOVERIES , 0 ,
86
+ DEFAULT_CLUSTER_ROUTING_ALLOCATION_NODE_INITIAL_RECOVERIES , 0 ,
87
+ Property .Dynamic , Property .NodeScope );
88
+ public static final Setting <Integer > CLUSTER_ROUTING_ALLOCATION_NODE_INITIAL_REPLICAS_RECOVERIES_SETTING =
89
+ Setting .intSetting ("cluster.routing.allocation.node_initial_replicas_recoveries" ,
90
+ DEFAULT_CLUSTER_ROUTING_ALLOCATION_NODE_INITIAL_RECOVERIES , 0 ,
84
91
Property .Dynamic , Property .NodeScope );
85
92
public static final Setting <Integer > CLUSTER_ROUTING_ALLOCATION_NODE_CONCURRENT_INCOMING_RECOVERIES_SETTING = new Setting <>(
86
93
"cluster.routing.allocation.node_concurrent_incoming_recoveries" ,
@@ -99,9 +106,11 @@ public class ThrottlingAllocationDecider extends AllocationDecider {
99
106
private volatile int primariesInitialRecoveries ;
100
107
private volatile int concurrentIncomingRecoveries ;
101
108
private volatile int concurrentOutgoingRecoveries ;
109
+ private volatile int replicasInitialRecoveries ;
102
110
103
111
public ThrottlingAllocationDecider (Settings settings , ClusterSettings clusterSettings ) {
104
- this .primariesInitialRecoveries = CLUSTER_ROUTING_ALLOCATION_NODE_INITIAL_PRIMARIES_RECOVERIES_SETTING .get (settings );
112
+ primariesInitialRecoveries = CLUSTER_ROUTING_ALLOCATION_NODE_INITIAL_PRIMARIES_RECOVERIES_SETTING .get (settings );
113
+ replicasInitialRecoveries = CLUSTER_ROUTING_ALLOCATION_NODE_INITIAL_REPLICAS_RECOVERIES_SETTING .get (settings );
105
114
concurrentIncomingRecoveries = CLUSTER_ROUTING_ALLOCATION_NODE_CONCURRENT_INCOMING_RECOVERIES_SETTING .get (settings );
106
115
concurrentOutgoingRecoveries = CLUSTER_ROUTING_ALLOCATION_NODE_CONCURRENT_OUTGOING_RECOVERIES_SETTING .get (settings );
107
116
@@ -111,10 +120,12 @@ public ThrottlingAllocationDecider(Settings settings, ClusterSettings clusterSet
111
120
this ::setConcurrentIncomingRecoverries );
112
121
clusterSettings .addSettingsUpdateConsumer (CLUSTER_ROUTING_ALLOCATION_NODE_CONCURRENT_OUTGOING_RECOVERIES_SETTING ,
113
122
this ::setConcurrentOutgoingRecoverries );
123
+ clusterSettings .addSettingsUpdateConsumer (CLUSTER_ROUTING_ALLOCATION_NODE_INITIAL_REPLICAS_RECOVERIES_SETTING ,
124
+ this ::setReplicasInitialRecoveries );
114
125
115
126
logger .debug ("using node_concurrent_outgoing_recoveries [{}], node_concurrent_incoming_recoveries [{}], " +
116
- "node_initial_primaries_recoveries [{}]" ,
117
- concurrentOutgoingRecoveries , concurrentIncomingRecoveries , primariesInitialRecoveries );
127
+ "node_initial_primaries_recoveries [{}], node_initial_replicas_recoveries [{}] " ,
128
+ concurrentOutgoingRecoveries , concurrentIncomingRecoveries , primariesInitialRecoveries , replicasInitialRecoveries );
118
129
}
119
130
120
131
private void setConcurrentIncomingRecoverries (int concurrentIncomingRecoveries ) {
@@ -128,6 +139,10 @@ private void setPrimariesInitialRecoveries(int primariesInitialRecoveries) {
128
139
this .primariesInitialRecoveries = primariesInitialRecoveries ;
129
140
}
130
141
142
+ private void setReplicasInitialRecoveries (int replicasInitialRecoveries ) {
143
+ this .replicasInitialRecoveries = replicasInitialRecoveries ;
144
+ }
145
+
131
146
@ Override
132
147
public Decision canAllocate (ShardRouting shardRouting , RoutingNode node , RoutingAllocation allocation ) {
133
148
if (shardRouting .primary () && shardRouting .unassigned ()) {
@@ -150,36 +165,75 @@ public Decision canAllocate(ShardRouting shardRouting, RoutingNode node, Routing
150
165
// Peer recovery
151
166
assert initializingShard (shardRouting , node .nodeId ()).recoverySource ().getType () == RecoverySource .Type .PEER ;
152
167
153
- // Allocating a shard to this node will increase the incoming recoveries
154
- int currentInRecoveries = allocation .routingNodes ().getIncomingRecoveries (node .nodeId ());
155
- if (currentInRecoveries >= concurrentIncomingRecoveries ) {
168
+ if (shardRouting .unassignedReasonIndexCreated ()) {
169
+ return allocateInitialShardCopies (shardRouting , node , allocation );
170
+ } else {
171
+ return allocateNonInitialShardCopies (shardRouting , node , allocation );
172
+ }
173
+ }
174
+ }
175
+
176
+ private Decision allocateInitialShardCopies (ShardRouting shardRouting , RoutingNode node , RoutingAllocation allocation ) {
177
+ int currentInRecoveries = allocation .routingNodes ().getInitialIncomingRecoveries (node .nodeId ());
178
+ assert shardRouting .unassignedReasonIndexCreated () && !shardRouting .primary ();
179
+
180
+ return allocateShardCopies (shardRouting , allocation , currentInRecoveries , replicasInitialRecoveries ,
181
+ (x ,y ) -> getInitialPrimaryNodeOutgoingRecoveries (x ,y ), replicasInitialRecoveries ,
182
+ String .format (Locale .ROOT , "[%s=%d]" , CLUSTER_ROUTING_ALLOCATION_NODE_INITIAL_REPLICAS_RECOVERIES_SETTING .getKey (),
183
+ replicasInitialRecoveries ),
184
+ String .format (Locale .ROOT , "[%s=%d]" , CLUSTER_ROUTING_ALLOCATION_NODE_INITIAL_REPLICAS_RECOVERIES_SETTING .getKey (),
185
+ replicasInitialRecoveries ));
186
+ }
187
+
188
+ private Decision allocateNonInitialShardCopies (ShardRouting shardRouting , RoutingNode node , RoutingAllocation allocation ) {
189
+
190
+ assert !shardRouting .unassignedReasonIndexCreated ();
191
+ int currentInRecoveries = allocation .routingNodes ().getIncomingRecoveries (node .nodeId ());
192
+
193
+ return allocateShardCopies (shardRouting , allocation , currentInRecoveries , concurrentIncomingRecoveries ,
194
+ (x ,y ) -> getPrimaryNodeOutgoingRecoveries (x ,y ), concurrentOutgoingRecoveries ,
195
+ String .format (Locale .ROOT , "[%s=%d] (can also be set via [%s])" ,
196
+ CLUSTER_ROUTING_ALLOCATION_NODE_CONCURRENT_INCOMING_RECOVERIES_SETTING .getKey (),
197
+ concurrentIncomingRecoveries , CLUSTER_ROUTING_ALLOCATION_NODE_CONCURRENT_RECOVERIES_SETTING .getKey ()),
198
+ String .format (Locale .ROOT , "[%s=%d] (can also be set via [%s])" ,
199
+ CLUSTER_ROUTING_ALLOCATION_NODE_CONCURRENT_OUTGOING_RECOVERIES_SETTING .getKey (),
200
+ concurrentOutgoingRecoveries , CLUSTER_ROUTING_ALLOCATION_NODE_CONCURRENT_RECOVERIES_SETTING .getKey ()));
201
+ }
202
+
203
+ private Integer getPrimaryNodeOutgoingRecoveries (ShardRouting shardRouting , RoutingAllocation allocation ) {
204
+ ShardRouting primaryShard = allocation .routingNodes ().activePrimary (shardRouting .shardId ());
205
+ return allocation .routingNodes ().getOutgoingRecoveries (primaryShard .currentNodeId ());
206
+ }
207
+
208
+ private Integer getInitialPrimaryNodeOutgoingRecoveries (ShardRouting shardRouting , RoutingAllocation allocation ) {
209
+ ShardRouting primaryShard = allocation .routingNodes ().activePrimary (shardRouting .shardId ());
210
+ return allocation .routingNodes ().getInitialOutgoingRecoveries (primaryShard .currentNodeId ());
211
+ }
212
+
213
+ private Decision allocateShardCopies (ShardRouting shardRouting , RoutingAllocation allocation , int currentInRecoveries ,
214
+ int inRecoveriesLimit , BiFunction <ShardRouting , RoutingAllocation ,
215
+ Integer > primaryNodeOutRecoveriesFunc , int outRecoveriesLimit ,
216
+ String incomingRecoveriesSettingMsg , String outGoingRecoveriesSettingMsg ) {
217
+ // Allocating a shard to this node will increase the incoming recoveries
218
+ if (currentInRecoveries >= inRecoveriesLimit ) {
219
+ return allocation .decision (THROTTLE , NAME ,
220
+ "reached the limit of incoming shard recoveries [%d], cluster setting %s" ,
221
+ currentInRecoveries , incomingRecoveriesSettingMsg );
222
+ } else {
223
+ // search for corresponding recovery source (= primary shard) and check number of outgoing recoveries on that node
224
+ ShardRouting primaryShard = allocation .routingNodes ().activePrimary (shardRouting .shardId ());
225
+ if (primaryShard == null ) {
226
+ return allocation .decision (Decision .NO , NAME , "primary shard for this replica is not yet active" );
227
+ }
228
+ int primaryNodeOutRecoveries = primaryNodeOutRecoveriesFunc .apply (shardRouting , allocation );
229
+ if (primaryNodeOutRecoveries >= outRecoveriesLimit ) {
156
230
return allocation .decision (THROTTLE , NAME ,
157
- "reached the limit of incoming shard recoveries [%d], cluster setting [%s=%d] (can also be set via [%s])" ,
158
- currentInRecoveries , CLUSTER_ROUTING_ALLOCATION_NODE_CONCURRENT_INCOMING_RECOVERIES_SETTING .getKey (),
159
- concurrentIncomingRecoveries ,
160
- CLUSTER_ROUTING_ALLOCATION_NODE_CONCURRENT_RECOVERIES_SETTING .getKey ());
231
+ "reached the limit of outgoing shard recoveries [%d] on the node [%s] which holds the primary, " +
232
+ "cluster setting %s" , primaryNodeOutRecoveries , primaryShard .currentNodeId (),
233
+ outGoingRecoveriesSettingMsg );
161
234
} else {
162
- // search for corresponding recovery source (= primary shard) and check number of outgoing recoveries on that node
163
- ShardRouting primaryShard = allocation .routingNodes ().activePrimary (shardRouting .shardId ());
164
- if (primaryShard == null ) {
165
- return allocation .decision (Decision .NO , NAME , "primary shard for this replica is not yet active" );
166
- }
167
- int primaryNodeOutRecoveries = allocation .routingNodes ().getOutgoingRecoveries (primaryShard .currentNodeId ());
168
- if (primaryNodeOutRecoveries >= concurrentOutgoingRecoveries ) {
169
- return allocation .decision (THROTTLE , NAME ,
170
- "reached the limit of outgoing shard recoveries [%d] on the node [%s] which holds the primary, " +
171
- "cluster setting [%s=%d] (can also be set via [%s])" ,
172
- primaryNodeOutRecoveries , primaryShard .currentNodeId (),
173
- CLUSTER_ROUTING_ALLOCATION_NODE_CONCURRENT_OUTGOING_RECOVERIES_SETTING .getKey (),
174
- concurrentOutgoingRecoveries ,
175
- CLUSTER_ROUTING_ALLOCATION_NODE_CONCURRENT_RECOVERIES_SETTING .getKey ());
176
- } else {
177
- return allocation .decision (YES , NAME , "below shard recovery limit of outgoing: [%d < %d] incoming: [%d < %d]" ,
178
- primaryNodeOutRecoveries ,
179
- concurrentOutgoingRecoveries ,
180
- currentInRecoveries ,
181
- concurrentIncomingRecoveries );
182
- }
235
+ return allocation .decision (YES , NAME , "below shard recovery limit of outgoing: [%d < %d] incoming: [%d < %d]" ,
236
+ primaryNodeOutRecoveries , outRecoveriesLimit , currentInRecoveries , inRecoveriesLimit );
183
237
}
184
238
}
185
239
}
0 commit comments