azkaban-aplcache

Enforcing num of concurrent runs quota given a Flow Executable

1/10/2018 6:04:48 PM
3.40.0

Details

diff --git a/azkaban-common/src/main/java/azkaban/executor/ExecutorManager.java b/azkaban-common/src/main/java/azkaban/executor/ExecutorManager.java
index 50f238f..267cc9a 100644
--- a/azkaban-common/src/main/java/azkaban/executor/ExecutorManager.java
+++ b/azkaban-common/src/main/java/azkaban/executor/ExecutorManager.java
@@ -67,6 +67,8 @@ public class ExecutorManager extends EventHandler implements
 
   public static final String AZKABAN_USE_MULTIPLE_EXECUTORS =
       "azkaban.use.multiple.executors";
+  public static final String AZKABAN_MAX_CONCURRENT_RUNS_ONEFLOW =
+      "azkaban.max.concurrent.runs.oneflow";
   static final String AZKABAN_EXECUTOR_SELECTOR_FILTERS =
       "azkaban.executorselector.filters";
   static final String AZKABAN_EXECUTOR_SELECTOR_COMPARATOR_PREFIX =
@@ -79,6 +81,7 @@ public class ExecutorManager extends EventHandler implements
       "azkaban.activeexecutor.refresh.milisecinterval";
   private static final String AZKABAN_ACTIVE_EXECUTOR_REFRESH_IN_NUM_FLOW =
       "azkaban.activeexecutor.refresh.flowinterval";
+  private static final int DEFAULT_MAX_ONCURRENT_RUNS_ONEFLOW = 30;
   private static final String AZKABAN_EXECUTORINFO_REFRESH_MAX_THREADS =
       "azkaban.executorinfo.refresh.maxThreads";
   private static final String AZKABAN_MAX_DISPATCHING_ERRORS_PERMITTED =
@@ -98,6 +101,7 @@ public class ExecutorManager extends EventHandler implements
       new ConcurrentHashMap<>();
   private final ExecutingManagerUpdaterThread executingManager;
   private final ExecutorApiGateway apiGateway;
+  private final int maxConcurrentRunsOneFlow;
   QueuedExecutions queuedFlows;
   File cacheDir;
   private QueueProcessorThread queueProcessor;
@@ -124,6 +128,11 @@ public class ExecutorManager extends EventHandler implements
     this.loadRunningFlows();
 
     this.queuedFlows = new QueuedExecutions(azkProps.getLong(AZKABAN_WEBSERVER_QUEUE_SIZE, 100000));
+
+    // The default threshold is set to 30 for now, in case some users are affected. We may
+    // decrease this number in future, to better prevent DDos attacks.
+    this.maxConcurrentRunsOneFlow = azkProps.getInt(AZKABAN_MAX_CONCURRENT_RUNS_ONEFLOW,
+        DEFAULT_MAX_ONCURRENT_RUNS_ONEFLOW);
     this.loadQueuedFlows();
 
     this.cacheDir = new File(azkProps.getString("cache.directory", "cache"));
@@ -951,6 +960,7 @@ public class ExecutorManager extends EventHandler implements
         exflow.setSubmitUser(userId);
         exflow.setSubmitTime(System.currentTimeMillis());
 
+        // Get collection of running flows given a project and a specific flow name
         final List<Integer> running = getRunningFlows(projectId, flowId);
 
         ExecutionOptions options = exflow.getExecutionOptions();
