azkaban-aplcache

AZNewDispatchingLogic - Create ExecutionController module

12/3/2018 9:44:53 PM

Changes

Details

diff --git a/az-core/src/main/java/azkaban/Constants.java b/az-core/src/main/java/azkaban/Constants.java
index e05090e..9f8409f 100644
--- a/az-core/src/main/java/azkaban/Constants.java
+++ b/az-core/src/main/java/azkaban/Constants.java
@@ -90,6 +90,9 @@ public class Constants {
 
   public static class ConfigurationKeys {
 
+    // Configures Azkaban to use new polling model for dispatching
+    public static final String AZKABAN_POLL_MODEL = "azkaban.poll.model";
+
     // Configures Azkaban Flow Version in project YAML file
     public static final String AZKABAN_FLOW_VERSION = "azkaban-flow-version";
 
diff --git a/az-jobsummary/src/main/java/azkaban/viewer/jobsummary/JobSummaryServlet.java b/az-jobsummary/src/main/java/azkaban/viewer/jobsummary/JobSummaryServlet.java
index 94e19d8..c9ddcde 100644
--- a/az-jobsummary/src/main/java/azkaban/viewer/jobsummary/JobSummaryServlet.java
+++ b/az-jobsummary/src/main/java/azkaban/viewer/jobsummary/JobSummaryServlet.java
@@ -54,7 +54,7 @@ public class JobSummaryServlet extends LoginAbstractAzkabanServlet {
   private final String viewerName;
   private final String viewerPath;
 
-  private ExecutorManagerAdapter executorManager;
+  private ExecutorManagerAdapter executorManagerAdapter;
   private ProjectManager projectManager;
 
   private String outputDir;
@@ -87,7 +87,7 @@ public class JobSummaryServlet extends LoginAbstractAzkabanServlet {
   public void init(final ServletConfig config) throws ServletException {
     super.init(config);
     final AzkabanWebServer server = (AzkabanWebServer) getApplication();
-    this.executorManager = server.getExecutorManager();
+    this.executorManagerAdapter = server.getExecutorManagerAdapter();
     this.projectManager = server.getProjectManager();
   }
 
@@ -112,7 +112,7 @@ public class JobSummaryServlet extends LoginAbstractAzkabanServlet {
     ExecutableFlow flow = null;
     ExecutableNode node = null;
     try {
-      flow = this.executorManager.getExecutableFlow(execId);
+      flow = this.executorManagerAdapter.getExecutableFlow(execId);
       if (flow == null) {
         page.add("errorMsg", "Error loading executing flow " + execId
             + ": not found.");
diff --git a/azkaban-common/src/main/java/azkaban/executor/ExecutionController.java b/azkaban-common/src/main/java/azkaban/executor/ExecutionController.java
new file mode 100644
index 0000000..fee1499
--- /dev/null
+++ b/azkaban-common/src/main/java/azkaban/executor/ExecutionController.java
@@ -0,0 +1,332 @@
+/*
+* Copyright 2018 LinkedIn Corp.
+*
+* Licensed under the Apache License, Version 2.0 (the “License”); you may not
+* use this file except in compliance with the License. You may obtain a copy of
+* the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an “AS IS” BASIS, WITHOUT
+* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+* License for the specific language governing permissions and limitations under
+* the License.
+*/
+package azkaban.executor;
+
+import azkaban.event.EventHandler;
+import azkaban.project.Project;
+import azkaban.utils.FileIOUtils.LogData;
+import azkaban.utils.Pair;
+import java.io.IOException;
+import java.lang.Thread.State;
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import javax.inject.Inject;
+import javax.inject.Singleton;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Controls flow executions on web server. This module implements the polling model
+ * in the new AZ dispatching design. It's injected only when azkaban.poll.model is configured to
+ * true. It will ultimately replace ExecutorManager in the future.
+ */
+@Singleton
+public class ExecutionController extends EventHandler implements ExecutorManagerAdapter {
+
+  private static final Logger logger = LoggerFactory.getLogger(ExecutionController.class);
+  private static final Duration RECENTLY_FINISHED_LIFETIME = Duration.ofMinutes(10);
+  private final ExecutorLoader executorLoader;
+  private final ExecutorApiGateway apiGateway;
+
+  @Inject
+  ExecutionController(final ExecutorLoader executorLoader, final ExecutorApiGateway apiGateway) {
+    this.executorLoader = executorLoader;
+    this.apiGateway = apiGateway;
+  }
+
+  @Override
+  public void setupExecutors() throws ExecutorManagerException {
+    // Todo: deprecate this method
+  }
+
+  @Override
+  public void disableQueueProcessorThread() {
+    // Todo: deprecate this method
+  }
+
+  @Override
+  public void enableQueueProcessorThread() {
+    // Todo: deprecate this method
+  }
+
+  @Override
+  public State getExecutorManagerThreadState() {
+    // Todo: deprecate this method
+    return State.RUNNABLE;
+  }
+
+  @Override
+  public boolean isExecutorManagerThreadActive() {
+    // Todo: deprecate this method
+    return true;
+  }
+
+  @Override
+  public long getLastExecutorManagerThreadCheckTime() {
+    // Todo: deprecate this method
+    return 1L;
+  }
+
+  @Override
+  public Collection<Executor> getAllActiveExecutors() {
+    // Todo: get the executor info from DB
+    return Collections.emptyList();
+  }
+
+  @Override
+  public Executor fetchExecutor(final int executorId) throws ExecutorManagerException {
+    return this.executorLoader.fetchExecutor(executorId);
+  }
+
+  @Override
+  public Set<String> getPrimaryServerHosts() {
+    // Todo: get the primary executor host info from DB
+    return Collections.emptySet();
+  }
+
+  @Override
+  public Set<String> getAllActiveExecutorServerHosts() {
+    // Todo: get all executor host info from DB
+    return Collections.emptySet();
+  }
+
+  /**
+   * Gets a list of all the active (running flows and non-dispatched flows) executions for a given
+   * project and flow from database. {@inheritDoc}
+   */
+  @Override
+  public List<Integer> getRunningFlows(final int projectId, final String flowId) {
+    // Todo: get running flows from DB
+    return Collections.emptyList();
+  }
+
+  @Override
+  public List<Pair<ExecutableFlow, Optional<Executor>>> getActiveFlowsWithExecutor()
+      throws IOException {
+    // Todo: get active flows with executor from DB
+    return Collections.emptyList();
+  }
+
+  /**
+   * Checks whether the given flow has an active (running, non-dispatched) execution from
+   * database. {@inheritDoc}
+   */
+  @Override
+  public boolean isFlowRunning(final int projectId, final String flowId) {
+    // Todo: check DB to see if flow is running
+    return true;
+  }
+
+  /**
+   * Fetch ExecutableFlow from database. {@inheritDoc}
+   */
+  @Override
+  public ExecutableFlow getExecutableFlow(final int execId)
+      throws ExecutorManagerException {
+    return this.executorLoader.fetchExecutableFlow(execId);
+  }
+
+  /**
+   * Get all active (running, non-dispatched) flows from database. {@inheritDoc}
+   */
+  @Override
+  public List<ExecutableFlow> getRunningFlows() {
+    // Todo: get running flows from DB
+    return Collections.emptyList();
+  }
+
+  @Override
+  public List<ExecutableFlow> getRecentlyFinishedFlows() {
+    List<ExecutableFlow> flows = new ArrayList<>();
+    try {
+      flows = this.executorLoader.fetchRecentlyFinishedFlows(
+          RECENTLY_FINISHED_LIFETIME);
+    } catch (final ExecutorManagerException e) {
+      logger.error("Failed to fetch recently finished flows.", e);
+    }
+    return flows;
+  }
+
+  @Override
+  public List<ExecutableFlow> getExecutableFlows(final int skip, final int size)
+      throws ExecutorManagerException {
+    final List<ExecutableFlow> flows = this.executorLoader.fetchFlowHistory(skip, size);
+    return flows;
+  }
+
+  @Override
+  public List<ExecutableFlow> getExecutableFlows(final String flowIdContains,
+      final int skip, final int size) throws ExecutorManagerException {
+    final List<ExecutableFlow> flows =
+        this.executorLoader.fetchFlowHistory(null, '%' + flowIdContains + '%', null,
+            0, -1, -1, skip, size);
+    return flows;
+  }
+
+  @Override
+  public List<ExecutableFlow> getExecutableFlows(final String projContain,
+      final String flowContain, final String userContain, final int status, final long begin,
+      final long end,
+      final int skip, final int size) throws ExecutorManagerException {
+    final List<ExecutableFlow> flows =
+        this.executorLoader.fetchFlowHistory(projContain, flowContain, userContain,
+            status, begin, end, skip, size);
+    return flows;
+  }
+
+  @Override
+  public List<ExecutableJobInfo> getExecutableJobs(final Project project,
+      final String jobId, final int skip, final int size) throws ExecutorManagerException {
+    final List<ExecutableJobInfo> nodes =
+        this.executorLoader.fetchJobHistory(project.getId(), jobId, skip, size);
+    return nodes;
+  }
+
+  @Override
+  public int getNumberOfJobExecutions(final Project project, final String jobId)
+      throws ExecutorManagerException {
+    return this.executorLoader.fetchNumExecutableNodes(project.getId(), jobId);
+  }
+
+  @Override
+  public LogData getExecutableFlowLog(final ExecutableFlow exFlow, final int offset,
+      final int length) throws ExecutorManagerException {
+    // Todo: get the flow log from executor if the flow is running, else get it from DB.
+    return new LogData(0, 0, "dummy");
+  }
+
+  @Override
+  public LogData getExecutionJobLog(final ExecutableFlow exFlow, final String jobId,
+      final int offset, final int length, final int attempt) throws ExecutorManagerException {
+    // Todo: get the job log from executor if the flow is running, else get it from DB.
+    return new LogData(0, 0, "dummy");
+  }
+
+  @Override
+  public List<Object> getExecutionJobStats(final ExecutableFlow exFlow, final String jobId,
+      final int attempt) throws ExecutorManagerException {
+    // Todo: get execution job status from executor if the flow is running, else get if from DB.
+    return Collections.emptyList();
+  }
+
+  @Override
+  public String getJobLinkUrl(final ExecutableFlow exFlow, final String jobId, final int attempt) {
+    // Todo: deprecate this method
+    return null;
+  }
+
+  /**
+   * If a flow was dispatched to an executor, cancel by calling Executor. Else if it's still
+   * queued in database, remove it from database queue and finalize. {@inheritDoc}
+   */
+  @Override
+  public void cancelFlow(final ExecutableFlow exFlow, final String userId)
+      throws ExecutorManagerException {
+    // Todo: call executor to cancel the flow if it's running or remove from DB queue if it
+    // hasn't started
+  }
+
+  @Override
+  public void resumeFlow(final ExecutableFlow exFlow, final String userId)
+      throws ExecutorManagerException {
+    // Todo: call executor to resume the flow
+  }
+
+  @Override
+  public void pauseFlow(final ExecutableFlow exFlow, final String userId)
+      throws ExecutorManagerException {
+    // Todo: call executor to pause the flow
+  }
+
+  @Override
+  public void retryFailures(final ExecutableFlow exFlow, final String userId)
+      throws ExecutorManagerException {
+    // Todo: call executor to retry failed flows
+  }
+
+  /**
+   * When a flow is submitted, insert a new execution into the database queue. {@inheritDoc}
+   */
+  @Override
+  public String submitExecutableFlow(final ExecutableFlow exflow, final String userId)
+      throws ExecutorManagerException {
+    // Todo: insert the execution to DB queue
+    return null;
+  }
+
+  @Override
+  public Map<String, Object> callExecutorStats(final int executorId, final String action,
+      final Pair<String, String>... params) throws IOException, ExecutorManagerException {
+    final Executor executor = fetchExecutor(executorId);
+    final List<Pair<String, String>> paramList = new ArrayList<>();
+
+    if (params != null) {
+      paramList.addAll(Arrays.asList(params));
+    }
+
+    paramList.add(new Pair<>(ConnectorParams.ACTION_PARAM, action));
+
+    return this.apiGateway.callForJsonObjectMap(executor.getHost(), executor.getPort(),
+        "/stats", paramList);
+  }
+
+  @Override
+  public Map<String, Object> callExecutorJMX(final String hostPort, final String action,
+      final String mBean) throws IOException {
+    final List<Pair<String, String>> paramList =
+        new ArrayList<>();
+
+    paramList.add(new Pair<>(action, ""));
+    if (mBean != null) {
+      paramList.add(new Pair<>(ConnectorParams.JMX_MBEAN, mBean));
+    }
+
+    final String[] hostPortSplit = hostPort.split(":");
+    return this.apiGateway.callForJsonObjectMap(hostPortSplit[0],
+        Integer.valueOf(hostPortSplit[1]), "/jmx", paramList);
+  }
+
+
+  @Override
+  public void shutdown() {
+    //Todo: shutdown any thread that is running
+  }
+
+  @Override
+  public int getExecutableFlows(final int projectId, final String flowId, final int from,
+      final int length, final List<ExecutableFlow> outputList)
+      throws ExecutorManagerException {
+    final List<ExecutableFlow> flows =
+        this.executorLoader.fetchFlowHistory(projectId, flowId, from, length);
+    outputList.addAll(flows);
+    return this.executorLoader.fetchNumExecutableFlows(projectId, flowId);
+  }
+
+  @Override
+  public List<ExecutableFlow> getExecutableFlows(final int projectId, final String flowId,
+      final int from, final int length, final Status status) throws ExecutorManagerException {
+    return this.executorLoader.fetchFlowHistory(projectId, flowId, from, length,
+        status);
+  }
+
+}
diff --git a/azkaban-common/src/main/java/azkaban/trigger/builtin/ExecuteFlowAction.java b/azkaban-common/src/main/java/azkaban/trigger/builtin/ExecuteFlowAction.java
index 6d00500..ebf06e3 100644
--- a/azkaban-common/src/main/java/azkaban/trigger/builtin/ExecuteFlowAction.java
+++ b/azkaban-common/src/main/java/azkaban/trigger/builtin/ExecuteFlowAction.java
@@ -38,7 +38,7 @@ public class ExecuteFlowAction implements TriggerAction {
 
   public static final String EXEC_ID = "ExecuteFlowAction.execid";
 
-  private static ExecutorManagerAdapter executorManager;
+  private static ExecutorManagerAdapter executorManagerAdapter;
   private static TriggerManager triggerManager;
   private static ProjectManager projectManager;
   private static Logger logger = Logger.getLogger(ExecuteFlowAction.class);
@@ -66,12 +66,13 @@ public class ExecuteFlowAction implements TriggerAction {
     ExecuteFlowAction.logger = logger;
   }
 
-  public static ExecutorManagerAdapter getExecutorManager() {
-    return executorManager;
+  public static ExecutorManagerAdapter getExecutorManagerAdapter() {
+    return executorManagerAdapter;
   }
 
-  public static void setExecutorManager(final ExecutorManagerAdapter executorManager) {
-    ExecuteFlowAction.executorManager = executorManager;
+  public static void setExecutorManagerAdapter(
+      final ExecutorManagerAdapter executorManagerAdapter) {
+    ExecuteFlowAction.executorManagerAdapter = executorManagerAdapter;
   }
 
   public static TriggerManager getTriggerManager() {
@@ -197,7 +198,7 @@ public class ExecuteFlowAction implements TriggerAction {
 
   @Override
   public void doAction() throws Exception {
-    if (projectManager == null || executorManager == null) {
+    if (projectManager == null || executorManagerAdapter == null) {
       throw new Exception("ExecuteFlowAction not properly initialized!");
     }
 
@@ -224,7 +225,7 @@ public class ExecuteFlowAction implements TriggerAction {
     }
 
     logger.info("Invoking flow " + project.getName() + "." + this.flowName);
-    executorManager.submitExecutableFlow(exflow, this.submitUser);
+    executorManagerAdapter.submitExecutableFlow(exflow, this.submitUser);
     logger.info("Invoked flow " + project.getName() + "." + this.flowName);
   }
 
diff --git a/azkaban-common/src/main/java/azkaban/trigger/builtin/ExecutionChecker.java b/azkaban-common/src/main/java/azkaban/trigger/builtin/ExecutionChecker.java
index 30a021c..24fd2b6 100644
--- a/azkaban-common/src/main/java/azkaban/trigger/builtin/ExecutionChecker.java
+++ b/azkaban-common/src/main/java/azkaban/trigger/builtin/ExecutionChecker.java
@@ -28,7 +28,7 @@ import java.util.Map;
 public class ExecutionChecker implements ConditionChecker {
 
   public static final String type = "ExecutionChecker";
-  public static ExecutorManagerAdapter executorManager;
+  public static ExecutorManagerAdapter executorManagerAdapter;
 
   private final String checkerId;
   private final int execId;
@@ -43,8 +43,8 @@ public class ExecutionChecker implements ConditionChecker {
     this.wantedStatus = wantedStatus;
   }
 
-  public static void setExecutorManager(final ExecutorManagerAdapter em) {
-    executorManager = em;
+  public static void setExecutorManagerAdapter(final ExecutorManagerAdapter em) {
+    executorManagerAdapter = em;
   }
 
   public static ExecutionChecker createFromJson(final HashMap<String, Object> jsonObj)
@@ -68,7 +68,7 @@ public class ExecutionChecker implements ConditionChecker {
   public Object eval() {
     final ExecutableFlow exflow;
     try {
-      exflow = executorManager.getExecutableFlow(this.execId);
+      exflow = executorManagerAdapter.getExecutableFlow(this.execId);
     } catch (final ExecutorManagerException e) {
       e.printStackTrace();
       return Boolean.FALSE;
diff --git a/azkaban-common/src/main/java/azkaban/trigger/builtin/KillExecutionAction.java b/azkaban-common/src/main/java/azkaban/trigger/builtin/KillExecutionAction.java
index 919ecc4..c378c90 100644
--- a/azkaban-common/src/main/java/azkaban/trigger/builtin/KillExecutionAction.java
+++ b/azkaban-common/src/main/java/azkaban/trigger/builtin/KillExecutionAction.java
@@ -38,7 +38,7 @@ public class KillExecutionAction implements TriggerAction {
 
   private static final Logger logger = Logger
       .getLogger(KillExecutionAction.class);
-  private static ExecutorManagerAdapter executorManager;
+  private static ExecutorManagerAdapter executorManagerAdapter;
   private final String actionId;
   private final int execId;
 
@@ -48,8 +48,8 @@ public class KillExecutionAction implements TriggerAction {
     this.actionId = actionId;
   }
 
-  public static void setExecutorManager(final ExecutorManagerAdapter em) {
-    executorManager = em;
+  public static void setExecutorManagerAdapter(final ExecutorManagerAdapter em) {
+    executorManagerAdapter = em;
   }
 
   public static KillExecutionAction createFromJson(final Object obj) {
@@ -94,11 +94,11 @@ public class KillExecutionAction implements TriggerAction {
 
   @Override
   public void doAction() throws Exception {
-    final ExecutableFlow exFlow = executorManager.getExecutableFlow(this.execId);
+    final ExecutableFlow exFlow = executorManagerAdapter.getExecutableFlow(this.execId);
     logger.info("ready to kill execution " + this.execId);
     if (!Status.isStatusFinished(exFlow.getStatus())) {
       logger.info("Killing execution " + this.execId);
-      executorManager.cancelFlow(exFlow, Constants.AZKABAN_SLA_CHECKER_USERNAME);
+      executorManagerAdapter.cancelFlow(exFlow, Constants.AZKABAN_SLA_CHECKER_USERNAME);
     }
   }
 
diff --git a/azkaban-common/src/main/java/azkaban/trigger/TriggerManager.java b/azkaban-common/src/main/java/azkaban/trigger/TriggerManager.java
index 69438b8..a2b4032 100644
--- a/azkaban-common/src/main/java/azkaban/trigger/TriggerManager.java
+++ b/azkaban-common/src/main/java/azkaban/trigger/TriggerManager.java
@@ -19,11 +19,9 @@ package azkaban.trigger;
 import static java.util.Objects.requireNonNull;
 
 import azkaban.event.EventHandler;
-import azkaban.executor.ExecutorManager;
+import azkaban.executor.ExecutorManagerAdapter;
 import azkaban.executor.ExecutorManagerException;
 import azkaban.utils.Props;
-import javax.inject.Inject;
-import javax.inject.Singleton;
 import java.util.ArrayList;
 import java.util.Comparator;
 import java.util.HashMap;
@@ -34,6 +32,8 @@ import java.util.Set;
 import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.PriorityBlockingQueue;
+import javax.inject.Inject;
+import javax.inject.Singleton;
 import org.apache.log4j.Logger;
 
 @Singleton
@@ -57,10 +57,10 @@ public class TriggerManager extends EventHandler implements
 
   @Inject
   public TriggerManager(final Props props, final TriggerLoader triggerLoader,
-      final ExecutorManager executorManager) throws TriggerManagerException {
+      final ExecutorManagerAdapter executorManagerAdapter) throws TriggerManagerException {
 
     requireNonNull(props);
-    requireNonNull(executorManager);
+    requireNonNull(executorManagerAdapter);
     this.triggerLoader = requireNonNull(triggerLoader);
 
     final long scannerInterval =
diff --git a/azkaban-common/src/test/java/azkaban/trigger/TriggerManagerTest.java b/azkaban-common/src/test/java/azkaban/trigger/TriggerManagerTest.java
index 2f98175..064d238 100644
--- a/azkaban-common/src/test/java/azkaban/trigger/TriggerManagerTest.java
+++ b/azkaban-common/src/test/java/azkaban/trigger/TriggerManagerTest.java
@@ -18,13 +18,11 @@ package azkaban.trigger;
 
 import static org.junit.Assert.assertTrue;
 import static org.mockito.ArgumentMatchers.any;
-import static org.mockito.Mockito.anyObject;
-import static org.mockito.Mockito.doNothing;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.when;
 
 import azkaban.executor.ExecutionOptions;
-import azkaban.executor.ExecutorManager;
+import azkaban.executor.ExecutorManagerAdapter;
 import azkaban.executor.ExecutorManagerException;
 import azkaban.flow.Flow;
 import azkaban.project.Project;
@@ -51,16 +49,15 @@ import org.junit.Test;
 public class TriggerManagerTest {
 
   private static TriggerLoader triggerLoader;
-  private static ExecutorManager executorManager;
+  private static ExecutorManagerAdapter executorManagerAdapter;
   private static ProjectManager projectManager;
   private TriggerManager triggerManager;
 
   @BeforeClass
   public static void prepare() {
     triggerLoader = new MockTriggerLoader();
-    executorManager = mock(ExecutorManager.class);
+    executorManagerAdapter = mock(ExecutorManagerAdapter.class);
     projectManager = mock(ProjectManager.class);
-    doNothing().when(executorManager).addListener(anyObject());
   }
 
   @Before
@@ -68,15 +65,15 @@ public class TriggerManagerTest {
     final Project project = new Project(1, "test-project");
     project.setFlows(ImmutableMap.of("test-flow", new Flow("test-flow")));
     when(projectManager.getProject(1)).thenReturn(project);
-    when(executorManager.submitExecutableFlow(any(), any()))
+    when(executorManagerAdapter.submitExecutableFlow(any(), any()))
         .thenThrow(new ExecutorManagerException("Flow is already running. Skipping execution.",
             ExecutorManagerException.Reason.SkippedExecution));
-    ExecuteFlowAction.setExecutorManager(this.executorManager);
+    ExecuteFlowAction.setExecutorManagerAdapter(this.executorManagerAdapter);
     ExecuteFlowAction.setProjectManager(this.projectManager);
     ExecuteFlowAction.setTriggerManager(this.triggerManager);
     final Props props = new Props();
     props.put("trigger.scan.interval", 300);
-    this.triggerManager = new TriggerManager(props, triggerLoader, executorManager);
+    this.triggerManager = new TriggerManager(props, triggerLoader, executorManagerAdapter);
     this.triggerManager.registerCheckerType(ThresholdChecker.type,
         ThresholdChecker.class);
     this.triggerManager.registerActionType(DummyTriggerAction.type,
diff --git a/azkaban-solo-server/src/main/java/azkaban/soloserver/AzkabanSingleServer.java b/azkaban-solo-server/src/main/java/azkaban/soloserver/AzkabanSingleServer.java
index 124fe51..eb16d67 100644
--- a/azkaban-solo-server/src/main/java/azkaban/soloserver/AzkabanSingleServer.java
+++ b/azkaban-solo-server/src/main/java/azkaban/soloserver/AzkabanSingleServer.java
@@ -53,10 +53,10 @@ public class AzkabanSingleServer {
     this.executor = executor;
   }
 
-  public static void main(String[] args) {
+  public static void main(final String[] args) {
     try {
       start(args);
-    } catch (Exception e) {
+    } catch (final Exception e) {
       log.error("Failed to start single server. Shutting down.", e);
       System.exit(1);
     }
@@ -96,7 +96,7 @@ public class AzkabanSingleServer {
     /* Initialize Guice Injector */
     final Injector injector = Guice.createInjector(
         new AzkabanCommonModule(props),
-        new AzkabanWebServerModule(),
+        new AzkabanWebServerModule(props),
         new AzkabanExecServerModule()
     );
     SERVICE_PROVIDER.setInjector(injector);
diff --git a/azkaban-solo-server/src/test/java/azkaban/soloserver/AzkabanSingleServerTest.java b/azkaban-solo-server/src/test/java/azkaban/soloserver/AzkabanSingleServerTest.java
index cc2903c..f328b5b 100644
--- a/azkaban-solo-server/src/test/java/azkaban/soloserver/AzkabanSingleServerTest.java
+++ b/azkaban-solo-server/src/test/java/azkaban/soloserver/AzkabanSingleServerTest.java
@@ -105,7 +105,7 @@ public class AzkabanSingleServerTest {
     final Injector injector = Guice.createInjector(
         new AzkabanCommonModule(props),
         new AzkabanExecServerModule(),
-        new AzkabanWebServerModule()
+        new AzkabanWebServerModule(props)
     );
     SERVICE_PROVIDER.setInjector(injector);
 
diff --git a/azkaban-web-server/src/main/java/azkaban/webapp/AzkabanWebServer.java b/azkaban-web-server/src/main/java/azkaban/webapp/AzkabanWebServer.java
index 35b74e1..1ed6666 100644
--- a/azkaban-web-server/src/main/java/azkaban/webapp/AzkabanWebServer.java
+++ b/azkaban-web-server/src/main/java/azkaban/webapp/AzkabanWebServer.java
@@ -24,6 +24,7 @@ import azkaban.Constants;
 import azkaban.Constants.ConfigurationKeys;
 import azkaban.database.AzkabanDatabaseSetup;
 import azkaban.executor.ExecutorManager;
+import azkaban.executor.ExecutorManagerAdapter;
 import azkaban.flowtrigger.FlowTriggerService;
 import azkaban.flowtrigger.quartz.FlowTriggerScheduler;
 import azkaban.jmx.JmxExecutorManager;
@@ -137,7 +138,7 @@ public class AzkabanWebServer extends AzkabanServer {
   private final Server server;
   private final UserManager userManager;
   private final ProjectManager projectManager;
-  private final ExecutorManager executorManager;
+  private final ExecutorManagerAdapter executorManagerAdapter;
   private final ScheduleManager scheduleManager;
   private final TriggerManager triggerManager;
   private final MetricsManager metricsManager;
@@ -152,7 +153,7 @@ public class AzkabanWebServer extends AzkabanServer {
   @Inject
   public AzkabanWebServer(final Props props,
       final Server server,
-      final ExecutorManager executorManager,
+      final ExecutorManagerAdapter executorManagerAdapter,
       final ProjectManager projectManager,
       final TriggerManager triggerManager,
       final MetricsManager metricsManager,
@@ -165,7 +166,8 @@ public class AzkabanWebServer extends AzkabanServer {
       final StatusService statusService) {
     this.props = requireNonNull(props, "props is null.");
     this.server = requireNonNull(server, "server is null.");
-    this.executorManager = requireNonNull(executorManager, "executorManager is null.");
+    this.executorManagerAdapter = requireNonNull(executorManagerAdapter,
+        "executorManagerAdapter is null.");
     this.projectManager = requireNonNull(projectManager, "projectManager is null.");
     this.triggerManager = requireNonNull(triggerManager, "triggerManager is null.");
     this.metricsManager = requireNonNull(metricsManager, "metricsManager is null.");
@@ -217,7 +219,7 @@ public class AzkabanWebServer extends AzkabanServer {
     /* Initialize Guice Injector */
     final Injector injector = Guice.createInjector(
         new AzkabanCommonModule(props),
-        new AzkabanWebServerModule()
+        new AzkabanWebServerModule(props)
     );
     SERVICE_PROVIDER.setInjector(injector);
 
@@ -228,7 +230,10 @@ public class AzkabanWebServer extends AzkabanServer {
     /* This creates the Web Server instance */
     app = webServer;
 
-    webServer.executorManager.start();
+    // Todo jamiesjc: also start the threads in ExecutionController if needed.
+    if (webServer.executorManagerAdapter instanceof ExecutorManager) {
+      ((ExecutorManager) webServer.executorManagerAdapter).start();
+    }
 
     // TODO refactor code into ServerProvider
     webServer.prepareAndStartServer();
@@ -544,7 +549,9 @@ public class AzkabanWebServer extends AzkabanServer {
     createThreadPool();
     configureRoutes();
 
-    if (this.props.getBoolean(Constants.ConfigurationKeys.IS_METRICS_ENABLED, false)) {
+    // Todo jamiesjc: enable web metrics for azkaban poll model later
+    if (this.props.getBoolean(Constants.ConfigurationKeys.IS_METRICS_ENABLED, false)
+        && !this.props.getBoolean(ConfigurationKeys.AZKABAN_POLL_MODEL, false)) {
       startWebMetrics();
     }
 
@@ -589,7 +596,9 @@ public class AzkabanWebServer extends AzkabanServer {
 
 
   private void startWebMetrics() throws Exception {
-    this.metricsManager.addGauge("WEB-NumQueuedFlows", this.executorManager::getQueuedFlowSize);
+    this.metricsManager
+        .addGauge("WEB-NumQueuedFlows", ((ExecutorManager) this
+            .executorManagerAdapter)::getQueuedFlowSize);
     /*
      * TODO: Currently {@link ExecutorManager#getRunningFlows()} includes both running and non-dispatched flows.
      * Originally we would like to do a subtraction between getRunningFlows and {@link ExecutorManager#getQueuedFlowSize()},
@@ -598,7 +607,8 @@ public class AzkabanWebServer extends AzkabanServer {
      * a thread safe subtraction. We need to fix this in the future.
      */
     this.metricsManager
-        .addGauge("WEB-NumRunningFlows", () -> this.executorManager.getRunningFlows().size());
+        .addGauge("WEB-NumRunningFlows",
+            () -> ((ExecutorManager) this.executorManagerAdapter).getRunningFlows().size());
 
     logger.info("starting reporting Web Server Metrics");
     this.metricsManager.startReporting("AZ-WEB", this.props);
@@ -606,12 +616,12 @@ public class AzkabanWebServer extends AzkabanServer {
 
   private void loadBuiltinCheckersAndActions() {
     logger.info("Loading built-in checker and action types");
-    ExecuteFlowAction.setExecutorManager(this.executorManager);
+    ExecuteFlowAction.setExecutorManagerAdapter(this.executorManagerAdapter);
     ExecuteFlowAction.setProjectManager(this.projectManager);
     ExecuteFlowAction.setTriggerManager(this.triggerManager);
-    KillExecutionAction.setExecutorManager(this.executorManager);
+    KillExecutionAction.setExecutorManagerAdapter(this.executorManagerAdapter);
     CreateTriggerAction.setTriggerManager(this.triggerManager);
-    ExecutionChecker.setExecutorManager(this.executorManager);
+    ExecutionChecker.setExecutorManagerAdapter(this.executorManagerAdapter);
 
     this.triggerManager.registerCheckerType(BasicTimeChecker.type, BasicTimeChecker.class);
     this.triggerManager.registerCheckerType(SlaChecker.type, SlaChecker.class);
@@ -647,8 +657,8 @@ public class AzkabanWebServer extends AzkabanServer {
     return this.projectManager;
   }
 
-  public ExecutorManager getExecutorManager() {
-    return this.executorManager;
+  public ExecutorManagerAdapter getExecutorManagerAdapter() {
+    return this.executorManagerAdapter;
   }
 
   public ScheduleManager getScheduleManager() {
@@ -681,8 +691,11 @@ public class AzkabanWebServer extends AzkabanServer {
 
     registerMbean("jetty", new JmxJettyServer(this.server));
     registerMbean("triggerManager", new JmxTriggerManager(this.triggerManager));
-    if (this.executorManager != null) {
-      registerMbean("executorManager", new JmxExecutorManager(this.executorManager));
+    // Todo jamiesjc: enable Jmx for executionController later
+    if (this.executorManagerAdapter != null
+        && this.executorManagerAdapter instanceof ExecutorManager) {
+      registerMbean("executorManager",
+          new JmxExecutorManager((ExecutorManager) this.executorManagerAdapter));
     }
 
     // Register Log4J loggers as JMX beans so the log level can be
@@ -712,7 +725,7 @@ public class AzkabanWebServer extends AzkabanServer {
       logger.error("Failed to cleanup MBeanServer", e);
     }
     this.scheduleManager.shutdown();
-    this.executorManager.shutdown();
+    this.executorManagerAdapter.shutdown();
     try {
       this.server.stop();
     } catch (final Exception e) {
diff --git a/azkaban-web-server/src/main/java/azkaban/webapp/AzkabanWebServerModule.java b/azkaban-web-server/src/main/java/azkaban/webapp/AzkabanWebServerModule.java
index 039e864..344b81e 100644
--- a/azkaban-web-server/src/main/java/azkaban/webapp/AzkabanWebServerModule.java
+++ b/azkaban-web-server/src/main/java/azkaban/webapp/AzkabanWebServerModule.java
@@ -18,11 +18,13 @@
 package azkaban.webapp;
 
 import azkaban.Constants.ConfigurationKeys;
+import azkaban.executor.ExecutionController;
+import azkaban.executor.ExecutorManager;
+import azkaban.executor.ExecutorManagerAdapter;
 import azkaban.flowtrigger.database.FlowTriggerInstanceLoader;
 import azkaban.flowtrigger.database.JdbcFlowTriggerInstanceLoaderImpl;
 import azkaban.flowtrigger.plugin.FlowTriggerDependencyPluginException;
 import azkaban.flowtrigger.plugin.FlowTriggerDependencyPluginManager;
-import azkaban.flowtrigger.database.JdbcFlowTriggerInstanceLoaderImpl;
 import azkaban.scheduler.ScheduleLoader;
 import azkaban.scheduler.TriggerBasedScheduleLoader;
 import azkaban.user.UserManager;
@@ -50,6 +52,11 @@ public class AzkabanWebServerModule extends AbstractModule {
   private static final Logger log = Logger.getLogger(AzkabanWebServerModule.class);
   private static final String USER_MANAGER_CLASS_PARAM = "user.manager.class";
   private static final String VELOCITY_DEV_MODE_PARAM = "velocity.dev.mode";
+  private final Props props;
+
+  public AzkabanWebServerModule(final Props props) {
+    this.props = props;
+  }
 
   @Provides
   @Singleton
@@ -71,6 +78,12 @@ public class AzkabanWebServerModule extends AbstractModule {
     bind(Server.class).toProvider(WebServerProvider.class);
     bind(ScheduleLoader.class).to(TriggerBasedScheduleLoader.class);
     bind(FlowTriggerInstanceLoader.class).to(JdbcFlowTriggerInstanceLoaderImpl.class);
+    bind(ExecutorManagerAdapter.class).to(resolveExecutorManagerAdaptorClassType());
+  }
+
+  private Class<? extends ExecutorManagerAdapter> resolveExecutorManagerAdaptorClassType() {
+    return this.props.getBoolean(ConfigurationKeys.AZKABAN_POLL_MODEL, false)
+        ? ExecutionController.class : ExecutorManager.class;
   }
 
   @Inject
diff --git a/azkaban-web-server/src/main/java/azkaban/webapp/servlet/ExecutorServlet.java b/azkaban-web-server/src/main/java/azkaban/webapp/servlet/ExecutorServlet.java
index beeab4a..85d74c6 100644
--- a/azkaban-web-server/src/main/java/azkaban/webapp/servlet/ExecutorServlet.java
+++ b/azkaban-web-server/src/main/java/azkaban/webapp/servlet/ExecutorServlet.java
@@ -74,7 +74,7 @@ public class ExecutorServlet extends LoginAbstractAzkabanServlet {
   private WebMetrics webMetrics;
   private ProjectManager projectManager;
   private FlowTriggerService flowTriggerService;
-  private ExecutorManagerAdapter executorManager;
+  private ExecutorManagerAdapter executorManagerAdapter;
   private ScheduleManager scheduleManager;
   private UserManager userManager;
 
@@ -84,7 +84,7 @@ public class ExecutorServlet extends LoginAbstractAzkabanServlet {
     final AzkabanWebServer server = (AzkabanWebServer) getApplication();
     this.userManager = server.getUserManager();
     this.projectManager = server.getProjectManager();
-    this.executorManager = server.getExecutorManager();
+    this.executorManagerAdapter = server.getExecutorManagerAdapter();
     this.scheduleManager = server.getScheduleManager();
     this.flowTriggerService = server.getFlowTriggerService();
     // TODO: reallocf fully guicify
@@ -120,7 +120,7 @@ public class ExecutorServlet extends LoginAbstractAzkabanServlet {
       ExecutableFlow exFlow = null;
 
       try {
-        exFlow = this.executorManager.getExecutableFlow(execid);
+        exFlow = this.executorManagerAdapter.getExecutableFlow(execid);
       } catch (final ExecutorManagerException e) {
         ret.put("error",
             "Error fetching execution '" + execid + "': " + e.getMessage());
@@ -198,9 +198,9 @@ public class ExecutorServlet extends LoginAbstractAzkabanServlet {
     if (HttpRequestUtils.hasPermission(this.userManager, user, Type.ADMIN)) {
       try {
         if (enableQueue) {
-          this.executorManager.enableQueueProcessorThread();
+          this.executorManagerAdapter.enableQueueProcessorThread();
         } else {
-          this.executorManager.disableQueueProcessorThread();
+          this.executorManagerAdapter.disableQueueProcessorThread();
         }
         returnMap.put(ConnectorParams.STATUS_PARAM,
             ConnectorParams.RESPONSE_SUCCESS);
@@ -256,7 +256,7 @@ public class ExecutorServlet extends LoginAbstractAzkabanServlet {
     boolean wasSuccess = false;
     if (HttpRequestUtils.hasPermission(this.userManager, user, Type.ADMIN)) {
       try {
-        this.executorManager.setupExecutors();
+        this.executorManagerAdapter.setupExecutors();
         returnMap.put(ConnectorParams.STATUS_PARAM,
             ConnectorParams.RESPONSE_SUCCESS);
         wasSuccess = true;
@@ -300,7 +300,7 @@ public class ExecutorServlet extends LoginAbstractAzkabanServlet {
     ExecutableNode node = null;
     final String jobLinkUrl;
     try {
-      flow = this.executorManager.getExecutableFlow(execId);
+      flow = this.executorManagerAdapter.getExecutableFlow(execId);
       if (flow == null) {
         page.add("errorMsg", "Error loading executing flow " + execId
             + ": not found.");
@@ -315,7 +315,7 @@ public class ExecutorServlet extends LoginAbstractAzkabanServlet {
         return;
       }
 
-      jobLinkUrl = this.executorManager.getJobLinkUrl(flow, jobId, attempt);
+      jobLinkUrl = this.executorManagerAdapter.getJobLinkUrl(flow, jobId, attempt);
 
       final List<ViewerPlugin> jobViewerPlugins =
           PluginRegistry.getRegistry().getViewerPluginsForJobType(
@@ -359,11 +359,11 @@ public class ExecutorServlet extends LoginAbstractAzkabanServlet {
             "azkaban/webapp/servlet/velocity/executionspage.vm");
 
     final List<Pair<ExecutableFlow, Optional<Executor>>> runningFlows =
-        this.executorManager.getActiveFlowsWithExecutor();
+        this.executorManagerAdapter.getActiveFlowsWithExecutor();
     page.add("runningFlows", runningFlows.isEmpty() ? null : runningFlows);
 
     final List<ExecutableFlow> finishedFlows =
-        this.executorManager.getRecentlyFinishedFlows();
+        this.executorManagerAdapter.getRecentlyFinishedFlows();
     page.add("recentlyFinished", finishedFlows.isEmpty() ? null : finishedFlows);
     page.add("vmutils", new VelocityUtil(this.projectManager));
     page.render();
@@ -439,7 +439,7 @@ public class ExecutorServlet extends LoginAbstractAzkabanServlet {
 
     ExecutableFlow flow = null;
     try {
-      flow = this.executorManager.getExecutableFlow(execId);
+      flow = this.executorManagerAdapter.getExecutableFlow(execId);
       if (flow == null) {
         page.add("errorMsg", "Error loading executing flow " + execId
             + " not found.");
@@ -537,7 +537,7 @@ public class ExecutorServlet extends LoginAbstractAzkabanServlet {
     }
 
     try {
-      this.executorManager.retryFailures(exFlow, user.getUserId());
+      this.executorManagerAdapter.retryFailures(exFlow, user.getUserId());
     } catch (final ExecutorManagerException e) {
       ret.put("error", e.getMessage());
     }
@@ -563,7 +563,7 @@ public class ExecutorServlet extends LoginAbstractAzkabanServlet {
 
     try {
       final LogData data =
-          this.executorManager.getExecutableFlowLog(exFlow, offset, length);
+          this.executorManagerAdapter.getExecutableFlowLog(exFlow, offset, length);
       if (data == null) {
         ret.put("length", 0);
         ret.put("offset", offset);
@@ -614,7 +614,7 @@ public class ExecutorServlet extends LoginAbstractAzkabanServlet {
 
       final int attempt = this.getIntParam(req, "attempt", node.getAttempt());
       final LogData data =
-          this.executorManager.getExecutionJobLog(exFlow, jobId, offset, length,
+          this.executorManagerAdapter.getExecutionJobLog(exFlow, jobId, offset, length,
               attempt);
       if (data == null) {
         ret.put("length", 0);
@@ -651,7 +651,7 @@ public class ExecutorServlet extends LoginAbstractAzkabanServlet {
       }
 
       final List<Object> jsonObj =
-          this.executorManager
+          this.executorManagerAdapter
               .getExecutionJobStats(exFlow, jobId, node.getAttempt());
       ret.put("jobStats", jsonObj);
     } catch (final ExecutorManagerException e) {
@@ -765,7 +765,7 @@ public class ExecutorServlet extends LoginAbstractAzkabanServlet {
     }
 
     try {
-      this.executorManager.cancelFlow(exFlow, user.getUserId());
+      this.executorManagerAdapter.cancelFlow(exFlow, user.getUserId());
     } catch (final ExecutorManagerException e) {
       ret.put("error", e.getMessage());
     }
@@ -781,7 +781,7 @@ public class ExecutorServlet extends LoginAbstractAzkabanServlet {
     }
 
     final List<Integer> refs =
-        this.executorManager.getRunningFlows(project.getId(), flowId);
+        this.executorManagerAdapter.getRunningFlows(project.getId(), flowId);
     if (!refs.isEmpty()) {
       ret.put("execIds", refs);
     }
@@ -798,7 +798,7 @@ public class ExecutorServlet extends LoginAbstractAzkabanServlet {
     }
 
     try {
-      this.executorManager.pauseFlow(exFlow, user.getUserId());
+      this.executorManagerAdapter.pauseFlow(exFlow, user.getUserId());
     } catch (final ExecutorManagerException e) {
       ret.put("error", e.getMessage());
     }
@@ -815,7 +815,7 @@ public class ExecutorServlet extends LoginAbstractAzkabanServlet {
     }
 
     try {
-      this.executorManager.resumeFlow(exFlow, user.getUserId());
+      this.executorManagerAdapter.resumeFlow(exFlow, user.getUserId());
     } catch (final ExecutorManagerException e) {
       ret.put("resume", e.getMessage());
     }
@@ -1004,7 +1004,7 @@ public class ExecutorServlet extends LoginAbstractAzkabanServlet {
     try {
       HttpRequestUtils.filterAdminOnlyFlowParams(this.userManager, options, user);
       final String message =
-          this.executorManager.submitExecutableFlow(exflow, user.getUserId());
+          this.executorManagerAdapter.submitExecutableFlow(exflow, user.getUserId());
       ret.put("message", message);
     } catch (final Exception e) {
       e.printStackTrace();
diff --git a/azkaban-web-server/src/main/java/azkaban/webapp/servlet/HistoryServlet.java b/azkaban-web-server/src/main/java/azkaban/webapp/servlet/HistoryServlet.java
index 6ab39e2..b2b02ad 100644
--- a/azkaban-web-server/src/main/java/azkaban/webapp/servlet/HistoryServlet.java
+++ b/azkaban-web-server/src/main/java/azkaban/webapp/servlet/HistoryServlet.java
@@ -35,14 +35,14 @@ public class HistoryServlet extends LoginAbstractAzkabanServlet {
 
   private static final String FILTER_BY_DATE_PATTERN = "MM/dd/yyyy hh:mm aa";
   private static final long serialVersionUID = 1L;
-  private ExecutorManagerAdapter executorManager;
+  private ExecutorManagerAdapter executorManagerAdapter;
   private ProjectManager projectManager;
 
   @Override
   public void init(final ServletConfig config) throws ServletException {
     super.init(config);
     final AzkabanWebServer server = (AzkabanWebServer) getApplication();
-    this.executorManager = server.getExecutorManager();
+    this.executorManagerAdapter = server.getExecutorManagerAdapter();
     this.projectManager = server.getProjectManager();
   }
 
@@ -111,7 +111,7 @@ public class HistoryServlet extends LoginAbstractAzkabanServlet {
               .parseDateTime(end).getMillis();
       try {
         history =
-            this.executorManager.getExecutableFlows(projContain, flowContain,
+            this.executorManagerAdapter.getExecutableFlows(projContain, flowContain,
                 userContain, status, beginTime, endTime, (pageNum - 1)
                     * pageSize, pageSize);
       } catch (final ExecutorManagerException e) {
@@ -121,7 +121,7 @@ public class HistoryServlet extends LoginAbstractAzkabanServlet {
       final String searchTerm = getParam(req, "searchterm");
       try {
         history =
-            this.executorManager.getExecutableFlows(searchTerm, (pageNum - 1)
+            this.executorManagerAdapter.getExecutableFlows(searchTerm, (pageNum - 1)
                 * pageSize, pageSize);
       } catch (final ExecutorManagerException e) {
         page.add("error", e.getMessage());
@@ -129,7 +129,7 @@ public class HistoryServlet extends LoginAbstractAzkabanServlet {
     } else {
       try {
         history =
-            this.executorManager.getExecutableFlows((pageNum - 1) * pageSize,
+            this.executorManagerAdapter.getExecutableFlows((pageNum - 1) * pageSize,
                 pageSize);
       } catch (final ExecutorManagerException e) {
         e.printStackTrace();
diff --git a/azkaban-web-server/src/main/java/azkaban/webapp/servlet/JMXHttpServlet.java b/azkaban-web-server/src/main/java/azkaban/webapp/servlet/JMXHttpServlet.java
index 1ca04a2..d389062 100644
--- a/azkaban-web-server/src/main/java/azkaban/webapp/servlet/JMXHttpServlet.java
+++ b/azkaban-web-server/src/main/java/azkaban/webapp/servlet/JMXHttpServlet.java
@@ -51,7 +51,7 @@ public class JMXHttpServlet extends LoginAbstractAzkabanServlet implements
 
   private UserManager userManager;
   private AzkabanWebServer server;
-  private ExecutorManagerAdapter executorManager;
+  private ExecutorManagerAdapter executorManagerAdapter;
   private TriggerManager triggerManager;
 
   @Override
@@ -60,7 +60,7 @@ public class JMXHttpServlet extends LoginAbstractAzkabanServlet implements
 
     this.server = (AzkabanWebServer) getApplication();
     this.userManager = this.server.getUserManager();
-    this.executorManager = this.server.getExecutorManager();
+    this.executorManagerAdapter = this.server.getExecutorManagerAdapter();
 
     this.triggerManager = this.server.getTriggerManager();
   }
@@ -83,7 +83,7 @@ public class JMXHttpServlet extends LoginAbstractAzkabanServlet implements
         final String hostPort = getParam(req, JMX_HOSTPORT);
         final String mbean = getParam(req, JMX_MBEAN);
         final Map<String, Object> result =
-            this.executorManager.callExecutorJMX(hostPort,
+            this.executorManagerAdapter.callExecutorJMX(hostPort,
                 JMX_GET_ALL_MBEAN_ATTRIBUTES, mbean);
         // order the attribute by name
         for (final Map.Entry<String, Object> entry : result.entrySet()) {
@@ -171,10 +171,10 @@ public class JMXHttpServlet extends LoginAbstractAzkabanServlet implements
     page.add("mbeans", this.server.getMbeanNames());
 
     final Map<String, Object> executorMBeans = new HashMap<>();
-    for (final String hostPort : this.executorManager.getAllActiveExecutorServerHosts()) {
+    for (final String hostPort : this.executorManagerAdapter.getAllActiveExecutorServerHosts()) {
       try {
         final Map<String, Object> mbeans =
-            this.executorManager.callExecutorJMX(hostPort, JMX_GET_MBEANS, null);
+            this.executorManagerAdapter.callExecutorJMX(hostPort, JMX_GET_MBEANS, null);
 
         executorMBeans.put(hostPort, mbeans.get("mbeans"));
       } catch (final IOException e) {
diff --git a/azkaban-web-server/src/main/java/azkaban/webapp/servlet/ProjectManagerServlet.java b/azkaban-web-server/src/main/java/azkaban/webapp/servlet/ProjectManagerServlet.java
index 6a6ae41..ac2dcaf 100644
--- a/azkaban-web-server/src/main/java/azkaban/webapp/servlet/ProjectManagerServlet.java
+++ b/azkaban-web-server/src/main/java/azkaban/webapp/servlet/ProjectManagerServlet.java
@@ -103,7 +103,7 @@ public class ProjectManagerServlet extends LoginAbstractAzkabanServlet {
     }
   };
   private ProjectManager projectManager;
-  private ExecutorManagerAdapter executorManager;
+  private ExecutorManagerAdapter executorManagerAdapter;
   private ScheduleManager scheduleManager;
   private UserManager userManager;
   private FlowTriggerScheduler scheduler;
@@ -118,7 +118,7 @@ public class ProjectManagerServlet extends LoginAbstractAzkabanServlet {
 
     final AzkabanWebServer server = (AzkabanWebServer) getApplication();
     this.projectManager = server.getProjectManager();
-    this.executorManager = server.getExecutorManager();
+    this.executorManagerAdapter = server.getExecutorManagerAdapter();
     this.scheduleManager = server.getScheduleManager();
     this.userManager = server.getUserManager();
     this.scheduler = server.getScheduler();
@@ -388,7 +388,7 @@ public class ProjectManagerServlet extends LoginAbstractAzkabanServlet {
     List<ExecutableFlow> exFlows = null;
     try {
       exFlows =
-          this.executorManager.getExecutableFlows(project.getId(), flowId, 0, 1,
+          this.executorManagerAdapter.getExecutableFlows(project.getId(), flowId, 0, 1,
               Status.SUCCEEDED);
     } catch (final ExecutorManagerException e) {
       ret.put("error", "Error retrieving executable flows");
@@ -417,7 +417,7 @@ public class ProjectManagerServlet extends LoginAbstractAzkabanServlet {
     int total = 0;
     try {
       total =
-          this.executorManager.getExecutableFlows(project.getId(), flowId, from,
+          this.executorManagerAdapter.getExecutableFlows(project.getId(), flowId, from,
               length, exFlows);
     } catch (final ExecutorManagerException e) {
       ret.put("error", "Error retrieving executable flows");
@@ -1179,10 +1179,10 @@ public class ProjectManagerServlet extends LoginAbstractAzkabanServlet {
 
     int numResults = 0;
     try {
-      numResults = this.executorManager.getNumberOfJobExecutions(project, jobId);
+      numResults = this.executorManagerAdapter.getNumberOfJobExecutions(project, jobId);
       final int maxPage = (numResults / pageSize) + 1;
       List<ExecutableJobInfo> jobInfo =
-          this.executorManager.getExecutableJobs(project, jobId, skipPage, pageSize);
+          this.executorManagerAdapter.getExecutableJobs(project, jobId, skipPage, pageSize);
 
       if (jobInfo == null || jobInfo.isEmpty()) {
         jobInfo = null;
diff --git a/azkaban-web-server/src/main/java/azkaban/webapp/servlet/StatsServlet.java b/azkaban-web-server/src/main/java/azkaban/webapp/servlet/StatsServlet.java
index e6eac84..daf289f 100644
--- a/azkaban-web-server/src/main/java/azkaban/webapp/servlet/StatsServlet.java
+++ b/azkaban-web-server/src/main/java/azkaban/webapp/servlet/StatsServlet.java
@@ -18,7 +18,7 @@ package azkaban.webapp.servlet;
 
 import azkaban.executor.ConnectorParams;
 import azkaban.executor.Executor;
-import azkaban.executor.ExecutorManager;
+import azkaban.executor.ExecutorManagerAdapter;
 import azkaban.executor.ExecutorManagerException;
 import azkaban.server.session.Session;
 import azkaban.user.User;
@@ -47,14 +47,14 @@ public class StatsServlet extends LoginAbstractAzkabanServlet {
 
   private static final long serialVersionUID = 1L;
   private UserManager userManager;
-  private ExecutorManager execManager;
+  private ExecutorManagerAdapter execManagerAdapter;
 
   @Override
   public void init(final ServletConfig config) throws ServletException {
     super.init(config);
     final AzkabanWebServer server = (AzkabanWebServer) getApplication();
     this.userManager = server.getUserManager();
-    this.execManager = server.getExecutorManager();
+    this.execManagerAdapter = server.getExecutorManagerAdapter();
   }
 
   @Override
@@ -108,7 +108,7 @@ public class StatsServlet extends LoginAbstractAzkabanServlet {
     final Map<String, Object> result;
     try {
       result =
-          this.execManager.callExecutorStats(executorId,
+          this.execManagerAdapter.callExecutorStats(executorId,
               ConnectorParams.STATS_GET_ALLMETRICSNAME,
               (Pair<String, String>[]) null);
 
@@ -134,7 +134,7 @@ public class StatsServlet extends LoginAbstractAzkabanServlet {
       throws ServletException, IOException {
     try {
       final Map<String, Object> result =
-          this.execManager
+          this.execManagerAdapter
               .callExecutorStats(executorId, actionName, getAllParams(req));
       if (result.containsKey(ConnectorParams.RESPONSE_ERROR)) {
         ret.put(ConnectorParams.RESPONSE_ERROR,
@@ -157,7 +157,7 @@ public class StatsServlet extends LoginAbstractAzkabanServlet {
       ServletException {
     try {
       final Map<String, Object> result =
-          this.execManager.callExecutorStats(executorId,
+          this.execManagerAdapter.callExecutorStats(executorId,
               ConnectorParams.STATS_GET_METRICHISTORY, getAllParams(req));
       if (result.containsKey(ConnectorParams.RESPONSE_ERROR)) {
         ret.put(ConnectorParams.RESPONSE_ERROR,
@@ -181,11 +181,11 @@ public class StatsServlet extends LoginAbstractAzkabanServlet {
     final Page page = newPage(req, resp, session, "azkaban/webapp/servlet/velocity/statsPage.vm");
 
     try {
-      final Collection<Executor> executors = this.execManager.getAllActiveExecutors();
+      final Collection<Executor> executors = this.execManagerAdapter.getAllActiveExecutors();
       page.add("executorList", executors);
 
       final Map<String, Object> result =
-          this.execManager.callExecutorStats(executors.iterator().next().getId(),
+          this.execManagerAdapter.callExecutorStats(executors.iterator().next().getId(),
               ConnectorParams.STATS_GET_ALLMETRICSNAME,
               (Pair<String, String>[]) null);
       if (result.containsKey(ConnectorParams.RESPONSE_ERROR)) {
diff --git a/azkaban-web-server/src/test/java/azkaban/scheduler/QuartzSchedulerTest.java b/azkaban-web-server/src/test/java/azkaban/scheduler/QuartzSchedulerTest.java
index fd27ec6..0ef2c35 100644
--- a/azkaban-web-server/src/test/java/azkaban/scheduler/QuartzSchedulerTest.java
+++ b/azkaban-web-server/src/test/java/azkaban/scheduler/QuartzSchedulerTest.java
@@ -60,7 +60,7 @@ public class QuartzSchedulerTest {
     props.put("h2.path", "./h2");
     final Injector injector = Guice.createInjector(
         new AzkabanCommonModule(props),
-        new AzkabanWebServerModule()
+        new AzkabanWebServerModule(props)
     );
 
     SERVICE_PROVIDER.unsetInjector();
diff --git a/azkaban-web-server/src/test/java/azkaban/webapp/AzkabanWebServerTest.java b/azkaban-web-server/src/test/java/azkaban/webapp/AzkabanWebServerTest.java
index 7a11e4e..f1b2207 100644
--- a/azkaban-web-server/src/test/java/azkaban/webapp/AzkabanWebServerTest.java
+++ b/azkaban-web-server/src/test/java/azkaban/webapp/AzkabanWebServerTest.java
@@ -38,7 +38,7 @@ import azkaban.executor.Executor;
 import azkaban.executor.ExecutorDao;
 import azkaban.executor.ExecutorEventsDao;
 import azkaban.executor.ExecutorLoader;
-import azkaban.executor.ExecutorManager;
+import azkaban.executor.ExecutorManagerAdapter;
 import azkaban.executor.FetchActiveFlowDao;
 import azkaban.project.ProjectLoader;
 import azkaban.project.ProjectManager;
@@ -120,7 +120,7 @@ public class AzkabanWebServerTest {
   public void testInjection() throws Exception {
     final Injector injector = Guice.createInjector(
         new AzkabanCommonModule(props),
-        new AzkabanWebServerModule()
+        new AzkabanWebServerModule(props)
     );
     SERVICE_PROVIDER.unsetInjector();
     SERVICE_PROVIDER.setInjector(injector);
@@ -137,7 +137,7 @@ public class AzkabanWebServerTest {
 
     //Test if triggermanager is singletonly guiced. If not, the below test will fail.
     assertSingleton(ExecutorLoader.class, injector);
-    assertSingleton(ExecutorManager.class, injector);
+    assertSingleton(ExecutorManagerAdapter.class, injector);
     assertSingleton(ProjectLoader.class, injector);
     assertSingleton(ProjectManager.class, injector);
     assertSingleton(Storage.class, injector);
diff --git a/az-reportal/src/main/java/azkaban/viewer/reportal/ReportalServlet.java b/az-reportal/src/main/java/azkaban/viewer/reportal/ReportalServlet.java
index b01e9d4..c300aa1 100644
--- a/az-reportal/src/main/java/azkaban/viewer/reportal/ReportalServlet.java
+++ b/az-reportal/src/main/java/azkaban/viewer/reportal/ReportalServlet.java
@@ -288,9 +288,9 @@ public class ReportalServlet extends LoginAbstractAzkabanServlet {
       final int offset = getIntParam(req, "offset");
       final int length = getIntParam(req, "length");
       final ExecutableFlow exec;
-      final ExecutorManagerAdapter executorManager = this.server.getExecutorManager();
+      final ExecutorManagerAdapter executorManagerAdapter = this.server.getExecutorManagerAdapter();
       try {
-        exec = executorManager.getExecutableFlow(execId);
+        exec = executorManagerAdapter.getExecutableFlow(execId);
       } catch (final Exception e) {
         ret.put("error", "Log does not exist or isn't created yet.");
         return;
@@ -299,7 +299,7 @@ public class ReportalServlet extends LoginAbstractAzkabanServlet {
       final LogData data;
       try {
         data =
-            executorManager.getExecutionJobLog(exec, jobId, offset, length,
+            executorManagerAdapter.getExecutionJobLog(exec, jobId, offset, length,
                 exec.getExecutableNode(jobId).getAttempt());
       } catch (final Exception e) {
         e.printStackTrace();
@@ -364,7 +364,7 @@ public class ReportalServlet extends LoginAbstractAzkabanServlet {
     preparePage(page, session);
 
     final ProjectManager projectManager = this.server.getProjectManager();
-    final ExecutorManagerAdapter executorManager = this.server.getExecutorManager();
+    final ExecutorManagerAdapter executorManagerAdapter = this.server.getExecutorManagerAdapter();
 
     final Project project = projectManager.getProject(id);
     final Reportal reportal = Reportal.loadFromProject(project);
@@ -392,7 +392,7 @@ public class ReportalServlet extends LoginAbstractAzkabanServlet {
       if (hasParam(req, "logs")) {
         final ExecutableFlow exec;
         try {
-          exec = executorManager.getExecutableFlow(execId);
+          exec = executorManagerAdapter.getExecutableFlow(execId);
         } catch (final ExecutorManagerException e) {
           e.printStackTrace();
           page.add("errorMsg", "ExecutableFlow not found. " + e.getMessage());
@@ -501,10 +501,10 @@ public class ReportalServlet extends LoginAbstractAzkabanServlet {
       }
       try {
         final Flow flow = project.getFlows().get(0);
-        executorManager.getExecutableFlows(project.getId(), flow.getId(),
+        executorManagerAdapter.getExecutableFlows(project.getId(), flow.getId(),
             pageNumber * this.itemsPerPage, this.itemsPerPage, exFlows);
         final ArrayList<ExecutableFlow> tmp = new ArrayList<>();
-        executorManager.getExecutableFlows(project.getId(), flow.getId(),
+        executorManagerAdapter.getExecutableFlows(project.getId(), flow.getId(),
             (pageNumber + 1) * this.itemsPerPage, 1, tmp);
         if (!tmp.isEmpty()) {
           hasNextPage = true;
@@ -1218,7 +1218,7 @@ public class ReportalServlet extends LoginAbstractAzkabanServlet {
 
     try {
       final String message =
-          this.server.getExecutorManager().submitExecutableFlow(exflow,
+          this.server.getExecutorManagerAdapter().submitExecutableFlow(exflow,
               session.getUser().getUserId())
               + ".";
       ret.put("message", message);