|
59 | 59 | import java.util.concurrent.ScheduledFuture;
|
60 | 60 | import java.util.concurrent.ThreadPoolExecutor;
|
61 | 61 | import java.util.concurrent.TimeUnit;
|
62 |
| -import java.util.concurrent.atomic.AtomicBoolean; |
63 | 62 | import java.util.concurrent.atomic.AtomicInteger;
|
64 | 63 | import java.util.concurrent.atomic.AtomicLong;
|
65 | 64 | import java.util.concurrent.locks.ReentrantLock;
|
@@ -964,41 +963,40 @@ public void run() {
|
964 | 963 | .numMaxThreads(16).build();
|
965 | 964 |
|
966 | 965 | ManagerClientService.Client iface = managerConnection(getManagerAddress());
|
967 |
| - final AtomicBoolean managerDown = new AtomicBoolean(false); |
| 966 | + boolean managerDown = false; |
968 | 967 |
|
969 | 968 | try {
|
970 |
| - for (DataLevel level : DataLevel.values()) { |
| 969 | + for (DataLevel level : new DataLevel[] {DataLevel.USER, DataLevel.METADATA, DataLevel.ROOT}) { |
971 | 970 | getOnlineTablets().keySet().forEach(ke -> {
|
972 | 971 | if (DataLevel.of(ke.tableId()) == level) {
|
973 | 972 | futures.add(
|
974 | 973 | tpe.submit(new UnloadTabletHandler(this, ke, TUnloadTabletGoal.UNASSIGNED, 5000)));
|
975 | 974 | }
|
976 | 975 | });
|
977 | 976 | while (!futures.isEmpty()) {
|
978 |
| - futures.removeIf(f -> { |
979 |
| - if (!f.isDone()) { |
980 |
| - return false; |
981 |
| - } |
982 |
| - if (!managerDown.get()) { |
983 |
| - ManagerMessage mm = managerMessages.poll(); |
984 |
| - try { |
985 |
| - if (mm != null) { |
| 977 | + Iterator<Future<?>> unloads = futures.iterator(); |
| 978 | + while (unloads.hasNext()) { |
| 979 | + Future<?> f = unloads.next(); |
| 980 | + if (f.isDone()) { |
| 981 | + if (!managerDown) { |
| 982 | + ManagerMessage mm = managerMessages.poll(); |
| 983 | + try { |
986 | 984 | mm.send(getContext().rpcCreds(), getClientAddressString(), iface);
|
| 985 | + } catch (TException e) { |
| 986 | + managerDown = true; |
| 987 | + LOG.debug("Error sending message to Manager during tablet unloading, msg: {}", |
| 988 | + e.getMessage()); |
987 | 989 | }
|
988 |
| - } catch (TException e) { |
989 |
| - managerDown.set(true); |
990 |
| - log.debug("Error sending message to Manager during tablet unloading, msg: {}", |
991 |
| - e.getMessage()); |
992 | 990 | }
|
| 991 | + unloads.remove(); |
993 | 992 | }
|
994 |
| - return true; |
995 |
| - }); |
| 993 | + } |
996 | 994 | log.debug("Waiting on {} {} tablets to close.", futures.size(), level);
|
997 | 995 | UtilWaitThread.sleep(1000);
|
998 | 996 | }
|
999 | 997 | }
|
1000 | 998 | } finally {
|
1001 |
| - if (!managerDown.get()) { |
| 999 | + if (!managerDown) { |
1002 | 1000 | try {
|
1003 | 1001 | ManagerMessage mm = managerMessages.poll();
|
1004 | 1002 | do {
|
|
0 commit comments