Details
diff --git a/azkaban-common/src/main/java/azkaban/executor/ExecutorManager.java b/azkaban-common/src/main/java/azkaban/executor/ExecutorManager.java
index e3517ce..b7b3d3d 100644
--- a/azkaban-common/src/main/java/azkaban/executor/ExecutorManager.java
+++ b/azkaban-common/src/main/java/azkaban/executor/ExecutorManager.java
@@ -16,6 +16,7 @@
package azkaban.executor;
+import azkaban.utils.FlowUtils;
import java.io.File;
import java.io.IOException;
import java.lang.Thread.State;
@@ -928,35 +929,6 @@ public class ExecutorManager extends EventHandler implements
}
}
- private void applyDisabledJobs(List<Object> disabledJobs,
- ExecutableFlowBase exflow) {
- for (Object disabled : disabledJobs) {
- if (disabled instanceof String) {
- String nodeName = (String) disabled;
- ExecutableNode node = exflow.getExecutableNode(nodeName);
- if (node != null) {
- node.setStatus(Status.DISABLED);
- }
- } else if (disabled instanceof Map) {
- @SuppressWarnings("unchecked")
- Map<String, Object> nestedDisabled = (Map<String, Object>) disabled;
- String nodeName = (String) nestedDisabled.get("id");
- @SuppressWarnings("unchecked")
- List<Object> subDisabledJobs =
- (List<Object>) nestedDisabled.get("children");
-
- if (nodeName == null || subDisabledJobs == null) {
- return;
- }
-
- ExecutableNode node = exflow.getExecutableNode(nodeName);
- if (node != null && node instanceof ExecutableFlowBase) {
- applyDisabledJobs(subDisabledJobs, (ExecutableFlowBase) node);
- }
- }
- }
- }
-
@Override
public String submitExecutableFlow(ExecutableFlow exflow, String userId)
throws ExecutorManagerException {
@@ -986,7 +958,7 @@ public class ExecutorManager extends EventHandler implements
}
if (options.getDisabledJobs() != null) {
- applyDisabledJobs(options.getDisabledJobs(), exflow);
+ FlowUtils.applyDisabledJobs(options.getDisabledJobs(), exflow);
}
if (!running.isEmpty()) {
diff --git a/azkaban-common/src/main/java/azkaban/utils/FlowUtils.java b/azkaban-common/src/main/java/azkaban/utils/FlowUtils.java
new file mode 100644
index 0000000..9268d2c
--- /dev/null
+++ b/azkaban-common/src/main/java/azkaban/utils/FlowUtils.java
@@ -0,0 +1,60 @@
+/*
+ * Copyright 2017 LinkedIn Corp.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package azkaban.utils;
+
+import azkaban.executor.ExecutableFlowBase;
+import azkaban.executor.ExecutableNode;
+import azkaban.executor.Status;
+import java.util.List;
+import java.util.Map;
+
+
+public class FlowUtils {
+ /**
+ * Change job status to disabled in exflow if the job is in disabledJobs
+ * @param disabledJobs
+ * @param exflow
+ */
+ public static void applyDisabledJobs(List<Object> disabledJobs,
+ ExecutableFlowBase exflow) {
+ for (Object disabled : disabledJobs) {
+ if (disabled instanceof String) {
+ String nodeName = (String) disabled;
+ ExecutableNode node = exflow.getExecutableNode(nodeName);
+ if (node != null) {
+ node.setStatus(Status.DISABLED);
+ }
+ } else if (disabled instanceof Map) {
+ @SuppressWarnings("unchecked")
+ Map<String, Object> nestedDisabled = (Map<String, Object>) disabled;
+ String nodeName = (String) nestedDisabled.get("id");
+ @SuppressWarnings("unchecked")
+ List<Object> subDisabledJobs =
+ (List<Object>) nestedDisabled.get("children");
+
+ if (nodeName == null || subDisabledJobs == null) {
+ return;
+ }
+
+ ExecutableNode node = exflow.getExecutableNode(nodeName);
+ if (node != null && node instanceof ExecutableFlowBase) {
+ applyDisabledJobs(subDisabledJobs, (ExecutableFlowBase) node);
+ }
+ }
+ }
+ }
+}
diff --git a/azkaban-web-server/src/main/java/azkaban/webapp/servlet/ExecutorServlet.java b/azkaban-web-server/src/main/java/azkaban/webapp/servlet/ExecutorServlet.java
index e36a441..5476470 100644
--- a/azkaban-web-server/src/main/java/azkaban/webapp/servlet/ExecutorServlet.java
+++ b/azkaban-web-server/src/main/java/azkaban/webapp/servlet/ExecutorServlet.java
@@ -16,6 +16,8 @@
package azkaban.webapp.servlet;
+import azkaban.executor.ExecutorManager;
+import azkaban.utils.FlowUtils;
import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
@@ -141,6 +143,10 @@ public class ExecutorServlet extends LoginAbstractAzkabanServlet {
ajaxFetchExecutableFlowInfo(req, resp, ret, session.getUser(), exFlow);
}
}
+ } else if (ajaxName.equals("fetchscheduledflowgraph")) {
+ String projectName = getParam(req, "project");
+ String flowName = getParam(req, "flow");
+ ajaxFetchScheduledFlowGraph(projectName, flowName, ret, session.getUser());
} else if (ajaxName.equals("reloadExecutors")) {
ajaxReloadExecutors(req, resp, ret, session.getUser());
} else if (ajaxName.equals("enableQueueProcessor")) {
@@ -203,6 +209,37 @@ public class ExecutorServlet extends LoginAbstractAzkabanServlet {
}
}
+ private void ajaxFetchScheduledFlowGraph(String projectName, String flowName,
+ HashMap<String, Object> ret, User user) throws ServletException {
+ Project project =
+ getProjectAjaxByPermission(ret, projectName, user, Type.EXECUTE);
+ if (project == null) {
+ ret.put("error", "Project '" + projectName + "' doesn't exist.");
+ return;
+ }
+ try {
+ Schedule schedule = scheduleManager.getSchedule(project.getId(), flowName);
+ ExecutionOptions executionOptions = schedule != null ? schedule.getExecutionOptions() : new ExecutionOptions();
+ Flow flow = project.getFlow(flowName);
+ if (flow == null) {
+ ret.put("error", "Flow '" + flowName + "' cannot be found in project " + project);
+ return;
+ }
+ ExecutableFlow exFlow = new ExecutableFlow(project, flow);
+ exFlow.setExecutionOptions(executionOptions);
+ ret.put("submitTime", exFlow.getSubmitTime());
+ ret.put("submitUser", exFlow.getSubmitUser());
+ ret.put("execid", exFlow.getExecutionId());
+ ret.put("projectId", exFlow.getProjectId());
+ ret.put("project", project.getName());
+ FlowUtils.applyDisabledJobs(executionOptions.getDisabledJobs(), exFlow);
+ Map<String, Object> flowObj = getExecutableNodeInfo(exFlow);
+ ret.putAll(flowObj);
+ } catch(ScheduleManagerException ex) {
+ throw new ServletException(ex);
+ }
+ }
+
/* Reloads executors from DB and azkaban.properties via executorManager */
private void ajaxReloadExecutors(HttpServletRequest req,
HttpServletResponse resp, HashMap<String, Object> returnMap, User user) {
diff --git a/azkaban-web-server/src/web/js/azkaban/view/flow-execute-dialog.js b/azkaban-web-server/src/web/js/azkaban/view/flow-execute-dialog.js
index 9b8a3b6..9fc85eb 100644
--- a/azkaban-web-server/src/web/js/azkaban/view/flow-execute-dialog.js
+++ b/azkaban-web-server/src/web/js/azkaban/view/flow-execute-dialog.js
@@ -249,13 +249,13 @@ azkaban.FlowExecuteDialogView = Backbone.View.extend({
loadGraph: function(projectName, flowId, exgraph, callback) {
console.log("Loading flow " + flowId);
- var requestURL = contextURL + "/manager";
+ var requestURL = contextURL + "/executor";
var graphModel = executableGraphModel;
// fetchFlow(this.model, projectName, flowId, true);
var requestData = {
"project": projectName,
- "ajax": "fetchflowgraph",
+ "ajax": "fetchscheduledflowgraph",
"flow": flowId
};
var self = this;