25
25
26
26
import java .io .FileNotFoundException ;
27
27
import java .io .IOException ;
28
+ import java .time .Duration ;
28
29
import java .util .ArrayList ;
29
30
import java .util .Collection ;
30
31
import java .util .Collections ;
36
37
import java .util .Objects ;
37
38
import java .util .Set ;
38
39
import java .util .SortedMap ;
39
- import java .util .concurrent .ExecutorService ;
40
+ import java .util .concurrent .ThreadPoolExecutor ;
40
41
import java .util .concurrent .TimeUnit ;
42
+ import java .util .concurrent .atomic .AtomicInteger ;
41
43
import java .util .stream .Stream ;
42
44
43
45
import org .apache .accumulo .core .Constants ;
65
67
import org .apache .accumulo .core .metadata .schema .TabletsMetadata ;
66
68
import org .apache .accumulo .core .security .Authorizations ;
67
69
import org .apache .accumulo .core .util .Pair ;
70
+ import org .apache .accumulo .core .util .Timer ;
68
71
import org .apache .accumulo .core .util .UtilWaitThread ;
69
72
import org .apache .accumulo .core .util .threads .ThreadPools ;
70
73
import org .apache .accumulo .core .volume .Volume ;
@@ -95,16 +98,19 @@ public class GCRun implements GarbageCollectionEnvironment {
95
98
private final Ample .DataLevel level ;
96
99
private final ServerContext context ;
97
100
private final AccumuloConfiguration config ;
101
+ private final Duration loggingInterval = Duration .ofMinutes (1 );
98
102
private long candidates = 0 ;
99
103
private long inUse = 0 ;
100
104
private long deleted = 0 ;
101
105
private long errors = 0 ;
106
+ private AtomicInteger batchCount ;
102
107
103
108
public GCRun (Ample .DataLevel level , ServerContext context ) {
104
109
this .log = LoggerFactory .getLogger (GCRun .class .getName () + "." + level .name ());
105
110
this .level = level ;
106
111
this .context = context ;
107
112
this .config = context .getConfiguration ();
113
+ this .batchCount = new AtomicInteger (0 );
108
114
}
109
115
110
116
@ Override
@@ -132,7 +138,8 @@ public void deleteGcCandidates(Collection<GcCandidate> gcCandidates, GcCandidate
132
138
return ;
133
139
}
134
140
135
- log .info ("Attempting to delete gcCandidates of type {} from metadata" , type );
141
+ log .info ("Batch {} attempting to delete {} gcCandidates of type {} from metadata" ,
142
+ batchCount .get (), gcCandidates .size (), type );
136
143
context .getAmple ().deleteGcCandidates (level , gcCandidates , type );
137
144
}
138
145
@@ -143,6 +150,7 @@ public List<GcCandidate> readCandidatesThatFitInMemory(Iterator<GcCandidate> can
143
150
long candidateBatchSize = getCandidateBatchSize () / 2 ;
144
151
145
152
List <GcCandidate > candidatesBatch = new ArrayList <>();
153
+ batchCount .incrementAndGet ();
146
154
147
155
while (candidates .hasNext ()) {
148
156
GcCandidate candidate = candidates .next ();
@@ -288,15 +296,17 @@ public void deleteConfirmedCandidates(SortedMap<String,GcCandidate> confirmedDel
288
296
289
297
List <GcCandidate > processedDeletes = Collections .synchronizedList (new ArrayList <>());
290
298
291
- minimizeDeletes (confirmedDeletes , processedDeletes , fs , log );
299
+ minimizeDeletes (confirmedDeletes , processedDeletes , fs , log , loggingInterval );
292
300
293
- ExecutorService deleteThreadPool = ThreadPools .getServerThreadPools ()
301
+ ThreadPoolExecutor deleteThreadPool = ThreadPools .getServerThreadPools ()
294
302
.createExecutorService (config , Property .GC_DELETE_THREADS );
295
303
296
304
final List <Pair <Path ,Path >> replacements = context .getVolumeReplacements ();
297
305
306
+ log .info ("Batch {} attempting to delete {} gcCandidate files" , batchCount .get (),
307
+ confirmedDeletes .size ());
308
+ Timer timer = Timer .startNew ();
298
309
for (final GcCandidate delete : confirmedDeletes .values ()) {
299
-
300
310
Runnable deleteTask = () -> {
301
311
boolean removeFlag = false ;
302
312
@@ -320,7 +330,7 @@ public void deleteConfirmedCandidates(SortedMap<String,GcCandidate> confirmedDel
320
330
}
321
331
322
332
for (Path pathToDel : GcVolumeUtil .expandAllVolumesUri (fs , fullPath )) {
323
- log .debug ("{} Deleting {}" , fileActionPrefix , pathToDel );
333
+ log .debug ("Batch {} {} Deleting {}" , batchCount . get () , fileActionPrefix , pathToDel );
324
334
325
335
if (moveToTrash (pathToDel ) || fs .deleteRecursively (pathToDel )) {
326
336
// delete succeeded, still want to delete
@@ -373,7 +383,12 @@ public void deleteConfirmedCandidates(SortedMap<String,GcCandidate> confirmedDel
373
383
deleteThreadPool .shutdown ();
374
384
375
385
try {
376
- while (!deleteThreadPool .awaitTermination (1000 , TimeUnit .MILLISECONDS )) { // empty
386
+ while (!deleteThreadPool .awaitTermination (1000 , TimeUnit .MILLISECONDS )) {
387
+ if (timer .hasElapsed (loggingInterval )) {
388
+ log .info ("Batch {} deleting file {} of {}" , batchCount .get (),
389
+ deleteThreadPool .getCompletedTaskCount (), confirmedDeletes .size ());
390
+ timer .restart ();
391
+ }
377
392
}
378
393
} catch (InterruptedException e1 ) {
379
394
log .error ("{}" , e1 .getMessage (), e1 );
@@ -441,7 +456,8 @@ public Iterator<Map.Entry<String,Replication.Status>> getReplicationNeededIterat
441
456
442
457
@ VisibleForTesting
443
458
static void minimizeDeletes (SortedMap <String ,GcCandidate > confirmedDeletes ,
444
- List <GcCandidate > processedDeletes , VolumeManager fs , Logger logger ) {
459
+ List <GcCandidate > processedDeletes , VolumeManager fs , Logger logger ,
460
+ Duration loggingInterval ) {
445
461
Set <Path > seenVolumes = new HashSet <>();
446
462
447
463
// when deleting a dir and all files in that dir, only need to delete the dir.
@@ -451,7 +467,11 @@ static void minimizeDeletes(SortedMap<String,GcCandidate> confirmedDeletes,
451
467
452
468
String lastDirRel = null ;
453
469
Path lastDirAbs = null ;
470
+ Timer progressTimer = Timer .startNew ();
471
+ int progressCount = 0 ;
472
+ int totalDeletes = confirmedDeletes .size ();
454
473
while (cdIter .hasNext ()) {
474
+ progressCount ++;
455
475
Map .Entry <String ,GcCandidate > entry = cdIter .next ();
456
476
String relPath = entry .getKey ();
457
477
Path absPath = new Path (entry .getValue ().getPath ());
@@ -491,6 +511,10 @@ static void minimizeDeletes(SortedMap<String,GcCandidate> confirmedDeletes,
491
511
lastDirAbs = null ;
492
512
}
493
513
}
514
+ if (progressTimer .hasElapsed (loggingInterval )) {
515
+ logger .debug ("Minimizing delete {} of {}" , progressCount , totalDeletes );
516
+ progressTimer .restart ();
517
+ }
494
518
}
495
519
}
496
520
0 commit comments