Skip to content

Commit 932efdc

Browse files
committed
Adds new sync bulk load RPC
Added a new synch bulk load RPC for bulk v2. Added a new property to control concurrency for bulk v2. Reverted the changes to the existing RPC used by bulk v1. The new RPC should hopefully not cause any problems. A 2.1.3 manager working w/ 2.1.4 tablets servers should have no problems. A 2.1.4 manager working w/ a 2.1.3 tserver should log a message about version mismatch and the bulk fate should pause until tservers are updated. Need to manually test 2.1.4 manager and 2.1.3 tservers to ensure this works.
1 parent a5f8b88 commit 932efdc

File tree

7 files changed

+1746
-533
lines changed

7 files changed

+1746
-533
lines changed

core/src/main/java/org/apache/accumulo/core/conf/Property.java

+5
Original file line numberDiff line numberDiff line change
@@ -374,6 +374,11 @@ public enum Property {
374374
"2.1.0"),
375375
MANAGER_BULK_TIMEOUT("manager.bulk.timeout", "5m", PropertyType.TIMEDURATION,
376376
"The time to wait for a tablet server to process a bulk import request.", "1.4.3"),
377+
MANAGER_BULK_MAX_CONNECTIONS("manager.bulk.connections.max", "8", PropertyType.COUNT,
378+
"The maximum number of connections the manager can make to a single tablet server for bulkv2 "
379+
+ "load request. For the case where a single tablet server has a lot of tablets for a bulk import "
380+
+ "increasing this may help lower the time it takes to load those tablets.",
381+
"2.1.4"),
377382
MANAGER_RENAME_THREADS("manager.rename.threadpool.size", "20", PropertyType.COUNT,
378383
"The number of threads to use when renaming user files during table import or bulk ingest.",
379384
"2.1.0"),

core/src/main/thrift-gen-java/org/apache/accumulo/core/tabletserver/thrift/TabletClientService.java

+1,614-455
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

core/src/main/thrift/tabletserver.thrift

+10-3
Original file line numberDiff line numberDiff line change
@@ -377,9 +377,16 @@ service TabletClientService {
377377
1:client.ThriftSecurityException sec
378378
)
379379

