Details
diff --git a/azkaban-common/src/main/java/azkaban/executor/ExecutorLoader.java b/azkaban-common/src/main/java/azkaban/executor/ExecutorLoader.java
index 0606467..d041209 100644
--- a/azkaban-common/src/main/java/azkaban/executor/ExecutorLoader.java
+++ b/azkaban-common/src/main/java/azkaban/executor/ExecutorLoader.java
@@ -133,6 +133,19 @@ public interface ExecutorLoader {
/**
* <pre>
+ * Remove the executor from executors table.
+ * Note:-
+ * 1. throws an Exception in case of a SQL issue
+ * 2. throws an Exception if there is no executor in the table* </pre>
+ * </pre>
+ * @param host
+ * @param port
+ * @throws ExecutorManagerException
+ */
+ void removeExecutor(String host, int port) throws ExecutorManagerException;
+
+ /**
+ * <pre>
* Log an event in executor_event audit table Note:- throws an Exception in
* case of a SQL issue
* Note: throws an Exception in case of a SQL issue
diff --git a/azkaban-common/src/main/java/azkaban/executor/JdbcExecutorLoader.java b/azkaban-common/src/main/java/azkaban/executor/JdbcExecutorLoader.java
index a588cca..3a65a58 100644
--- a/azkaban-common/src/main/java/azkaban/executor/JdbcExecutorLoader.java
+++ b/azkaban-common/src/main/java/azkaban/executor/JdbcExecutorLoader.java
@@ -948,6 +948,28 @@ public class JdbcExecutorLoader extends AbstractJdbcLoader implements
}
}
+
+ /**
+ * {@inheritDoc}
+ *
+ * @see azkaban.executor.ExecutorLoader#removeExecutor(String, int)
+ */
+ @Override
+ public void removeExecutor(String host, int port) throws ExecutorManagerException {
+ final String DELETE = "DELETE FROM executors WHERE host=? AND port=?";
+ QueryRunner runner = createQueryRunner();
+ try {
+ int rows = runner.update(DELETE, host, port);
+ if (rows == 0) {
+ throw new ExecutorManagerException("No executor with host, port :"
+ + "(" + host + "," + port + ")");
+ }
+ } catch (SQLException e) {
+ throw new ExecutorManagerException("Error removing executor with host, port : "
+ + "(" + host + "," + port + ")", e);
+ }
+ }
+
/**
* {@inheritDoc}
*
diff --git a/azkaban-common/src/test/java/azkaban/executor/JdbcExecutorLoaderTest.java b/azkaban-common/src/test/java/azkaban/executor/JdbcExecutorLoaderTest.java
index dd32302..39ed6d9 100644
--- a/azkaban-common/src/test/java/azkaban/executor/JdbcExecutorLoaderTest.java
+++ b/azkaban-common/src/test/java/azkaban/executor/JdbcExecutorLoaderTest.java
@@ -739,6 +739,21 @@ public class JdbcExecutorLoaderTest {
Assert.assertFalse(fetchedExecutor.isActive());
}
+ /* Test Removing Executor */
+ @Test
+ public void testRemovingExecutor() throws Exception {
+ if (!isTestSetup()) {
+ return;
+ }
+
+ ExecutorLoader loader = createLoader();
+ Executor executor = loader.addExecutor("localhost1", 12345);
+ Assert.assertNotNull(executor);
+ loader.removeExecutor("localhost1", 12345);
+ Executor fetchedExecutor = loader.fetchExecutor("localhost1", 12345);
+ Assert.assertNull(fetchedExecutor);
+ }
+
/* Test Executor reactivation */
@Test
public void testExecutorActivation() throws Exception {
diff --git a/azkaban-common/src/test/java/azkaban/executor/MockExecutorLoader.java b/azkaban-common/src/test/java/azkaban/executor/MockExecutorLoader.java
index 0db2de5..2c8a95b 100644
--- a/azkaban-common/src/test/java/azkaban/executor/MockExecutorLoader.java
+++ b/azkaban-common/src/test/java/azkaban/executor/MockExecutorLoader.java
@@ -303,6 +303,15 @@ public class MockExecutorLoader implements ExecutorLoader {
}
@Override
+ public void removeExecutor(String host, int port) throws ExecutorManagerException {
+ Executor executor = fetchExecutor(host, port);
+ if (executor != null) {
+ executorIdCounter--;
+ executors.remove(executor);
+ }
+ }
+
+ @Override
public void postExecutorEvent(Executor executor, EventType type, String user,
String message) throws ExecutorManagerException {
ExecutorLogEvent event =
diff --git a/azkaban-exec-server/src/main/java/azkaban/execapp/AzkabanExecutorServer.java b/azkaban-exec-server/src/main/java/azkaban/execapp/AzkabanExecutorServer.java
index b8418f5..828c615 100644
--- a/azkaban-exec-server/src/main/java/azkaban/execapp/AzkabanExecutorServer.java
+++ b/azkaban-exec-server/src/main/java/azkaban/execapp/AzkabanExecutorServer.java
@@ -379,7 +379,16 @@ public class AzkabanExecutorServer {
logger.info(("Exception when logging top memory consumers"), e);
}
- logger.info("Shutting down...");
+ String host = app.getHost();
+ int port = app.getPort();
+ try {
+ logger.info(String.format("Removing executor(host: %s, port: %s) entry from database...", host, port));
+ app.getExecutorLoader().removeExecutor(host, port);
+ } catch (ExecutorManagerException ex) {
+ logger.error(String.format("Exception when removing executor(host: %s, port: %s)", host, port), ex);
+ }
+
+ logger.warn("Shutting down executor...");
try {
app.shutdownNow();
} catch (Exception e) {
@@ -611,12 +620,8 @@ public class AzkabanExecutorServer {
*/
private void shutdownInternal() {
getFlowRunnerManager().shutdown();
- try {
- shutdownNow();
- logger.warn("Shutdown AzkabanExecutorServer complete");
- } catch (Exception e) {
- logger.error(e);
- }
+ // trigger shutdown hook
+ System.exit(0);
}
/**