26
26
27
27
import java .io .FileNotFoundException ;
28
28
import java .io .IOException ;
29
- import java .util .ArrayList ;
30
29
import java .util .Collection ;
31
30
import java .util .HashMap ;
32
31
import java .util .HashSet ;
36
35
import java .util .Set ;
37
36
import java .util .UUID ;
38
37
import java .util .concurrent .atomic .AtomicBoolean ;
38
+ import java .util .stream .Stream ;
39
39
40
40
import org .apache .accumulo .core .gc .thrift .GCStatus ;
41
41
import org .apache .accumulo .core .gc .thrift .GcCycleStats ;
61
61
62
62
import com .google .common .annotations .VisibleForTesting ;
63
63
import com .google .common .base .Preconditions ;
64
- import com .google .common .collect .Iterators ;
64
+ import com .google .common .collect .Streams ;
65
65
66
66
import io .opentelemetry .api .trace .Span ;
67
67
import io .opentelemetry .context .Scope ;
68
68
69
- public class GarbageCollectWriteAheadLogs implements AutoCloseable {
69
+ public class GarbageCollectWriteAheadLogs {
70
70
private static final Logger log = LoggerFactory .getLogger (GarbageCollectWriteAheadLogs .class );
71
71
72
72
private final ServerContext context ;
73
73
private final VolumeManager fs ;
74
74
private final LiveTServerSet liveServers ;
75
75
private final WalStateManager walMarker ;
76
- private final Iterable <TabletMetadata > store ;
77
- private final List <AutoCloseable > closeables = new ArrayList <>();
78
76
private final AtomicBoolean hasCollected ;
79
77
80
78
/**
@@ -88,39 +86,28 @@ public class GarbageCollectWriteAheadLogs implements AutoCloseable {
88
86
this .context = context ;
89
87
this .fs = fs ;
90
88
this .liveServers = liveServers ;
91
- this .walMarker = new WalStateManager (context );
89
+ this .walMarker = getWalStateManager ();
90
+ this .hasCollected = new AtomicBoolean (false );
91
+ }
92
92
93
+ @ VisibleForTesting
94
+ WalStateManager getWalStateManager () {
95
+ return new WalStateManager (context );
96
+ }
97
+
98
+ @ VisibleForTesting
99
+ Stream <TabletMetadata > getStore () {
93
100
TabletsMetadata root = context .getAmple ().readTablets ().forLevel (DataLevel .ROOT )
94
101
.filter (new HasWalsFilter ()).fetch (LOCATION , LAST , LOGS , PREV_ROW , SUSPEND ).build ();
95
102
TabletsMetadata metadata = context .getAmple ().readTablets ().forLevel (DataLevel .METADATA )
96
103
.filter (new HasWalsFilter ()).fetch (LOCATION , LAST , LOGS , PREV_ROW , SUSPEND ).build ();
97
104
TabletsMetadata user = context .getAmple ().readTablets ().forLevel (DataLevel .USER )
98
105
.filter (new HasWalsFilter ()).fetch (LOCATION , LAST , LOGS , PREV_ROW , SUSPEND ).build ();
99
-
100
- closeables .add (root );
101
- closeables .add (metadata );
102
- closeables .add (user );
103
-
104
- this .store = () -> Iterators .concat (root .iterator (), metadata .iterator (), user .iterator ());
105
- this .hasCollected = new AtomicBoolean (false );
106
- }
107
-
108
- /**
109
- * Creates a new GC WAL object. Meant for testing -- allows mocked objects.
110
- *
111
- * @param context the collection server's context
112
- * @param fs volume manager to use
113
- * @param liveTServerSet a started LiveTServerSet instance
114
- */
115
- @ VisibleForTesting
116
- GarbageCollectWriteAheadLogs (ServerContext context , VolumeManager fs ,
117
- LiveTServerSet liveTServerSet , WalStateManager walMarker , Iterable <TabletMetadata > store ) {
118
- this .context = context ;
119
- this .fs = fs ;
120
- this .liveServers = liveTServerSet ;
121
- this .walMarker = walMarker ;
122
- this .store = store ;
123
- this .hasCollected = new AtomicBoolean (false );
106
+ return Streams .concat (root .stream (), metadata .stream (), user .stream ()).onClose (() -> {
107
+ root .close ();
108
+ metadata .close ();
109
+ user .close ();
110
+ });
124
111
}
125
112
126
113
public void collect (GCStatus status ) {
@@ -294,28 +281,32 @@ private Map<UUID,TServerInstance> removeEntriesInUse(Map<TServerInstance,Set<UUI
294
281
}
295
282
296
283
// remove any entries if there's a log reference (recovery hasn't finished)
297
- for (TabletMetadata tabletMetadata : store ) {
298
- // Tablet is still assigned to a dead server. Manager has moved markers and reassigned it
299
- // Easiest to just ignore all the WALs for the dead server.
300
- if (TabletState .compute (tabletMetadata , liveServers ) == TabletState .ASSIGNED_TO_DEAD_SERVER ) {
301
- Set <UUID > idsToIgnore = candidates .remove (tabletMetadata .getLocation ().getServerInstance ());
302
- if (idsToIgnore != null ) {
303
- result .keySet ().removeAll (idsToIgnore );
304
- recoveryLogs .keySet ().removeAll (idsToIgnore );
284
+ try (Stream <TabletMetadata > store = getStore ()) {
285
+ store .forEach (tabletMetadata -> {
286
+ // Tablet is still assigned to a dead server. Manager has moved markers and reassigned it
287
+ // Easiest to just ignore all the WALs for the dead server.
288
+ if (TabletState .compute (tabletMetadata , liveServers )
289
+ == TabletState .ASSIGNED_TO_DEAD_SERVER ) {
290
+ Set <UUID > idsToIgnore =
291
+ candidates .remove (tabletMetadata .getLocation ().getServerInstance ());
292
+ if (idsToIgnore != null ) {
293
+ result .keySet ().removeAll (idsToIgnore );
294
+ recoveryLogs .keySet ().removeAll (idsToIgnore );
295
+ }
305
296
}
306
- }
307
- // Tablet is being recovered and has WAL references, remove all the WALs for the dead server
308
- // that made the WALs.
309
- for ( LogEntry wal : tabletMetadata . getLogs ()) {
310
- UUID walUUID = wal . getUniqueID ( );
311
- TServerInstance dead = result . get ( walUUID );
312
- // There's a reference to a log file, so skip that server's logs
313
- Set < UUID > idsToIgnore = candidates . remove ( dead );
314
- if ( idsToIgnore != null ) {
315
- result .keySet ().removeAll (idsToIgnore );
316
- recoveryLogs . keySet (). removeAll ( idsToIgnore );
297
+ // Tablet is being recovered and has WAL references, remove all the WALs for the dead server
298
+ // that made the WALs.
299
+ for ( LogEntry wal : tabletMetadata . getLogs ()) {
300
+ UUID walUUID = wal . getUniqueID ();
301
+ TServerInstance dead = result . get ( walUUID );
302
+ // There's a reference to a log file, so skip that server's logs
303
+ Set < UUID > idsToIgnore = candidates . remove ( dead );
304
+ if ( idsToIgnore != null ) {
305
+ result . keySet (). removeAll ( idsToIgnore );
306
+ recoveryLogs .keySet ().removeAll (idsToIgnore );
307
+ }
317
308
}
318
- }
309
+ });
319
310
}
320
311
321
312
// Remove OPEN and CLOSED logs for live servers: they are still in use
@@ -385,14 +376,4 @@ protected Map<UUID,Path> getSortedWALogs() throws IOException {
385
376
return result ;
386
377
}
387
378
388
- @ Override
389
- public void close () throws Exception {
390
- for (AutoCloseable closable : closeables ) {
391
- try {
392
- closable .close ();
393
- } catch (Exception e ) {
394
- throw new RuntimeException (e );
395
- }
396
- }
397
- }
398
379
}
0 commit comments