-
Notifications
You must be signed in to change notification settings - Fork 455
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Attempted to parallelize bulk import loading #5375
base: 2.1
Are you sure you want to change the base?
Changes from 1 commit
108cdc5
f016945
551dde0
a5f8b88
932efdc
a616266
e920687
2f65c47
4727fd3
c7f1040
dc2419e
c1e4f4e
bcba44b
ddfd569
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -32,6 +32,7 @@ | |
import java.util.Map; | ||
import java.util.NoSuchElementException; | ||
import java.util.Set; | ||
import java.util.concurrent.TimeUnit; | ||
|
||
import org.apache.accumulo.core.client.BatchWriter; | ||
import org.apache.accumulo.core.client.MutationsRejectedException; | ||
|
@@ -184,6 +185,7 @@ void start(Path bulkDir, Manager manager, long tid, boolean setTime) throws Exce | |
|
||
private void sendQueued(int threshhold) { | ||
if (queuedDataSize > threshhold || threshhold == 0) { | ||
var sendTimer = Timer.startNew(); | ||
loadQueue.forEach((server, tabletFiles) -> { | ||
|
||
if (log.isTraceEnabled()) { | ||
|
@@ -195,17 +197,41 @@ private void sendQueued(int threshhold) { | |
try { | ||
client = ThriftUtil.getClient(ThriftClientTypes.TABLET_SERVER, server, | ||
manager.getContext(), timeInMillis); | ||
client.loadFiles(TraceUtil.traceInfo(), manager.getContext().rpcCreds(), tid, | ||
bulkDir.toString(), tabletFiles, setTime); | ||
// Send a message per tablet. On the tablet server side for each tablet it must write to | ||
// the metadata tablet which requires waiting on the walog. Sending a message per tablet | ||
// allows these per tablet metadata table writes to run in parallel. This avoids | ||
// serially waiting on the metadata table write for each tablet. | ||
for (var entry : tabletFiles.entrySet()) { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This is the code that I changed to do an RPC per tablet instead of per tablet server. I was hoping these would execute in parallel on the tablet server, but that does not seem to be happening. |
||
TKeyExtent tExtent = entry.getKey(); | ||
Map<String,MapFileInfo> files = entry.getValue(); | ||
|
||
client.loadFiles(TraceUtil.traceInfo(), manager.getContext().rpcCreds(), tid, | ||
bulkDir.toString(), Map.of(tExtent, files), setTime); | ||
client.getOutputProtocol().getTransport().flush(); | ||
} | ||
} catch (TException ex) { | ||
log.debug("rpc failed server: " + server + ", " + fmtTid + " " + ex.getMessage(), ex); | ||
} finally { | ||
ThriftUtil.returnClient(client, manager.getContext()); | ||
} | ||
}); | ||
|
||
if (log.isDebugEnabled()) { | ||
var elapsed = sendTimer.elapsed(TimeUnit.MILLISECONDS); | ||
int count = 0; | ||
for (var tableFiles : loadQueue.values()) { | ||
for (var files : tableFiles.values()) { | ||
count += files.size(); | ||
} | ||
} | ||
|
||
log.debug("{} sent {} messages to {} tablet servers in {} ms", fmtTid, count, | ||
loadQueue.size(), elapsed); | ||
} | ||
|
||
loadQueue.clear(); | ||
queuedDataSize = 0; | ||
|
||
} | ||
} | ||
|
||
|
@@ -267,9 +293,11 @@ long finish() { | |
|
||
long sleepTime = 0; | ||
if (loadMsgs.size() > 0) { | ||
// find which tablet server had the most load messages sent to it and sleep 13ms for each | ||
// load message | ||
sleepTime = loadMsgs.max() * 13; | ||
// Find which tablet server had the most load messages sent to it and sleep 13ms for each | ||
// load message. Assuming it takes 13ms to process a single message. The tablet server will | ||
// process these message in parallel, so assume it can process 16 in parallel. Must return a | ||
// non-zero value when messages were sent or the calling code will think everything is done. | ||
sleepTime = Math.max(1, (loadMsgs.max() * 13) / 16); | ||
} | ||
|
||
if (locationLess > 0) { | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -101,6 +101,7 @@ | |
import org.apache.accumulo.core.util.ByteBufferUtil; | ||
import org.apache.accumulo.core.util.Halt; | ||
import org.apache.accumulo.core.util.Pair; | ||
import org.apache.accumulo.core.util.UtilWaitThread; | ||
import org.apache.accumulo.core.util.threads.Threads; | ||
import org.apache.accumulo.server.ServerContext; | ||
import org.apache.accumulo.server.compaction.CompactionInfo; | ||
|
@@ -219,6 +220,7 @@ public void loadFiles(TInfo tinfo, TCredentials credentials, long tid, String di | |
|
||
watcher.runQuietly(Constants.BULK_ARBITRATOR_TYPE, tid, () -> { | ||
tabletImports.forEach((tke, fileMap) -> { | ||
log.debug("Starting bulk import for {} ", KeyExtent.fromThrift(tke)); | ||
Map<TabletFile,MapFileInfo> newFileMap = new HashMap<>(); | ||
|
||
for (Entry<String,MapFileInfo> mapping : fileMap.entrySet()) { | ||
|
@@ -243,6 +245,8 @@ public void loadFiles(TInfo tinfo, TCredentials credentials, long tid, String di | |
server.removeBulkImportState(files); | ||
} | ||
} | ||
UtilWaitThread.sleep(100); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Addes this sleep so that I could verify things were running concurrently in the tserver and never saw that happen. |
||
log.debug("Finished bulk import for {} ", KeyExtent.fromThrift(tke)); | ||
}); | ||
}); | ||
|
||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This was added as debug while trying to understand what is going on.