Details
diff --git a/az-core/src/main/java/azkaban/Constants.java b/az-core/src/main/java/azkaban/Constants.java
index 9f8409f..afcea5a 100644
--- a/az-core/src/main/java/azkaban/Constants.java
+++ b/az-core/src/main/java/azkaban/Constants.java
@@ -92,6 +92,7 @@ public class Constants {
// Configures Azkaban to use new polling model for dispatching
public static final String AZKABAN_POLL_MODEL = "azkaban.poll.model";
+ public static final String AZKABAN_POLLING_INTERVAL_MS = "azkaban.polling.interval.ms";
// Configures Azkaban Flow Version in project YAML file
public static final String AZKABAN_FLOW_VERSION = "azkaban-flow-version";
diff --git a/azkaban-common/src/main/java/azkaban/executor/ExecutionController.java b/azkaban-common/src/main/java/azkaban/executor/ExecutionController.java
index 55da336..60c4f16 100644
--- a/azkaban-common/src/main/java/azkaban/executor/ExecutionController.java
+++ b/azkaban-common/src/main/java/azkaban/executor/ExecutionController.java
@@ -15,16 +15,21 @@
*/
package azkaban.executor;
+import azkaban.Constants.ConfigurationKeys;
import azkaban.event.EventHandler;
+import azkaban.flow.FlowUtils;
import azkaban.project.Project;
+import azkaban.project.ProjectWhitelist;
import azkaban.utils.FileIOUtils.LogData;
import azkaban.utils.Pair;
+import azkaban.utils.Props;
import java.io.IOException;
import java.lang.Thread.State;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
+import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
@@ -46,13 +51,24 @@ public class ExecutionController extends EventHandler implements ExecutorManager
private static final Logger logger = LoggerFactory.getLogger(ExecutionController.class);
private static final Duration RECENTLY_FINISHED_LIFETIME = Duration.ofMinutes(10);
+ private static final int DEFAULT_MAX_ONCURRENT_RUNS_ONEFLOW = 30;
private final ExecutorLoader executorLoader;
private final ExecutorApiGateway apiGateway;
+ private final int maxConcurrentRunsOneFlow;
@Inject
- ExecutionController(final ExecutorLoader executorLoader, final ExecutorApiGateway apiGateway) {
+ ExecutionController(final Props azkProps, final ExecutorLoader executorLoader, final
+ ExecutorApiGateway apiGateway) {
this.executorLoader = executorLoader;
this.apiGateway = apiGateway;
+ this.maxConcurrentRunsOneFlow = getMaxConcurrentRunsOneFlow(azkProps);
+ }
+
+ private int getMaxConcurrentRunsOneFlow(final Props azkProps) {
+ // The default threshold is set to 30 for now, in case some users are affected. We may
+ // decrease this number in future, to better prevent DDos attacks.
+ return azkProps.getInt(ConfigurationKeys.MAX_CONCURRENT_RUNS_ONEFLOW,
+ DEFAULT_MAX_ONCURRENT_RUNS_ONEFLOW);
}
@Override
@@ -489,8 +505,71 @@ public class ExecutionController extends EventHandler implements ExecutorManager
@Override
public String submitExecutableFlow(final ExecutableFlow exflow, final String userId)
throws ExecutorManagerException {
- // Todo: insert the execution to DB queue
- return null;
+ final String exFlowKey = exflow.getProjectName() + "." + exflow.getId() + ".submitFlow";
+ // Use project and flow name to prevent race condition when same flow is submitted by API and
+ // schedule at the same time
+ // causing two same flow submission entering this piece.
+ synchronized (exFlowKey.intern()) {
+ final String flowId = exflow.getFlowId();
+ logger.info("Submitting execution flow " + flowId + " by " + userId);
+
+ String message = "";
+
+ final int projectId = exflow.getProjectId();
+ exflow.setSubmitUser(userId);
+ exflow.setSubmitTime(System.currentTimeMillis());
+
+ final List<Integer> running = getRunningFlows(projectId, flowId);
+
+ ExecutionOptions options = exflow.getExecutionOptions();
+ if (options == null) {
+ options = new ExecutionOptions();
+ }
+
+ if (options.getDisabledJobs() != null) {
+ FlowUtils.applyDisabledJobs(options.getDisabledJobs(), exflow);
+ }
+
+ if (!running.isEmpty()) {
+ if (running.size() > this.maxConcurrentRunsOneFlow) {
+ throw new ExecutorManagerException("Flow " + flowId
+ + " has more than " + this.maxConcurrentRunsOneFlow + " concurrent runs. Skipping",
+ ExecutorManagerException.Reason.SkippedExecution);
+ } else if (options.getConcurrentOption().equals(
+ ExecutionOptions.CONCURRENT_OPTION_PIPELINE)) {
+ Collections.sort(running);
+ final Integer runningExecId = running.get(running.size() - 1);
+
+ options.setPipelineExecutionId(runningExecId);
+ message =
+ "Flow " + flowId + " is already running with exec id "
+ + runningExecId + ". Pipelining level "
+ + options.getPipelineLevel() + ". \n";
+ } else if (options.getConcurrentOption().equals(
+ ExecutionOptions.CONCURRENT_OPTION_SKIP)) {
+ throw new ExecutorManagerException("Flow " + flowId
+ + " is already running. Skipping execution.",
+ ExecutorManagerException.Reason.SkippedExecution);
+ } else {
+ message =
+ "Flow " + flowId + " is already running with exec id "
+ + StringUtils.join(running, ",")
+ + ". Will execute concurrently. \n";
+ }
+ }
+
+ final boolean memoryCheck =
+ !ProjectWhitelist.isProjectWhitelisted(exflow.getProjectId(),
+ ProjectWhitelist.WhitelistType.MemoryCheck);
+ options.setMemoryCheck(memoryCheck);
+
+ // The exflow id is set by the loader. So it's unavailable until after
+ // this call.
+ this.executorLoader.uploadExecutableFlow(exflow);
+
+ message += "Execution queued successfully with exec id " + exflow.getExecutionId();
+ return message;
+ }
}
@Override
diff --git a/azkaban-common/src/main/java/azkaban/executor/ExecutionFlowDao.java b/azkaban-common/src/main/java/azkaban/executor/ExecutionFlowDao.java
index acb189b..6483e22 100644
--- a/azkaban-common/src/main/java/azkaban/executor/ExecutionFlowDao.java
+++ b/azkaban-common/src/main/java/azkaban/executor/ExecutionFlowDao.java
@@ -279,6 +279,53 @@ public class ExecutionFlowDao {
}
}
+ public int selectAndUpdateExecution(final int executorId) throws ExecutorManagerException {
+ final String UPDATE_EXECUTION = "UPDATE execution_flows SET executor_id = ? where exec_id = ?";
+
+ final SQLTransaction<Integer> selectAndUpdateExecution = transOperator -> {
+ final List<Integer> execIds = transOperator.query(SelectFromExecutionFlows
+ .SELECT_EXECUTION_FOR_UPDATE, new SelectFromExecutionFlows());
+
+ int execId = -1;
+ if (!execIds.isEmpty()) {
+ execId = execIds.get(0);
+ transOperator.update(UPDATE_EXECUTION, executorId, execId);
+ }
+ transOperator.getConnection().commit();
+ return execId;
+ };
+
+ try {
+ return this.dbOperator.transaction(selectAndUpdateExecution);
+ } catch (final SQLException e) {
+ throw new ExecutorManagerException("Error selecting and updating execution with executor "
+ + executorId, e);
+ }
+ }
+
+ public static class SelectFromExecutionFlows implements
+ ResultSetHandler<List<Integer>> {
+
+ static String SELECT_EXECUTION_FOR_UPDATE = "SELECT exec_id from execution_flows where "
+ + "status = " + Status.PREPARING.getNumVal()
+ + " and executor_id is NULL and flow_data is NOT NULL ORDER BY submit_time ASC LIMIT 1 FOR "
+ + "UPDATE";
+
+ @Override
+ public List<Integer> handle(final ResultSet rs) throws SQLException {
+ if (!rs.next()) {
+ return Collections.emptyList();
+ }
+ final List<Integer> execIds = new ArrayList<>();
+ do {
+ final int execId = rs.getInt(1);
+ execIds.add(execId);
+ } while (rs.next());
+
+ return execIds;
+ }
+ }
+
public static class FetchExecutableFlows implements
ResultSetHandler<List<ExecutableFlow>> {
diff --git a/azkaban-common/src/main/java/azkaban/executor/ExecutorLoader.java b/azkaban-common/src/main/java/azkaban/executor/ExecutorLoader.java
index 8827344..db0e807 100644
--- a/azkaban-common/src/main/java/azkaban/executor/ExecutorLoader.java
+++ b/azkaban-common/src/main/java/azkaban/executor/ExecutorLoader.java
@@ -277,4 +277,6 @@ public interface ExecutorLoader {
int removeExecutionLogsByTime(long millis)
throws ExecutorManagerException;
+
+ int selectAndUpdateExecution(final int executorId) throws ExecutorManagerException;
}
diff --git a/azkaban-common/src/main/java/azkaban/executor/JdbcExecutorLoader.java b/azkaban-common/src/main/java/azkaban/executor/JdbcExecutorLoader.java
index 13217e4..302b889 100644
--- a/azkaban-common/src/main/java/azkaban/executor/JdbcExecutorLoader.java
+++ b/azkaban-common/src/main/java/azkaban/executor/JdbcExecutorLoader.java
@@ -339,4 +339,9 @@ public class JdbcExecutorLoader implements ExecutorLoader {
public void unassignExecutor(final int executionId) throws ExecutorManagerException {
this.assignExecutorDao.unassignExecutor(executionId);
}
+
+ @Override
+ public int selectAndUpdateExecution(final int executorId) throws ExecutorManagerException {
+ return this.executionFlowDao.selectAndUpdateExecution(executorId);
+ }
}
diff --git a/azkaban-common/src/test/java/azkaban/executor/ExecutionControllerTest.java b/azkaban-common/src/test/java/azkaban/executor/ExecutionControllerTest.java
index a41e2db..3102761 100644
--- a/azkaban-common/src/test/java/azkaban/executor/ExecutionControllerTest.java
+++ b/azkaban-common/src/test/java/azkaban/executor/ExecutionControllerTest.java
@@ -16,10 +16,15 @@
package azkaban.executor;
import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
+import azkaban.Constants;
+import azkaban.user.User;
import azkaban.utils.Pair;
+import azkaban.utils.Props;
import azkaban.utils.TestUtils;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
@@ -43,12 +48,24 @@ public class ExecutionControllerTest {
private ExecutionController controller;
private ExecutorLoader loader;
private ExecutorApiGateway apiGateway;
+ private Props props;
+ private User user;
+ private ExecutableFlow flow1;
+ private ExecutableFlow flow2;
+ private ExecutableFlow flow3;
+ private ExecutableFlow flow4;
+ private ExecutionReference ref1;
+ private ExecutionReference ref2;
+ private ExecutionReference ref3;
@Before
public void setup() throws Exception {
+ this.props = new Props();
+ this.user = TestUtils.getTestUser();
this.loader = mock(ExecutorLoader.class);
this.apiGateway = mock(ExecutorApiGateway.class);
- this.controller = new ExecutionController(this.loader, this.apiGateway);
+ this.props.put(Constants.ConfigurationKeys.MAX_CONCURRENT_RUNS_ONEFLOW, 1);
+ this.controller = new ExecutionController(this.props, this.loader, this.apiGateway);
final Executor executor1 = new Executor(1, "localhost", 12345, true);
final Executor executor2 = new Executor(2, "localhost", 12346, true);
@@ -57,27 +74,27 @@ public class ExecutionControllerTest {
this.allExecutors = ImmutableList.of(executor1, executor2, executor3);
when(this.loader.fetchActiveExecutors()).thenReturn(this.activeExecutors);
- final ExecutableFlow flow1 = TestUtils.createTestExecutableFlow("exectest1", "exec1");
- final ExecutableFlow flow2 = TestUtils.createTestExecutableFlow("exectest1", "exec2");
- final ExecutableFlow flow3 = TestUtils.createTestExecutableFlow("exectest1", "exec2");
- flow1.setExecutionId(1);
- flow2.setExecutionId(2);
- flow3.setExecutionId(3);
- final ExecutionReference ref1 =
- new ExecutionReference(flow1.getExecutionId(), null);
- final ExecutionReference ref2 =
- new ExecutionReference(flow2.getExecutionId(), executor2);
- final ExecutionReference ref3 =
- new ExecutionReference(flow3.getExecutionId(), executor3);
+ this.flow1 = TestUtils.createTestExecutableFlow("exectest1", "exec1");
+ this.flow2 = TestUtils.createTestExecutableFlow("exectest1", "exec2");
+ this.flow3 = TestUtils.createTestExecutableFlow("exectest1", "exec2");
+ this.flow4 = TestUtils.createTestExecutableFlow("exectest1", "exec2");
+ this.flow1.setExecutionId(1);
+ this.flow2.setExecutionId(2);
+ this.flow3.setExecutionId(3);
+ this.flow4.setExecutionId(4);
+ this.ref1 = new ExecutionReference(this.flow1.getExecutionId(), null);
+ this.ref2 = new ExecutionReference(this.flow2.getExecutionId(), executor2);
+ this.ref3 = new ExecutionReference(this.flow3.getExecutionId(), executor3);
this.activeFlows = ImmutableMap
- .of(flow2.getExecutionId(), new Pair<>(ref2, flow2), flow3.getExecutionId(),
- new Pair<>(ref3, flow3));
+ .of(this.flow2.getExecutionId(), new Pair<>(this.ref2, this.flow2),
+ this.flow3.getExecutionId(), new Pair<>(this.ref3, this.flow3));
when(this.loader.fetchActiveFlows()).thenReturn(this.activeFlows);
- this.unfinishedFlows = ImmutableMap.of(flow1.getExecutionId(), new Pair<>(ref1, flow1),
- flow2.getExecutionId(), new Pair<>(ref2, flow2), flow3.getExecutionId(), new Pair<>(ref3,
- flow3));
+ this.unfinishedFlows = ImmutableMap
+ .of(this.flow1.getExecutionId(), new Pair<>(this.ref1, this.flow1),
+ this.flow2.getExecutionId(), new Pair<>(this.ref2, this.flow2),
+ this.flow3.getExecutionId(), new Pair<>(this.ref3, this.flow3));
when(this.loader.fetchUnfinishedFlows()).thenReturn(this.unfinishedFlows);
}
@@ -97,14 +114,14 @@ public class ExecutionControllerTest {
@Test
public void testFetchActiveFlowByProject() throws Exception {
- final ExecutableFlow flow2 = this.unfinishedFlows.get(2).getSecond();
- final ExecutableFlow flow3 = this.unfinishedFlows.get(3).getSecond();
- final List<Integer> executions = this.controller.getRunningFlows(flow2.getProjectId(), flow2
- .getFlowId());
- assertThat(executions.contains(flow2.getExecutionId())).isTrue();
- assertThat(executions.contains(flow3.getExecutionId())).isTrue();
- assertThat(this.controller.isFlowRunning(flow2.getProjectId(), flow2.getFlowId())).isTrue();
- assertThat(this.controller.isFlowRunning(flow3.getProjectId(), flow3.getFlowId())).isTrue();
+ final List<Integer> executions = this.controller
+ .getRunningFlows(this.flow2.getProjectId(), this.flow2.getFlowId());
+ assertThat(executions.contains(this.flow2.getExecutionId())).isTrue();
+ assertThat(executions.contains(this.flow3.getExecutionId())).isTrue();
+ assertThat(this.controller.isFlowRunning(this.flow2.getProjectId(), this.flow2.getFlowId()))
+ .isTrue();
+ assertThat(this.controller.isFlowRunning(this.flow3.getProjectId(), this.flow3.getFlowId()))
+ .isTrue();
}
@Test
@@ -123,4 +140,40 @@ public class ExecutionControllerTest {
activeExecutorServerHosts.contains(executor.getHost() + ":" + executor.getPort()))
.isTrue());
}
+
+ @Test
+ public void testSubmitFlows() throws Exception {
+ this.controller.submitExecutableFlow(this.flow1, this.user.getUserId());
+ verify(this.loader).uploadExecutableFlow(this.flow1);
+ }
+
+ @Test
+ public void testSubmitFlowsExceedingMaxConcurrentRuns() throws Exception {
+ this.unfinishedFlows = new HashMap<>();
+ when(this.loader.fetchUnfinishedFlows()).thenReturn(this.unfinishedFlows);
+ this.controller.submitExecutableFlow(this.flow2, this.user.getUserId());
+ verify(this.loader).uploadExecutableFlow(this.flow2);
+ this.unfinishedFlows.put(this.flow2.getExecutionId(), new Pair<>(this.ref2, this.flow2));
+
+ this.controller.submitExecutableFlow(this.flow3, this.user.getUserId());
+ verify(this.loader).uploadExecutableFlow(this.flow3);
+ this.unfinishedFlows.put(this.flow3.getExecutionId(), new Pair<>(this.ref3, this.flow3));
+
+ assertThatThrownBy(() -> this.controller.submitExecutableFlow(this.flow4, this.user.getUserId
+ ())).isInstanceOf(ExecutorManagerException.class).hasMessageContaining("Flow " + this
+ .flow4.getId() + " has more than 1 concurrent runs. Skipping");
+ }
+
+ @Test
+ public void testSubmitFlowsWithSkipOption() throws Exception {
+ this.unfinishedFlows = new HashMap<>();
+ when(this.loader.fetchUnfinishedFlows()).thenReturn(this.unfinishedFlows);
+ this.controller.submitExecutableFlow(this.flow2, this.user.getUserId());
+ this.unfinishedFlows.put(this.flow2.getExecutionId(), new Pair<>(this.ref2, this.flow2));
+ this.flow3.getExecutionOptions().setConcurrentOption(ExecutionOptions.CONCURRENT_OPTION_SKIP);
+ assertThatThrownBy(
+ () -> this.controller.submitExecutableFlow(this.flow3, this.user.getUserId()))
+ .isInstanceOf(ExecutorManagerException.class).hasMessageContaining(
+ "Flow " + this.flow3.getId() + " is already running. Skipping execution.");
+ }
}
diff --git a/azkaban-common/src/test/java/azkaban/executor/ExecutionFlowDaoTest.java b/azkaban-common/src/test/java/azkaban/executor/ExecutionFlowDaoTest.java
index 35ad37f..ee2692f 100644
--- a/azkaban-common/src/test/java/azkaban/executor/ExecutionFlowDaoTest.java
+++ b/azkaban-common/src/test/java/azkaban/executor/ExecutionFlowDaoTest.java
@@ -469,6 +469,18 @@ public class ExecutionFlowDaoTest {
assertThat(inOutProps.getSecond().get("hello")).isEqualTo("output");
}
+ @Test
+ public void testSelectAndUpdateExecution() throws Exception {
+ final ExecutableFlow flow = TestUtils.createTestExecutableFlow("exectest1", "exec1");
+ flow.setExecutionId(1);
+ this.executionFlowDao.uploadExecutableFlow(flow);
+ final Executor executor = this.executorDao.addExecutor("localhost", 12345);
+ assertThat(this.executionFlowDao.selectAndUpdateExecution(executor.getId())).isEqualTo(flow
+ .getExecutionId());
+ assertThat(this.executorDao.fetchExecutorByExecutionId(flow.getExecutionId())).isEqualTo
+ (executor);
+ }
+
private void assertTwoFlowSame(final ExecutableFlow flow1, final ExecutableFlow flow2) {
assertThat(flow1.getExecutionId()).isEqualTo(flow2.getExecutionId());
assertThat(flow1.getStatus()).isEqualTo(flow2.getStatus());
diff --git a/azkaban-common/src/test/java/azkaban/executor/MockExecutorLoader.java b/azkaban-common/src/test/java/azkaban/executor/MockExecutorLoader.java
index 8033629..b22a385 100644
--- a/azkaban-common/src/test/java/azkaban/executor/MockExecutorLoader.java
+++ b/azkaban-common/src/test/java/azkaban/executor/MockExecutorLoader.java
@@ -420,4 +420,9 @@ public class MockExecutorLoader implements ExecutorLoader {
throws ExecutorManagerException {
return new ArrayList<>();
}
+
+ @Override
+ public int selectAndUpdateExecution(final int executorId) throws ExecutorManagerException {
+ return 1;
+ }
}
diff --git a/azkaban-exec-server/src/main/java/azkaban/execapp/FlowRunnerManager.java b/azkaban-exec-server/src/main/java/azkaban/execapp/FlowRunnerManager.java
index 3111c3f..7cfb598 100644
--- a/azkaban-exec-server/src/main/java/azkaban/execapp/FlowRunnerManager.java
+++ b/azkaban-exec-server/src/main/java/azkaban/execapp/FlowRunnerManager.java
@@ -16,6 +16,8 @@
package azkaban.execapp;
+import static java.util.Objects.requireNonNull;
+
import azkaban.Constants;
import azkaban.Constants.ConfigurationKeys;
import azkaban.event.Event;
@@ -61,9 +63,11 @@ import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.RejectedExecutionException;
+import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import javax.annotation.Nullable;
import javax.inject.Inject;
@@ -126,16 +130,17 @@ public class FlowRunnerManager implements EventListener,
private final Object executionDirDeletionSync = new Object();
private final int numThreads;
- private int threadPoolQueueSize = -1;
private final int numJobThreadPerFlow;
- private Props globalProps;
- private long lastCleanerThreadCheckTime = -1;
- private long executionDirRetention = 1 * 24 * 60 * 60 * 1000; // 1 Day
// We want to limit the log sizes to about 20 megs
private final String jobLogChunkSize;
private final int jobLogNumFiles;
// If true, jobs will validate proxy user against a list of valid proxy users.
private final boolean validateProxyUser;
+ private PollingService pollingService;
+ private int threadPoolQueueSize = -1;
+ private Props globalProps;
+ private long lastCleanerThreadCheckTime = -1;
+ private long executionDirRetention = 1 * 24 * 60 * 60 * 1000; // 1 Day
// date time of the the last flow submitted.
private long lastFlowSubmittedDate = 0;
@@ -200,6 +205,13 @@ public class FlowRunnerManager implements EventListener,
this.cleanerThread = new CleanerThread();
this.cleanerThread.start();
+
+ if (this.azkabanProps.getBoolean(ConfigurationKeys.AZKABAN_POLL_MODEL, false)) {
+ this.logger.info("Starting polling service.");
+ this.pollingService = new PollingService(this.azkabanProps.getLong
+ (ConfigurationKeys.AZKABAN_POLLING_INTERVAL_MS, 1000));
+ this.pollingService.start();
+ }
}
public double getProjectDirCacheHitRatio() {
@@ -290,7 +302,7 @@ public class FlowRunnerManager implements EventListener,
submitFlowRunner(runner);
}
- private boolean isAlreadyRunning(int execId) throws ExecutorManagerException {
+ private boolean isAlreadyRunning(final int execId) throws ExecutorManagerException {
if (this.runningFlows.containsKey(execId)) {
logger.info("Execution " + execId + " is already in running.");
if (!this.submittedFlows.containsValue(execId)) {
@@ -723,6 +735,9 @@ public class FlowRunnerManager implements EventListener,
*/
public void shutdown() {
logger.warn("Shutting down FlowRunnerManager...");
+ if (this.azkabanProps.getBoolean(ConfigurationKeys.AZKABAN_POLL_MODEL, false)) {
+ this.pollingService.shutdown();
+ }
this.executorService.shutdown();
boolean result = false;
while (!result) {
@@ -742,6 +757,9 @@ public class FlowRunnerManager implements EventListener,
*/
public void shutdownNow() {
logger.warn("Shutting down FlowRunnerManager now...");
+ if (this.azkabanProps.getBoolean(ConfigurationKeys.AZKABAN_POLL_MODEL, false)) {
+ this.pollingService.shutdown();
+ }
this.executorService.shutdownNow();
this.triggerManager.shutdown();
}
@@ -906,4 +924,56 @@ public class FlowRunnerManager implements EventListener,
}
}
+ /**
+ * Polls new executions from DB periodically and submits the executions to run on the executor.
+ */
+ @SuppressWarnings("FutureReturnValueIgnored")
+ private class PollingService {
+
+ private final long pollingIntervalMs;
+ private final ScheduledExecutorService scheduler;
+ private int executorId = -1;
+
+ public PollingService(final long pollingIntervalMs) {
+ this.pollingIntervalMs = pollingIntervalMs;
+ this.scheduler = Executors.newSingleThreadScheduledExecutor();
+ }
+
+ public void start() {
+ this.scheduler.scheduleAtFixedRate(() -> pollExecution(), 0L, this.pollingIntervalMs,
+ TimeUnit.MILLISECONDS);
+ }
+
+ private void pollExecution() {
+ if (this.executorId == -1) {
+ if (AzkabanExecutorServer.getApp() != null) {
+ try {
+ final Executor executor = requireNonNull(FlowRunnerManager.this.executorLoader
+ .fetchExecutor(AzkabanExecutorServer.getApp().getHost(),
+ AzkabanExecutorServer.getApp().getPort()), "The executor can not be null");
+ this.executorId = executor.getId();
+ } catch (final Exception e) {
+ logger.error("Failed to fetch executor ", e);
+ }
+ }
+ } else {
+ try {
+ // Todo jamiesjc: check executor capacity before polling from DB
+ final int execId = FlowRunnerManager.this.executorLoader
+ .selectAndUpdateExecution(this.executorId);
+ if (execId != -1) {
+ logger.info("Submitting flow " + execId);
+ submitFlow(execId);
+ }
+ } catch (final Exception e) {
+ logger.error("Failed to submit flow ", e);
+ }
+ }
+ }
+
+ public void shutdown() {
+ this.scheduler.shutdown();
+ this.scheduler.shutdownNow();
+ }
+ }
}