Details
diff --git a/azkaban-common/src/main/java/azkaban/executor/ExecutorManager.java b/azkaban-common/src/main/java/azkaban/executor/ExecutorManager.java
index 50f238f..267cc9a 100644
--- a/azkaban-common/src/main/java/azkaban/executor/ExecutorManager.java
+++ b/azkaban-common/src/main/java/azkaban/executor/ExecutorManager.java
@@ -67,6 +67,8 @@ public class ExecutorManager extends EventHandler implements
public static final String AZKABAN_USE_MULTIPLE_EXECUTORS =
"azkaban.use.multiple.executors";
+ public static final String AZKABAN_MAX_CONCURRENT_RUNS_ONEFLOW =
+ "azkaban.max.concurrent.runs.oneflow";
static final String AZKABAN_EXECUTOR_SELECTOR_FILTERS =
"azkaban.executorselector.filters";
static final String AZKABAN_EXECUTOR_SELECTOR_COMPARATOR_PREFIX =
@@ -79,6 +81,7 @@ public class ExecutorManager extends EventHandler implements
"azkaban.activeexecutor.refresh.milisecinterval";
private static final String AZKABAN_ACTIVE_EXECUTOR_REFRESH_IN_NUM_FLOW =
"azkaban.activeexecutor.refresh.flowinterval";
+ private static final int DEFAULT_MAX_ONCURRENT_RUNS_ONEFLOW = 30;
private static final String AZKABAN_EXECUTORINFO_REFRESH_MAX_THREADS =
"azkaban.executorinfo.refresh.maxThreads";
private static final String AZKABAN_MAX_DISPATCHING_ERRORS_PERMITTED =
@@ -98,6 +101,7 @@ public class ExecutorManager extends EventHandler implements
new ConcurrentHashMap<>();
private final ExecutingManagerUpdaterThread executingManager;
private final ExecutorApiGateway apiGateway;
+ private final int maxConcurrentRunsOneFlow;
QueuedExecutions queuedFlows;
File cacheDir;
private QueueProcessorThread queueProcessor;
@@ -124,6 +128,11 @@ public class ExecutorManager extends EventHandler implements
this.loadRunningFlows();
this.queuedFlows = new QueuedExecutions(azkProps.getLong(AZKABAN_WEBSERVER_QUEUE_SIZE, 100000));
+
+ // The default threshold is set to 30 for now, in case some users are affected. We may
+ // decrease this number in future, to better prevent DDos attacks.
+ this.maxConcurrentRunsOneFlow = azkProps.getInt(AZKABAN_MAX_CONCURRENT_RUNS_ONEFLOW,
+ DEFAULT_MAX_ONCURRENT_RUNS_ONEFLOW);
this.loadQueuedFlows();
this.cacheDir = new File(azkProps.getString("cache.directory", "cache"));
@@ -951,6 +960,7 @@ public class ExecutorManager extends EventHandler implements
exflow.setSubmitUser(userId);
exflow.setSubmitTime(System.currentTimeMillis());
+ // Get collection of running flows given a project and a specific flow name
final List<Integer> running = getRunningFlows(projectId, flowId);
ExecutionOptions options = exflow.getExecutionOptions();
@@ -963,7 +973,11 @@ public class ExecutorManager extends EventHandler implements
}
if (!running.isEmpty()) {
- if (options.getConcurrentOption().equals(
+ if (running.size() > this.maxConcurrentRunsOneFlow) {
+ throw new ExecutorManagerException("Flow " + flowId
+ + " has more than " + this.maxConcurrentRunsOneFlow + " concurrent runs. Skipping",
+ ExecutorManagerException.Reason.SkippedExecution);
+ } else if (options.getConcurrentOption().equals(
ExecutionOptions.CONCURRENT_OPTION_PIPELINE)) {
Collections.sort(running);
final Integer runningExecId = running.get(running.size() - 1);
diff --git a/azkaban-common/src/test/java/azkaban/executor/ExecutorManagerTest.java b/azkaban-common/src/test/java/azkaban/executor/ExecutorManagerTest.java
index 3355936..88d7812 100644
--- a/azkaban-common/src/test/java/azkaban/executor/ExecutorManagerTest.java
+++ b/azkaban-common/src/test/java/azkaban/executor/ExecutorManagerTest.java
@@ -308,6 +308,30 @@ public class ExecutorManagerTest {
verify(this.loader).addActiveExecutableReference(any());
}
+ // Too many concurrent flows will fail job submission
+ @Test(expected = ExecutorManagerException.class)
+ public void testTooManySubmitFlows() throws Exception {
+ testSetUpForRunningFlows();
+ final ExecutableFlow flow1 = TestUtils
+ .createTestExecutableFlowFromYaml("basicyamlshelltest", "bashSleep");
+ flow1.setExecutionId(101);
+ final ExecutableFlow flow2 = TestUtils
+ .createTestExecutableFlowFromYaml("basicyamlshelltest", "bashSleep");
+ flow2.setExecutionId(102);
+ final ExecutableFlow flow3 = TestUtils
+ .createTestExecutableFlowFromYaml("basicyamlshelltest", "bashSleep");
+ flow3.setExecutionId(103);
+ final ExecutableFlow flow4 = TestUtils
+ .createTestExecutableFlowFromYaml("basicyamlshelltest", "bashSleep");
+ flow4.setExecutionId(104);
+ this.manager.submitExecutableFlow(flow1, this.user.getUserId());
+ verify(this.loader).uploadExecutableFlow(flow1);
+ this.manager.submitExecutableFlow(flow2, this.user.getUserId());
+ verify(this.loader).uploadExecutableFlow(flow2);
+ this.manager.submitExecutableFlow(flow3, this.user.getUserId());
+ this.manager.submitExecutableFlow(flow4, this.user.getUserId());
+ }
+
@Ignore
@Test
public void testFetchAllActiveFlows() throws Exception {
@@ -365,6 +389,9 @@ public class ExecutorManagerTest {
//so that flows will be dispatched to executors.
this.props.put(ExecutorManager.AZKABAN_QUEUEPROCESSING_ENABLED, "true");
+ // allow two concurrent runs give one Flow
+ this.props.put(ExecutorManager.AZKABAN_MAX_CONCURRENT_RUNS_ONEFLOW, 2);
+
final List<Executor> executors = new ArrayList<>();
final Executor executor1 = new Executor(1, "localhost", 12345, true);
final Executor executor2 = new Executor(2, "localhost", 12346, true);
diff --git a/azkaban-common/src/test/java/azkaban/utils/TestUtils.java b/azkaban-common/src/test/java/azkaban/utils/TestUtils.java
index 35bb985..13e9ac5 100644
--- a/azkaban-common/src/test/java/azkaban/utils/TestUtils.java
+++ b/azkaban-common/src/test/java/azkaban/utils/TestUtils.java
@@ -18,6 +18,7 @@ package azkaban.utils;
import azkaban.executor.ExecutableFlow;
import azkaban.flow.Flow;
+import azkaban.project.DirectoryYamlFlowLoader;
import azkaban.project.Project;
import azkaban.test.executions.ExecutionsTestUtil;
import azkaban.user.User;
@@ -53,6 +54,20 @@ public class TestUtils {
return execFlow;
}
+ /* Helper method to create an ExecutableFlow from Yaml */
+ public static ExecutableFlow createTestExecutableFlowFromYaml(final String projectName,
+ final String flowName) throws IOException {
+
+ final Project project = new Project(11, projectName);
+ final DirectoryYamlFlowLoader loader = new DirectoryYamlFlowLoader(new Props());
+ loader.loadProjectFlow(project, ExecutionsTestUtil.getFlowDir(projectName));
+ project.setFlows(loader.getFlowMap());
+ project.setVersion(123);
+
+ final Flow flow = project.getFlow(flowName);
+ return new ExecutableFlow(project, flow);
+ }
+
/* Helper method to create an XmlUserManager from XML_FILE_PARAM file */
public static UserManager createTestXmlUserManager() {
final Props props = new Props();
diff --git a/test/execution-test-data/basicyamlshelltest/bashSleep.flow b/test/execution-test-data/basicyamlshelltest/bashSleep.flow
new file mode 100644
index 0000000..755cba5
--- /dev/null
+++ b/test/execution-test-data/basicyamlshelltest/bashSleep.flow
@@ -0,0 +1,20 @@
+---
+config:
+ flow-level-parameter: value
+
+nodes:
+ - name: shell_end
+ type: noop
+ dependsOn:
+ - shell_sleep
+ - shell_echo
+
+ - name: shell_echo
+ type: command
+ config:
+ command: echo "This is an echoed text."
+
+ - name: shell_sleep
+ type: command
+ config:
+ command: sleep 5
diff --git a/test/execution-test-data/basicyamlshelltest/bashSleep.project b/test/execution-test-data/basicyamlshelltest/bashSleep.project
new file mode 100644
index 0000000..4929753
--- /dev/null
+++ b/test/execution-test-data/basicyamlshelltest/bashSleep.project
@@ -0,0 +1 @@
+azkaban-flow-version: 2.0