azkaban-memoizeit
Details
diff --git a/src/java/azkaban/executor/ExecutionOptions.java b/src/java/azkaban/executor/ExecutionOptions.java
index 495ce3c..3e9460f 100644
--- a/src/java/azkaban/executor/ExecutionOptions.java
+++ b/src/java/azkaban/executor/ExecutionOptions.java
@@ -18,6 +18,8 @@ public class ExecutionOptions {
private boolean notifyOnFirstFailure = true;
private boolean notifyOnLastFailure = false;
+ private boolean failureEmailsOverride = false;
+ private boolean successEmailsOverride = false;
private ArrayList<String> failureEmails = new ArrayList<String>();
private ArrayList<String> successEmails = new ArrayList<String>();
@@ -49,6 +51,22 @@ public class ExecutionOptions {
failureEmails.addAll(emails);
}
+ public boolean isFailureEmailsOverridden() {
+ return this.failureEmailsOverride;
+ }
+
+ public boolean isSuccessEmailsOverridden() {
+ return this.successEmailsOverride;
+ }
+
+ public void setSuccessEmailsOverridden(boolean override) {
+ this.successEmailsOverride = override;
+ }
+
+ public void setFailureEmailsOverridden(boolean override) {
+ this.failureEmailsOverride = override;
+ }
+
public List<String> getFailureEmails() {
return failureEmails;
}
@@ -135,7 +153,8 @@ public class ExecutionOptions {
flowOptionObj.put("queueLevel", queueLevel);
flowOptionObj.put("concurrentOption", concurrentOption);
flowOptionObj.put("disabled", initiallyDisabledJobs);
-
+ flowOptionObj.put("failureEmailsOverride", failureEmailsOverride);
+ flowOptionObj.put("successEmailsOverride", successEmailsOverride);
return flowOptionObj;
}
@@ -145,7 +164,7 @@ public class ExecutionOptions {
return null;
}
- Map<String,Object> optionsMap = new HashMap<String,Object>();
+ Map<String,Object> optionsMap = (Map<String,Object>)obj;
ExecutionOptions options = new ExecutionOptions();
if (optionsMap.containsKey("flowParameters")) {
@@ -182,6 +201,14 @@ public class ExecutionOptions {
options.setFailureEmails((List<String>)optionsMap.get("failureEmails"));
}
+ if (optionsMap.containsKey("successEmailsOverride")) {
+ options.setSuccessEmailsOverridden((Boolean)optionsMap.get("successEmailsOverride"));
+ }
+
+ if (optionsMap.containsKey("failureEmailsOverride")) {
+ options.setFailureEmailsOverridden((Boolean)optionsMap.get("failureEmailsOverride"));
+ }
+
return options;
}
}
diff --git a/src/java/azkaban/executor/ExecutorManager.java b/src/java/azkaban/executor/ExecutorManager.java
index ff94590..b58c167 100644
--- a/src/java/azkaban/executor/ExecutorManager.java
+++ b/src/java/azkaban/executor/ExecutorManager.java
@@ -392,7 +392,7 @@ public class ExecutorManager {
message = "Flow " + flowId + " is already running with exec id " + runningExecId +". Pipelining level " + options.getPipelineLevel() + ". ";
}
else if (options.getConcurrentOption().equals(ExecutionOptions.CONCURRENT_OPTION_SKIP)) {
- throw new ExecutorManagerException("Flow " + flowId + " is already running. Skipping execution.");
+ throw new ExecutorManagerException("Flow " + flowId + " is already running. Skipping execution.", ExecutorManagerException.Reason.SkippedExecution);
}
else {
// The settings is to run anyways.
diff --git a/src/java/azkaban/executor/ExecutorManagerException.java b/src/java/azkaban/executor/ExecutorManagerException.java
index 772fc4b..a5c264e 100644
--- a/src/java/azkaban/executor/ExecutorManagerException.java
+++ b/src/java/azkaban/executor/ExecutorManagerException.java
@@ -17,8 +17,13 @@
package azkaban.executor;
public class ExecutorManagerException extends Exception {
+ public enum Reason {
+ SkippedExecution
+ }
+
private static final long serialVersionUID = 1L;
private ExecutableFlow flow = null;
+ private Reason reason = null;
public ExecutorManagerException(Exception e) {
super(e);
@@ -33,6 +38,11 @@ public class ExecutorManagerException extends Exception {
this.flow = flow;
}
+ public ExecutorManagerException(String message, Reason reason) {
+ super(message);
+ this.reason = reason;
+ }
+
public ExecutorManagerException(String message, Throwable cause) {
super(message, cause);
}
@@ -40,4 +50,8 @@ public class ExecutorManagerException extends Exception {
public ExecutableFlow getExecutableFlow() {
return flow;
}
+
+ public Reason getReason() {
+ return reason;
+ }
}
diff --git a/src/java/azkaban/scheduler/ScheduleManager.java b/src/java/azkaban/scheduler/ScheduleManager.java
index 2733b3e..fab237a 100644
--- a/src/java/azkaban/scheduler/ScheduleManager.java
+++ b/src/java/azkaban/scheduler/ScheduleManager.java
@@ -35,6 +35,7 @@ import org.joda.time.format.DateTimeFormatter;
import azkaban.executor.ExecutableFlow;
import azkaban.executor.ExecutionOptions;
import azkaban.executor.ExecutorManager;
+import azkaban.executor.ExecutorManagerException;
import azkaban.flow.Flow;
import azkaban.project.Project;
@@ -379,10 +380,21 @@ public class ScheduleManager {
}
exflow.setExecutionOptions(flowOptions);
+ if (!flowOptions.isFailureEmailsOverridden()) {
+ flowOptions.setFailureEmails(flow.getFailureEmails());
+ }
+ if (!flowOptions.isSuccessEmailsOverridden()) {
+ flowOptions.setSuccessEmails(flow.getSuccessEmails());
+ }
+
try {
executorManager.submitExecutableFlow(exflow);
logger.info("Scheduler has invoked " + exflow.getExecutionId());
- } catch (Exception e) {
+ }
+ catch (ExecutorManagerException e) {
+ throw e;
+ }
+ catch (Exception e) {
e.printStackTrace();
throw new ScheduleManagerException("Scheduler invoked flow " + exflow.getExecutionId() + " has failed.", e);
}
@@ -406,7 +418,16 @@ public class ScheduleManager {
}
}
- } catch (Exception e) {
+ }
+ catch (ExecutorManagerException e) {
+ if (e.getReason() != null && e.getReason() == ExecutorManagerException.Reason.SkippedExecution) {
+ logger.info(e.getMessage());
+ }
+ else {
+ e.printStackTrace();
+ }
+ }
+ catch (Exception e) {
logger.info("Scheduler failed to run job. " + e.getMessage() + e.getCause());
}
diff --git a/src/java/azkaban/webapp/servlet/ExecutorServlet.java b/src/java/azkaban/webapp/servlet/ExecutorServlet.java
index 80a3865..a9d5d17 100644
--- a/src/java/azkaban/webapp/servlet/ExecutorServlet.java
+++ b/src/java/azkaban/webapp/servlet/ExecutorServlet.java
@@ -552,6 +552,9 @@ public class ExecutorServlet extends LoginAbstractAzkabanServlet {
ret.put("notifyFailureFirst", options.getNotifyOnFirstFailure());
ret.put("notifyFailureLast", options.getNotifyOnLastFailure());
+ ret.put("failureEmailsOverride", options.isFailureEmailsOverridden());
+ ret.put("successEmailsOverride", options.isSuccessEmailsOverridden());
+
ret.put("concurrentOptions", options.getConcurrentOption());
ret.put("pipelineLevel", options.getPipelineLevel());
ret.put("pipelineExecution", options.getPipelineExecutionId());
@@ -764,6 +767,12 @@ public class ExecutorServlet extends LoginAbstractAzkabanServlet {
ExecutionOptions options = HttpRequestUtils.parseFlowOptions(req);
exflow.setExecutionOptions(options);
+ if (!options.isFailureEmailsOverridden()) {
+ options.setFailureEmails(flow.getFailureEmails());
+ }
+ if (!options.isSuccessEmailsOverridden()) {
+ options.setSuccessEmails(flow.getSuccessEmails());
+ }
try {
String message = executorManager.submitExecutableFlow(exflow);
diff --git a/src/java/azkaban/webapp/servlet/HttpRequestUtils.java b/src/java/azkaban/webapp/servlet/HttpRequestUtils.java
index fd29b4c..4e71930 100644
--- a/src/java/azkaban/webapp/servlet/HttpRequestUtils.java
+++ b/src/java/azkaban/webapp/servlet/HttpRequestUtils.java
@@ -28,6 +28,15 @@ public class HttpRequestUtils {
}
}
+ if (hasParam(req, "failureEmailsOverride")) {
+ boolean override = getBooleanParam(req, "failureEmailsOverride", false);
+ execOptions.setFailureEmailsOverridden(override);
+ }
+ if (hasParam(req, "successEmailsOverride")) {
+ boolean override = getBooleanParam(req, "successEmailsOverride", false);
+ execOptions.setSuccessEmailsOverridden(override);
+ }
+
if (hasParam(req, "failureEmails")) {
String emails = getParam(req, "failureEmails");
if (!emails.isEmpty()) {
@@ -151,6 +160,23 @@ public class HttpRequestUtils {
return defaultVal;
}
+ public static boolean getBooleanParam(HttpServletRequest request, String name) throws ServletException {
+ String p = getParam(request, name);
+ return Boolean.parseBoolean(p);
+ }
+
+ public static boolean getBooleanParam(HttpServletRequest request, String name, boolean defaultVal) {
+ if (hasParam(request, name)) {
+ try {
+ return getBooleanParam(request, name);
+ } catch (Exception e) {
+ return defaultVal;
+ }
+ }
+
+ return defaultVal;
+ }
+
public static long getLongParam(HttpServletRequest request, String name) throws ServletException {
String p = getParam(request, name);
return Long.valueOf(p);
diff --git a/src/java/azkaban/webapp/servlet/velocity/flowexecutionpanel.vm b/src/java/azkaban/webapp/servlet/velocity/flowexecutionpanel.vm
index 806465f..6d4ddcb 100644
--- a/src/java/azkaban/webapp/servlet/velocity/flowexecutionpanel.vm
+++ b/src/java/azkaban/webapp/servlet/velocity/flowexecutionpanel.vm
@@ -48,13 +48,22 @@
<input id="notifyFailureFirst" class="checkbox" type="checkbox" name="notify" value="first" checked /> <label for="notify">First Failure</label>
<input id="notifyFailureLast" class="checkbox" type="checkbox" name="notify" value="last"></input> <label for="notify">Flow Finished</label>
+
<h4>Failure Emails</h4>
+ <div>
+ <input id="overrideFailureEmails" type="checkbox" name="overrideFailureEmails" value="overrideFailureEmails" />
+ <label for="overrideFailureEmails">Override flow email settings</label>
+ </div>
<p>Notify these addresses on failure. Comma, space or semi-colon delimited list.</p>
<textarea id="failureEmails"></textarea>
</div>
<div>
<h4>Success Emails</h4>
+ <div>
+ <input id="overrideSuccessEmails" type="checkbox" name="overrideSuccessEmails" value="overrideSuccessEmails" />
+ <label for="overrideSuccessEmails">Override flow email settings</label>
+ </div>
<p>Notify when the flow finishes successfully. Comma, space or semi-colon delimited list.</p>
<textarea id="successEmails"></textarea>
</div>
src/web/css/azkaban.css 2(+1 -1)
diff --git a/src/web/css/azkaban.css b/src/web/css/azkaban.css
index 8fca496..b187638 100644
--- a/src/web/css/azkaban.css
+++ b/src/web/css/azkaban.css
@@ -1237,7 +1237,7 @@ tr:hover td {
/* Graph SVG */
#graphView {
position: absolute;
- top: 210px;
+ top: 202px;
bottom: 5px;
left: 50px;
right: 50px;
src/web/js/azkaban.flow.execute.view.js 39(+38 -1)
diff --git a/src/web/js/azkaban.flow.execute.view.js b/src/web/js/azkaban.flow.execute.view.js
index 0258cbe..97609ae 100644
--- a/src/web/js/azkaban.flow.execute.view.js
+++ b/src/web/js/azkaban.flow.execute.view.js
@@ -31,6 +31,24 @@ azkaban.FlowExecuteDialogView= Backbone.View.extend({
},
initialize : function(settings) {
this.model.bind('change:flowinfo', this.changeFlowInfo, this);
+
+ $("#overrideSuccessEmails").click(function(evt) {
+ if($(this).is(':checked')){
+ $('#successEmails').attr('disabled',null);
+ }
+ else {
+ $('#successEmails').attr('disabled',"disabled");
+ }
+ });
+
+ $("#overrideFailureEmails").click(function(evt) {
+ if($(this).is(':checked')){
+ $('#failureEmails').attr('disabled',null);
+ }
+ else {
+ $('#failureEmails').attr('disabled',"disabled");
+ }
+ });
},
render: function() {
},
@@ -40,7 +58,9 @@ azkaban.FlowExecuteDialogView= Backbone.View.extend({
var successEmails = $('#successEmails').val();
var notifyFailureFirst = $('#notifyFailureFirst').is(':checked');
var notifyFailureLast = $('#notifyFailureLast').is(':checked');
-
+ var failureEmailsOverride = $("#overrideFailureEmails").is(':checked');
+ var successEmailsOverride = $("#overrideSuccessEmails").is(':checked');
+
var flowOverride = {};
var editRows = $(".editRow");
for (var i = 0; i < editRows.length; ++i) {
@@ -68,6 +88,8 @@ azkaban.FlowExecuteDialogView= Backbone.View.extend({
ajax: "executeFlow",
flow: this.flowId,
disabled: disabled,
+ failureEmailOverride:failureEmailsOverride,
+ successEmailOverride:successEmailsOverride,
failureAction: failureAction,
failureEmails: failureEmails,
successEmails: successEmails,
@@ -102,6 +124,21 @@ azkaban.FlowExecuteDialogView= Backbone.View.extend({
var pipelineExecutionId = this.model.get("pipelineExecution");
var queueLevel = this.model.get("queueLevel");
var nodeStatus = this.model.get("nodeStatus");
+ var overrideSuccessEmails = this.model.get("overrideSuccessEmails");
+ var overrideFailureEmails = this.model.get("overrideFailureEmails");
+
+ if (overrideSuccessEmails) {
+ $('#overrideSuccessEmails').attr('checked', true);
+ }
+ else {
+ $('#successEmails').attr('disabled','disabled');
+ }
+ if (overrideFailureEmails) {
+ $('#overrideFailureEmails').attr('checked', true);
+ }
+ else {
+ $('#failureEmails').attr('disabled','disabled');
+ }
if (successEmails) {
$('#successEmails').val(successEmails.join());