azkaban-uncached
Changes
src/web/js/azkaban.ajax.utils.js 59(+59 -0)
src/web/js/azkaban.exflow.options.view.js 15(+1 -14)
src/web/js/azkaban.project.view.js 55(+45 -10)
Details
diff --git a/src/java/azkaban/executor/ExecutionReference.java b/src/java/azkaban/executor/ExecutionReference.java
index 5b5eeb1..89c1d38 100644
--- a/src/java/azkaban/executor/ExecutionReference.java
+++ b/src/java/azkaban/executor/ExecutionReference.java
@@ -1,5 +1,7 @@
package azkaban.executor;
+import java.util.HashMap;
+
public class ExecutionReference {
private final int execId;
private final String host;
@@ -49,4 +51,4 @@ public class ExecutionReference {
public void setNumErrors(int numErrors) {
this.numErrors = numErrors;
}
-}
+ }
\ No newline at end of file
diff --git a/src/java/azkaban/executor/ExecutorManager.java b/src/java/azkaban/executor/ExecutorManager.java
index d9b9685..fe807d3 100644
--- a/src/java/azkaban/executor/ExecutorManager.java
+++ b/src/java/azkaban/executor/ExecutorManager.java
@@ -71,6 +71,17 @@ public class ExecutorManager {
runningFlows.putAll(executorLoader.fetchActiveFlows());
}
+ public List<Integer> getRunningFlows(int projectId, String flowId) {
+ ArrayList<Integer> executionIds = new ArrayList<Integer>();
+ for (Pair<ExecutionReference, ExecutableFlow> ref : runningFlows.values()) {
+ if (ref.getSecond().getFlowId().equals(flowId)) {
+ executionIds.add(ref.getFirst().getExecId());
+ }
+ }
+
+ return executionIds;
+ }
+
public boolean isFlowRunning(int projectId, String flowId) {
for (Pair<ExecutionReference, ExecutableFlow> ref : runningFlows.values()) {
if (ref.getSecond().getFlowId().equals(flowId)) {
diff --git a/src/java/azkaban/webapp/servlet/ExecutorServlet.java b/src/java/azkaban/webapp/servlet/ExecutorServlet.java
index a8ddd59..581f5c1 100644
--- a/src/java/azkaban/webapp/servlet/ExecutorServlet.java
+++ b/src/java/azkaban/webapp/servlet/ExecutorServlet.java
@@ -22,6 +22,7 @@ import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import java.util.Set;
import javax.servlet.ServletConfig;
import javax.servlet.ServletException;
@@ -31,6 +32,7 @@ import javax.servlet.http.HttpServletResponse;
import azkaban.executor.ExecutableFlow;
import azkaban.executor.ExecutableNode;
import azkaban.executor.ExecutableFlow.FailureAction;
+import azkaban.executor.ExecutionReference;
import azkaban.executor.ExecutorManager;
import azkaban.executor.ExecutableFlow.Status;
import azkaban.executor.ExecutorManagerException;
@@ -270,10 +272,10 @@ public class ExecutorServlet extends LoginAbstractAzkabanServlet {
}
}
}
- else if (ajaxName.equals("isRunning")) {
+ else if (ajaxName.equals("getRunning")) {
String projectName = getParam(req, "project");
String flowName = getParam(req, "flow");
- ajaxIsFlowRunning(req, resp, ret, session.getUser(), projectName, flowName);
+ ajaxGetFlowRunning(req, resp, ret, session.getUser(), projectName, flowName);
}
else if (ajaxName.equals("flowInfo")) {
String projectName = getParam(req, "project");
@@ -454,13 +456,16 @@ public class ExecutorServlet extends LoginAbstractAzkabanServlet {
}
}
- private void ajaxIsFlowRunning(HttpServletRequest req, HttpServletResponse resp, HashMap<String, Object> ret, User user, String projectId, String flowId) throws ServletException{
+ private void ajaxGetFlowRunning(HttpServletRequest req, HttpServletResponse resp, HashMap<String, Object> ret, User user, String projectId, String flowId) throws ServletException{
Project project = getProjectAjaxByPermission(ret, projectId, user, Type.EXECUTE);
if (project == null) {
return;
}
- ret.put("running", executorManager.isFlowRunning(project.getId(), flowId));
+ List<Integer> refs = executorManager.getRunningFlows(project.getId(), flowId);
+ if (!refs.isEmpty()) {
+ ret.put("execIds", refs);
+ }
}
private void ajaxRestartFlow(HttpServletRequest req, HttpServletResponse resp, HashMap<String, Object> ret, User user, ExecutableFlow exFlow) throws ServletException{
@@ -624,12 +629,36 @@ public class ExecutorServlet extends LoginAbstractAzkabanServlet {
Map<String, String> flowParamGroup = this.getParamGroup(req, "flowOverride");
exflow.addFlowParameters(flowParamGroup);
- // Setup disabled
- Map<String, String> paramGroup = this.getParamGroup(req, "disable");
- for (Map.Entry<String, String> entry: paramGroup.entrySet()) {
- boolean nodeDisabled = Boolean.parseBoolean(entry.getValue());
-
- exflow.setNodeStatus(entry.getKey(), nodeDisabled ? Status.DISABLED : Status.READY);
+ if (hasParam(req, "job")) {
+ // Disable everything.
+ for(ExecutableNode node : exflow.getExecutableNodes()) {
+ node.setStatus(Status.DISABLED);
+ }
+
+ String jobId = getParam(req, "job");
+ ExecutableNode job = exflow.getExecutableNode(jobId);
+ if (job == null) {
+ ret.put("error", "Job " + jobId + " doesn't exist in flow.");
+ return;
+ }
+
+ job.setStatus(Status.READY);
+
+ if (hasParam(req, "withDep")) {
+ boolean withDep = "true".equals(getParam(req, "withDep"));
+ if (withDep) {
+ enableAllAncestors(job, exflow);
+ }
+ }
+ }
+ else {
+ // Setup disabled
+ Map<String, String> paramGroup = this.getParamGroup(req, "disable");
+ for (Map.Entry<String, String> entry: paramGroup.entrySet()) {
+ boolean nodeDisabled = Boolean.parseBoolean(entry.getValue());
+
+ exflow.setNodeStatus(entry.getKey(), nodeDisabled ? Status.DISABLED : Status.READY);
+ }
}
try {
@@ -643,6 +672,19 @@ public class ExecutorServlet extends LoginAbstractAzkabanServlet {
ret.put("execid", exflow.getExecutionId());
}
+ private void enableAllAncestors(ExecutableNode node, ExecutableFlow flow) {
+ Set<String> inNodes = node.getInNodes();
+ if (inNodes != null) {
+ for (String inNode: inNodes) {
+ ExecutableNode job = flow.getExecutableNode(inNode);
+ if (job != null) {
+ job.setStatus(Status.READY);
+ enableAllAncestors(job, flow);
+ }
+ }
+ }
+ }
+
public class ExecutorVMHelper {
public String getProjectName(int id) {
Project project = projectManager.getProject(id);
diff --git a/src/java/azkaban/webapp/servlet/velocity/projectpage.vm b/src/java/azkaban/webapp/servlet/velocity/projectpage.vm
index 78c6763..f592314 100644
--- a/src/java/azkaban/webapp/servlet/velocity/projectpage.vm
+++ b/src/java/azkaban/webapp/servlet/velocity/projectpage.vm
@@ -24,6 +24,7 @@
<script type="text/javascript" src="${context}/js/underscore-1.2.1-min.js"></script>
<script type="text/javascript" src="${context}/js/backbone-0.5.3-min.js"></script>
<script type="text/javascript" src="${context}/js/jquery.simplemodal.js"></script>
+ <script type="text/javascript" src="${context}/js/azkaban.ajax.utils.js"></script>
<script type="text/javascript" src="${context}/js/azkaban.nav.js"></script>
<script type="text/javascript" src="${context}/js/azkaban.project.view.js"></script>
<script type="text/javascript">
@@ -131,7 +132,7 @@
<fieldset>
<dl>
<dt>Job Archive</dt>
- <dd><input id="file" name="file" class="file" type="file" onChange="changeFile()" /></dd>
+ <dd><input id="file" name="file" class="file" type="file" /></dd>
<input type="hidden" name="project" value="$project.name" />
<input type="hidden" name="action" value="upload" />
</dl>
@@ -168,11 +169,11 @@
</div>
<div id="flow-execute" class="modal">
<h3>Execute Flow</h3>
+ <div id="executeErrorMsg" class="box-error-message"></div>
<div id="execute-message" class="message">
</div>
<div class="actions">
<a class="yes btn1" id="execute-btn">Execute</a>
- <a class="yes btn2" id="customize-btn">Customize Execution</a>
<a class="no simplemodal-close btn3">Cancel</a>
</div>
</div>
src/web/js/azkaban.ajax.utils.js 59(+59 -0)
diff --git a/src/web/js/azkaban.ajax.utils.js b/src/web/js/azkaban.ajax.utils.js
index c087086..6290978 100644
--- a/src/web/js/azkaban.ajax.utils.js
+++ b/src/web/js/azkaban.ajax.utils.js
@@ -28,3 +28,62 @@ function ajaxCall(requestURL, data, callback) {
"json"
);
}
+
+function executeFlow(executingData) {
+ executeURL = contextURL + "/executor";
+ $.get(
+ executeURL,
+ executingData,
+ function(data) {
+ if (data.error) {
+ alert(data.error);
+ }
+ else {
+ var redirectURL = contextURL + "/executor?execid=" + data.execid;
+ window.location.href = redirectURL;
+ }
+ },
+ "json"
+ );
+}
+
+/**
+* Checks to see if a flow is running.
+*
+*/
+function flowExecutingStatus(projectId, flowId) {
+ var requestURL = contextURL + "/executor";
+
+ var executionIds;
+ $.ajax( {
+ url: requestURL,
+ async: false,
+ data: {"ajax":"getRunning", "project":projectId, "flow":flowId},
+ error: function(data) {},
+ success: function(data) {
+ if (data.error == "session") {
+ // We need to relogin.
+ var errorDialog = document.getElementById("invalid-session");
+ if (errorDialog) {
+ $(errorDialog).modal({
+ closeHTML: "<a href='#' title='Close' class='modal-close'>x</a>",
+ position: ["20%",],
+ containerId: 'confirm-container',
+ containerCss: {
+ 'height': '220px',
+ 'width': '565px'
+ },
+ onClose: function (dialog) {
+ window.location.reload();
+ }
+ });
+ }
+ }
+ else {
+ executionIds = data.execIds;
+ }
+ }
+ });
+
+ return executionIds;
+}
src/web/js/azkaban.exflow.options.view.js 15(+1 -14)
diff --git a/src/web/js/azkaban.exflow.options.view.js b/src/web/js/azkaban.exflow.options.view.js
index e6fdefb..4388010 100644
--- a/src/web/js/azkaban.exflow.options.view.js
+++ b/src/web/js/azkaban.exflow.options.view.js
@@ -262,20 +262,7 @@ azkaban.ExecuteFlowView = Backbone.View.extend({
flowOverride: flowOverride
};
- $.get(
- executeURL,
- executingData,
- function(data) {
- if (data.error) {
- alert(data.error);
- }
- else {
- var redirectURL = contextURL + "/executor?execid=" + data.execid;
- window.location.href = redirectURL;
- }
- },
- "json"
- );
+ executeFlow(executingData);
},
handleAddRow: function(evt) {
var tr = document.createElement("tr");
src/web/js/azkaban.project.view.js 55(+45 -10)
diff --git a/src/web/js/azkaban.project.view.js b/src/web/js/azkaban.project.view.js
index a233824..8c849c1 100644
--- a/src/web/js/azkaban.project.view.js
+++ b/src/web/js/azkaban.project.view.js
@@ -259,24 +259,57 @@ azkaban.FlowTableView= Backbone.View.extend({
var jobId = evt.currentTarget.jobName;
var flowId = evt.currentTarget.flowId;
- $("#execute-message").text("Run only job '" + jobId + "' in flow '" + flowId + "'.");
- this.executeFlowDialog();
+ $("#execute-message").text("Execute only job '" + jobId + "' in flow '" + flowId + "'.");
+
+ var executingData = {
+ project: projectId,
+ ajax: "executeFlow",
+ flow: flowId,
+ job: jobId
+ };
+
+ this.executeFlowDialog(executingData);
},
runWithDep: function(evt) {
- var jobId = evt.currentTarget.jobName;
+ var jobId = evt.currentTarget.jobName;
var flowId = evt.currentTarget.flowId;
console.log("Run With Dep");
- var jobId = evt.currentTarget.jobId;
- $("#execute-message").text("Run job '" + jobId + "' and all of its ancestors in '" + flowId + "'.");
- this.executeFlowDialog();
+ $("#execute-message").text("Execute job '" + jobId + "' and all of its ancestors in '" + flowId + "'.");
+
+ var executingData = {
+ project: projectId,
+ ajax: "executeFlow",
+ flow: flowId,
+ job: jobId,
+ withDep: true
+ };
+
+ this.executeFlowDialog(executingData);
},
executeFlow: function(evt) {
console.log("Execute Flow");
var flowId = evt.currentTarget.flowId;
- $("#execute-message").text("Executing the complete flow '" + flowId + "'.");
- this.executeFlowDialog();
+ $("#execute-message").text("Execute the complete flow '" + flowId + "'.");
+
+ var executingData = {
+ project: projectId,
+ ajax: "executeFlow",
+ flow: flowId
+ };
+
+ this.executeFlowDialog(executingData);
},
- executeFlowDialog: function(message) {
+ executeFlowDialog: function(executingData) {
+ var flowId = executingData.flow;
+ var executionIds = flowExecutingStatus(projectId, flowId);
+
+ if (executionIds && executionIds.length > 0) {
+ $("#executeErrorMsg").text("Flow '" + flowId + "' is already running. Click on Execute to proceed anyways.");
+ }
+ else {
+ $("#executeErrorMsg").hide();
+ }
+
$('#flow-execute').modal({
closeHTML: "<a href='#' title='Close' class='modal-close'>x</a>",
position: ["20%",],
@@ -287,7 +320,9 @@ azkaban.FlowTableView= Backbone.View.extend({
},
onShow: function (dialog) {
var modal = this;
- $("#errorMsg").hide();
+ $('#execute-btn').click(function() {
+ executeFlow(executingData);
+ });
}
});
},