diff --git a/tony-core/src/main/java/com/linkedin/tony/TaskExecutor.java b/tony-core/src/main/java/com/linkedin/tony/TaskExecutor.java index 4229e054..dc470e1b 100644 --- a/tony-core/src/main/java/com/linkedin/tony/TaskExecutor.java +++ b/tony-core/src/main/java/com/linkedin/tony/TaskExecutor.java @@ -87,25 +87,28 @@ private void setupPorts() throws IOException, InterruptedException { // With Estimator API, there is a separate lone "chief" task that runs TensorBoard. // With the low-level distributed API, worker 0 runs TensorBoard. if (isChief) { - this.tbPort = requireNonNull(allocatePort()); + this.tbPort = requireNonNull(EphemeralPort.create()); this.registerTensorBoardUrl(); this.shellEnv.put(Constants.TB_PORT, String.valueOf(this.tbPort.getPort())); LOG.info("Reserved tbPort: " + this.tbPort.getPort()); } } + + private void releasePort(ServerPort port) throws Exception { + if (port != null) { + port.close(); + } + } + /** * Releases the reserved ports if any. This method has to be invoked after ports are created. */ private void releasePorts() throws Exception { try { - if (this.rpcPort != null) { - this.rpcPort.close(); - } + this.releasePort(this.rpcPort); } finally { - if (this.tbPort != null) { - this.tbPort.close(); - } + this.releasePort(this.tbPort); } } @@ -221,9 +224,13 @@ public static void main(String[] unused) throws Exception { // If not reusing port, then reserve them up until before the underlying TF process is // launched. See this issue for // details. - if (executor != null && !executor.isReusingPort()) { - LOG.info("Releasing reserved ports before launching tensorflow process."); - executor.releasePorts(); + if (executor != null) { + LOG.info("Releasing reserved port(s) before launching tensorflow process."); + if (executor.isReusingPort()) { + executor.releasePort(executor.tbPort); + } else { + executor.releasePorts(); + } } try { @@ -237,7 +244,7 @@ public static void main(String[] unused) throws Exception { } finally { if (executor.isReusingPort()) { LOG.info("Tensorflow process exited, releasing reserved ports."); - executor.releasePorts(); + executor.releasePort(executor.rpcPort); } } }