7/9/2018 6:12:59 PM
dependency order. Otherwise it can happen that web server tries to launch an execution, but it fails as executor server hasn't started yet.
Also start ExecutorManager's threads (that periodically call executors to get updates) not sooner than when the web server is started. Those threads used to be started already as soon as Guice injection creates the ExecutorManager instance (which happens before executor server is started – even now that the order is fixed).
This is one step towards having everything run in "multi-executor" mode.
As you may know, currently there are two possible modes, controlled by azkaban.use.multiple.executors=true|false:
multi-executor mode, based on the executors DB table
single executor mode, based on fixed props executor.host & executor.port
That adds extra complexity and different branches of code are executed depending on the mode. However, everything can just as well be run as "multi-executor" (but just one executor if that's what you want, and naturally so in case of solo-server). It makes it also easier to reach a better test coverage when testing is always done in the same "mode" that's used in production configurations.
I also noticed that ExecutorManager loads executors from the DB at the time of Guice injection (before executor server is started). This may need to be changed as well to properly support multi-executor mode on a single server.
Details
diff --git a/azkaban-common/src/main/java/azkaban/executor/ExecutorManager.java b/azkaban-common/src/main/java/azkaban/executor/ExecutorManager.java
index dbd9581..5ec1d96 100644
--- a/azkaban-common/src/main/java/azkaban/executor/ExecutorManager.java
+++ b/azkaban-common/src/main/java/azkaban/executor/ExecutorManager.java
@@ -143,7 +143,6 @@ public class ExecutorManager extends EventHandler implements
this.cacheDir = new File(azkProps.getString("cache.directory", "cache"));
this.executingManager = new ExecutingManagerUpdaterThread();
- this.executingManager.start();
if (isMultiExecutorMode()) {
setupMultiExecutorMode();
@@ -154,8 +153,14 @@ public class ExecutorManager extends EventHandler implements
DEFAULT_EXECUTION_LOGS_RETENTION_MS);
this.cleanerThread = new CleanerThread(executionLogsRetentionMs);
- this.cleanerThread.start();
+ }
+ public void start() {
+ this.executingManager.start();
+ this.cleanerThread.start();
+ if (isMultiExecutorMode()) {
+ this.queueProcessor.start();
+ }
}
private String findApplicationIdFromLog(final String logData) {
@@ -201,8 +206,6 @@ public class ExecutorManager extends EventHandler implements
this.azkProps.getInt(
Constants.ConfigurationKeys.MAX_DISPATCHING_ERRORS_PERMITTED,
this.activeExecutors.size()));
-
- this.queueProcessor.start();
}
/**
diff --git a/azkaban-common/src/test/java/azkaban/executor/ExecutorManagerTest.java b/azkaban-common/src/test/java/azkaban/executor/ExecutorManagerTest.java
index 37fed51..5ac23b4 100644
--- a/azkaban-common/src/test/java/azkaban/executor/ExecutorManagerTest.java
+++ b/azkaban-common/src/test/java/azkaban/executor/ExecutorManagerTest.java
@@ -272,6 +272,7 @@ public class ExecutorManagerTest {
@Test
public void testNotFoundFlows() throws Exception {
testSetUpForRunningFlows();
+ this.manager.start();
final ExecutableFlow flow1 = TestUtils.createTestExecutableFlow("exectest1", "exec1");
when(this.loader.fetchExecutableFlow(-1)).thenReturn(flow1);
diff --git a/azkaban-solo-server/src/main/java/azkaban/soloserver/AzkabanSingleServer.java b/azkaban-solo-server/src/main/java/azkaban/soloserver/AzkabanSingleServer.java
index 5d04bef..6e10639 100644
--- a/azkaban-solo-server/src/main/java/azkaban/soloserver/AzkabanSingleServer.java
+++ b/azkaban-solo-server/src/main/java/azkaban/soloserver/AzkabanSingleServer.java
@@ -98,10 +98,11 @@ public class AzkabanSingleServer {
}
private void launch() throws Exception {
- AzkabanWebServer.launch(this.webServer);
- log.info("Azkaban Web Server started...");
-
+ // exec server first so that it's ready to accept calls by web server when web initializes
AzkabanExecutorServer.launch(this.executor);
log.info("Azkaban Exec Server started...");
+
+ AzkabanWebServer.launch(this.webServer);
+ log.info("Azkaban Web Server started...");
}
}
diff --git a/azkaban-web-server/src/main/java/azkaban/webapp/AzkabanWebServer.java b/azkaban-web-server/src/main/java/azkaban/webapp/AzkabanWebServer.java
index 9311157..35b74e1 100644
--- a/azkaban-web-server/src/main/java/azkaban/webapp/AzkabanWebServer.java
+++ b/azkaban-web-server/src/main/java/azkaban/webapp/AzkabanWebServer.java
@@ -228,6 +228,8 @@ public class AzkabanWebServer extends AzkabanServer {
/* This creates the Web Server instance */
app = webServer;
+ webServer.executorManager.start();
+
// TODO refactor code into ServerProvider
webServer.prepareAndStartServer();