Skip to content

Commit 7d39c8d

Browse files
committed
moved compaction thread interruption to FileCompactor
1 parent 4e24af0 commit 7d39c8d

File tree

5 files changed

+49
-69
lines changed

5 files changed

+49
-69
lines changed

server/base/src/main/java/org/apache/accumulo/server/compaction/CompactionInfo.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -65,7 +65,7 @@ public long getEntriesWritten() {
6565
}
6666

6767
public Thread getThread() {
68-
return compactor.thread;
68+
return compactor.getThread();
6969
}
7070

7171
public String getOutputFile() {

server/base/src/main/java/org/apache/accumulo/server/compaction/FileCompactor.java

+43-6
Original file line numberDiff line numberDiff line change
@@ -79,6 +79,7 @@
7979
import org.slf4j.Logger;
8080
import org.slf4j.LoggerFactory;
8181

82+
import com.google.common.base.Preconditions;
8283
import com.google.common.collect.Collections2;
8384

8485
import io.opentelemetry.api.trace.Span;
@@ -141,13 +142,48 @@ SystemIteratorEnvironment createIteratorEnv(ServerContext context,
141142

142143
// a unique id to identify a compactor
143144
private final long compactorID = nextCompactorID.getAndIncrement();
144-
protected volatile Thread thread;
145+
private volatile Thread thread;
145146
private final ServerContext context;
146147

147148
private final AtomicBoolean interruptFlag = new AtomicBoolean(false);
148149

149-
public void interrupt() {
150+
public synchronized void interrupt() {
150151
interruptFlag.set(true);
152+
153+
if (thread != null) {
154+
// Never want to interrupt the thread after clearThread was called as the thread could have
155+
// moved on to something completely different than the compaction. This method and clearThread
156+
// being synchronized and clearThread setting thread to null prevent this.
157+
thread.interrupt();
158+
}
159+
}
160+
161+
private class ThreadClearer implements AutoCloseable {
162+
@Override
163+
public void close() throws InterruptedException {
164+
clearThread();
165+
}
166+
}
167+
168+
private synchronized ThreadClearer setThread() {
169+
thread = Thread.currentThread();
170+
return new ThreadClearer();
171+
}
172+
173+
private synchronized void clearThread() throws InterruptedException {
174+
Preconditions.checkState(thread == Thread.currentThread());
175+
thread = null;
176+
// If the thread was interrupted during compaction do not want to allow the thread to continue
177+
// w/ the interrupt status set as this could impact code unrelated to the compaction. For
178+
// internal compactions the thread will execute metadata update code after the compaction and
179+
// would not want the interrupt status set for that.
180+
if (Thread.interrupted()) {
181+
throw new InterruptedException();
182+
}
183+
}
184+
185+
Thread getThread() {
186+
return thread;
151187
}
152188

153189
public long getCompactorID() {
@@ -272,7 +308,8 @@ protected Map<String,Set<ByteSequence>> getLocalityGroups(AccumuloConfiguration
272308
}
273309

274310
@Override
275-
public CompactionStats call() throws IOException, CompactionCanceledException {
311+
public CompactionStats call()
312+
throws IOException, CompactionCanceledException, InterruptedException {
276313

277314
FileSKVWriter mfw = null;
278315

@@ -290,8 +327,9 @@ public CompactionStats call() throws IOException, CompactionCanceledException {
290327
String newThreadName =
291328
"MajC compacting " + extent + " started " + threadStartDate + " file: " + outputFile;
292329
Thread.currentThread().setName(newThreadName);
293-
thread = Thread.currentThread();
294-
try {
330+
// Use try w/ resources for clearing the thread instead of finally because clearing may throw an
331+
// exception. Java's handling of exceptions thrown in finally blocks is not good.
332+
try (var ignored = setThread()) {
295333
FileOperations fileFactory = FileOperations.getInstance();
296334
FileSystem ns = this.fs.getFileSystemByPath(outputFile.getPath());
297335

@@ -374,7 +412,6 @@ public CompactionStats call() throws IOException, CompactionCanceledException {
374412
} finally {
375413
Thread.currentThread().setName(oldThreadName);
376414
if (remove) {
377-
thread = null;
378415
runningCompactions.remove(this);
379416
}
380417

server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/CompactableImpl.java

+2-58
Original file line numberDiff line numberDiff line change
@@ -1313,56 +1313,6 @@ private void completeCompaction(CompactionJob job, Set<StoredTabletFile> jobFile
13131313
selectFiles();
13141314
}
13151315

1316-
private final Set<CompactionInterrupter> compactionInterrupters = ConcurrentHashMap.newKeySet();
1317-
1318-
private class CompactionInterrupter implements AutoCloseable {
1319-
private Thread thread;
1320-
1321-
CompactionInterrupter() throws InterruptedException {
1322-
1323-
this.thread = Thread.currentThread();
1324-
1325-
if (Thread.currentThread().isInterrupted() || closed) {
1326-
throw new InterruptedException();
1327-
}
1328-
1329-
compactionInterrupters.add(this);
1330-
log.trace("Registered compaction interrupter for {} {}", thread.getId(), getExtent());
1331-
}
1332-
1333-
public synchronized void interrupt() {
1334-
if (thread != null) {
1335-
log.debug("Interrupting compaction thread {} for {}", thread.getId(), getExtent());
1336-
thread.interrupt();
1337-
}
1338-
}
1339-
1340-
@Override
1341-
public void close() throws InterruptedException {
1342-
synchronized (this) {
1343-
Preconditions.checkState(thread == Thread.currentThread());
1344-
1345-
// The goal of this class is not to interrupt the compaction thread after this close()
1346-
// method returns. Two things help achieve this goal, first we set the thread instance var
1347-
// null here. Second this code block and the interrupt() method are synchronized.
1348-
thread = null;
1349-
}
1350-
1351-
// call this outside of synchronized block to avoid deadlock with the locks inside the
1352-
// concurrent hash set
1353-
compactionInterrupters.remove(this);
1354-
1355-
log.trace("Unregistered compaction interrupter for {} {}", Thread.currentThread().getId(),
1356-
getExtent());
1357-
1358-
// Its possible the threads interrupt status was set but nothing ever checked it. For example
1359-
// interrupt() could have been called immediately before this method was called.
1360-
if (Thread.interrupted()) {
1361-
throw new InterruptedException();
1362-
}
1363-
}
1364-
}
1365-
13661316
@Override
13671317
public void compact(CompactionServiceId service, CompactionJob job, BooleanSupplier keepRunning,
13681318
RateLimiter readLimiter, RateLimiter writeLimiter, long queuedTime) {
@@ -1382,7 +1332,6 @@ public void compact(CompactionServiceId service, CompactionJob job, BooleanSuppl
13821332
boolean successful = false;
13831333
try {
13841334
TabletLogger.compacting(getExtent(), job, cInfo.localCompactionCfg);
1385-
13861335
tablet.incrementStatusMajor();
13871336
var check = new CompactionCheck(service, kind, keepRunning, cInfo.checkCompactionId);
13881337
TabletFile tmpFileName = tablet.getNextMapFilenameForMajc(cInfo.propagateDeletes);
@@ -1391,11 +1340,8 @@ public void compact(CompactionServiceId service, CompactionJob job, BooleanSuppl
13911340
SortedMap<StoredTabletFile,DataFileValue> allFiles = tablet.getDatafiles();
13921341
HashMap<StoredTabletFile,DataFileValue> compactFiles = new HashMap<>();
13931342
cInfo.jobFiles.forEach(file -> compactFiles.put(file, allFiles.get(file)));
1394-
// Limit interrupting compactions to the part that reads and writes files and avoid
1395-
// interrupting the metadata table updates.
1396-
try (var ignored = new CompactionInterrupter()) {
1397-
stats = CompactableUtils.compact(tablet, job, cInfo, compactEnv, compactFiles, tmpFileName);
1398-
}
1343+
1344+
stats = CompactableUtils.compact(tablet, job, cInfo, compactEnv, compactFiles, tmpFileName);
13991345

14001346
newFile = CompactableUtils.bringOnline(tablet.getDatafileManager(), cInfo, stats,
14011347
compactFiles, allFiles, kind, tmpFileName);
@@ -1653,8 +1599,6 @@ public void close() {
16531599

16541600
closed = true;
16551601

1656-
compactionInterrupters.forEach(CompactionInterrupter::interrupt);
1657-
16581602
// Wait while internal jobs are running or external compactions are committing. When
16591603
// chopStatus is MARKING or selectStatus is SELECTING, there may be metadata table writes so
16601604
// wait on those. Do not wait on external compactions that are running.

server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/CompactableUtils.java

+2-2
Original file line numberDiff line numberDiff line change
@@ -560,7 +560,7 @@ private static AccumuloConfiguration getCompactionConfig(TableConfiguration tabl
560560
static CompactionStats compact(Tablet tablet, CompactionJob job,
561561
CompactableImpl.CompactionInfo cInfo, CompactionEnv cenv,
562562
Map<StoredTabletFile,DataFileValue> compactFiles, TabletFile tmpFileName)
563-
throws IOException, CompactionCanceledException {
563+
throws IOException, CompactionCanceledException, InterruptedException {
564564
TableConfiguration tableConf = tablet.getTableConfiguration();
565565

566566
AccumuloConfiguration compactionConfig = getCompactionConfig(tableConf,
@@ -576,7 +576,7 @@ static CompactionStats compact(Tablet tablet, CompactionJob job,
576576
}
577577
};
578578
final ScheduledFuture<?> future = tablet.getContext().getScheduledExecutor()
579-
.scheduleWithFixedDelay(compactionCancellerTask, 10, 10, TimeUnit.SECONDS);
579+
.scheduleWithFixedDelay(compactionCancellerTask, 3, 3, TimeUnit.SECONDS);
580580
try {
581581
return compactor.call();
582582
} finally {

server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/MinorCompactor.java

+1-2
Original file line numberDiff line numberDiff line change
@@ -137,7 +137,7 @@ public CompactionStats call() {
137137
log.warn("MinC failed ({}) to create {} retrying ...", e.getMessage(), outputFileName, e);
138138
reportedProblem = true;
139139
retryCounter++;
140-
} catch (CompactionCanceledException e) {
140+
} catch (CompactionCanceledException | InterruptedException e) {
141141
throw new IllegalStateException(e);
142142
}
143143

@@ -161,7 +161,6 @@ public CompactionStats call() {
161161

162162
} while (true);
163163
} finally {
164-
thread = null;
165164
runningCompactions.remove(this);
166165
}
167166
}

0 commit comments

Comments
 (0)