azkaban-aplcache

Have scheduled flow's job diabled/enabled status be picked

2/3/2017 9:22:03 PM
3.15.0

Details

diff --git a/azkaban-common/src/main/java/azkaban/executor/ExecutorManager.java b/azkaban-common/src/main/java/azkaban/executor/ExecutorManager.java
index e3517ce..b7b3d3d 100644
--- a/azkaban-common/src/main/java/azkaban/executor/ExecutorManager.java
+++ b/azkaban-common/src/main/java/azkaban/executor/ExecutorManager.java
@@ -16,6 +16,7 @@
 
 package azkaban.executor;
 
+import azkaban.utils.FlowUtils;
 import java.io.File;
 import java.io.IOException;
 import java.lang.Thread.State;
@@ -928,35 +929,6 @@ public class ExecutorManager extends EventHandler implements
     }
   }
 
-  private void applyDisabledJobs(List<Object> disabledJobs,
-      ExecutableFlowBase exflow) {
-    for (Object disabled : disabledJobs) {
-      if (disabled instanceof String) {
-        String nodeName = (String) disabled;
-        ExecutableNode node = exflow.getExecutableNode(nodeName);
-        if (node != null) {
-          node.setStatus(Status.DISABLED);
-        }
-      } else if (disabled instanceof Map) {
-        @SuppressWarnings("unchecked")
-        Map<String, Object> nestedDisabled = (Map<String, Object>) disabled;
-        String nodeName = (String) nestedDisabled.get("id");
-        @SuppressWarnings("unchecked")
-        List<Object> subDisabledJobs =
-            (List<Object>) nestedDisabled.get("children");
-
-        if (nodeName == null || subDisabledJobs == null) {
-          return;
-        }
-
-        ExecutableNode node = exflow.getExecutableNode(nodeName);
-        if (node != null && node instanceof ExecutableFlowBase) {
-          applyDisabledJobs(subDisabledJobs, (ExecutableFlowBase) node);
-        }
-      }
-    }
-  }
-
   @Override
   public String submitExecutableFlow(ExecutableFlow exflow, String userId)
     throws ExecutorManagerException {
@@ -986,7 +958,7 @@ public class ExecutorManager extends EventHandler implements
         }
 
         if (options.getDisabledJobs() != null) {
-          applyDisabledJobs(options.getDisabledJobs(), exflow);
+          FlowUtils.applyDisabledJobs(options.getDisabledJobs(), exflow);
         }
 
         if (!running.isEmpty()) {
diff --git a/azkaban-common/src/main/java/azkaban/utils/FlowUtils.java b/azkaban-common/src/main/java/azkaban/utils/FlowUtils.java
new file mode 100644
index 0000000..9268d2c
--- /dev/null
+++ b/azkaban-common/src/main/java/azkaban/utils/FlowUtils.java
@@ -0,0 +1,60 @@
+/*
+ * Copyright 2017 LinkedIn Corp.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package azkaban.utils;
+
+import azkaban.executor.ExecutableFlowBase;
+import azkaban.executor.ExecutableNode;
+import azkaban.executor.Status;
+import java.util.List;
+import java.util.Map;
+
+
+public class FlowUtils {
+  /**
+   * Change job status to disabled in exflow if the job is in disabledJobs
+   * @param disabledJobs
+   * @param exflow
+   */
+  public static void applyDisabledJobs(List<Object> disabledJobs,
+      ExecutableFlowBase exflow) {
+    for (Object disabled : disabledJobs) {
+      if (disabled instanceof String) {
+        String nodeName = (String) disabled;
+        ExecutableNode node = exflow.getExecutableNode(nodeName);
+        if (node != null) {
+          node.setStatus(Status.DISABLED);
+        }
+      } else if (disabled instanceof Map) {
+        @SuppressWarnings("unchecked")
+        Map<String, Object> nestedDisabled = (Map<String, Object>) disabled;
+        String nodeName = (String) nestedDisabled.get("id");
+        @SuppressWarnings("unchecked")
+        List<Object> subDisabledJobs =
+            (List<Object>) nestedDisabled.get("children");
+
+        if (nodeName == null || subDisabledJobs == null) {
+          return;
+        }
+
+        ExecutableNode node = exflow.getExecutableNode(nodeName);
+        if (node != null && node instanceof ExecutableFlowBase) {
+          applyDisabledJobs(subDisabledJobs, (ExecutableFlowBase) node);
+        }
+      }
+    }
+  }
+}
diff --git a/azkaban-web-server/src/main/java/azkaban/webapp/servlet/ExecutorServlet.java b/azkaban-web-server/src/main/java/azkaban/webapp/servlet/ExecutorServlet.java
index e36a441..5476470 100644
--- a/azkaban-web-server/src/main/java/azkaban/webapp/servlet/ExecutorServlet.java
+++ b/azkaban-web-server/src/main/java/azkaban/webapp/servlet/ExecutorServlet.java
@@ -16,6 +16,8 @@
 
 package azkaban.webapp.servlet;
 
