Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Attempted to parallelize bulk import loading #5375

Open
wants to merge 14 commits into
base: 2.1
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -374,6 +374,11 @@ public enum Property {
"2.1.0"),
MANAGER_BULK_TIMEOUT("manager.bulk.timeout", "5m", PropertyType.TIMEDURATION,
"The time to wait for a tablet server to process a bulk import request.", "1.4.3"),
MANAGER_BULK_MAX_CONNECTIONS("manager.bulk.connections.max", "8", PropertyType.COUNT,
"The maximum number of connections the manager can make to a single tablet server for bulkv2 "
+ "load request. For the case where a single tablet server has a lot of tablets for a bulk import "
+ "increasing this may help lower the time it takes to load those tablets.",
"2.1.4"),
MANAGER_RENAME_THREADS("manager.rename.threadpool.size", "20", PropertyType.COUNT,
"The number of threads to use when renaming user files during table import or bulk ingest.",
"2.1.0"),
Expand Down

Large diffs are not rendered by default.

9 changes: 9 additions & 0 deletions core/src/main/thrift/tabletserver.thrift
Original file line number Diff line number Diff line change
Expand Up @@ -386,6 +386,15 @@ service TabletClientService {
6:bool setTime
)

void loadFilesV2(
1:trace.TInfo tinfo
2:security.TCredentials credentials
3:i64 tid
4:string dir
5:map<data.TKeyExtent, map<string, data.MapFileInfo>> files
6:bool setTime
)

