34
34
import java .util .Map .Entry ;
35
35
import java .util .Set ;
36
36
import java .util .UUID ;
37
+ import java .util .concurrent .atomic .AtomicBoolean ;
38
+ import java .util .stream .Stream ;
37
39
38
40
import org .apache .accumulo .core .gc .thrift .GCStatus ;
39
41
import org .apache .accumulo .core .gc .thrift .GcCycleStats ;
42
44
import org .apache .accumulo .core .metadata .TabletState ;
43
45
import org .apache .accumulo .core .metadata .schema .Ample .DataLevel ;
44
46
import org .apache .accumulo .core .metadata .schema .TabletMetadata ;
47
+ import org .apache .accumulo .core .metadata .schema .TabletsMetadata ;
45
48
import org .apache .accumulo .core .tabletserver .log .LogEntry ;
46
49
import org .apache .accumulo .core .trace .TraceUtil ;
47
50
import org .apache .accumulo .core .util .Pair ;
57
60
import org .slf4j .LoggerFactory ;
58
61
59
62
import com .google .common .annotations .VisibleForTesting ;
60
- import com .google .common .collect .Iterators ;
63
+ import com .google .common .base .Preconditions ;
64
+ import com .google .common .collect .Streams ;
61
65
62
66
import io .opentelemetry .api .trace .Span ;
63
67
import io .opentelemetry .context .Scope ;
@@ -69,7 +73,7 @@ public class GarbageCollectWriteAheadLogs {
69
73
private final VolumeManager fs ;
70
74
private final LiveTServerSet liveServers ;
71
75
private final WalStateManager walMarker ;
72
- private final Iterable < TabletMetadata > store ;
76
+ private final AtomicBoolean hasCollected ;
73
77
74
78
/**
75
79
* Creates a new GC WAL object.
@@ -82,34 +86,32 @@ public class GarbageCollectWriteAheadLogs {
82
86
this .context = context ;
83
87
this .fs = fs ;
84
88
this .liveServers = liveServers ;
85
- this .walMarker = new WalStateManager (context );
86
- this .store = () -> Iterators .concat (
87
- context .getAmple ().readTablets ().forLevel (DataLevel .ROOT ).filter (new HasWalsFilter ())
88
- .fetch (LOCATION , LAST , LOGS , PREV_ROW , SUSPEND ).build ().iterator (),
89
- context .getAmple ().readTablets ().forLevel (DataLevel .METADATA ).filter (new HasWalsFilter ())
90
- .fetch (LOCATION , LAST , LOGS , PREV_ROW , SUSPEND ).build ().iterator (),
91
- context .getAmple ().readTablets ().forLevel (DataLevel .USER ).filter (new HasWalsFilter ())
92
- .fetch (LOCATION , LAST , LOGS , PREV_ROW , SUSPEND ).build ().iterator ());
89
+ this .walMarker = createWalStateManager (context );
90
+ this .hasCollected = new AtomicBoolean (false );
93
91
}
94
92
95
- /**
96
- * Creates a new GC WAL object. Meant for testing -- allows mocked objects.
97
- *
98
- * @param context the collection server's context
99
- * @param fs volume manager to use
100
- * @param liveTServerSet a started LiveTServerSet instance
101
- */
102
93
@ VisibleForTesting
103
- GarbageCollectWriteAheadLogs (ServerContext context , VolumeManager fs ,
104
- LiveTServerSet liveTServerSet , WalStateManager walMarker , Iterable <TabletMetadata > store ) {
105
- this .context = context ;
106
- this .fs = fs ;
107
- this .liveServers = liveTServerSet ;
108
- this .walMarker = walMarker ;
109
- this .store = store ;
94
+ WalStateManager createWalStateManager (ServerContext context ) {
95
+ return new WalStateManager (context );
96
+ }
97
+
98
+ @ VisibleForTesting
99
+ Stream <TabletMetadata > createStore () {
100
+ TabletsMetadata root = context .getAmple ().readTablets ().forLevel (DataLevel .ROOT )
101
+ .filter (new HasWalsFilter ()).fetch (LOCATION , LAST , LOGS , PREV_ROW , SUSPEND ).build ();
102
+ TabletsMetadata metadata = context .getAmple ().readTablets ().forLevel (DataLevel .METADATA )
103
+ .filter (new HasWalsFilter ()).fetch (LOCATION , LAST , LOGS , PREV_ROW , SUSPEND ).build ();
104
+ TabletsMetadata user = context .getAmple ().readTablets ().forLevel (DataLevel .USER )
105
+ .filter (new HasWalsFilter ()).fetch (LOCATION , LAST , LOGS , PREV_ROW , SUSPEND ).build ();
106
+ return Streams .concat (root .stream (), metadata .stream (), user .stream ()).onClose (() -> {
107
+ root .close ();
108
+ metadata .close ();
109
+ user .close ();
110
+ });
110
111
}
111
112
112
113
public void collect (GCStatus status ) {
114
+ Preconditions .checkState (!hasCollected .get (), "Can only call collect once per object" );
113
115
try {
114
116
long count ;
115
117
long fileScanStop ;
@@ -197,7 +199,7 @@ public void collect(GCStatus status) {
197
199
} finally {
198
200
span5 .end ();
199
201
}
200
-
202
+ hasCollected . set ( true );
201
203
} catch (Exception e ) {
202
204
log .error ("exception occurred while garbage collecting write ahead logs" , e );
203
205
} finally {
@@ -279,28 +281,32 @@ private Map<UUID,TServerInstance> removeEntriesInUse(Map<TServerInstance,Set<UUI
279
281
}
280
282
281
283
// remove any entries if there's a log reference (recovery hasn't finished)
282
- for (TabletMetadata tabletMetadata : store ) {
283
- // Tablet is still assigned to a dead server. Manager has moved markers and reassigned it
284
- // Easiest to just ignore all the WALs for the dead server.
285
- if (TabletState .compute (tabletMetadata , liveServers ) == TabletState .ASSIGNED_TO_DEAD_SERVER ) {
286
- Set <UUID > idsToIgnore = candidates .remove (tabletMetadata .getLocation ().getServerInstance ());
287
- if (idsToIgnore != null ) {
288
- result .keySet ().removeAll (idsToIgnore );
289
- recoveryLogs .keySet ().removeAll (idsToIgnore );
284
+ try (Stream <TabletMetadata > store = createStore ()) {
285
+ store .forEach (tabletMetadata -> {
286
+ // Tablet is still assigned to a dead server. Manager has moved markers and reassigned it
287
+ // Easiest to just ignore all the WALs for the dead server.
288
+ if (TabletState .compute (tabletMetadata , liveServers )
289
+ == TabletState .ASSIGNED_TO_DEAD_SERVER ) {
290
+ Set <UUID > idsToIgnore =
291
+ candidates .remove (tabletMetadata .getLocation ().getServerInstance ());
292
+ if (idsToIgnore != null ) {
293
+ result .keySet ().removeAll (idsToIgnore );
294
+ recoveryLogs .keySet ().removeAll (idsToIgnore );
295
+ }
290
296
}
291
- }
292
- // Tablet is being recovered and has WAL references, remove all the WALs for the dead server
293
- // that made the WALs.
294
- for ( LogEntry wal : tabletMetadata . getLogs ()) {
295
- UUID walUUID = wal . getUniqueID ( );
296
- TServerInstance dead = result . get ( walUUID );
297
- // There's a reference to a log file, so skip that server's logs
298
- Set < UUID > idsToIgnore = candidates . remove ( dead );
299
- if ( idsToIgnore != null ) {
300
- result .keySet ().removeAll (idsToIgnore );
301
- recoveryLogs . keySet (). removeAll ( idsToIgnore );
297
+ // Tablet is being recovered and has WAL references, remove all the WALs for the dead server
298
+ // that made the WALs.
299
+ for ( LogEntry wal : tabletMetadata . getLogs ()) {
300
+ UUID walUUID = wal . getUniqueID ();
301
+ TServerInstance dead = result . get ( walUUID );
302
+ // There's a reference to a log file, so skip that server's logs
303
+ Set < UUID > idsToIgnore = candidates . remove ( dead );
304
+ if ( idsToIgnore != null ) {
305
+ result . keySet (). removeAll ( idsToIgnore );
306
+ recoveryLogs .keySet ().removeAll (idsToIgnore );
307
+ }
302
308
}
303
- }
309
+ });
304
310
}
305
311
306
312
// Remove OPEN and CLOSED logs for live servers: they are still in use
@@ -369,4 +375,5 @@ protected Map<UUID,Path> getSortedWALogs() throws IOException {
369
375
}
370
376
return result ;
371
377
}
378
+
372
379
}
0 commit comments