+import azkaban.executor.ExecutorManager;
+import azkaban.utils.FlowUtils;
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.HashMap;
@@ -141,6 +143,10 @@ public class ExecutorServlet extends LoginAbstractAzkabanServlet {
           ajaxFetchExecutableFlowInfo(req, resp, ret, session.getUser(), exFlow);
         }
       }
+    } else if (ajaxName.equals("fetchscheduledflowgraph")) {
+      String projectName = getParam(req, "project");
+      String flowName = getParam(req, "flow");
+      ajaxFetchScheduledFlowGraph(projectName, flowName, ret, session.getUser());
     } else if (ajaxName.equals("reloadExecutors")) {
       ajaxReloadExecutors(req, resp, ret, session.getUser());
     } else if (ajaxName.equals("enableQueueProcessor")) {
@@ -203,6 +209,37 @@ public class ExecutorServlet extends LoginAbstractAzkabanServlet {
     }
   }
 
+  private void ajaxFetchScheduledFlowGraph(String projectName, String flowName,
+      HashMap<String, Object> ret, User user) throws ServletException {
+    Project project =
+        getProjectAjaxByPermission(ret, projectName, user, Type.EXECUTE);
+    if (project == null) {
+      ret.put("error", "Project '" + projectName + "' doesn't exist.");
+      return;
+    }
+    try {
+      Schedule schedule = scheduleManager.getSchedule(project.getId(), flowName);
+      ExecutionOptions executionOptions = schedule != null ? schedule.getExecutionOptions() : new ExecutionOptions();
+      Flow flow = project.getFlow(flowName);
+      if (flow == null) {
+        ret.put("error", "Flow '" + flowName + "' cannot be found in project " + project);
+        return;
+      }
+      ExecutableFlow exFlow = new ExecutableFlow(project, flow);
+      exFlow.setExecutionOptions(executionOptions);
+      ret.put("submitTime", exFlow.getSubmitTime());
+      ret.put("submitUser", exFlow.getSubmitUser());
+      ret.put("execid", exFlow.getExecutionId());
+      ret.put("projectId", exFlow.getProjectId());
+      ret.put("project", project.getName());
+      FlowUtils.applyDisabledJobs(executionOptions.getDisabledJobs(), exFlow);
+      Map<String, Object> flowObj = getExecutableNodeInfo(exFlow);
+      ret.putAll(flowObj);
+    } catch(ScheduleManagerException ex) {
+      throw new ServletException(ex);
+    }
+  }
+
   /* Reloads executors from DB and azkaban.properties via executorManager */
   private void ajaxReloadExecutors(HttpServletRequest req,
     HttpServletResponse resp, HashMap<String, Object> returnMap, User user) {
diff --git a/azkaban-web-server/src/web/js/azkaban/view/flow-execute-dialog.js b/azkaban-web-server/src/web/js/azkaban/view/flow-execute-dialog.js
index 9b8a3b6..9fc85eb 100644
--- a/azkaban-web-server/src/web/js/azkaban/view/flow-execute-dialog.js
+++ b/azkaban-web-server/src/web/js/azkaban/view/flow-execute-dialog.js
@@ -249,13 +249,13 @@ azkaban.FlowExecuteDialogView = Backbone.View.extend({
 
   loadGraph: function(projectName, flowId, exgraph, callback) {
     console.log("Loading flow " + flowId);
-    var requestURL = contextURL + "/manager";
+    var requestURL = contextURL + "/executor";
 
     var graphModel = executableGraphModel;
     // fetchFlow(this.model, projectName, flowId, true);
     var requestData = {
         "project": projectName,
-        "ajax": "fetchflowgraph",
+        "ajax": "fetchscheduledflowgraph",
         "flow": flowId
       };
     var self = this;