void splitTablet(
4:trace.TInfo tinfo
1:security.TCredentials credentials
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,13 +32,15 @@
import java.util.Map;
import java.util.NoSuchElementException;
import java.util.Set;
import java.util.concurrent.TimeUnit;

import org.apache.accumulo.core.client.BatchWriter;
import org.apache.accumulo.core.client.MutationsRejectedException;
import org.apache.accumulo.core.clientImpl.bulk.Bulk;
import org.apache.accumulo.core.clientImpl.bulk.Bulk.Files;
import org.apache.accumulo.core.clientImpl.bulk.BulkSerialize;
import org.apache.accumulo.core.clientImpl.bulk.LoadMappingIterator;
import org.apache.accumulo.core.conf.AccumuloConfiguration;
import org.apache.accumulo.core.conf.Property;
import org.apache.accumulo.core.data.Mutation;
import org.apache.accumulo.core.dataImpl.KeyExtent;
Expand Down Expand Up @@ -69,6 +71,7 @@
import org.apache.accumulo.server.fs.VolumeManager;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.thrift.TApplicationException;
import org.apache.thrift.TException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -114,7 +117,7 @@ public long isReady(long tid, Manager manager) throws Exception {

Loader loader;
if (bulkInfo.tableState == TableState.ONLINE) {
loader = new OnlineLoader();
loader = new OnlineLoader(manager.getConfiguration());
} else {
loader = new OfflineLoader();
}
Expand Down Expand Up @@ -157,62 +160,149 @@ void start(Path bulkDir, Manager manager, long tid, boolean setTime) throws Exce

private static class OnlineLoader extends Loader {

private final int maxConnections;
long timeInMillis;
String fmtTid;
int locationLess = 0;

// track how many tablets were sent load messages per tablet server
MapCounter<HostAndPort> loadMsgs;
int tabletsAdded;

// Each RPC to a tablet server needs to check in zookeeper to see if the transaction is still
// active. The purpose of this map is to group load request by tablet servers inorder to do less
// RPCs. Less RPCs will result in less calls to Zookeeper.
Map<HostAndPort,Map<TKeyExtent,Map<String,MapFileInfo>>> loadQueue;
private int queuedDataSize = 0;

public OnlineLoader(AccumuloConfiguration configuration) {
super();
this.maxConnections = configuration.getCount(Property.MANAGER_BULK_MAX_CONNECTIONS);
}

@Override
void start(Path bulkDir, Manager manager, long tid, boolean setTime) throws Exception {
super.start(bulkDir, manager, tid, setTime);

timeInMillis = manager.getConfiguration().getTimeInMillis(Property.MANAGER_BULK_TIMEOUT);
fmtTid = FateTxId.formatTid(tid);

loadMsgs = new MapCounter<>();
tabletsAdded = 0;

loadQueue = new HashMap<>();
}

private static class Client {
final HostAndPort server;
final TabletClientService.Client service;

private Client(HostAndPort server, TabletClientService.Client service) {
this.server = server;
this.service = service;
}
}

private void sendQueued(int threshhold) {
if (queuedDataSize > threshhold || threshhold == 0) {
loadQueue.forEach((server, tabletFiles) -> {
var sendTimer = Timer.startNew();

List<Client> clients = new ArrayList<>();
try {

// Send load messages to tablet servers spinning up work, but do not wait on results.
loadQueue.forEach((server, tabletFiles) -> {

if (log.isTraceEnabled()) {
log.trace("{} asking {} to bulk import {} files for {} tablets", fmtTid, server,
tabletFiles.values().stream().mapToInt(Map::size).sum(), tabletFiles.size());
}

// On the server side tablets are processed serially with a write to the metadata table
// done for each tablet. Chunk the work up for a tablet server up so that it can be sent
// over multiple connections allowing it to run in parallel on the server side. This
// allows multiple threads on a single tserver to do metadata writes for this bulk
// import.
int neededConnections = Math.min(maxConnections, tabletFiles.size());
List<Map<TKeyExtent,Map<String,MapFileInfo>>> chunks =
new ArrayList<>(neededConnections);
for (int i = 0; i < neededConnections; i++) {
chunks.add(new HashMap<>());
}

int nextConnection = 0;
for (var entry : tabletFiles.entrySet()) {
Copy link
Contributor Author

@keith-turner keith-turner Mar 1, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is the code that I changed to do an RPC per tablet instead of per tablet server. I was hoping these would execute in parallel on the tablet server, but that does not seem to be happening.

chunks.get(nextConnection++ % chunks.size()).put(entry.getKey(), entry.getValue());
}

for (var chunk : chunks) {
try {
var client = ThriftUtil.getClient(ThriftClientTypes.TABLET_SERVER, server,
manager.getContext(), timeInMillis);
// add client to list before calling send in case there is an exception, this makes
// sure its returned in the finally
clients.add(new Client(server, client));
client.send_loadFilesV2(TraceUtil.traceInfo(), manager.getContext().rpcCreds(), tid,
bulkDir.toString(), chunk, setTime);
} catch (TException ex) {
log.debug("rpc failed server: {}, {}", server, fmtTid, ex);
}
}
});

long sendTime = sendTimer.elapsed(TimeUnit.MILLISECONDS);
sendTimer.restart();

int outdatedTservers = 0;

// wait for all the tservers to complete processing
for (var client : clients) {
try {
client.service.recv_loadFilesV2();
} catch (TException ex) {
String additionalInfo = "";
if (ex instanceof TApplicationException) {
if (((TApplicationException) ex).getType()
== TApplicationException.UNKNOWN_METHOD) {
// A new RPC method was added in 2.1.4, a tserver running 2.1.3 or earlier will
// not have this RPC. This should not kill the fate operation, it can wait until
// all tablet servers are upgraded.
outdatedTservers++;
additionalInfo = " (tserver may be running older version)";
}
}
log.debug("rpc failed server{}: {}, {}", additionalInfo, client.server, fmtTid, ex);
}
}

if (log.isTraceEnabled()) {
log.trace("{} asking {} to bulk import {} files for {} tablets", fmtTid, server,
tabletFiles.values().stream().mapToInt(Map::size).sum(), tabletFiles.size());
if (outdatedTservers > 0) {
log.info(
"{} can not proceed with bulk import because {} tablet servers are likely running "
+ "an older version. Please update tablet servers to same patch level as manager.",
fmtTid, outdatedTservers);
}

TabletClientService.Client client = null;
try {
client = ThriftUtil.getClient(ThriftClientTypes.TABLET_SERVER, server,
manager.getContext(), timeInMillis);
client.loadFiles(TraceUtil.traceInfo(), manager.getContext().rpcCreds(), tid,
bulkDir.toString(), tabletFiles, setTime);
} catch (TException ex) {
log.debug("rpc failed server: " + server + ", " + fmtTid + " " + ex.getMessage(), ex);
} finally {
ThriftUtil.returnClient(client, manager.getContext());
if (log.isDebugEnabled()) {
var recvTime = sendTimer.elapsed(TimeUnit.MILLISECONDS);
int numTablets = loadQueue.values().stream().mapToInt(Map::size).sum();
log.debug(
"{} sent {} messages to {} tablet servers for {} tablets, send time:{}ms recv time:{}ms {}:{}",
fmtTid, clients.size(), loadQueue.size(), numTablets, sendTime, recvTime,
Property.MANAGER_BULK_MAX_CONNECTIONS.getKey(), maxConnections);
}
});

loadQueue.clear();
queuedDataSize = 0;
loadQueue.clear();
queuedDataSize = 0;

} finally {
for (var client : clients) {
ThriftUtil.returnClient(client.service, manager.getContext());
}
}
}
}

private void addToQueue(HostAndPort server, KeyExtent extent,
Map<String,MapFileInfo> thriftImports) {
if (!thriftImports.isEmpty()) {
loadMsgs.increment(server, 1);
tabletsAdded++;

Map<String,MapFileInfo> prev = loadQueue.computeIfAbsent(server, k -> new HashMap<>())
.putIfAbsent(extent.toThrift(), thriftImports);
Expand Down Expand Up @@ -266,10 +356,12 @@ long finish() {
sendQueued(0);

long sleepTime = 0;
if (loadMsgs.size() > 0) {
// find which tablet server had the most load messages sent to it and sleep 13ms for each
// load message
sleepTime = loadMsgs.max() * 13;
if (tabletsAdded > 0) {
// Waited for all the tablet servers to process everything so a long sleep is not needed.
// Even though this code waited, it does not know what succeeded on the tablet server side
// and it did not track if there were connection errors. Since success status is unknown
// must return a non-zero sleep to indicate another scan of the metadata table is needed.
sleepTime = 1;
}

if (locationLess > 0) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -248,6 +248,13 @@ public void loadFiles(TInfo tinfo, TCredentials credentials, long tid, String di

}

@Override
public void loadFilesV2(TInfo tinfo, TCredentials credentials, long tid, String dir,
Map<TKeyExtent,Map<String,MapFileInfo>> tabletImports, boolean setTime)
throws ThriftSecurityException {
loadFiles(tinfo, credentials, tid, dir, tabletImports, setTime);
}

@Override
public long startUpdate(TInfo tinfo, TCredentials credentials, TDurability tdurabilty)
throws ThriftSecurityException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executors;
import java.util.stream.Collectors;
import java.util.stream.IntStream;

import org.apache.accumulo.core.client.Accumulo;
import org.apache.accumulo.core.client.AccumuloClient;
Expand Down Expand Up @@ -621,6 +622,47 @@ public void testExceptionInMetadataUpdate() throws Exception {
}
}

@Test
public void testManyTablets() throws Exception {

try (AccumuloClient c = Accumulo.newClient().from(getClientProps()).build()) {
String dir = getDir("/testManyTablets-");
writeData(dir + "/f1.", aconf, 0, 199);
writeData(dir + "/f2.", aconf, 200, 399);
writeData(dir + "/f3.", aconf, 400, 599);
writeData(dir + "/f4.", aconf, 600, 799);
writeData(dir + "/f5.", aconf, 800, 999);

var splits = IntStream.range(1, 1000).mapToObj(BulkNewIT::row).map(Text::new)
.collect(Collectors.toCollection(TreeSet::new));

// faster to create a table w/ lots of splits
c.tableOperations().delete(tableName);
c.tableOperations().create(tableName, new NewTableConfiguration().withSplits(splits));

var lpBuilder = LoadPlan.builder();
lpBuilder.loadFileTo("f1.rf", RangeType.TABLE, null, row(1));
IntStream.range(2, 200)
.forEach(i -> lpBuilder.loadFileTo("f1.rf", RangeType.TABLE, row(i - 1), row(i)));
IntStream.range(200, 400)
.forEach(i -> lpBuilder.loadFileTo("f2.rf", RangeType.TABLE, row(i - 1), row(i)));
IntStream.range(400, 600)
.forEach(i -> lpBuilder.loadFileTo("f3.rf", RangeType.TABLE, row(i - 1), row(i)));
IntStream.range(600, 800)
.forEach(i -> lpBuilder.loadFileTo("f4.rf", RangeType.TABLE, row(i - 1), row(i)));
IntStream.range(800, 1000)
.forEach(i -> lpBuilder.loadFileTo("f5.rf", RangeType.TABLE, row(i - 1), row(i)));

var loadPlan = lpBuilder.build();

c.tableOperations().importDirectory(dir).to(tableName).plan(loadPlan).load();

verifyData(c, tableName, 0, 999, false);

}

}

private void addSplits(AccumuloClient client, String tableName, String splitString)
throws Exception {
SortedSet<Text> splits = new TreeSet<>();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -129,6 +129,10 @@ public List<TKeyExtent> bulkImport(TInfo tinfo, TCredentials credentials, long t
public void loadFiles(TInfo tinfo, TCredentials credentials, long tid, String dir,
Map<TKeyExtent,Map<String,MapFileInfo>> fileMap, boolean setTime) {}

@Override
public void loadFilesV2(TInfo tinfo, TCredentials credentials, long tid, String dir,
Map<TKeyExtent,Map<String,MapFileInfo>> fileMap, boolean setTime) {}

@Override
public void closeMultiScan(TInfo tinfo, long scanID) {}

Expand Down
2 changes: 1 addition & 1 deletion test/src/main/resources/log4j2-test.properties
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ appender.console.type = Console
appender.console.name = STDOUT
appender.console.target = SYSTEM_OUT
appender.console.layout.type = PatternLayout
appender.console.layout.pattern = %d{ISO8601} [%c{2}] %-5p: %m%n
appender.console.layout.pattern = %d{ISO8601} %T [%c{2}] %-5p: %m%n

logger.01.name = org.apache.accumulo.core
logger.01.level = debug
Expand Down