26
26
27
27
import java .io .FileNotFoundException ;
28
28
import java .io .IOException ;
29
+ import java .time .Duration ;
29
30
import java .util .ArrayList ;
30
31
import java .util .Collection ;
31
32
import java .util .Collections ;
37
38
import java .util .Objects ;
38
39
import java .util .Set ;
39
40
import java .util .SortedMap ;
40
- import java .util .concurrent .ExecutorService ;
41
+ import java .util .concurrent .ThreadPoolExecutor ;
41
42
import java .util .concurrent .TimeUnit ;
43
+ import java .util .concurrent .atomic .AtomicInteger ;
42
44
import java .util .stream .Stream ;
43
45
44
46
import org .apache .accumulo .core .Constants ;
63
65
import org .apache .accumulo .core .metadata .schema .TabletMetadata ;
64
66
import org .apache .accumulo .core .metadata .schema .TabletsMetadata ;
65
67
import org .apache .accumulo .core .security .Authorizations ;
68
+ import org .apache .accumulo .core .util .Timer ;
66
69
import org .apache .accumulo .core .util .threads .ThreadPools ;
67
70
import org .apache .accumulo .core .volume .Volume ;
68
71
import org .apache .accumulo .server .ServerContext ;
@@ -88,16 +91,19 @@ public class GCRun implements GarbageCollectionEnvironment {
88
91
private final Ample .DataLevel level ;
89
92
private final ServerContext context ;
90
93
private final AccumuloConfiguration config ;
94
+ private final Duration loggingInterval = Duration .ofMinutes (1 );
91
95
private long candidates = 0 ;
92
96
private long inUse = 0 ;
93
97
private long deleted = 0 ;
94
98
private long errors = 0 ;
99
+ private AtomicInteger batchCount ;
95
100
96
101
public GCRun (Ample .DataLevel level , ServerContext context ) {
97
102
this .log = LoggerFactory .getLogger (GCRun .class .getName () + "." + level .name ());
98
103
this .level = level ;
99
104
this .context = context ;
100
105
this .config = context .getConfiguration ();
106
+ this .batchCount = new AtomicInteger (0 );
101
107
}
102
108
103
109
@ Override
@@ -125,7 +131,8 @@ public void deleteGcCandidates(Collection<GcCandidate> gcCandidates, GcCandidate
125
131
return ;
126
132
}
127
133
128
- log .info ("Attempting to delete gcCandidates of type {} from metadata" , type );
134
+ log .info ("Batch {} attempting to delete {} gcCandidates of type {} from metadata" ,
135
+ batchCount .get (), gcCandidates .size (), type );
129
136
context .getAmple ().deleteGcCandidates (level , gcCandidates , type );
130
137
}
131
138
@@ -136,6 +143,7 @@ public List<GcCandidate> readCandidatesThatFitInMemory(Iterator<GcCandidate> can
136
143
long candidateBatchSize = getCandidateBatchSize () / 2 ;
137
144
138
145
List <GcCandidate > candidatesBatch = new ArrayList <>();
146
+ batchCount .incrementAndGet ();
139
147
140
148
while (candidates .hasNext ()) {
141
149
GcCandidate candidate = candidates .next ();
@@ -279,15 +287,17 @@ public void deleteConfirmedCandidates(SortedMap<String,GcCandidate> confirmedDel
279
287
280
288
List <GcCandidate > processedDeletes = Collections .synchronizedList (new ArrayList <>());
281
289
282
- minimizeDeletes (confirmedDeletes , processedDeletes , fs , log );
290
+ minimizeDeletes (confirmedDeletes , processedDeletes , fs , log , loggingInterval );
283
291
284
- ExecutorService deleteThreadPool = ThreadPools .getServerThreadPools ()
292
+ ThreadPoolExecutor deleteThreadPool = ThreadPools .getServerThreadPools ()
285
293
.createExecutorService (config , Property .GC_DELETE_THREADS );
286
294
287
295
final Map <Path ,Path > replacements = context .getVolumeReplacements ();
288
296
297
+ log .info ("Batch {} attempting to delete {} gcCandidate files" , batchCount .get (),
298
+ confirmedDeletes .size ());
299
+ Timer timer = Timer .startNew ();
289
300
for (final GcCandidate delete : confirmedDeletes .values ()) {
290
-
291
301
Runnable deleteTask = () -> {
292
302
boolean removeFlag = false ;
293
303
@@ -311,7 +321,7 @@ public void deleteConfirmedCandidates(SortedMap<String,GcCandidate> confirmedDel
311
321
}
312
322
313
323
for (Path pathToDel : GcVolumeUtil .expandAllVolumesUri (fs , fullPath )) {
314
- log .debug ("{} Deleting {}" , fileActionPrefix , pathToDel );
324
+ log .debug ("Batch {} {} Deleting {}" , batchCount . get () , fileActionPrefix , pathToDel );
315
325
316
326
if (moveToTrash (pathToDel ) || fs .deleteRecursively (pathToDel )) {
317
327
// delete succeeded, still want to delete
@@ -364,7 +374,12 @@ public void deleteConfirmedCandidates(SortedMap<String,GcCandidate> confirmedDel
364
374
deleteThreadPool .shutdown ();
365
375
366
376
try {
367
- while (!deleteThreadPool .awaitTermination (1000 , TimeUnit .MILLISECONDS )) { // empty
377
+ while (!deleteThreadPool .awaitTermination (1000 , TimeUnit .MILLISECONDS )) {
378
+ if (timer .hasElapsed (loggingInterval )) {
379
+ log .info ("Batch {} deleting file {} of {}" , batchCount .get (),
380
+ deleteThreadPool .getCompletedTaskCount (), confirmedDeletes .size ());
381
+ timer .restart ();
382
+ }
368
383
}
369
384
} catch (InterruptedException e1 ) {
370
385
log .error ("{}" , e1 .getMessage (), e1 );
@@ -408,7 +423,8 @@ public void incrementInUseStat(long i) {
408
423
409
424
@ VisibleForTesting
410
425
static void minimizeDeletes (SortedMap <String ,GcCandidate > confirmedDeletes ,
411
- List <GcCandidate > processedDeletes , VolumeManager fs , Logger logger ) {
426
+ List <GcCandidate > processedDeletes , VolumeManager fs , Logger logger ,
427
+ Duration loggingInterval ) {
412
428
Set <Path > seenVolumes = new HashSet <>();
413
429
414
430
// when deleting a dir and all files in that dir, only need to delete the dir.
@@ -418,7 +434,11 @@ static void minimizeDeletes(SortedMap<String,GcCandidate> confirmedDeletes,
418
434
419
435
String lastDirRel = null ;
420
436
Path lastDirAbs = null ;
437
+ Timer progressTimer = Timer .startNew ();
438
+ int progressCount = 0 ;
439
+ int totalDeletes = confirmedDeletes .size ();
421
440
while (cdIter .hasNext ()) {
441
+ progressCount ++;
422
442
Map .Entry <String ,GcCandidate > entry = cdIter .next ();
423
443
String relPath = entry .getKey ();
424
444
Path absPath = new Path (entry .getValue ().getPath ());
@@ -458,6 +478,10 @@ static void minimizeDeletes(SortedMap<String,GcCandidate> confirmedDeletes,
458
478
lastDirAbs = null ;
459
479
}
460
480
}
481
+ if (progressTimer .hasElapsed (loggingInterval )) {
482
+ logger .debug ("Minimizing delete {} of {}" , progressCount , totalDeletes );
483
+ progressTimer .restart ();
484
+ }
461
485
}
462
486
}
463
487
0 commit comments