azkaban-aplcache

Clean up executor's db entry and exit the application when executor's

3/23/2017 8:18:54 PM

Details

diff --git a/azkaban-common/src/main/java/azkaban/executor/ExecutorLoader.java b/azkaban-common/src/main/java/azkaban/executor/ExecutorLoader.java
index 0606467..d041209 100644
--- a/azkaban-common/src/main/java/azkaban/executor/ExecutorLoader.java
+++ b/azkaban-common/src/main/java/azkaban/executor/ExecutorLoader.java
@@ -133,6 +133,19 @@ public interface ExecutorLoader {
 
   /**
    * <pre>
+   * Remove the executor from executors table.
+   * Note:-
+   * 1. throws an Exception in case of a SQL issue
+   * 2. throws an Exception if there is no executor in the table* </pre>
+   * </pre>
+   * @param host
+   * @param port
+   * @throws ExecutorManagerException
+   */
+  void removeExecutor(String host, int port) throws ExecutorManagerException;
+
+  /**
+   * <pre>
    * Log an event in executor_event audit table Note:- throws an Exception in
    * case of a SQL issue
    * Note: throws an Exception in case of a SQL issue
diff --git a/azkaban-common/src/main/java/azkaban/executor/JdbcExecutorLoader.java b/azkaban-common/src/main/java/azkaban/executor/JdbcExecutorLoader.java
index a588cca..3a65a58 100644
--- a/azkaban-common/src/main/java/azkaban/executor/JdbcExecutorLoader.java
+++ b/azkaban-common/src/main/java/azkaban/executor/JdbcExecutorLoader.java
@@ -948,6 +948,28 @@ public class JdbcExecutorLoader extends AbstractJdbcLoader implements
     }
   }
 
+
+  /**
+   * {@inheritDoc}
+   *
+   * @see azkaban.executor.ExecutorLoader#removeExecutor(String, int)
+   */
+  @Override
+  public void removeExecutor(String host, int port) throws ExecutorManagerException {
+    final String DELETE = "DELETE FROM executors WHERE host=? AND port=?";
+    QueryRunner runner = createQueryRunner();
+    try {
+      int rows = runner.update(DELETE, host, port);
+      if (rows == 0) {
+        throw new ExecutorManagerException("No executor with host, port :"
+            + "(" + host + "," + port + ")");
+      }
+    } catch (SQLException e) {
+      throw new ExecutorManagerException("Error removing executor with host, port : "
+          + "(" + host + "," + port + ")", e);
+    }
+  }
+
   /**
    * {@inheritDoc}
    *
diff --git a/azkaban-common/src/test/java/azkaban/executor/JdbcExecutorLoaderTest.java b/azkaban-common/src/test/java/azkaban/executor/JdbcExecutorLoaderTest.java
index dd32302..39ed6d9 100644
--- a/azkaban-common/src/test/java/azkaban/executor/JdbcExecutorLoaderTest.java
+++ b/azkaban-common/src/test/java/azkaban/executor/JdbcExecutorLoaderTest.java
@@ -739,6 +739,21 @@ public class JdbcExecutorLoaderTest {
     Assert.assertFalse(fetchedExecutor.isActive());
   }
 
+  /* Test Removing Executor */
+  @Test
+  public void testRemovingExecutor() throws Exception {
+    if (!isTestSetup()) {
+      return;
+    }
+
+    ExecutorLoader loader = createLoader();
+    Executor executor = loader.addExecutor("localhost1", 12345);
+    Assert.assertNotNull(executor);
+    loader.removeExecutor("localhost1", 12345);
+    Executor fetchedExecutor = loader.fetchExecutor("localhost1", 12345);
+    Assert.assertNull(fetchedExecutor);
+  }
+
   /* Test Executor reactivation */
   @Test
   public void testExecutorActivation() throws Exception {
diff --git a/azkaban-common/src/test/java/azkaban/executor/MockExecutorLoader.java b/azkaban-common/src/test/java/azkaban/executor/MockExecutorLoader.java
index 0db2de5..2c8a95b 100644
--- a/azkaban-common/src/test/java/azkaban/executor/MockExecutorLoader.java
+++ b/azkaban-common/src/test/java/azkaban/executor/MockExecutorLoader.java
@@ -303,6 +303,15 @@ public class MockExecutorLoader implements ExecutorLoader {
   }
 
   @Override
+  public void removeExecutor(String host, int port) throws ExecutorManagerException {
+    Executor executor = fetchExecutor(host, port);
+    if (executor != null) {
+        executorIdCounter--;
+        executors.remove(executor);
+    }
+  }
+
+  @Override
   public void postExecutorEvent(Executor executor, EventType type, String user,
     String message) throws ExecutorManagerException {
     ExecutorLogEvent event =
diff --git a/azkaban-exec-server/src/main/java/azkaban/execapp/AzkabanExecutorServer.java b/azkaban-exec-server/src/main/java/azkaban/execapp/AzkabanExecutorServer.java
index b8418f5..828c615 100644
--- a/azkaban-exec-server/src/main/java/azkaban/execapp/AzkabanExecutorServer.java
+++ b/azkaban-exec-server/src/main/java/azkaban/execapp/AzkabanExecutorServer.java
@@ -379,7 +379,16 @@ public class AzkabanExecutorServer {
           logger.info(("Exception when logging top memory consumers"), e);
         }
 
-        logger.info("Shutting down...");
+        String host = app.getHost();
+        int port = app.getPort();
+        try {
+          logger.info(String.format("Removing executor(host: %s, port: %s) entry from database...", host, port));
+          app.getExecutorLoader().removeExecutor(host, port);
+        } catch (ExecutorManagerException ex) {
+          logger.error(String.format("Exception when removing executor(host: %s, port: %s)", host, port), ex);
+        }
+
+        logger.warn("Shutting down executor...");
         try {
           app.shutdownNow();
         } catch (Exception e) {
@@ -611,12 +620,8 @@ public class AzkabanExecutorServer {
    */
   private void shutdownInternal() {
     getFlowRunnerManager().shutdown();
-    try {
-      shutdownNow();
-      logger.warn("Shutdown AzkabanExecutorServer complete");
-    } catch (Exception e) {
-      logger.error(e);
-    }
+    // trigger shutdown hook
+    System.exit(0);
   }
 
   /**