26
26
27
27
import java .io .FileNotFoundException ;
28
28
import java .io .IOException ;
29
+ import java .util .ArrayList ;
29
30
import java .util .Collection ;
30
31
import java .util .HashMap ;
31
32
import java .util .HashSet ;
34
35
import java .util .Map .Entry ;
35
36
import java .util .Set ;
36
37
import java .util .UUID ;
37
- import java .util .stream . Stream ;
38
+ import java .util .concurrent . atomic . AtomicBoolean ;
38
39
39
40
import org .apache .accumulo .core .gc .thrift .GCStatus ;
40
41
import org .apache .accumulo .core .gc .thrift .GcCycleStats ;
59
60
import org .slf4j .LoggerFactory ;
60
61
61
62
import com .google .common .annotations .VisibleForTesting ;
62
- import com .google .common .collect .Streams ;
63
+ import com .google .common .base .Preconditions ;
64
+ import com .google .common .collect .Iterators ;
63
65
64
66
import io .opentelemetry .api .trace .Span ;
65
67
import io .opentelemetry .context .Scope ;
66
68
67
- public class GarbageCollectWriteAheadLogs {
69
+ public class GarbageCollectWriteAheadLogs implements AutoCloseable {
68
70
private static final Logger log = LoggerFactory .getLogger (GarbageCollectWriteAheadLogs .class );
69
71
70
72
private final ServerContext context ;
71
73
private final VolumeManager fs ;
72
74
private final LiveTServerSet liveServers ;
73
75
private final WalStateManager walMarker ;
74
- private final Stream <TabletMetadata > store ;
76
+ private final Iterable <TabletMetadata > store ;
77
+ private final List <AutoCloseable > closeables = new ArrayList <>();
78
+ private final AtomicBoolean hasCollected ;
75
79
76
80
/**
77
81
* Creates a new GC WAL object.
@@ -85,17 +89,20 @@ public class GarbageCollectWriteAheadLogs {
85
89
this .fs = fs ;
86
90
this .liveServers = liveServers ;
87
91
this .walMarker = new WalStateManager (context );
92
+
88
93
TabletsMetadata root = context .getAmple ().readTablets ().forLevel (DataLevel .ROOT )
89
94
.filter (new HasWalsFilter ()).fetch (LOCATION , LAST , LOGS , PREV_ROW , SUSPEND ).build ();
90
95
TabletsMetadata metadata = context .getAmple ().readTablets ().forLevel (DataLevel .METADATA )
91
96
.filter (new HasWalsFilter ()).fetch (LOCATION , LAST , LOGS , PREV_ROW , SUSPEND ).build ();
92
97
TabletsMetadata user = context .getAmple ().readTablets ().forLevel (DataLevel .USER )
93
98
.filter (new HasWalsFilter ()).fetch (LOCATION , LAST , LOGS , PREV_ROW , SUSPEND ).build ();
94
- this .store = Streams .concat (root .stream (), metadata .stream (), user .stream ()).onClose (() -> {
95
- root .close ();
96
- metadata .close ();
97
- user .close ();
98
- });
99
+
100
+ closeables .add (root );
101
+ closeables .add (metadata );
102
+ closeables .add (user );
103
+
104
+ this .store = () -> Iterators .concat (root .iterator (), metadata .iterator (), user .iterator ());
105
+ this .hasCollected = new AtomicBoolean (false );
99
106
}
100
107
101
108
/**
@@ -107,15 +114,17 @@ public class GarbageCollectWriteAheadLogs {
107
114
*/
108
115
@ VisibleForTesting
109
116
GarbageCollectWriteAheadLogs (ServerContext context , VolumeManager fs ,
110
- LiveTServerSet liveTServerSet , WalStateManager walMarker , Stream <TabletMetadata > store ) {
117
+ LiveTServerSet liveTServerSet , WalStateManager walMarker , Iterable <TabletMetadata > store ) {
111
118
this .context = context ;
112
119
this .fs = fs ;
113
120
this .liveServers = liveTServerSet ;
114
121
this .walMarker = walMarker ;
115
122
this .store = store ;
123
+ this .hasCollected = new AtomicBoolean (false );
116
124
}
117
125
118
126
public void collect (GCStatus status ) {
127
+ Preconditions .checkState (!hasCollected .get (), "Can only call collect once per object" );
119
128
try {
120
129
long count ;
121
130
long fileScanStop ;
@@ -203,7 +212,7 @@ public void collect(GCStatus status) {
203
212
} finally {
204
213
span5 .end ();
205
214
}
206
-
215
+ hasCollected . set ( true );
207
216
} catch (Exception e ) {
208
217
log .error ("exception occurred while garbage collecting write ahead logs" , e );
209
218
} finally {
@@ -285,7 +294,7 @@ private Map<UUID,TServerInstance> removeEntriesInUse(Map<TServerInstance,Set<UUI
285
294
}
286
295
287
296
// remove any entries if there's a log reference (recovery hasn't finished)
288
- store . forEach ( tabletMetadata -> {
297
+ for ( TabletMetadata tabletMetadata : store ) {
289
298
// Tablet is still assigned to a dead server. Manager has moved markers and reassigned it
290
299
// Easiest to just ignore all the WALs for the dead server.
291
300
if (TabletState .compute (tabletMetadata , liveServers ) == TabletState .ASSIGNED_TO_DEAD_SERVER ) {
@@ -307,8 +316,7 @@ private Map<UUID,TServerInstance> removeEntriesInUse(Map<TServerInstance,Set<UUI
307
316
recoveryLogs .keySet ().removeAll (idsToIgnore );
308
317
}
309
318
}
310
- });
311
- store .close ();
319
+ }
312
320
313
321
// Remove OPEN and CLOSED logs for live servers: they are still in use
314
322
for (TServerInstance liveServer : liveServers ) {
@@ -376,4 +384,15 @@ protected Map<UUID,Path> getSortedWALogs() throws IOException {
376
384
}
377
385
return result ;
378
386
}
387
+
388
+ @ Override
389
+ public void close () throws Exception {
390
+ for (AutoCloseable closable : closeables ) {
391
+ try {
392
+ closable .close ();
393
+ } catch (Exception e ) {
394
+ throw new RuntimeException (e );
395
+ }
396
+ }
397
+ }
379
398
}
0 commit comments