@@ -963,7 +973,11 @@ public class ExecutorManager extends EventHandler implements
         }
 
         if (!running.isEmpty()) {
-          if (options.getConcurrentOption().equals(
+          if (running.size() > this.maxConcurrentRunsOneFlow) {
+            throw new ExecutorManagerException("Flow " + flowId
+                + " has more than " + this.maxConcurrentRunsOneFlow + " concurrent runs. Skipping",
+                ExecutorManagerException.Reason.SkippedExecution);
+          } else if (options.getConcurrentOption().equals(
               ExecutionOptions.CONCURRENT_OPTION_PIPELINE)) {
             Collections.sort(running);
             final Integer runningExecId = running.get(running.size() - 1);
diff --git a/azkaban-common/src/test/java/azkaban/executor/ExecutorManagerTest.java b/azkaban-common/src/test/java/azkaban/executor/ExecutorManagerTest.java
index 3355936..88d7812 100644
--- a/azkaban-common/src/test/java/azkaban/executor/ExecutorManagerTest.java
+++ b/azkaban-common/src/test/java/azkaban/executor/ExecutorManagerTest.java
@@ -308,6 +308,30 @@ public class ExecutorManagerTest {
     verify(this.loader).addActiveExecutableReference(any());
   }
 
+  // Too many concurrent flows will fail job submission
+  @Test(expected = ExecutorManagerException.class)
+  public void testTooManySubmitFlows() throws Exception {
+    testSetUpForRunningFlows();
+    final ExecutableFlow flow1 = TestUtils
+        .createTestExecutableFlowFromYaml("basicyamlshelltest", "bashSleep");
+    flow1.setExecutionId(101);
+    final ExecutableFlow flow2 = TestUtils
+        .createTestExecutableFlowFromYaml("basicyamlshelltest", "bashSleep");
+    flow2.setExecutionId(102);
+    final ExecutableFlow flow3 = TestUtils
+        .createTestExecutableFlowFromYaml("basicyamlshelltest", "bashSleep");
+    flow3.setExecutionId(103);
+    final ExecutableFlow flow4 = TestUtils
+        .createTestExecutableFlowFromYaml("basicyamlshelltest", "bashSleep");
+    flow4.setExecutionId(104);
+    this.manager.submitExecutableFlow(flow1, this.user.getUserId());
+    verify(this.loader).uploadExecutableFlow(flow1);
+    this.manager.submitExecutableFlow(flow2, this.user.getUserId());
+    verify(this.loader).uploadExecutableFlow(flow2);
+    this.manager.submitExecutableFlow(flow3, this.user.getUserId());
+    this.manager.submitExecutableFlow(flow4, this.user.getUserId());
+  }
+
   @Ignore
   @Test
   public void testFetchAllActiveFlows() throws Exception {
@@ -365,6 +389,9 @@ public class ExecutorManagerTest {
     //so that flows will be dispatched to executors.
     this.props.put(ExecutorManager.AZKABAN_QUEUEPROCESSING_ENABLED, "true");
 
+    // allow two concurrent runs give one Flow
+    this.props.put(ExecutorManager.AZKABAN_MAX_CONCURRENT_RUNS_ONEFLOW, 2);
+
     final List<Executor> executors = new ArrayList<>();
     final Executor executor1 = new Executor(1, "localhost", 12345, true);
     final Executor executor2 = new Executor(2, "localhost", 12346, true);
diff --git a/azkaban-common/src/test/java/azkaban/utils/TestUtils.java b/azkaban-common/src/test/java/azkaban/utils/TestUtils.java
index 35bb985..13e9ac5 100644
--- a/azkaban-common/src/test/java/azkaban/utils/TestUtils.java
+++ b/azkaban-common/src/test/java/azkaban/utils/TestUtils.java
@@ -18,6 +18,7 @@ package azkaban.utils;
 
 import azkaban.executor.ExecutableFlow;
 import azkaban.flow.Flow;
+import azkaban.project.DirectoryYamlFlowLoader;
 import azkaban.project.Project;
 import azkaban.test.executions.ExecutionsTestUtil;
 import azkaban.user.User;
@@ -53,6 +54,20 @@ public class TestUtils {
     return execFlow;
   }
 
+  /* Helper method to create an ExecutableFlow from Yaml */
+  public static ExecutableFlow createTestExecutableFlowFromYaml(final String projectName,
+      final String flowName) throws IOException {
+
+    final Project project = new Project(11, projectName);
+    final DirectoryYamlFlowLoader loader = new DirectoryYamlFlowLoader(new Props());
+    loader.loadProjectFlow(project, ExecutionsTestUtil.getFlowDir(projectName));
+    project.setFlows(loader.getFlowMap());
+    project.setVersion(123);
+
+    final Flow flow = project.getFlow(flowName);
+    return new ExecutableFlow(project, flow);
+  }
+
   /* Helper method to create an XmlUserManager from XML_FILE_PARAM file */
   public static UserManager createTestXmlUserManager() {
     final Props props = new Props();
diff --git a/test/execution-test-data/basicyamlshelltest/bashSleep.flow b/test/execution-test-data/basicyamlshelltest/bashSleep.flow
new file mode 100644
index 0000000..755cba5
--- /dev/null
+++ b/test/execution-test-data/basicyamlshelltest/bashSleep.flow
@@ -0,0 +1,20 @@
+---
+config:
+  flow-level-parameter: value
+
+nodes:
+  - name: shell_end
+    type: noop
+    dependsOn:
+      - shell_sleep
+      - shell_echo
+
+  - name: shell_echo
+    type: command
+    config:
+      command: echo "This is an echoed text."
+
+  - name: shell_sleep
+    type: command
+    config:
+      command: sleep 5
diff --git a/test/execution-test-data/basicyamlshelltest/bashSleep.project b/test/execution-test-data/basicyamlshelltest/bashSleep.project
new file mode 100644
index 0000000..4929753
--- /dev/null
+++ b/test/execution-test-data/basicyamlshelltest/bashSleep.project
@@ -0,0 +1 @@
+azkaban-flow-version: 2.0