Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Attempted to parallelize bulk import loading #5375

Open
wants to merge 14 commits into
base: 2.1
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,7 @@ public CustomFrameBuffer(TNonblockingTransport trans, SelectionKey selectionKey,
super(trans, selectionKey, selectThread);
// Store the clientAddress in the buffer so it can be referenced for logging during read/write
this.clientAddress = getClientAddress();
log.debug("created custom frame buffer ", new Exception());
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This was added as debug while trying to understand what is going on.

}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import java.util.Map;
import java.util.NoSuchElementException;
import java.util.Set;
import java.util.concurrent.TimeUnit;

import org.apache.accumulo.core.client.BatchWriter;
import org.apache.accumulo.core.client.MutationsRejectedException;
Expand Down Expand Up @@ -184,6 +185,7 @@ void start(Path bulkDir, Manager manager, long tid, boolean setTime) throws Exce

private void sendQueued(int threshhold) {
if (queuedDataSize > threshhold || threshhold == 0) {
var sendTimer = Timer.startNew();
loadQueue.forEach((server, tabletFiles) -> {

if (log.isTraceEnabled()) {
Expand All @@ -195,17 +197,41 @@ private void sendQueued(int threshhold) {
try {
client = ThriftUtil.getClient(ThriftClientTypes.TABLET_SERVER, server,
manager.getContext(), timeInMillis);
client.loadFiles(TraceUtil.traceInfo(), manager.getContext().rpcCreds(), tid,
bulkDir.toString(), tabletFiles, setTime);
// Send a message per tablet. On the tablet server side for each tablet it must write to
// the metadata tablet which requires waiting on the walog. Sending a message per tablet
// allows these per tablet metadata table writes to run in parallel. This avoids
// serially waiting on the metadata table write for each tablet.
for (var entry : tabletFiles.entrySet()) {
Copy link
Contributor Author

@keith-turner keith-turner Mar 1, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is the code that I changed to do an RPC per tablet instead of per tablet server. I was hoping these would execute in parallel on the tablet server, but that does not seem to be happening.

TKeyExtent tExtent = entry.getKey();
Map<String,MapFileInfo> files = entry.getValue();

client.loadFiles(TraceUtil.traceInfo(), manager.getContext().rpcCreds(), tid,
bulkDir.toString(), Map.of(tExtent, files), setTime);
client.getOutputProtocol().getTransport().flush();
}
} catch (TException ex) {
log.debug("rpc failed server: " + server + ", " + fmtTid + " " + ex.getMessage(), ex);
} finally {
ThriftUtil.returnClient(client, manager.getContext());
}
});

if (log.isDebugEnabled()) {
var elapsed = sendTimer.elapsed(TimeUnit.MILLISECONDS);
int count = 0;
for (var tableFiles : loadQueue.values()) {
for (var files : tableFiles.values()) {
count += files.size();
}
}

log.debug("{} sent {} messages to {} tablet servers in {} ms", fmtTid, count,
loadQueue.size(), elapsed);
}

loadQueue.clear();
queuedDataSize = 0;

}
}

Expand Down Expand Up @@ -267,9 +293,11 @@ long finish() {

long sleepTime = 0;
if (loadMsgs.size() > 0) {
// find which tablet server had the most load messages sent to it and sleep 13ms for each
// load message
sleepTime = loadMsgs.max() * 13;
// Find which tablet server had the most load messages sent to it and sleep 13ms for each
// load message. Assuming it takes 13ms to process a single message. The tablet server will
// process these message in parallel, so assume it can process 16 in parallel. Must return a
// non-zero value when messages were sent or the calling code will think everything is done.
sleepTime = Math.max(1, (loadMsgs.max() * 13) / 16);
}

if (locationLess > 0) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,7 @@
import org.apache.accumulo.core.util.ByteBufferUtil;
import org.apache.accumulo.core.util.Halt;
import org.apache.accumulo.core.util.Pair;
import org.apache.accumulo.core.util.UtilWaitThread;
import org.apache.accumulo.core.util.threads.Threads;
import org.apache.accumulo.server.ServerContext;
import org.apache.accumulo.server.compaction.CompactionInfo;
Expand Down Expand Up @@ -219,6 +220,7 @@ public void loadFiles(TInfo tinfo, TCredentials credentials, long tid, String di

watcher.runQuietly(Constants.BULK_ARBITRATOR_TYPE, tid, () -> {
tabletImports.forEach((tke, fileMap) -> {
log.debug("Starting bulk import for {} ", KeyExtent.fromThrift(tke));
Map<TabletFile,MapFileInfo> newFileMap = new HashMap<>();

for (Entry<String,MapFileInfo> mapping : fileMap.entrySet()) {
Expand All @@ -243,6 +245,8 @@ public void loadFiles(TInfo tinfo, TCredentials credentials, long tid, String di
server.removeBulkImportState(files);
}
}
UtilWaitThread.sleep(100);
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Addes this sleep so that I could verify things were running concurrently in the tserver and never saw that happen.

log.debug("Finished bulk import for {} ", KeyExtent.fromThrift(tke));
});
});

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executors;
import java.util.stream.Collectors;
import java.util.stream.IntStream;

import org.apache.accumulo.core.client.Accumulo;
import org.apache.accumulo.core.client.AccumuloClient;
Expand Down Expand Up @@ -621,6 +622,47 @@ public void testExceptionInMetadataUpdate() throws Exception {
}
}

@Test
public void testManyTablets() throws Exception {

try (AccumuloClient c = Accumulo.newClient().from(getClientProps()).build()) {
String dir = getDir("/testManyTablets-");
writeData(dir + "/f1.", aconf, 0, 199);
writeData(dir + "/f2.", aconf, 200, 399);
writeData(dir + "/f3.", aconf, 400, 599);
writeData(dir + "/f4.", aconf, 600, 799);
writeData(dir + "/f5.", aconf, 800, 999);

var splits = IntStream.range(1, 1000).mapToObj(BulkNewIT::row).map(Text::new)
.collect(Collectors.toCollection(TreeSet::new));

// faster to create a table w/ lots of splits
c.tableOperations().delete(tableName);
c.tableOperations().create(tableName, new NewTableConfiguration().withSplits(splits));

var lpBuilder = LoadPlan.builder();
lpBuilder.loadFileTo("f1.rf", RangeType.TABLE, null, row(1));
IntStream.range(2, 200)
.forEach(i -> lpBuilder.loadFileTo("f1.rf", RangeType.TABLE, row(i - 1), row(i)));
IntStream.range(200, 400)
.forEach(i -> lpBuilder.loadFileTo("f2.rf", RangeType.TABLE, row(i - 1), row(i)));
IntStream.range(400, 600)
.forEach(i -> lpBuilder.loadFileTo("f3.rf", RangeType.TABLE, row(i - 1), row(i)));
IntStream.range(600, 800)
.forEach(i -> lpBuilder.loadFileTo("f4.rf", RangeType.TABLE, row(i - 1), row(i)));
IntStream.range(800, 1000)
.forEach(i -> lpBuilder.loadFileTo("f5.rf", RangeType.TABLE, row(i - 1), row(i)));

var loadPlan = lpBuilder.build();

c.tableOperations().importDirectory(dir).to(tableName).plan(loadPlan).load();

verifyData(c, tableName, 0, 999, false);

}

}

private void addSplits(AccumuloClient client, String tableName, String splitString)
throws Exception {
SortedSet<Text> splits = new TreeSet<>();
Expand Down