azkaban-aplcache

AZNewDispatchingLogic - Submit flow in Polling Model (#2067) *

12/20/2018 9:38:39 PM

Details

diff --git a/az-core/src/main/java/azkaban/Constants.java b/az-core/src/main/java/azkaban/Constants.java
index 9f8409f..afcea5a 100644
--- a/az-core/src/main/java/azkaban/Constants.java
+++ b/az-core/src/main/java/azkaban/Constants.java
@@ -92,6 +92,7 @@ public class Constants {
 
     // Configures Azkaban to use new polling model for dispatching
     public static final String AZKABAN_POLL_MODEL = "azkaban.poll.model";
+    public static final String AZKABAN_POLLING_INTERVAL_MS = "azkaban.polling.interval.ms";
 
     // Configures Azkaban Flow Version in project YAML file
     public static final String AZKABAN_FLOW_VERSION = "azkaban-flow-version";
diff --git a/azkaban-common/src/main/java/azkaban/executor/ExecutionController.java b/azkaban-common/src/main/java/azkaban/executor/ExecutionController.java
index 55da336..60c4f16 100644
--- a/azkaban-common/src/main/java/azkaban/executor/ExecutionController.java
+++ b/azkaban-common/src/main/java/azkaban/executor/ExecutionController.java
@@ -15,16 +15,21 @@
 */
 package azkaban.executor;
 
+import azkaban.Constants.ConfigurationKeys;
 import azkaban.event.EventHandler;
+import azkaban.flow.FlowUtils;
 import azkaban.project.Project;
+import azkaban.project.ProjectWhitelist;
 import azkaban.utils.FileIOUtils.LogData;
 import azkaban.utils.Pair;
+import azkaban.utils.Props;
 import java.io.IOException;
 import java.lang.Thread.State;
 import java.time.Duration;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collection;
+import java.util.Collections;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
@@ -46,13 +51,24 @@ public class ExecutionController extends EventHandler implements ExecutorManager
 
   private static final Logger logger = LoggerFactory.getLogger(ExecutionController.class);
   private static final Duration RECENTLY_FINISHED_LIFETIME = Duration.ofMinutes(10);
+  private static final int DEFAULT_MAX_ONCURRENT_RUNS_ONEFLOW = 30;
   private final ExecutorLoader executorLoader;
   private final ExecutorApiGateway apiGateway;
+  private final int maxConcurrentRunsOneFlow;
 
   @Inject
-  ExecutionController(final ExecutorLoader executorLoader, final ExecutorApiGateway apiGateway) {
+  ExecutionController(final Props azkProps, final ExecutorLoader executorLoader, final
+  ExecutorApiGateway apiGateway) {
     this.executorLoader = executorLoader;
     this.apiGateway = apiGateway;
+    this.maxConcurrentRunsOneFlow = getMaxConcurrentRunsOneFlow(azkProps);
+  }
+
+  private int getMaxConcurrentRunsOneFlow(final Props azkProps) {
+    // The default threshold is set to 30 for now, in case some users are affected. We may
+    // decrease this number in future, to better prevent DDos attacks.
+    return azkProps.getInt(ConfigurationKeys.MAX_CONCURRENT_RUNS_ONEFLOW,
+        DEFAULT_MAX_ONCURRENT_RUNS_ONEFLOW);
   }
 
   @Override
@@ -489,8 +505,71 @@ public class ExecutionController extends EventHandler implements ExecutorManager
   @Override
   public String submitExecutableFlow(final ExecutableFlow exflow, final String userId)
       throws ExecutorManagerException {
-    // Todo: insert the execution to DB queue
-    return null;
+    final String exFlowKey = exflow.getProjectName() + "." + exflow.getId() + ".submitFlow";
+    // Use project and flow name to prevent race condition when same flow is submitted by API and
+    // schedule at the same time
+    // causing two same flow submission entering this piece.
+    synchronized (exFlowKey.intern()) {
+      final String flowId = exflow.getFlowId();
+      logger.info("Submitting execution flow " + flowId + " by " + userId);
+
+      String message = "";
+
+      final int projectId = exflow.getProjectId();
+      exflow.setSubmitUser(userId);
+      exflow.setSubmitTime(System.currentTimeMillis());
+
+      final List<Integer> running = getRunningFlows(projectId, flowId);
+
+      ExecutionOptions options = exflow.getExecutionOptions();
+      if (options == null) {
+        options = new ExecutionOptions();
+      }
+
+      if (options.getDisabledJobs() != null) {
+        FlowUtils.applyDisabledJobs(options.getDisabledJobs(), exflow);
+      }
+
+      if (!running.isEmpty()) {
+        if (running.size() > this.maxConcurrentRunsOneFlow) {
+          throw new ExecutorManagerException("Flow " + flowId
+              + " has more than " + this.maxConcurrentRunsOneFlow + " concurrent runs. Skipping",
+              ExecutorManagerException.Reason.SkippedExecution);
+        } else if (options.getConcurrentOption().equals(
+            ExecutionOptions.CONCURRENT_OPTION_PIPELINE)) {
+          Collections.sort(running);
+          final Integer runningExecId = running.get(running.size() - 1);
+
+          options.setPipelineExecutionId(runningExecId);
+          message =
+              "Flow " + flowId + " is already running with exec id "
+                  + runningExecId + ". Pipelining level "
+                  + options.getPipelineLevel() + ". \n";
+        } else if (options.getConcurrentOption().equals(
+            ExecutionOptions.CONCURRENT_OPTION_SKIP)) {
+          throw new ExecutorManagerException("Flow " + flowId
+              + " is already running. Skipping execution.",
+              ExecutorManagerException.Reason.SkippedExecution);
+        } else {
+          message =
+              "Flow " + flowId + " is already running with exec id "
+                  + StringUtils.join(running, ",")
+                  + ". Will execute concurrently. \n";
+        }
+      }
+
+      final boolean memoryCheck =
+          !ProjectWhitelist.isProjectWhitelisted(exflow.getProjectId(),
+              ProjectWhitelist.WhitelistType.MemoryCheck);
+      options.setMemoryCheck(memoryCheck);
+
+      // The exflow id is set by the loader. So it's unavailable until after
+      // this call.
+      this.executorLoader.uploadExecutableFlow(exflow);
+
+      message += "Execution queued successfully with exec id " + exflow.getExecutionId();
+      return message;
+    }
   }
 
   @Override
diff --git a/azkaban-common/src/main/java/azkaban/executor/ExecutionFlowDao.java b/azkaban-common/src/main/java/azkaban/executor/ExecutionFlowDao.java
index acb189b..6483e22 100644
--- a/azkaban-common/src/main/java/azkaban/executor/ExecutionFlowDao.java
+++ b/azkaban-common/src/main/java/azkaban/executor/ExecutionFlowDao.java
@@ -279,6 +279,53 @@ public class ExecutionFlowDao {
     }
   }
 
+  public int selectAndUpdateExecution(final int executorId) throws ExecutorManagerException {
+    final String UPDATE_EXECUTION = "UPDATE execution_flows SET executor_id = ? where exec_id = ?";
+
+    final SQLTransaction<Integer> selectAndUpdateExecution = transOperator -> {
+      final List<Integer> execIds = transOperator.query(SelectFromExecutionFlows
+          .SELECT_EXECUTION_FOR_UPDATE, new SelectFromExecutionFlows());
+
+      int execId = -1;
+      if (!execIds.isEmpty()) {
+        execId = execIds.get(0);
+        transOperator.update(UPDATE_EXECUTION, executorId, execId);
+      }
+      transOperator.getConnection().commit();
+      return execId;
+    };
+
+    try {
+      return this.dbOperator.transaction(selectAndUpdateExecution);
+    } catch (final SQLException e) {
+      throw new ExecutorManagerException("Error selecting and updating execution with executor "
+          + executorId, e);
+    }
+  }
+
+  public static class SelectFromExecutionFlows implements
+      ResultSetHandler<List<Integer>> {
+
+    static String SELECT_EXECUTION_FOR_UPDATE = "SELECT exec_id from execution_flows where "
+        + "status = " + Status.PREPARING.getNumVal()
+        + " and executor_id is NULL and flow_data is NOT NULL ORDER BY submit_time ASC LIMIT 1 FOR "
+        + "UPDATE";
+
+    @Override
+    public List<Integer> handle(final ResultSet rs) throws SQLException {
+      if (!rs.next()) {
+        return Collections.emptyList();
+      }
+      final List<Integer> execIds = new ArrayList<>();
+      do {
+        final int execId = rs.getInt(1);
+        execIds.add(execId);
+      } while (rs.next());
+
+      return execIds;
+    }
+  }
+
   public static class FetchExecutableFlows implements
       ResultSetHandler<List<ExecutableFlow>> {
 
diff --git a/azkaban-common/src/main/java/azkaban/executor/ExecutorLoader.java b/azkaban-common/src/main/java/azkaban/executor/ExecutorLoader.java
index 8827344..db0e807 100644
--- a/azkaban-common/src/main/java/azkaban/executor/ExecutorLoader.java
+++ b/azkaban-common/src/main/java/azkaban/executor/ExecutorLoader.java
@@ -277,4 +277,6 @@ public interface ExecutorLoader {
 
   int removeExecutionLogsByTime(long millis)
       throws ExecutorManagerException;
+
+  int selectAndUpdateExecution(final int executorId) throws ExecutorManagerException;
 }
diff --git a/azkaban-common/src/main/java/azkaban/executor/JdbcExecutorLoader.java b/azkaban-common/src/main/java/azkaban/executor/JdbcExecutorLoader.java
index 13217e4..302b889 100644
--- a/azkaban-common/src/main/java/azkaban/executor/JdbcExecutorLoader.java
+++ b/azkaban-common/src/main/java/azkaban/executor/JdbcExecutorLoader.java
@@ -339,4 +339,9 @@ public class JdbcExecutorLoader implements ExecutorLoader {
   public void unassignExecutor(final int executionId) throws ExecutorManagerException {
     this.assignExecutorDao.unassignExecutor(executionId);
   }
+
+  @Override
+  public int selectAndUpdateExecution(final int executorId) throws ExecutorManagerException {
+    return this.executionFlowDao.selectAndUpdateExecution(executorId);
+  }
 }
diff --git a/azkaban-common/src/test/java/azkaban/executor/ExecutionControllerTest.java b/azkaban-common/src/test/java/azkaban/executor/ExecutionControllerTest.java
index a41e2db..3102761 100644
--- a/azkaban-common/src/test/java/azkaban/executor/ExecutionControllerTest.java
+++ b/azkaban-common/src/test/java/azkaban/executor/ExecutionControllerTest.java
@@ -16,10 +16,15 @@
 package azkaban.executor;
 
 import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
 import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.when;
 
+import azkaban.Constants;
+import azkaban.user.User;
 import azkaban.utils.Pair;
+import azkaban.utils.Props;
 import azkaban.utils.TestUtils;
 import com.google.common.collect.ImmutableList;
 import com.google.common.collect.ImmutableMap;
@@ -43,12 +48,24 @@ public class ExecutionControllerTest {
   private ExecutionController controller;
   private ExecutorLoader loader;
   private ExecutorApiGateway apiGateway;
+  private Props props;
+  private User user;
+  private ExecutableFlow flow1;
+  private ExecutableFlow flow2;
+  private ExecutableFlow flow3;
+  private ExecutableFlow flow4;
+  private ExecutionReference ref1;
+  private ExecutionReference ref2;
+  private ExecutionReference ref3;
 
   @Before
   public void setup() throws Exception {
+    this.props = new Props();
+    this.user = TestUtils.getTestUser();
     this.loader = mock(ExecutorLoader.class);
     this.apiGateway = mock(ExecutorApiGateway.class);
-    this.controller = new ExecutionController(this.loader, this.apiGateway);
+    this.props.put(Constants.ConfigurationKeys.MAX_CONCURRENT_RUNS_ONEFLOW, 1);
+    this.controller = new ExecutionController(this.props, this.loader, this.apiGateway);
 
     final Executor executor1 = new Executor(1, "localhost", 12345, true);
     final Executor executor2 = new Executor(2, "localhost", 12346, true);
@@ -57,27 +74,27 @@ public class ExecutionControllerTest {
     this.allExecutors = ImmutableList.of(executor1, executor2, executor3);
     when(this.loader.fetchActiveExecutors()).thenReturn(this.activeExecutors);
 
-    final ExecutableFlow flow1 = TestUtils.createTestExecutableFlow("exectest1", "exec1");
-    final ExecutableFlow flow2 = TestUtils.createTestExecutableFlow("exectest1", "exec2");
-    final ExecutableFlow flow3 = TestUtils.createTestExecutableFlow("exectest1", "exec2");
-    flow1.setExecutionId(1);
-    flow2.setExecutionId(2);
-    flow3.setExecutionId(3);
-    final ExecutionReference ref1 =
-        new ExecutionReference(flow1.getExecutionId(), null);
-    final ExecutionReference ref2 =
-        new ExecutionReference(flow2.getExecutionId(), executor2);
-    final ExecutionReference ref3 =
-        new ExecutionReference(flow3.getExecutionId(), executor3);
+    this.flow1 = TestUtils.createTestExecutableFlow("exectest1", "exec1");
+    this.flow2 = TestUtils.createTestExecutableFlow("exectest1", "exec2");
+    this.flow3 = TestUtils.createTestExecutableFlow("exectest1", "exec2");
+    this.flow4 = TestUtils.createTestExecutableFlow("exectest1", "exec2");
+    this.flow1.setExecutionId(1);
+    this.flow2.setExecutionId(2);
+    this.flow3.setExecutionId(3);
+    this.flow4.setExecutionId(4);
+    this.ref1 = new ExecutionReference(this.flow1.getExecutionId(), null);
+    this.ref2 = new ExecutionReference(this.flow2.getExecutionId(), executor2);
+    this.ref3 = new ExecutionReference(this.flow3.getExecutionId(), executor3);
 
     this.activeFlows = ImmutableMap
-        .of(flow2.getExecutionId(), new Pair<>(ref2, flow2), flow3.getExecutionId(),
-            new Pair<>(ref3, flow3));
+        .of(this.flow2.getExecutionId(), new Pair<>(this.ref2, this.flow2),
+            this.flow3.getExecutionId(), new Pair<>(this.ref3, this.flow3));
     when(this.loader.fetchActiveFlows()).thenReturn(this.activeFlows);
 
-    this.unfinishedFlows = ImmutableMap.of(flow1.getExecutionId(), new Pair<>(ref1, flow1),
-        flow2.getExecutionId(), new Pair<>(ref2, flow2), flow3.getExecutionId(), new Pair<>(ref3,
-            flow3));
+    this.unfinishedFlows = ImmutableMap
+        .of(this.flow1.getExecutionId(), new Pair<>(this.ref1, this.flow1),
+            this.flow2.getExecutionId(), new Pair<>(this.ref2, this.flow2),
+            this.flow3.getExecutionId(), new Pair<>(this.ref3, this.flow3));
     when(this.loader.fetchUnfinishedFlows()).thenReturn(this.unfinishedFlows);
   }
 
@@ -97,14 +114,14 @@ public class ExecutionControllerTest {
 
   @Test
   public void testFetchActiveFlowByProject() throws Exception {
-    final ExecutableFlow flow2 = this.unfinishedFlows.get(2).getSecond();
-    final ExecutableFlow flow3 = this.unfinishedFlows.get(3).getSecond();
-    final List<Integer> executions = this.controller.getRunningFlows(flow2.getProjectId(), flow2
-        .getFlowId());
-    assertThat(executions.contains(flow2.getExecutionId())).isTrue();
-    assertThat(executions.contains(flow3.getExecutionId())).isTrue();
-    assertThat(this.controller.isFlowRunning(flow2.getProjectId(), flow2.getFlowId())).isTrue();
-    assertThat(this.controller.isFlowRunning(flow3.getProjectId(), flow3.getFlowId())).isTrue();
+    final List<Integer> executions = this.controller
+        .getRunningFlows(this.flow2.getProjectId(), this.flow2.getFlowId());
+    assertThat(executions.contains(this.flow2.getExecutionId())).isTrue();
+    assertThat(executions.contains(this.flow3.getExecutionId())).isTrue();
+    assertThat(this.controller.isFlowRunning(this.flow2.getProjectId(), this.flow2.getFlowId()))
+        .isTrue();
+    assertThat(this.controller.isFlowRunning(this.flow3.getProjectId(), this.flow3.getFlowId()))
+        .isTrue();
   }
 
   @Test
@@ -123,4 +140,40 @@ public class ExecutionControllerTest {
         activeExecutorServerHosts.contains(executor.getHost() + ":" + executor.getPort()))
         .isTrue());
   }
+
+  @Test
+  public void testSubmitFlows() throws Exception {
+    this.controller.submitExecutableFlow(this.flow1, this.user.getUserId());
+    verify(this.loader).uploadExecutableFlow(this.flow1);
+  }
+
+  @Test
+  public void testSubmitFlowsExceedingMaxConcurrentRuns() throws Exception {
+    this.unfinishedFlows = new HashMap<>();
+    when(this.loader.fetchUnfinishedFlows()).thenReturn(this.unfinishedFlows);
+    this.controller.submitExecutableFlow(this.flow2, this.user.getUserId());
+    verify(this.loader).uploadExecutableFlow(this.flow2);
+    this.unfinishedFlows.put(this.flow2.getExecutionId(), new Pair<>(this.ref2, this.flow2));
+
+    this.controller.submitExecutableFlow(this.flow3, this.user.getUserId());
+    verify(this.loader).uploadExecutableFlow(this.flow3);
+    this.unfinishedFlows.put(this.flow3.getExecutionId(), new Pair<>(this.ref3, this.flow3));
+
+    assertThatThrownBy(() -> this.controller.submitExecutableFlow(this.flow4, this.user.getUserId
+        ())).isInstanceOf(ExecutorManagerException.class).hasMessageContaining("Flow " + this
+        .flow4.getId() + " has more than 1 concurrent runs. Skipping");
+  }
+
+  @Test
+  public void testSubmitFlowsWithSkipOption() throws Exception {
+    this.unfinishedFlows = new HashMap<>();
+    when(this.loader.fetchUnfinishedFlows()).thenReturn(this.unfinishedFlows);
+    this.controller.submitExecutableFlow(this.flow2, this.user.getUserId());
+    this.unfinishedFlows.put(this.flow2.getExecutionId(), new Pair<>(this.ref2, this.flow2));
+    this.flow3.getExecutionOptions().setConcurrentOption(ExecutionOptions.CONCURRENT_OPTION_SKIP);
+    assertThatThrownBy(
+        () -> this.controller.submitExecutableFlow(this.flow3, this.user.getUserId()))
+        .isInstanceOf(ExecutorManagerException.class).hasMessageContaining(
+        "Flow " + this.flow3.getId() + " is already running. Skipping execution.");
+  }
 }
diff --git a/azkaban-common/src/test/java/azkaban/executor/ExecutionFlowDaoTest.java b/azkaban-common/src/test/java/azkaban/executor/ExecutionFlowDaoTest.java
index 35ad37f..ee2692f 100644
--- a/azkaban-common/src/test/java/azkaban/executor/ExecutionFlowDaoTest.java
+++ b/azkaban-common/src/test/java/azkaban/executor/ExecutionFlowDaoTest.java
@@ -469,6 +469,18 @@ public class ExecutionFlowDaoTest {
     assertThat(inOutProps.getSecond().get("hello")).isEqualTo("output");
   }
 
+  @Test
+  public void testSelectAndUpdateExecution() throws Exception {
+    final ExecutableFlow flow = TestUtils.createTestExecutableFlow("exectest1", "exec1");
+    flow.setExecutionId(1);
+    this.executionFlowDao.uploadExecutableFlow(flow);
+    final Executor executor = this.executorDao.addExecutor("localhost", 12345);
+    assertThat(this.executionFlowDao.selectAndUpdateExecution(executor.getId())).isEqualTo(flow
+        .getExecutionId());
+    assertThat(this.executorDao.fetchExecutorByExecutionId(flow.getExecutionId())).isEqualTo
+        (executor);
+  }
+
   private void assertTwoFlowSame(final ExecutableFlow flow1, final ExecutableFlow flow2) {
     assertThat(flow1.getExecutionId()).isEqualTo(flow2.getExecutionId());
     assertThat(flow1.getStatus()).isEqualTo(flow2.getStatus());
diff --git a/azkaban-common/src/test/java/azkaban/executor/MockExecutorLoader.java b/azkaban-common/src/test/java/azkaban/executor/MockExecutorLoader.java
index 8033629..b22a385 100644
--- a/azkaban-common/src/test/java/azkaban/executor/MockExecutorLoader.java
+++ b/azkaban-common/src/test/java/azkaban/executor/MockExecutorLoader.java
@@ -420,4 +420,9 @@ public class MockExecutorLoader implements ExecutorLoader {
       throws ExecutorManagerException {
     return new ArrayList<>();
   }
+
+  @Override
+  public int selectAndUpdateExecution(final int executorId) throws ExecutorManagerException {
+    return 1;
+  }
 }
diff --git a/azkaban-exec-server/src/main/java/azkaban/execapp/FlowRunnerManager.java b/azkaban-exec-server/src/main/java/azkaban/execapp/FlowRunnerManager.java
index 3111c3f..7cfb598 100644
--- a/azkaban-exec-server/src/main/java/azkaban/execapp/FlowRunnerManager.java
+++ b/azkaban-exec-server/src/main/java/azkaban/execapp/FlowRunnerManager.java
@@ -16,6 +16,8 @@
 
 package azkaban.execapp;
 
+import static java.util.Objects.requireNonNull;
+
 import azkaban.Constants;
 import azkaban.Constants.ConfigurationKeys;
 import azkaban.event.Event;
@@ -61,9 +63,11 @@ import java.util.List;
 import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.Executors;
 import java.util.concurrent.Future;
 import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.RejectedExecutionException;
+import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
 import javax.annotation.Nullable;
 import javax.inject.Inject;
@@ -126,16 +130,17 @@ public class FlowRunnerManager implements EventListener,
   private final Object executionDirDeletionSync = new Object();
 
   private final int numThreads;
-  private int threadPoolQueueSize = -1;
   private final int numJobThreadPerFlow;
-  private Props globalProps;
-  private long lastCleanerThreadCheckTime = -1;
-  private long executionDirRetention = 1 * 24 * 60 * 60 * 1000; // 1 Day
   // We want to limit the log sizes to about 20 megs
   private final String jobLogChunkSize;
   private final int jobLogNumFiles;
   // If true, jobs will validate proxy user against a list of valid proxy users.
   private final boolean validateProxyUser;
+  private PollingService pollingService;
+  private int threadPoolQueueSize = -1;
+  private Props globalProps;
+  private long lastCleanerThreadCheckTime = -1;
+  private long executionDirRetention = 1 * 24 * 60 * 60 * 1000; // 1 Day
   // date time of the the last flow submitted.
   private long lastFlowSubmittedDate = 0;
 
@@ -200,6 +205,13 @@ public class FlowRunnerManager implements EventListener,
 
     this.cleanerThread = new CleanerThread();
     this.cleanerThread.start();
+
+    if (this.azkabanProps.getBoolean(ConfigurationKeys.AZKABAN_POLL_MODEL, false)) {
+      this.logger.info("Starting polling service.");
+      this.pollingService = new PollingService(this.azkabanProps.getLong
+          (ConfigurationKeys.AZKABAN_POLLING_INTERVAL_MS, 1000));
+      this.pollingService.start();
+    }
   }
 
   public double getProjectDirCacheHitRatio() {
@@ -290,7 +302,7 @@ public class FlowRunnerManager implements EventListener,
     submitFlowRunner(runner);
   }
 
-  private boolean isAlreadyRunning(int execId) throws ExecutorManagerException {
+  private boolean isAlreadyRunning(final int execId) throws ExecutorManagerException {
     if (this.runningFlows.containsKey(execId)) {
       logger.info("Execution " + execId + " is already in running.");
       if (!this.submittedFlows.containsValue(execId)) {
@@ -723,6 +735,9 @@ public class FlowRunnerManager implements EventListener,
    */
   public void shutdown() {
     logger.warn("Shutting down FlowRunnerManager...");
+    if (this.azkabanProps.getBoolean(ConfigurationKeys.AZKABAN_POLL_MODEL, false)) {
+      this.pollingService.shutdown();
+    }
     this.executorService.shutdown();
     boolean result = false;
     while (!result) {
@@ -742,6 +757,9 @@ public class FlowRunnerManager implements EventListener,
    */
   public void shutdownNow() {
     logger.warn("Shutting down FlowRunnerManager now...");
+    if (this.azkabanProps.getBoolean(ConfigurationKeys.AZKABAN_POLL_MODEL, false)) {
+      this.pollingService.shutdown();
+    }
     this.executorService.shutdownNow();
     this.triggerManager.shutdown();
   }
@@ -906,4 +924,56 @@ public class FlowRunnerManager implements EventListener,
     }
   }
 
+  /**
+   * Polls new executions from DB periodically and submits the executions to run on the executor.
+   */
+  @SuppressWarnings("FutureReturnValueIgnored")
+  private class PollingService {
+
+    private final long pollingIntervalMs;
+    private final ScheduledExecutorService scheduler;
+    private int executorId = -1;
+
+    public PollingService(final long pollingIntervalMs) {
+      this.pollingIntervalMs = pollingIntervalMs;
+      this.scheduler = Executors.newSingleThreadScheduledExecutor();
+    }
+
+    public void start() {
+      this.scheduler.scheduleAtFixedRate(() -> pollExecution(), 0L, this.pollingIntervalMs,
+          TimeUnit.MILLISECONDS);
+    }
+
+    private void pollExecution() {
+      if (this.executorId == -1) {
+        if (AzkabanExecutorServer.getApp() != null) {
+          try {
+            final Executor executor = requireNonNull(FlowRunnerManager.this.executorLoader
+                .fetchExecutor(AzkabanExecutorServer.getApp().getHost(),
+                    AzkabanExecutorServer.getApp().getPort()), "The executor can not be null");
+            this.executorId = executor.getId();
+          } catch (final Exception e) {
+            logger.error("Failed to fetch executor ", e);
+          }
+        }
+      } else {
+        try {
+          // Todo jamiesjc: check executor capacity before polling from DB
+          final int execId = FlowRunnerManager.this.executorLoader
+              .selectAndUpdateExecution(this.executorId);
+          if (execId != -1) {
+            logger.info("Submitting flow " + execId);
+            submitFlow(execId);
+          }
+        } catch (final Exception e) {
+          logger.error("Failed to submit flow ", e);
+        }
+      }
+    }
+
+    public void shutdown() {
+      this.scheduler.shutdown();
+      this.scheduler.shutdownNow();
+    }
+  }
 }