25
25
import static org .apache .accumulo .core .rpc .ThriftUtil .getClient ;
26
26
import static org .apache .accumulo .core .rpc .ThriftUtil .returnClient ;
27
27
import static org .apache .accumulo .core .util .threads .ThreadPoolNames .INSTANCE_OPS_COMPACTIONS_FINDER_POOL ;
28
+ import static org .apache .accumulo .core .util .threads .ThreadPoolNames .INSTANCE_OPS_SCANS_FINDER_POOL ;
28
29
29
30
import java .time .Duration ;
30
31
import java .util .ArrayList ;
38
39
import java .util .Optional ;
39
40
import java .util .Set ;
40
41
import java .util .concurrent .ExecutionException ;
42
+ import java .util .concurrent .ExecutorService ;
41
43
import java .util .concurrent .Future ;
42
44
import java .util .function .BiPredicate ;
43
45
import java .util .function .Consumer ;
51
53
import org .apache .accumulo .core .client .admin .ActiveScan ;
52
54
import org .apache .accumulo .core .client .admin .InstanceOperations ;
53
55
import org .apache .accumulo .core .client .admin .servers .ServerId ;
56
+ import org .apache .accumulo .core .client .admin .servers .ServerId .Type ;
54
57
import org .apache .accumulo .core .clientImpl .thrift .ClientService ;
55
58
import org .apache .accumulo .core .clientImpl .thrift .ConfigurationType ;
56
59
import org .apache .accumulo .core .clientImpl .thrift .TVersionedProperties ;
72
75
import org .apache .accumulo .core .util .LocalityGroupUtil .LocalityGroupConfigurationError ;
73
76
import org .apache .accumulo .core .util .Retry ;
74
77
import org .apache .accumulo .core .util .compaction .ExternalCompactionUtil ;
78
+ import org .apache .accumulo .core .util .threads .ThreadPoolNames ;
75
79
import org .apache .thrift .TException ;
76
80
import org .apache .thrift .transport .TTransport ;
77
81
import org .slf4j .LoggerFactory ;
78
82
79
83
import com .google .common .base .Preconditions ;
80
84
import com .google .common .net .HostAndPort ;
85
+ import com .google .common .util .concurrent .MoreExecutors ;
81
86
82
87
/**
83
88
* Provides a class for administering the accumulo instance
@@ -265,33 +270,18 @@ public List<String> getTabletServers() {
265
270
@ Deprecated (since = "4.0.0" )
266
271
public List <ActiveScan > getActiveScans (String tserver )
267
272
throws AccumuloException , AccumuloSecurityException {
268
- final var parsedTserver = HostAndPort .fromString (tserver );
269
- TabletScanClientService .Client client = null ;
270
- try {
271
- client = getClient (ThriftClientTypes .TABLET_SCAN , parsedTserver , context );
272
-
273
- List <ActiveScan > as = new ArrayList <>();
274
- for (var activeScan : client .getActiveScans (TraceUtil .traceInfo (), context .rpcCreds ())) {
275
- try {
276
- as .add (new ActiveScanImpl (context , activeScan ));
277
- } catch (TableNotFoundException e ) {
278
- throw new AccumuloException (e );
279
- }
280
- }
281
- return as ;
282
- } catch (ThriftSecurityException e ) {
283
- throw new AccumuloSecurityException (e .user , e .code , e );
284
- } catch (TException e ) {
285
- throw new AccumuloException (e );
286
- } finally {
287
- if (client != null ) {
288
- returnClient (client , context );
289
- }
290
- }
273
+ var si = getServerId (tserver , List .of (Type .TABLET_SERVER , Type .SCAN_SERVER ));
274
+ // getActiveScans throws exceptions so we can't use Optional.map() here
275
+ return si .isPresent () ? getActiveScans (si .orElseThrow ()) : List .of ();
291
276
}
292
277
293
278
@ Override
294
- public List <ActiveScan > getActiveScans (ServerId server )
279
+ public List <ActiveScan > getActiveScans (Collection <ServerId > servers )
280
+ throws AccumuloException , AccumuloSecurityException {
281
+ return queryServers (servers , this ::getActiveScans , INSTANCE_OPS_SCANS_FINDER_POOL );
282
+ }
283
+
284
+ private List <ActiveScan > getActiveScans (ServerId server )
295
285
throws AccumuloException , AccumuloSecurityException {
296
286
297
287
Objects .requireNonNull (server );
@@ -309,7 +299,7 @@ public List<ActiveScan> getActiveScans(ServerId server)
309
299
List <ActiveScan > as = new ArrayList <>();
310
300
for (var activeScan : rpcClient .getActiveScans (TraceUtil .traceInfo (), context .rpcCreds ())) {
311
301
try {
312
- as .add (new ActiveScanImpl (context , activeScan ));
302
+ as .add (new ActiveScanImpl (context , activeScan , server ));
313
303
} catch (TableNotFoundException e ) {
314
304
throw new AccumuloException (e );
315
305
}
@@ -337,21 +327,12 @@ public boolean testClassLoad(final String className, final String asTypeName)
337
327
@ Deprecated
338
328
public List <ActiveCompaction > getActiveCompactions (String server )
339
329
throws AccumuloException , AccumuloSecurityException {
340
-
341
- HostAndPort hp = HostAndPort .fromString (server );
342
-
343
- ServerId si = getServer (ServerId .Type .COMPACTOR , null , hp .getHost (), hp .getPort ());
344
- if (si == null ) {
345
- si = getServer (ServerId .Type .TABLET_SERVER , null , hp .getHost (), hp .getPort ());
346
- }
347
- if (si == null ) {
348
- return List .of ();
349
- }
350
- return getActiveCompactions (si );
330
+ var si = getServerId (server , List .of (Type .COMPACTOR , Type .TABLET_SERVER ));
331
+ // getActiveCompactions throws exceptions so we can't use Optional.map() here
332
+ return si .isPresent () ? getActiveCompactions (si .orElseThrow ()) : List .of ();
351
333
}
352
334
353
- @ Override
354
- public List <ActiveCompaction > getActiveCompactions (ServerId server )
335
+ private List <ActiveCompaction > getActiveCompactions (ServerId server )
355
336
throws AccumuloException , AccumuloSecurityException {
356
337
357
338
Objects .requireNonNull (server );
@@ -391,6 +372,7 @@ public List<ActiveCompaction> getActiveCompactions(ServerId server)
391
372
}
392
373
393
374
@ Override
375
+ @ Deprecated
394
376
public List <ActiveCompaction > getActiveCompactions ()
395
377
throws AccumuloException , AccumuloSecurityException {
396
378
@@ -404,19 +386,34 @@ public List<ActiveCompaction> getActiveCompactions()
404
386
@ Override
405
387
public List <ActiveCompaction > getActiveCompactions (Collection <ServerId > compactionServers )
406
388
throws AccumuloException , AccumuloSecurityException {
389
+ return queryServers (compactionServers , this ::getActiveCompactions ,
390
+ INSTANCE_OPS_COMPACTIONS_FINDER_POOL );
391
+ }
392
+
393
+ private <T > List <T > queryServers (Collection <ServerId > servers , ServerQuery <List <T >> serverQuery ,
394
+ ThreadPoolNames pool ) throws AccumuloException , AccumuloSecurityException {
395
+
396
+ final ExecutorService executorService ;
397
+ // If size 0 or 1 there's no need to create a thread pool
398
+ if (servers .isEmpty ()) {
399
+ return List .of ();
400
+ } else if (servers .size () == 1 ) {
401
+ executorService = MoreExecutors .newDirectExecutorService ();
402
+ } else {
403
+ int numThreads = Math .max (4 , Math .min ((servers .size ()) / 10 , 256 ));
404
+ executorService =
405
+ context .threadPools ().getPoolBuilder (pool ).numCoreThreads (numThreads ).build ();
406
+ }
407
407
408
- int numThreads = Math .max (4 , Math .min ((compactionServers .size ()) / 10 , 256 ));
409
- var executorService = context .threadPools ().getPoolBuilder (INSTANCE_OPS_COMPACTIONS_FINDER_POOL )
410
- .numCoreThreads (numThreads ).build ();
411
408
try {
412
- List <Future <List <ActiveCompaction >>> futures = new ArrayList <>();
409
+ List <Future <List <T >>> futures = new ArrayList <>();
413
410
414
- for (ServerId server : compactionServers ) {
415
- futures .add (executorService .submit (() -> getActiveCompactions (server )));
411
+ for (ServerId server : servers ) {
412
+ futures .add (executorService .submit (() -> serverQuery . execute (server )));
416
413
}
417
414
418
- List <ActiveCompaction > ret = new ArrayList <>();
419
- for (Future <List <ActiveCompaction >> future : futures ) {
415
+ List <T > ret = new ArrayList <>();
416
+ for (Future <List <T >> future : futures ) {
420
417
try {
421
418
ret .addAll (future .get ());
422
419
} catch (InterruptedException | ExecutionException e ) {
@@ -635,4 +632,13 @@ private ServerId createServerId(ServerId.Type type, ServiceLockPath slp) {
635
632
return new ServerId (type , resourceGroup , host , port );
636
633
}
637
634
635
+ private Optional <ServerId > getServerId (String server , List <Type > types ) {
636
+ HostAndPort hp = HostAndPort .fromString (server );
637
+ return types .stream ().map (type -> getServer (type , null , hp .getHost (), hp .getPort ()))
638
+ .findFirst ();
639
+ }
640
+
641
+ interface ServerQuery <T > {
642
+ T execute (ServerId server ) throws AccumuloException , AccumuloSecurityException ;
643
+ }
638
644
}
0 commit comments