380-
// TODO removing oneway probably has a significant impact on the bulk v1
381-
// code. It also may have thrift compatibility implications.
382-
void loadFiles(
380+
oneway void loadFiles(
381+
1:trace.TInfo tinfo
382+
2:security.TCredentials credentials
383+
3:i64 tid
384+
4:string dir
385+
5:map<data.TKeyExtent, map<string, data.MapFileInfo>> files
386+
6:bool setTime
387+
)
388+
389+
void loadFilesV2(
383390
1:trace.TInfo tinfo
384391
2:security.TCredentials credentials
385392
3:i64 tid

server/manager/src/main/java/org/apache/accumulo/manager/tableOps/bulkVer2/LoadFiles.java

+106-71
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,7 @@
4040
import org.apache.accumulo.core.clientImpl.bulk.Bulk.Files;
4141
import org.apache.accumulo.core.clientImpl.bulk.BulkSerialize;
4242
import org.apache.accumulo.core.clientImpl.bulk.LoadMappingIterator;
43+
import org.apache.accumulo.core.conf.AccumuloConfiguration;
4344
import org.apache.accumulo.core.conf.Property;
4445
import org.apache.accumulo.core.data.Mutation;
4546
import org.apache.accumulo.core.dataImpl.KeyExtent;
@@ -70,6 +71,7 @@
7071
import org.apache.accumulo.server.fs.VolumeManager;
7172
import org.apache.hadoop.fs.Path;
7273
import org.apache.hadoop.io.Text;
74+
import org.apache.thrift.TApplicationException;
7375
import org.apache.thrift.TException;
7476
import org.slf4j.Logger;
7577
import org.slf4j.LoggerFactory;
@@ -115,7 +117,7 @@ public long isReady(long tid, Manager manager) throws Exception {
115117

116118
Loader loader;
117119
if (bulkInfo.tableState == TableState.ONLINE) {
118-
loader = new OnlineLoader();
120+
loader = new OnlineLoader(manager.getConfiguration());
119121
} else {
120122
loader = new OfflineLoader();
121123
}
@@ -158,112 +160,149 @@ void start(Path bulkDir, Manager manager, long tid, boolean setTime) throws Exce
158160

159161
private static class OnlineLoader extends Loader {
160162

163+
private final int maxConnections;
161164
long timeInMillis;
162165
String fmtTid;
163166
int locationLess = 0;
164167

165-
// track how many tablets were sent load messages per tablet server
166-
MapCounter<HostAndPort> loadMsgs;
168+
int tabletsAdded;
167169

168170
// Each RPC to a tablet server needs to check in zookeeper to see if the transaction is still
169171
// active. The purpose of this map is to group load request by tablet servers inorder to do less
170172
// RPCs. Less RPCs will result in less calls to Zookeeper.
171173
Map<HostAndPort,Map<TKeyExtent,Map<String,MapFileInfo>>> loadQueue;
172174
private int queuedDataSize = 0;
173175

176+
public OnlineLoader(AccumuloConfiguration configuration) {
177+
super();
178+
this.maxConnections = configuration.getCount(Property.MANAGER_BULK_MAX_CONNECTIONS);
179+
}
180+
174181
@Override
175182
void start(Path bulkDir, Manager manager, long tid, boolean setTime) throws Exception {
176183
super.start(bulkDir, manager, tid, setTime);
177184

178185
timeInMillis = manager.getConfiguration().getTimeInMillis(Property.MANAGER_BULK_TIMEOUT);
179186
fmtTid = FateTxId.formatTid(tid);
180187

181-
loadMsgs = new MapCounter<>();
188+
tabletsAdded = 0;
182189

183190
loadQueue = new HashMap<>();
184191
}
185192

186-
private static final int MAX_CONNECTIONS_PER_TSERVER = 8;
193+
private static class Client {
194+
final HostAndPort server;
195+
final TabletClientService.Client service;
196+
197+
private Client(HostAndPort server, TabletClientService.Client service) {
198+
this.server = server;
199+
this.service = service;
200+
}
201+
}
187202

188203
private void sendQueued(int threshhold) {
189204
if (queuedDataSize > threshhold || threshhold == 0) {
190205
var sendTimer = Timer.startNew();
191206

192-
List<TabletClientService.Client> clients = new ArrayList<>();
207+
List<Client> clients = new ArrayList<>();
208+
try {
193209

194-
// Send load messages to tablet servers spinning up work, but do not wait on results.
195-
loadQueue.forEach((server, tabletFiles) -> {
210+
// Send load messages to tablet servers spinning up work, but do not wait on results.
211+
loadQueue.forEach((server, tabletFiles) -> {
196212

197-
if (log.isTraceEnabled()) {
198-
log.trace("{} asking {} to bulk import {} files for {} tablets", fmtTid, server,
199-
tabletFiles.values().stream().mapToInt(Map::size).sum(), tabletFiles.size());
200-
}
213+
if (log.isTraceEnabled()) {
214+
log.trace("{} asking {} to bulk import {} files for {} tablets", fmtTid, server,
215+
tabletFiles.values().stream().mapToInt(Map::size).sum(), tabletFiles.size());
216+
}
201217

202-
// On the server side tablets are processed serially with a write to the metadata table
203-
// done for each tablet. Chunk the work up for a tablet server up so that it can be sent
204-
// over
205-
// multiple connections allowing it to run in parallel on the server side. This allows
206-
// multiple threads on a single tserver to do metadata writes for this bulk import.
207-
int neededConnections = Math.min(MAX_CONNECTIONS_PER_TSERVER, tabletFiles.size());
208-
List<Map<TKeyExtent,Map<String,MapFileInfo>>> chunks = new ArrayList<>(neededConnections);
209-
for (int i = 0; i < neededConnections; i++) {
210-
chunks.add(new HashMap<>());
211-
}
218+
// On the server side tablets are processed serially with a write to the metadata table
219+
// done for each tablet. Chunk the work up for a tablet server up so that it can be sent
220+
// over multiple connections allowing it to run in parallel on the server side. This
221+
// allows multiple threads on a single tserver to do metadata writes for this bulk
222+
// import.
223+
int neededConnections = Math.min(maxConnections, tabletFiles.size());
224+
List<Map<TKeyExtent,Map<String,MapFileInfo>>> chunks =
225+
new ArrayList<>(neededConnections);
226+
for (int i = 0; i < neededConnections; i++) {
227+
chunks.add(new HashMap<>());
228+
}
212229

213-
int nextConnection = 0;
214-
for (var entry : tabletFiles.entrySet()) {
215-
chunks.get(nextConnection++ % chunks.size()).put(entry.getKey(), entry.getValue());
216-
}
230+
int nextConnection = 0;
231+
for (var entry : tabletFiles.entrySet()) {
232+
chunks.get(nextConnection++ % chunks.size()).put(entry.getKey(), entry.getValue());
233+
}
217234

218-
for (var chunk : chunks) {
235+
for (var chunk : chunks) {
236+
try {
237+
var client = ThriftUtil.getClient(ThriftClientTypes.TABLET_SERVER, server,
238+
manager.getContext(), timeInMillis);
239+
// add client to list before calling send in case there is an exception, this makes
240+
// sure its returned in the finally
241+
clients.add(new Client(server, client));
242+
client.send_loadFilesV2(TraceUtil.traceInfo(), manager.getContext().rpcCreds(), tid,
243+
bulkDir.toString(), chunk, setTime);
244+
} catch (TException ex) {
245+
log.debug("rpc failed server: {}, {}", server, fmtTid, ex);
246+
}
247+
}
248+
});
249+
250+
long sendTime = sendTimer.elapsed(TimeUnit.MILLISECONDS);
251+
sendTimer.restart();
252+
253+
int outdatedTservers = 0;
254+
255+
// wait for all the tservers to complete processing
256+
for (var client : clients) {
219257
try {
220-
var client = ThriftUtil.getClient(ThriftClientTypes.TABLET_SERVER, server,
221-
manager.getContext(), timeInMillis);
222-
client.send_loadFiles(TraceUtil.traceInfo(), manager.getContext().rpcCreds(), tid,
223-
bulkDir.toString(), chunk, setTime);
224-
clients.add(client);
258+
client.service.recv_loadFilesV2();
225259
} catch (TException ex) {
226-
log.debug("rpc failed server: " + server + ", " + fmtTid + " " + ex.getMessage(), ex);
227-
// TODO return client
260+
String additionalInfo = "";
261+
if (ex instanceof TApplicationException) {
262+
if (((TApplicationException) ex).getType()
263+
== TApplicationException.UNKNOWN_METHOD) {
264+
// A new RPC method was added in 2.1.4, a tserver running 2.1.3 or earlier will
265+
// not have this RPC. This should not kill the fate operation, it can wait until
266+
// all tablet servers are upgraded.
267+
outdatedTservers++;
268+
additionalInfo = " (tserver may be running older version)";
269+
}
270+
}
271+
log.debug("rpc failed server{}: {}, {}", additionalInfo, client.server, fmtTid, ex);
228272
}
229273
}
230-
});
231-
232-
long sendTime = sendTimer.elapsed(TimeUnit.MILLISECONDS);
233-
sendTimer.restart();
234-
235-
// wait for all the tservers to complete processing
236-
for (var client : clients) {
237-
try {
238-
client.recv_loadFiles();
239-
} catch (TException ex) {
240-
// TODO need to keep the server
241-
String server = "???";
242-
log.debug("rpc failed server: " + server + ", " + fmtTid + " " + ex.getMessage(), ex);
243-
} finally {
244-
// TODO this finally needs to be structured better, needs to cover entire send and
245-
// receive code. Clients can fall throught the cracks w/ this.
246-
ThriftUtil.returnClient(client, manager.getContext());
247-
}
248-
}
249274

250-
if (log.isDebugEnabled()) {
251-
var recvTime = sendTimer.elapsed(TimeUnit.MILLISECONDS);
275+
if (outdatedTservers > 0) {
276+
log.info(
277+
"{} can not proceed with bulk import because {} tablet servers are likely running "
278+
+ "an older version. Please update tablet servers to same patch level as manager.",
279+
fmtTid, outdatedTservers);
280+
}
252281

253-
log.debug("{} sent {} messages to {} tablet servers, send time:{}ms recv time:{}ms",
254-
fmtTid, clients.size(), loadQueue.size(), sendTime, recvTime);
255-
}
282+
if (log.isDebugEnabled()) {
283+
var recvTime = sendTimer.elapsed(TimeUnit.MILLISECONDS);
284+
int numTablets = loadQueue.values().stream().mapToInt(Map::size).sum();
285+
log.debug(
286+
"{} sent {} messages to {} tablet servers for {} tablets, send time:{}ms recv time:{}ms {}:{}",
287+
fmtTid, clients.size(), loadQueue.size(), numTablets, sendTime, recvTime,
288+
Property.MANAGER_BULK_MAX_CONNECTIONS.getKey(), maxConnections);
289+
}
256290

257-
loadQueue.clear();
258-
queuedDataSize = 0;
291+
loadQueue.clear();
292+
queuedDataSize = 0;
259293

294+
} finally {
295+
for (var client : clients) {
296+
ThriftUtil.returnClient(client.service, manager.getContext());
297+
}
298+
}
260299
}
261300
}
262301

263302
private void addToQueue(HostAndPort server, KeyExtent extent,
264303
Map<String,MapFileInfo> thriftImports) {
265304
if (!thriftImports.isEmpty()) {
266-
loadMsgs.increment(server, 1);
305+
tabletsAdded++;
267306

268307
Map<String,MapFileInfo> prev = loadQueue.computeIfAbsent(server, k -> new HashMap<>())
269308
.putIfAbsent(extent.toThrift(), thriftImports);
@@ -317,16 +356,12 @@ long finish() {
317356
sendQueued(0);
318357

319358
long sleepTime = 0;
320-
if (loadMsgs.size() > 0) {
321-
// Find which tablet server had the most load messages sent to it and sleep 13ms for each
322-
// load message. Assuming it takes 13ms to process a single message. The tablet server will
323-
// process these message in parallel, so assume it can process 16 in parallel. Must return a
324-
// non-zero value when messages were sent or the calling code will think everything is done.
325-
sleepTime = Math.max(1, (loadMsgs.max() * 13) / 16);
326-
327-
// TODO since a result from the tablet is being waited on now, could have the tserver report
328-
// back success or not. If everything was a success, then could be done and avoid a
329-
// subsequent metadata scan.
359+
if (tabletsAdded > 0) {
360+
// Waited for all the tablet servers to process everything so a long sleep is not needed.
361+
// Even though this code waited, it does not know what succeeded on the tablet server side
362+
// and it did not track if there were connection errors. Since success status is unknown
363+
// must return a non-zero sleep to indicate another scan of the metadata table is needed.
364+
sleepTime = 1;
330365
}
331366

332367
if (locationLess > 0) {

server/tserver/src/main/java/org/apache/accumulo/tserver/TabletClientHandler.java

+7
Original file line numberDiff line numberDiff line change
@@ -251,6 +251,13 @@ public void loadFiles(TInfo tinfo, TCredentials credentials, long tid, String di
251251

252252
}
253253

254+
@Override
255+
public void loadFilesV2(TInfo tinfo, TCredentials credentials, long tid, String dir,
256+
Map<TKeyExtent,Map<String,MapFileInfo>> tabletImports, boolean setTime)
257+
throws ThriftSecurityException {
258+
loadFiles(tinfo, credentials, tid, dir, tabletImports, setTime);
259+
}
260+
254261
@Override
255262
public long startUpdate(TInfo tinfo, TCredentials credentials, TDurability tdurabilty)
256263
throws ThriftSecurityException {

test/src/main/java/org/apache/accumulo/test/functional/BulkNewIT.java

-4
Original file line numberDiff line numberDiff line change
@@ -640,10 +640,6 @@ public void testManyTablets() throws Exception {
640640
c.tableOperations().delete(tableName);
641641
c.tableOperations().create(tableName, new NewTableConfiguration().withSplits(splits));
642642

643-
// TODO this is a hack, want to wait for all tablets to be loaded as this is attempting to
644-
// performance test bulk code. So want to avoid waiting on tablet loads also.
645-
Thread.sleep(5_000);
646-
647643
var lpBuilder = LoadPlan.builder();
648644
lpBuilder.loadFileTo("f1.rf", RangeType.TABLE, null, row(1));
649645
IntStream.range(2, 200)

test/src/main/java/org/apache/accumulo/test/performance/NullTserver.java

+4
Original file line numberDiff line numberDiff line change
@@ -129,6 +129,10 @@ public List<TKeyExtent> bulkImport(TInfo tinfo, TCredentials credentials, long t
129129
public void loadFiles(TInfo tinfo, TCredentials credentials, long tid, String dir,
130130
Map<TKeyExtent,Map<String,MapFileInfo>> fileMap, boolean setTime) {}
131131

132+
@Override
133+
public void loadFilesV2(TInfo tinfo, TCredentials credentials, long tid, String dir,
134+
Map<TKeyExtent,Map<String,MapFileInfo>> fileMap, boolean setTime) {}
135+
132136
@Override
133137
public void closeMultiScan(TInfo tinfo, long scanID) {}
134138

0 commit comments

Comments
 (0)