azkaban-aplcache
Changes
conf/azkaban.properties 8(+4 -4)
src/java/azkaban/utils/Mailman.java 46(+23 -23)
src/web/js/azkaban.exflow.options.view.js 63(+53 -10)
Details
conf/azkaban.properties 8(+4 -4)
diff --git a/conf/azkaban.properties b/conf/azkaban.properties
index e696a83..48984f1 100644
--- a/conf/azkaban.properties
+++ b/conf/azkaban.properties
@@ -43,7 +43,7 @@ executor.shared.token=abcdefg
executor.flow.threads=30
# mail settings
-mail.sender=azkaban-noreply@linkedin.com
-mail.host=mail.corp.linkedin.com
-job.failure.email=cyu@linkedin.com
-job.success.email=cyu@linkedin.com
+mail.sender=
+mail.host=
+job.failure.email=
+job.success.email=
diff --git a/src/java/azkaban/executor/ExecutableFlow.java b/src/java/azkaban/executor/ExecutableFlow.java
index 53dcf34..102213b 100644
--- a/src/java/azkaban/executor/ExecutableFlow.java
+++ b/src/java/azkaban/executor/ExecutableFlow.java
@@ -25,6 +25,9 @@ public class ExecutableFlow {
private ArrayList<String> startNodes;
private ArrayList<String> endNodes;
+ private ArrayList<String> failureEmails;
+ private ArrayList<String> successEmails;
+
private long submitTime = -1;
private long startTime = -1;
private long endTime = -1;
@@ -33,9 +36,22 @@ public class ExecutableFlow {
private Status flowStatus = Status.UNKNOWN;
private String submitUser;
private boolean submitted = false;
+ private boolean notifyOnFirstFailure = true;
+ private boolean notifyOnLastFailure = false;
+
+ private Integer pipelineLevel = null;
+ private Map<String, String> flowParameters = new HashMap<String, String>();
+
+ public enum FailureAction {
+ FINISH_CURRENTLY_RUNNING,
+ CANCEL_ALL,
+ FINISH_ALL_POSSIBLE
+ }
+
+ private FailureAction failureAction = FailureAction.FINISH_CURRENTLY_RUNNING;
public enum Status {
- FAILED, FAILED_FINISHING, SUCCEEDED, RUNNING, WAITING, KILLED, DISABLED, READY, UNKNOWN, PAUSED
+ FAILED, FAILED_FINISHING, SUCCEEDED, RUNNING, WAITING, KILLED, DISABLED, READY, UNKNOWN, PAUSED, SKIPPED
}
public ExecutableFlow(String id, Flow flow) {
@@ -77,6 +93,14 @@ public class ExecutableFlow {
updateNumber = number;
}
+ public void addFlowParameters(Map<String, String> param) {
+ flowParameters.putAll(param);
+ }
+
+ public Map<String, String> getFlowParameters() {
+ return flowParameters;
+ }
+
private void setFlow(Flow flow) {
for (Node node: flow.getNodes()) {
String id = node.getId();
@@ -92,6 +116,8 @@ public class ExecutableFlow {
targetNode.addInNode(edge.getSourceId());
}
+ successEmails = new ArrayList<String>(flow.getSuccessEmails());
+ failureEmails = new ArrayList<String>(flow.getFailureEmails());
flowProps.putAll(flow.getAllFlowProps());
}
@@ -190,6 +216,14 @@ public class ExecutableFlow {
this.flowStatus = flowStatus;
}
+ public void setFailureEmails(List<String> emails) {
+ this.failureEmails = emails == null ? new ArrayList<String>() : new ArrayList<String>(emails);
+ }
+
+ public void setSuccessEmails(List<String> emails) {
+ this.successEmails = emails == null ? new ArrayList<String>() : new ArrayList<String>(emails);
+ }
+
public Map<String,Object> toObject() {
HashMap<String, Object> flowObj = new HashMap<String, Object>();
flowObj.put("type", "executableflow");
@@ -202,6 +236,13 @@ public class ExecutableFlow {
flowObj.put("endTime", endTime);
flowObj.put("status", flowStatus.toString());
flowObj.put("submitUser", submitUser);
+ flowObj.put("flowParameters", this.flowParameters);
+ flowObj.put("notifyOnFirstFailure", this.notifyOnFirstFailure);
+ flowObj.put("notifyOnLastFailure", this.notifyOnLastFailure);
+ flowObj.put("successEmails", successEmails);
+ flowObj.put("failureEmails", failureEmails);
+ flowObj.put("failureAction", failureAction.toString());
+ flowObj.put("pipelineLevel", pipelineLevel);
ArrayList<Object> props = new ArrayList<Object>();
for (FlowProps fprop: flowProps.values()) {
@@ -226,6 +267,10 @@ public class ExecutableFlow {
return flowObj;
}
+ public void setFailureAction(FailureAction action) {
+ failureAction = action;
+ }
+
@SuppressWarnings("unchecked")
public static ExecutableFlow createExecutableFlowFromObject(Object obj) {
ExecutableFlow exFlow = new ExecutableFlow();
@@ -240,7 +285,23 @@ public class ExecutableFlow {
exFlow.endTime = getLongFromObject(flowObj.get("endTime"));
exFlow.flowStatus = Status.valueOf((String)flowObj.get("status"));
exFlow.submitUser = (String)flowObj.get("submitUser");
-
+ exFlow.flowParameters = new HashMap<String, String>((Map<String,String>)flowObj.get("flowParameters"));
+
+ // Failure notification
+ if (flowObj.containsKey("notifyOnFirstFailure")) {
+ exFlow.notifyOnFirstFailure = (Boolean)flowObj.get("notifyOnFirstFailure");
+ }
+ if (flowObj.containsKey("notifyOnLastFailure")) {
+ exFlow.notifyOnLastFailure = (Boolean)flowObj.get("notifyOnLastFailure");
+ }
+
+ // Failure action
+ if (flowObj.containsKey("failureAction")) {
+ exFlow.failureAction = FailureAction.valueOf((String)flowObj.get("failureAction"));
+ }
+ exFlow.pipelineLevel = (Integer)flowObj.get("pipelineLevel");
+
+ // Copy nodes
List<Object> nodes = (List<Object>)flowObj.get("nodes");
for (Object nodeObj: nodes) {
ExecutableNode node = ExecutableNode.createNodeFromObject(nodeObj, exFlow);
@@ -257,6 +318,11 @@ public class ExecutableFlow {
exFlow.flowProps.put(source, flowProps);
}
+ // Success emails
+ exFlow.setSuccessEmails((List<String>)flowObj.get("successEmails"));
+ // Failure emails
+ exFlow.setFailureEmails((List<String>)flowObj.get("failureEmails"));
+
return exFlow;
}
@@ -318,6 +384,18 @@ public class ExecutableFlow {
this.submitted = submitted;
}
+ public void setPipelineLevel(int level) {
+ pipelineLevel = level;
+ }
+
+ public void setNotifyOnFirstFailure(boolean notify) {
+ this.notifyOnFirstFailure = notify;
+ }
+
+ public void setNotifyOnLastFailure(boolean notify) {
+ this.notifyOnLastFailure = notify;
+ }
+
public static class ExecutableNode {
private String id;
diff --git a/src/java/azkaban/executor/FlowRunner.java b/src/java/azkaban/executor/FlowRunner.java
index e0865a3..10a9e94 100644
--- a/src/java/azkaban/executor/FlowRunner.java
+++ b/src/java/azkaban/executor/FlowRunner.java
@@ -43,8 +43,7 @@ public class FlowRunner extends EventHandler implements Runnable {
private ExecutableFlow flow;
private ExecutorService executorService;
private BlockingQueue<JobRunner> jobsToRun = new LinkedBlockingQueue<JobRunner>();
- private List<JobRunner> pausedJobsToRun = Collections
- .synchronizedList(new ArrayList<JobRunner>());
+ private List<JobRunner> pausedJobsToRun = Collections.synchronizedList(new ArrayList<JobRunner>());
private int numThreads = NUM_CONCURRENT_THREADS;
private boolean cancelled = false;
private boolean paused = false;
diff --git a/src/java/azkaban/scheduler/LocalFileScheduleLoader.java b/src/java/azkaban/scheduler/LocalFileScheduleLoader.java
index 733ee77..5bc9a1a 100644
--- a/src/java/azkaban/scheduler/LocalFileScheduleLoader.java
+++ b/src/java/azkaban/scheduler/LocalFileScheduleLoader.java
@@ -72,8 +72,6 @@ public class LocalFileScheduleLoader implements ScheduleLoader {
private File backupScheduleFile;
public LocalFileScheduleLoader(Props props) throws IOException {
-
-
basePath = new File(props.getString("schedule.directory"));
if (!basePath.exists()) {
logger.info("Schedule directory " + basePath + " not found.");
src/java/azkaban/utils/Mailman.java 46(+23 -23)
diff --git a/src/java/azkaban/utils/Mailman.java b/src/java/azkaban/utils/Mailman.java
index 2553a21..1770009 100644
--- a/src/java/azkaban/utils/Mailman.java
+++ b/src/java/azkaban/utils/Mailman.java
@@ -63,29 +63,29 @@ public class Mailman {
Session session = Session.getInstance(props, null);
session.setDebug(true);
- //email message
- Message msg = new MimeMessage(session);
- msg.setFrom(new InternetAddress(fromAddress));
- for(String str : toAddress) {
- msg.addRecipients(Message.RecipientType.TO, InternetAddress.parse(str));
- }
-
- msg.setSubject(subject);
- msg.setText(body);
-
- //transport
- SMTPTransport t = (SMTPTransport)session.getTransport(protocol);
-
- try {
- t.connect(_mailHost, _mailUser, _mailPassword);
- t.sendMessage(msg, msg.getAllRecipients());
- }
- catch (Exception e) {
- logger.error(e);
- }
- finally {
- t.close();
- }
+// //email message
+// Message msg = new MimeMessage(session);
+// msg.setFrom(new InternetAddress(fromAddress));
+// for(String str : toAddress) {
+// msg.addRecipients(Message.RecipientType.TO, InternetAddress.parse(str));
+// }
+//
+// msg.setSubject(subject);
+// msg.setText(body);
+//
+// //transport
+// SMTPTransport t = (SMTPTransport)session.getTransport(protocol);
+//
+// try {
+// t.connect(_mailHost, _mailUser, _mailPassword);
+// t.sendMessage(msg, msg.getAllRecipients());
+// }
+// catch (Exception e) {
+// logger.error(e);
+// }
+// finally {
+// t.close();
+// }
}
public void sendEmailIfPossible(String fromAddress, List<String> toAddress,
diff --git a/src/java/azkaban/webapp/servlet/ExecutorServlet.java b/src/java/azkaban/webapp/servlet/ExecutorServlet.java
index 29745ef..68de060 100644
--- a/src/java/azkaban/webapp/servlet/ExecutorServlet.java
+++ b/src/java/azkaban/webapp/servlet/ExecutorServlet.java
@@ -4,6 +4,7 @@ import java.io.File;
import java.io.IOException;
import java.io.PrintWriter;
import java.util.ArrayList;
+import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
@@ -15,6 +16,7 @@ import javax.servlet.http.HttpServletResponse;
import azkaban.executor.ExecutableFlow;
import azkaban.executor.ExecutableFlow.ExecutableNode;
+import azkaban.executor.ExecutableFlow.FailureAction;
import azkaban.executor.ExecutorManager;
import azkaban.executor.ExecutableFlow.Status;
import azkaban.executor.ExecutorManagerException;
@@ -35,7 +37,6 @@ public class ExecutorServlet extends LoginAbstractAzkabanServlet {
private ProjectManager projectManager;
private ExecutorManager executorManager;
private ScheduleManager scheduleManager;
- private static final int STRING_BUFFER_SIZE = 1024*5;
@Override
public void init(ServletConfig config) throws ServletException {
@@ -500,7 +501,45 @@ public class ExecutorServlet extends LoginAbstractAzkabanServlet {
// Create ExecutableFlow
ExecutableFlow exflow = executorManager.createExecutableFlow(flow);
exflow.setSubmitUser(user.getUserId());
- Map<String, String> paramGroup = this.getParamGroup(req, "disabled");
+ if (hasParam(req, "failureAction")) {
+ String option = getParam(req, "failureAction");
+ if (option.equals("finishCurrent") ) {
+ exflow.setFailureAction(FailureAction.FINISH_CURRENTLY_RUNNING);
+ }
+ else if (option.equals("cancelImmediately")) {
+ exflow.setFailureAction(FailureAction.CANCEL_ALL);
+ }
+ else if (option.equals("finishPossible")) {
+ exflow.setFailureAction(FailureAction.FINISH_ALL_POSSIBLE);
+ }
+ }
+ if (hasParam(req, "failureEmails")) {
+ String emails = getParam(req, "failureEmails");
+ String[] emailSplit = emails.split("\\s*,\\s*|\\s*;\\s*|\\s+");
+ exflow.setFailureEmails(Arrays.asList(emailSplit));
+ }
+ if (hasParam(req, "successEmails")) {
+ String emails = getParam(req, "successEmails");
+ String[] emailSplit = emails.split("\\s*,\\s*|\\s*;\\s*|\\s+");
+ exflow.setSuccessEmails(Arrays.asList(emailSplit));
+ }
+ if (hasParam(req, "notifyFailureFirst")) {
+ exflow.setNotifyOnFirstFailure(Boolean.parseBoolean(getParam(req, "notifyFailureFirst")));
+ }
+ if (hasParam(req, "notifyFailureLast")) {
+ exflow.setNotifyOnLastFailure(Boolean.parseBoolean(getParam(req, "notifyFailureLast")));
+ }
+ if (hasParam(req, "executingJobOption")) {
+ String option = getParam(req, "jobOption");
+ // Not set yet
+ }
+ if (hasParam(req, "flowOverride")) {
+ Map<String, String> paramGroup = this.getParamGroup(req, "flowOverride");
+ exflow.addFlowParameters(paramGroup);
+ }
+
+ // Setup disabled
+ Map<String, String> paramGroup = this.getParamGroup(req, "disable");
for (Map.Entry<String, String> entry: paramGroup.entrySet()) {
boolean nodeDisabled = Boolean.parseBoolean(entry.getValue());
exflow.setStatus(entry.getKey(), nodeDisabled ? Status.DISABLED : Status.READY);
diff --git a/src/java/azkaban/webapp/servlet/velocity/flowpage.vm b/src/java/azkaban/webapp/servlet/velocity/flowpage.vm
index 68c06e0..ac2879d 100644
--- a/src/java/azkaban/webapp/servlet/velocity/flowpage.vm
+++ b/src/java/azkaban/webapp/servlet/velocity/flowpage.vm
@@ -206,7 +206,7 @@
<dl>
<dt>Failure Action</dt>
<dd>
- <select name="failureAction">
+ <select id="failureAction" name="failureAction">
<option value="finishCurrent">Finish Current Running</option>
<option value="cancelImmediately">Cancel All</option>
<option value="finishPossible">Finish All Possible</option>
@@ -218,14 +218,14 @@
</dd>
<dt>Notify on Failure</dt>
<dd>
- <input class="checkbox" type="checkbox" name="notify" value="first" checked >First Failure</input>
- <input class="checkbox" type="checkbox" name="notify" value="last">Flow Stop</input>
+ <input id="notifyFailureFirst" class="checkbox" type="checkbox" name="notify" value="first" checked >First Failure</input>
+ <input id="notifyFailureLast" class="checkbox" type="checkbox" name="notify" value="last">Flow Stop</input>
</dd>
<dt>Success Email</dt>
<dd>
<textarea id="successEmails"></textarea>
</dd>
- <dt>Executing Job</dt>
+ <dt>Concurrent Execution</dt>
<dd id="executingJob">
<input id="ignore" class="radio" type="radio" name="concurrent" value="ignore" checked /><label class="radioLabel" for="ignore">Run Concurrently</label>
<input id="pipeline" class="radio" type="radio" name="concurrent" value="pipeline" /><label class="radioLabel" for="pipeline">Pipeline</label>
@@ -279,10 +279,10 @@
</ul>
<ul id="disableJobMenu" class="contextMenu flowSubmenu">
- <li class="open"><a href="#open">Open...</a></li>
<li class="openwindow"><a href="#openwindow">Open in New Window...</a></li>
<li id="disable" class="disable separator"><a href="#disable">Disable</a><div id="disableArrow" class="context-sub-icon"></div></li>
<ul id="disableSub" class="subMenu">
+ <li class="disableAll"><a href="#disableAll">All</a></li>
<li class="parents"><a href="#disableParents">Parents</a></li>
<li class="ancestors"><a href="#disableAncestors">All Ancestors</a></li>
<li class="children"><a href="#disableChildren">Children</a></li>
@@ -290,6 +290,7 @@
</ul>
<li id="enable" class="enable"><a href="#enable">Enable</a> <div id="enableArrow" class="context-sub-icon"></div></li>
<ul id="enableSub" class="subMenu">
+ <li class="enableAll"><a href="#enableAll">All</a></li>
<li class="parents"><a href="#enableParents">Parents</a></li>
<li class="ancestors"><a href="#enableAncestors">All Ancestors</a></li>
<li class="children"><a href="#enableChildren">Children</a></li>
src/web/js/azkaban.exflow.options.view.js 63(+53 -10)
diff --git a/src/web/js/azkaban.exflow.options.view.js b/src/web/js/azkaban.exflow.options.view.js
index a9a5034..6120c2c 100644
--- a/src/web/js/azkaban.exflow.options.view.js
+++ b/src/web/js/azkaban.exflow.options.view.js
@@ -52,7 +52,6 @@ azkaban.ExecuteFlowView = Backbone.View.extend({
events : {
"click" : "closeEditingTarget",
"click #execute-btn": "handleExecuteFlow",
- "click #execute-custom-btn": "handleCustomFlow",
"click #cancel-btn": "handleCancelExecution",
"click .modal-close": "handleCancelExecution",
"click #generalOptions": "handleGeneralOptionsSelect",
@@ -124,9 +123,44 @@ azkaban.ExecuteFlowView = Backbone.View.extend({
},
handleExecuteFlow: function(evt) {
var executeURL = contextURL + "/executor";
+ var disabled = this.cloneModel.get("disabled");
+ var failureAction = $('#failureAction').val();
+ var failureEmails = $('#failureEmails').val();
+ var successEmails = $('#successEmails').val();
+ var notifyFailureFirst = $('#notifyFailureFirst').is(':checked');
+ var notifyFailureLast = $('#notifyFailureLast').is(':checked');
+ var executingJobOption = $('input:radio[name=gender]:checked').val();
+
+ var flowOverride = {};
+ var editRows = $(".editRow");
+ for (var i = 0; i < editRows.length; ++i) {
+ var row = editRows[i];
+ var td = $(row).find('td');
+ var key = $(td[0]).text();
+ var val = $(td[1]).text();
+
+ if (key && key.length > 0) {
+ flowOverride[key] = val;
+ }
+ }
+
+ var executingData = {
+ project: projectName,
+ ajax: "executeFlow",
+ flow: flowName,
+ disable: this.cloneModel.get('disabled'),
+ failureAction: failureAction,
+ failureEmails: failureEmails,
+ successEmails: successEmails,
+ notifyFailureFirst: notifyFailureFirst,
+ notifyFailureLast: notifyFailureLast,
+ executingJobOption: executingJobOption,
+ flowOverride: flowOverride
+ };
+
$.get(
executeURL,
- {"project": projectName, "ajax":"executeFlow", "flow":flowName, "disabled":this.cloneModel.get("disabled")},
+ executingData,
function(data) {
if (data.error) {
alert(data.error);
@@ -139,9 +173,6 @@ azkaban.ExecuteFlowView = Backbone.View.extend({
"json"
);
},
- handleCustomFlow: function(evt) {
-
- },
handleAddRow: function(evt) {
var tr = document.createElement("tr");
var tdName = document.createElement("td");
@@ -162,6 +193,7 @@ azkaban.ExecuteFlowView = Backbone.View.extend({
$(tdValue).append(valueData);
$(tdValue).addClass("editable");
+ $(tr).addClass("editRow");
$(tr).append(tdName);
$(tr).append(tdValue);
@@ -192,8 +224,6 @@ azkaban.ExecuteFlowView = Backbone.View.extend({
var row = curTarget.parentElement.parentElement;
$(row).remove();
},
- handleResetData : function(evt) {
- },
closeEditingTarget: function(evt) {
if (this.editingTarget != null && this.editingTarget != evt.target && this.editingTarget != evt.target.parentElement ) {
var input = $(this.editingTarget).children("input")[0];
@@ -215,9 +245,6 @@ azkaban.ExecuteFlowView = Backbone.View.extend({
this.editingTarget = null;
}
},
- addRowData : function(evt) {
-
- },
handleDisableMenuClick : function(action, el, pos) {
var jobid = el[0].jobid;
var requestURL = contextURL + "/manager?project=" + projectName + "&flow=" + flowName + "&job=" + jobid;
@@ -234,6 +261,17 @@ azkaban.ExecuteFlowView = Backbone.View.extend({
cloneModel.set({disabled: disabled});
cloneModel.trigger("change:disabled");
}
+ else if(action == "disableAll") {
+ var disabled = cloneModel.get("disabled");
+
+ var nodes = cloneModel.get("nodes");
+ for (var key in nodes) {
+ disabled[key] = true;
+ }
+
+ cloneModel.set({disabled: disabled});
+ cloneModel.trigger("change:disabled");
+ }
else if (action == "disableParents") {
var disabled = cloneModel.get("disabled");
var nodes = cloneModel.get("nodes");
@@ -287,6 +325,11 @@ azkaban.ExecuteFlowView = Backbone.View.extend({
cloneModel.set({disabled: disabled});
cloneModel.trigger("change:disabled");
}
+ else if(action == "enableAll") {
+ disabled = {};
+ cloneModel.set({disabled: disabled});
+ cloneModel.trigger("change:disabled");
+ }
else if (action == "enableParents") {
var disabled = cloneModel.get("disabled");
var nodes = cloneModel.get("nodes");