-
Notifications
You must be signed in to change notification settings - Fork 455
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Attempted to parallelize bulk import loading #5375
base: 2.1
Are you sure you want to change the base?
Conversation
While profiling bulk load v2 noticed that the manager would send a message to a tablet that contained a lot of tablets to load. The tablet server would process each tablet one at time and do a metadata write for it. This caused a lot of serial metadata writes per tablet server which caused this part of bulk import to take longer. Attempted to parallelize these metadata writes by changing the manager to send a RPC per tablet. The hope was that the tablet server would process each RPC request in a separate thread and this would avoid the serial metadata writes. However this is not currently working and I am not sure why. The manager is getting a thrift client and then sending a lot of one way RPCs to load tablets. These one way messages all appear to be being processed by a single thread on the tablet servers. Still investigating why this happening, if anyone knows more about this please let me know.
// the metadata tablet which requires waiting on the walog. Sending a message per tablet | ||
// allows these per tablet metadata table writes to run in parallel. This avoids | ||
// serially waiting on the metadata table write for each tablet. | ||
for (var entry : tabletFiles.entrySet()) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is the code that I changed to do an RPC per tablet instead of per tablet server. I was hoping these would execute in parallel on the tablet server, but that does not seem to be happening.
@@ -119,6 +119,7 @@ public CustomFrameBuffer(TNonblockingTransport trans, SelectionKey selectionKey, | |||
super(trans, selectionKey, selectThread); | |||
// Store the clientAddress in the buffer so it can be referenced for logging during read/write | |||
this.clientAddress = getClientAddress(); | |||
log.debug("created custom frame buffer ", new Exception()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This was added as debug while trying to understand what is going on.
@@ -243,6 +245,8 @@ public void loadFiles(TInfo tinfo, TCredentials credentials, long tid, String di | |||
server.removeBulkImportState(files); | |||
} | |||
} | |||
UtilWaitThread.sleep(100); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Addes this sleep so that I could verify things were running concurrently in the tserver and never saw that happen.
I thought there was some issue you found awhile back that indicated there was an issue with the thrift server type that made it use a small number of threads (maybe one) for handling RPC requests. I don't recall the details. |
It looks our default Thrift server type will end up creating a custom non-blocking Thrift server with one accept thread. If the value of |
Related to my earlier comment, it looks like you (@keith-turner ) created an issue in Thrift for the TThreadedSelectorServer (https://issues.apache.org/jira/browse/THRIFT-4847). It looks like this was fixed in Thrift 0.21, so you may need to update Thrift in this branch as well. |
The reminders about the select thread were helpful. I was assuming that the thrift code would read frames per RPC into memory and queue those frames on a thread pool. It may instead queue the connection and the frame on thread pool, returning the connection for selection when the task completes on the pool. If so that would mean only one thing will ever execute per connection, even if the message is oneway and no response is needed. Going to look into this a bit more and see if that is the case. |
I think that in the case of the other threaded server implementations (TnonBlockingServer, for example), there is one select thread that reads the request from the connection, then hands it off to a thread in the worker thread pool. I don't think that the accept threads wait for the worker thread to complete in this case, so it's somewhat asynchronous. I could be wrong about this, but it makes sense on the surface. There could be cases where the accept thread could take a long time though to complete the task of reading the request and assigning it to a worker thread. For example, if only some of the packets have arrived on the interface for the request and the client has not sent them all yet. |
Made changes in 551dde0 to use mutliple connections per tserver in the manager. Seeing parallelism on the tserver side w/ this change. This behavior is making me wonder about connection pooling plus one way messages. Maybe a situation like the following could happen, want to test this.
It may be that a connection obtained from the pool could actually have multiple one way messages queued on it that must be processed before it will actually do anything. |
Still have not taken a deep dive into the thrift code. As more is understood about this, would be good to asses the impact on the managers use of one way messages to load tablets and the implications for parallel tablet loads. |
Looking at the Thrift code for our default server type (CustomNonBlockingServer extends THsHaServer extends TNonblockingServer), I think our The createFrameBuffer method that is called from You can see in FrameBuffer.invoke that the RPC method is called, then the responseReady method. Conversely, in AsyncFrameBuffer.invoke you can see that the Processor is cast to a TAsyncProcessor, then the I'm not sure what Edit: Also, if we did fix TimedProcessor to implement TAsyncProcessor, the TMultipledProcessor that it is wrapping does not implement it. I'm not sure why, or what the issue would be with invoking it in an async manner. Update: Looks like https://issues.apache.org/jira/browse/THRIFT-2427 was created for the async multiplexed processor. A PR was created, but never merged. |
Following that responseReady() method in AbstractNonBlockingServer it eventually calls this code which I believe makes the connection available for the selection thread to use again. For the Sync case this is called after the message is processed server side.
The difference seems to be more on the client side. The loadFiles rpc only calls send on the client side. The splitTablet rpc also has a void return type, but is not one way and it calls send and recv. The oneway does allow the client to spin up a bunch of work on tservers w/o waiting for each tserver, but that seems to have some undesirable side effects. I am going to experiment w/ dropping the oneway on the decleration and calling the send and recv methods separately. |
Right, but the send call in the accumulo/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletClientHandler.java Lines 222 to 246 in dacc3e9
|
That does not seem to be the behavior I am seeing based on logging from running the new test against 551dde0. Below are some of the following logs where by the time the manager has sent 999 one way messages not a single tablet has completed bulk load processing in a tserver.
Notice how the manager code keeps queueing work up for the tablet servers in the messages above by continually sending these one way messages. Eventually a bunch of these run after the bulk import is done.
I considered that when I started looking into this but did not want to create yet another thread pool that needs to be configured and monitored. Figured could use the existing RPC thread pool. That may be a way to solve this, would probably be best to have a thread pool per tserver for this as opposed to per request. |
Good news is that there are already properties for the thread pools being used in the client for bulk v1, but are labeled |
In a5f8b88 made the following changes
This change has a nice advantage that is unrelated to initial goal of parallelization. The current bulk code w/ one way messages only knows if somethnig is done by scanning the metadata table. Because the changes in a5f8b88 waits for the tablets servers it does not keep scanning the metadata table and then sending more one way messages queuing up more uneeded work for the tablet servers, causing more metadata scans, and having to guess when things are done. The following are some log message from running the new test w/ the changes in a5f8b88
|
That could work, it would depend on the behavior of the bulkv1 and bulkv2 code that use the prop. If bulkv1 and bulkv2 code would need different settings for the property to be optimal on a system because of underlying behavior diffs in the use of the property then it would be confusing to reuse the property. Thinking about the bulk v1 code and behavior, it would be nice if this fix for bulkv2 caused no changes in behavior for the bulkv1 code. Since both use the same RPC need to be careful about changing that RPC as it will impact bulkv1 and bulkv2. |
I agree that we shouldn't reuse the bulkv1 threadpool property. Users will probably want to run jobs back to back using bulkv1 and bulkv2 configurations to compare results.
Is there a requirement for keeping the same RPC as bulkv1? |
No, if changing the behavior of the existing RPC is desired it would probably be best to create a new rpc for bulkv2 and leave bulkv1 as is. Creating a new RPC creates headache for a mix of 2.1.3 and 2.1.4 server processes. These RPCs are only used by servers so at least it would not impact clients. |
Added a new synch bulk load RPC for bulk v2. Added a new property to control concurrency for bulk v2. Reverted the changes to the existing RPC used by bulk v1. The new RPC should hopefully not cause any problems. A 2.1.3 manager working w/ 2.1.4 tablets servers should have no problems. A 2.1.4 manager working w/ a 2.1.3 tserver should log a message about version mismatch and the bulk fate should pause until tservers are updated. Need to manually test 2.1.4 manager and 2.1.3 tservers to ensure this works.
In 932efdc added a new RPC and a new property, also cleaned up a lot of loose ends. 932efdc should get to a point of causing no changes in behavior for bulkv1 w/ the new RPC. The changes in 932efdc should tolerate a mixture of 2.1.X server versions w/o causing bulk imports to fail, but I need to manually test this. After manually testing this plan to take this out of draft. The commit message for 932efdc has more details. |
Tested this code w/ a 2.1.3 tserver, ran through the following steps
Saw messages like the following in the manager logs when the 2.1.3 tserver was running.
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Just some minor suggestions. Overall the changes make sense to me and look correct.
// Even though this code waited, it does not know what succeeded on the tablet server side | ||
// and it did not track if there were connection errors. Since success status is unknown | ||
// must return a non-zero sleep to indicate another scan of the metadata table is needed. | ||
sleepTime = 1; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Removing that long sleep is a nice improvement.
server/manager/src/main/java/org/apache/accumulo/manager/tableOps/bulkVer2/LoadFiles.java
Outdated
Show resolved
Hide resolved
server/manager/src/main/java/org/apache/accumulo/manager/tableOps/bulkVer2/LoadFiles.java
Outdated
Show resolved
Hide resolved
server/manager/src/main/java/org/apache/accumulo/manager/tableOps/bulkVer2/LoadFiles.java
Outdated
Show resolved
Hide resolved
server/manager/src/main/java/org/apache/accumulo/manager/tableOps/bulkVer2/LoadFiles.java
Outdated
Show resolved
Hide resolved
server/manager/src/main/java/org/apache/accumulo/manager/tableOps/bulkVer2/LoadFiles.java
Outdated
Show resolved
Hide resolved
…Ops/bulkVer2/LoadFiles.java Co-authored-by: Daniel Roberts <ddanielr@gmail.com>
…Ops/bulkVer2/LoadFiles.java Co-authored-by: Daniel Roberts <ddanielr@gmail.com>
…Ops/bulkVer2/LoadFiles.java Co-authored-by: Daniel Roberts <ddanielr@gmail.com>
…Ops/bulkVer2/LoadFiles.java Co-authored-by: Daniel Roberts <ddanielr@gmail.com>
While profiling bulk load v2 noticed that the manager would send a message to a tablet containing a lot of tablets to load. The tablet server would process each tablet one at time and do a metadata write for it. This caused a lot of serial metadata writes per tablet server which caused this part of bulk import to take longer.
Attempted to parallelize these metadata writes by changing the manager to send a RPC per tablet. The hope was that the tablet server would process each RPC request in a separate thread and this would avoid the serial metadata writes.
However this is not currently working and I am not sure why. The manager is getting a thrift client and then sending a lot of one way RPCs to load tablets. These one way messages all appear to be being processed by a single thread on the tablet servers. Still investigating why this happening, if anyone knows more about this please let me know.