azkaban-aplcache

merge two flow utils classes (#1480) Merge two utils classes

9/18/2017 6:08:47 PM

Details

diff --git a/azkaban-common/src/main/java/azkaban/executor/ExecutorManager.java b/azkaban-common/src/main/java/azkaban/executor/ExecutorManager.java
index 52bd8a4..0942600 100644
--- a/azkaban-common/src/main/java/azkaban/executor/ExecutorManager.java
+++ b/azkaban-common/src/main/java/azkaban/executor/ExecutorManager.java
@@ -22,12 +22,12 @@ import azkaban.event.EventHandler;
 import azkaban.executor.selector.ExecutorComparator;
 import azkaban.executor.selector.ExecutorFilter;
 import azkaban.executor.selector.ExecutorSelector;
+import azkaban.flow.FlowUtils;
 import azkaban.metrics.CommonMetrics;
 import azkaban.project.Project;
 import azkaban.project.ProjectWhitelist;
 import azkaban.utils.FileIOUtils.JobMetaData;
 import azkaban.utils.FileIOUtils.LogData;
-import azkaban.utils.FlowUtils;
 import azkaban.utils.JSONUtils;
 import azkaban.utils.Pair;
 import azkaban.utils.Props;
diff --git a/azkaban-common/src/main/java/azkaban/flow/FlowUtils.java b/azkaban-common/src/main/java/azkaban/flow/FlowUtils.java
index 2fe921d..f510eb3 100644
--- a/azkaban-common/src/main/java/azkaban/flow/FlowUtils.java
+++ b/azkaban-common/src/main/java/azkaban/flow/FlowUtils.java
@@ -17,7 +17,11 @@
 package azkaban.flow;
 
 import azkaban.executor.ExecutableFlowBase;
+import azkaban.executor.ExecutableNode;
+import azkaban.executor.Status;
 import azkaban.utils.Props;
+import java.util.List;
+import java.util.Map;
 import java.util.UUID;
 import org.joda.time.DateTime;
 
@@ -53,4 +57,34 @@ public class FlowUtils {
 
     return props;
   }
+
+  /**
+   * Change job status to disabled in exflow if the job is in disabledJobs
+   */
+  public static void applyDisabledJobs(final List<Object> disabledJobs,
+      final ExecutableFlowBase exflow) {
+    for (final Object disabled : disabledJobs) {
+      if (disabled instanceof String) {
+        final String nodeName = (String) disabled;
+        final ExecutableNode node = exflow.getExecutableNode(nodeName);
+        if (node != null) {
+          node.setStatus(Status.DISABLED);
+        }
+      } else if (disabled instanceof Map) {
+        final Map<String, Object> nestedDisabled = (Map<String, Object>) disabled;
+        final String nodeName = (String) nestedDisabled.get("id");
+        final List<Object> subDisabledJobs =
+            (List<Object>) nestedDisabled.get("children");
+
+        if (nodeName == null || subDisabledJobs == null) {
+          return;
+        }
+
+        final ExecutableNode node = exflow.getExecutableNode(nodeName);
+        if (node != null && node instanceof ExecutableFlowBase) {
+          applyDisabledJobs(subDisabledJobs, (ExecutableFlowBase) node);
+        }
+      }
+    }
+  }
 }
diff --git a/azkaban-web-server/src/main/java/azkaban/webapp/servlet/ExecutorServlet.java b/azkaban-web-server/src/main/java/azkaban/webapp/servlet/ExecutorServlet.java
index 3e51b3a..2c174c5 100644
--- a/azkaban-web-server/src/main/java/azkaban/webapp/servlet/ExecutorServlet.java
+++ b/azkaban-web-server/src/main/java/azkaban/webapp/servlet/ExecutorServlet.java
@@ -30,6 +30,7 @@ import azkaban.executor.ExecutorManagerAdapter;
 import azkaban.executor.ExecutorManagerException;
 import azkaban.executor.Status;
 import azkaban.flow.Flow;
+import azkaban.flow.FlowUtils;
 import azkaban.project.Project;
 import azkaban.project.ProjectManager;
 import azkaban.scheduler.Schedule;
@@ -43,7 +44,6 @@ import azkaban.user.User;
 import azkaban.user.UserManager;
 import azkaban.utils.ExternalLinkUtils;
 import azkaban.utils.FileIOUtils.LogData;
-import azkaban.utils.FlowUtils;
 import azkaban.utils.Pair;
 import azkaban.utils.Props;
 import azkaban.webapp.AzkabanWebServer;