azkaban-uncached
Changes
src/java/azkaban/execapp/FlowRunner.java 17(+11 -6)
src/java/azkaban/executor/ExecutableFlow.java 159(+22 -137)
src/java/azkaban/executor/ExecutionOptions.java 180(+180 -0)
src/java/azkaban/scheduler/Schedule.java 211(+29 -182)
src/java/azkaban/scheduler/ScheduleManager.java 89(+29 -60)
src/java/azkaban/sla/SlaMailer.java 146(+0 -146)
src/java/azkaban/sla/SlaOptions.java 51(+51 -0)
src/java/azkaban/webapp/servlet/ExecutorServlet.java 128(+20 -108)
src/java/azkaban/webapp/servlet/ScheduleServlet.java 174(+15 -159)
src/web/js/azkaban.flow.execute.view.js 109(+57 -52)
src/web/js/azkaban.schedule.panel.view.js 55(+50 -5)
Details
src/java/azkaban/execapp/FlowRunner.java 17(+11 -6)
diff --git a/src/java/azkaban/execapp/FlowRunner.java b/src/java/azkaban/execapp/FlowRunner.java
index 2f58a1a..0fbe48d 100644
--- a/src/java/azkaban/execapp/FlowRunner.java
+++ b/src/java/azkaban/execapp/FlowRunner.java
@@ -28,8 +28,9 @@ import azkaban.execapp.event.EventHandler;
import azkaban.execapp.event.EventListener;
import azkaban.execapp.event.FlowWatcher;
import azkaban.executor.ExecutableFlow;
-import azkaban.executor.ExecutableFlow.FailureAction;
import azkaban.executor.ExecutableNode;
+import azkaban.executor.ExecutionOptions;
+import azkaban.executor.ExecutionOptions.FailureAction;
import azkaban.executor.ExecutorLoader;
import azkaban.executor.ExecutorManagerException;
import azkaban.executor.Status;
@@ -105,8 +106,9 @@ public class FlowRunner extends EventHandler implements Runnable {
this.execDir = new File(flow.getExecutionPath());
this.jobtypeManager = jobtypeManager;
- this.pipelineLevel = flow.getPipelineLevel();
- this.pipelineExecId = flow.getPipelineExecutionId();
+ ExecutionOptions options = flow.getExecutionOptions();
+ this.pipelineLevel = options.getPipelineLevel();
+ this.pipelineExecId = options.getPipelineExecutionId();
this.proxyUsers = flow.getProxyUsers();
}
@@ -344,8 +346,9 @@ public class FlowRunner extends EventHandler implements Runnable {
Props parentProps = propsSource == null ? globalProps : sharedProps.get(propsSource);
// Set up overrides
+ ExecutionOptions options = flow.getExecutionOptions();
@SuppressWarnings("unchecked")
- Props flowProps = new Props(null, flow.getFlowParameters());
+ Props flowProps = new Props(null, options.getFlowParameters());
if (flowProps.size() > 0) {
flowProps.setParent(parentProps);
@@ -699,7 +702,8 @@ public class FlowRunner extends EventHandler implements Runnable {
}
}
- if (shouldKill || flowCancelled || (flowFailed && flow.getFailureAction() != FailureAction.FINISH_ALL_POSSIBLE)) {
+ ExecutionOptions options = flow.getExecutionOptions();
+ if (shouldKill || flowCancelled || (flowFailed && options.getFailureAction() != FailureAction.FINISH_ALL_POSSIBLE)) {
return Status.KILLED;
}
@@ -793,7 +797,8 @@ public class FlowRunner extends EventHandler implements Runnable {
flowFailed = true;
if (!isFailedStatus(flow.getStatus())) {
flow.setStatus(Status.FAILED_FINISHING);
- if (flow.getFailureAction() == FailureAction.CANCEL_ALL) {
+ ExecutionOptions options = flow.getExecutionOptions();
+ if (options.getFailureAction() == FailureAction.CANCEL_ALL) {
cancel("azkaban");
}
}
diff --git a/src/java/azkaban/execapp/FlowRunnerManager.java b/src/java/azkaban/execapp/FlowRunnerManager.java
index 9e9d8c7..cc35939 100644
--- a/src/java/azkaban/execapp/FlowRunnerManager.java
+++ b/src/java/azkaban/execapp/FlowRunnerManager.java
@@ -42,6 +42,7 @@ import azkaban.execapp.event.FlowWatcher;
import azkaban.execapp.event.LocalFlowWatcher;
import azkaban.execapp.event.RemoteFlowWatcher;
import azkaban.executor.ExecutableFlow;
+import azkaban.executor.ExecutionOptions;
import azkaban.executor.ExecutorLoader;
import azkaban.executor.ExecutorManagerException;
import azkaban.jobtype.JobTypeManager;
@@ -368,8 +369,9 @@ public class FlowRunnerManager implements EventListener {
// Setup flow runner
FlowWatcher watcher = null;
- if (flow.getPipelineExecutionId() != null) {
- Integer pipelineExecId = flow.getPipelineExecutionId();
+ ExecutionOptions options = flow.getExecutionOptions();
+ if (options.getPipelineExecutionId() != null) {
+ Integer pipelineExecId = options.getPipelineExecutionId();
FlowRunner runner = runningFlows.get(pipelineExecId);
if (runner != null) {
src/java/azkaban/executor/ExecutableFlow.java 159(+22 -137)
diff --git a/src/java/azkaban/executor/ExecutableFlow.java b/src/java/azkaban/executor/ExecutableFlow.java
index b70baa8..ef34069 100644
--- a/src/java/azkaban/executor/ExecutableFlow.java
+++ b/src/java/azkaban/executor/ExecutableFlow.java
@@ -43,9 +43,6 @@ public class ExecutableFlow {
private HashMap<String, ExecutableNode> executableNodes = new HashMap<String, ExecutableNode>();
private ArrayList<String> startNodes;
private ArrayList<String> endNodes;
-
- private ArrayList<String> failureEmails = new ArrayList<String>();
- private ArrayList<String> successEmails = new ArrayList<String>();
private long submitTime = -1;
private long startTime = -1;
@@ -54,30 +51,14 @@ public class ExecutableFlow {
private Status flowStatus = Status.READY;
private String submitUser;
- private boolean notifyOnFirstFailure = true;
- private boolean notifyOnLastFailure = false;
-
- private Integer pipelineLevel = null;
- private Integer pipelineExecId = null;
- private Integer queueLevel = null;
- private String concurrentOption = null;
- private Map<String, String> flowParameters = new HashMap<String, String>();
private HashSet<String> proxyUsers = new HashSet<String>();
-
- public enum FailureAction {
- FINISH_CURRENTLY_RUNNING,
- CANCEL_ALL,
- FINISH_ALL_POSSIBLE
- }
-
- private FailureAction failureAction = FailureAction.FINISH_CURRENTLY_RUNNING;
+ private ExecutionOptions executionOptions;
public ExecutableFlow(Flow flow) {
this.projectId = flow.getProjectId();
this.flowId = flow.getId();
this.version = flow.getVersion();
-
this.setFlow(flow);
}
@@ -113,14 +94,6 @@ public class ExecutableFlow {
return flowProps.values();
}
- public void addFlowParameters(Map<String, String> param) {
- flowParameters.putAll(param);
- }
-
- public Map<String, String> getFlowParameters() {
- return flowParameters;
- }
-
public void setProxyUsers(HashSet<String> proxyUsers) {
this.proxyUsers = proxyUsers;
}
@@ -129,7 +102,17 @@ public class ExecutableFlow {
return this.proxyUsers;
}
+ public void setExecutionOptions(ExecutionOptions options) {
+ executionOptions = options;
+ }
+
+ public ExecutionOptions getExecutionOptions() {
+ return executionOptions;
+ }
+
private void setFlow(Flow flow) {
+ executionOptions = new ExecutionOptions();
+
for (Node node: flow.getNodes()) {
String id = node.getId();
ExecutableNode exNode = new ExecutableNode(node, this);
@@ -145,10 +128,10 @@ public class ExecutableFlow {
}
if (flow.getSuccessEmails() != null) {
- successEmails = new ArrayList<String>(flow.getSuccessEmails());
+ executionOptions.setSuccessEmails(flow.getSuccessEmails());
}
if (flow.getFailureEmails() != null) {
- failureEmails = new ArrayList<String>(flow.getFailureEmails());
+ executionOptions.setFailureEmails(flow.getFailureEmails());
}
flowProps.putAll(flow.getAllFlowProps());
}
@@ -265,22 +248,6 @@ public class ExecutableFlow {
this.flowStatus = flowStatus;
}
- public void setFailureEmails(List<String> emails) {
- this.failureEmails = emails == null ? new ArrayList<String>() : new ArrayList<String>(emails);
- }
-
- public List<String> getFailureEmails() {
- return this.failureEmails;
- }
-
- public void setSuccessEmails(List<String> emails) {
- this.successEmails = emails == null ? new ArrayList<String>() : new ArrayList<String>(emails);
- }
-
- public List<String> getSuccessEmails() {
- return this.successEmails;
- }
-
public Map<String,Object> toObject() {
HashMap<String, Object> flowObj = new HashMap<String, Object>();
flowObj.put("type", "executableflow");
@@ -293,17 +260,10 @@ public class ExecutableFlow {
flowObj.put("endTime", endTime);
flowObj.put("status", flowStatus.toString());
flowObj.put("submitUser", submitUser);
- flowObj.put("flowParameters", this.flowParameters);
- flowObj.put("notifyOnFirstFailure", this.notifyOnFirstFailure);
- flowObj.put("notifyOnLastFailure", this.notifyOnLastFailure);
- flowObj.put("successEmails", successEmails);
- flowObj.put("failureEmails", failureEmails);
- flowObj.put("failureAction", failureAction.toString());
- flowObj.put("pipelineLevel", pipelineLevel);
- flowObj.put("pipelineExecId", pipelineExecId);
- flowObj.put("queueLevel", queueLevel);
flowObj.put("version", version);
- flowObj.put("concurrentOption", concurrentOption);
+
+ flowObj.put("executionOptions", this.executionOptions.toObject());
+ flowObj.put("version", version);
ArrayList<Object> props = new ArrayList<Object>();
for (FlowProps fprop: flowProps.values()) {
@@ -332,14 +292,6 @@ public class ExecutableFlow {
return flowObj;
}
- public void setFailureAction(FailureAction action) {
- failureAction = action;
- }
-
- public FailureAction getFailureAction() {
- return failureAction;
- }
-
public Object toUpdateObject(long lastUpdateTime) {
Map<String, Object> updateData = new HashMap<String,Object>();
updateData.put("execId", this.executionId);
@@ -424,28 +376,14 @@ public class ExecutableFlow {
exFlow.flowStatus = Status.valueOf((String)flowObj.get("status"));
exFlow.submitUser = (String)flowObj.get("submitUser");
exFlow.version = (Integer)flowObj.get("version");
-
- if (flowObj.containsKey("flowParameters")) {
- exFlow.flowParameters = new HashMap<String, String>((Map<String,String>)flowObj.get("flowParameters"));
- }
- // Failure notification
- if (flowObj.containsKey("notifyOnFirstFailure")) {
- exFlow.notifyOnFirstFailure = (Boolean)flowObj.get("notifyOnFirstFailure");
- }
- if (flowObj.containsKey("notifyOnLastFailure")) {
- exFlow.notifyOnLastFailure = (Boolean)flowObj.get("notifyOnLastFailure");
- }
- if (flowObj.containsKey("concurrentOption")) {
- exFlow.concurrentOption = (String)flowObj.get("concurrentOption");
- }
- // Failure action
- if (flowObj.containsKey("failureAction")) {
- exFlow.failureAction = FailureAction.valueOf((String)flowObj.get("failureAction"));
+ if (flowObj.containsKey("executionOptions")) {
+ exFlow.executionOptions = ExecutionOptions.createFromObject(flowObj.get("executionOptions"));
+ }
+ else {
+ // for backawards compatibility should remove in a few versions.
+ exFlow.executionOptions = ExecutionOptions.createFromObject(flowObj);
}
- exFlow.pipelineLevel = (Integer)flowObj.get("pipelineLevel");
- exFlow.pipelineExecId = (Integer)flowObj.get("pipelineExecId");
- exFlow.queueLevel = (Integer)flowObj.get("queueLevel");
// Copy nodes
List<Object> nodes = (List<Object>)flowObj.get("nodes");
@@ -464,11 +402,6 @@ public class ExecutableFlow {
exFlow.flowProps.put(source, flowProps);
}
- // Success emails
- exFlow.setSuccessEmails((List<String>)flowObj.get("successEmails"));
- // Failure emails
- exFlow.setFailureEmails((List<String>)flowObj.get("failureEmails"));
-
if(flowObj.containsKey("proxyUsers")) {
ArrayList<String> proxyUserList = (ArrayList<String>) flowObj.get("proxyUsers");
exFlow.setProxyUsers(new HashSet<String>(proxyUserList));
@@ -518,30 +451,6 @@ public class ExecutableFlow {
public void setSubmitUser(String submitUser) {
this.submitUser = submitUser;
}
-
- public void setPipelineLevel(Integer level) {
- pipelineLevel = level;
- }
-
- public void setPipelineExecutionId(Integer execId) {
- pipelineExecId = execId;
- }
-
- public void setNotifyOnFirstFailure(boolean notify) {
- this.notifyOnFirstFailure = notify;
- }
-
- public void setNotifyOnLastFailure(boolean notify) {
- this.notifyOnLastFailure = notify;
- }
-
- public boolean getNotifyOnFirstFailure() {
- return this.notifyOnFirstFailure;
- }
-
- public boolean getNotifyOnLastFailure() {
- return this.notifyOnLastFailure;
- }
public int getVersion() {
return version;
@@ -550,28 +459,4 @@ public class ExecutableFlow {
public void setVersion(int version) {
this.version = version;
}
-
- public Integer getPipelineLevel() {
- return pipelineLevel;
- }
-
- public Integer getPipelineExecutionId() {
- return pipelineExecId;
- }
-
- public Integer getQueueLevel() {
- return queueLevel;
- }
-
- public void setQueueLevel(int queue) {
- queueLevel = queue;
- }
-
- public String getConcurrentOption() {
- return this.concurrentOption;
- }
-
- public void setConcurrentOption(String concurrentOption) {
- this.concurrentOption = concurrentOption;
- }
}
src/java/azkaban/executor/ExecutionOptions.java 180(+180 -0)
diff --git a/src/java/azkaban/executor/ExecutionOptions.java b/src/java/azkaban/executor/ExecutionOptions.java
new file mode 100644
index 0000000..25b9575
--- /dev/null
+++ b/src/java/azkaban/executor/ExecutionOptions.java
@@ -0,0 +1,180 @@
+package azkaban.executor;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+/**
+ * Execution options for submitted flows and scheduled flows
+ */
+public class ExecutionOptions {
+
+ private boolean notifyOnFirstFailure = true;
+ private boolean notifyOnLastFailure = false;
+ private ArrayList<String> failureEmails = new ArrayList<String>();
+ private ArrayList<String> successEmails = new ArrayList<String>();
+
+ private Integer pipelineLevel = null;
+ private Integer pipelineExecId = null;
+ private Integer queueLevel = null;
+ private String concurrentOption = null;
+ private Map<String, String> flowParameters = new HashMap<String, String>();
+
+ public enum FailureAction {
+ FINISH_CURRENTLY_RUNNING,
+ CANCEL_ALL,
+ FINISH_ALL_POSSIBLE
+ }
+
+ private FailureAction failureAction = FailureAction.FINISH_CURRENTLY_RUNNING;
+
+ private Set<String> initiallyDisabledJobs = new HashSet<String>();
+
+ public void setFlowParameters(Map<String,String> flowParam) {
+ flowParameters.get(flowParam);
+ }
+
+ public Map<String,String> getFlowParameters() {
+ return flowParameters;
+ }
+
+ public void setFailureEmails(Collection<String> emails) {
+ failureEmails.addAll(emails);
+ }
+
+ public List<String> getFailureEmails() {
+ return failureEmails;
+ }
+
+ public void setSuccessEmails(Collection<String> emails) {
+ successEmails.addAll(emails);
+ }
+
+ public List<String> getSuccessEmails() {
+ return successEmails;
+ }
+
+ public boolean getNotifyOnFirstFailure() {
+ return notifyOnFirstFailure;
+ }
+
+ public boolean getNotifyOnLastFailure() {
+ return notifyOnLastFailure;
+ }
+
+ public void setNotifyOnFirstFailure(boolean notify) {
+ this.notifyOnFirstFailure = notify;
+ }
+
+ public void setNotifyOnLastFailure(boolean notify) {
+ this.notifyOnLastFailure = notify;
+ }
+
+ public FailureAction getFailureAction() {
+ return failureAction;
+ }
+
+ public void setFailureAction(FailureAction action) {
+ failureAction = action;
+ }
+
+ public void setConcurrentOption(String concurrentOption) {
+ this.concurrentOption = concurrentOption;
+ }
+
+ public String getConcurrentOption() {
+ return concurrentOption;
+ }
+
+ public Integer getPipelineLevel() {
+ return pipelineLevel;
+ }
+
+ public Integer getPipelineExecutionId() {
+ return pipelineExecId;
+ }
+
+ public void setPipelineLevel(Integer level) {
+ pipelineLevel = level;
+ }
+
+ public void setPipelineExecutionId(Integer id) {
+ this.pipelineExecId = id;
+ }
+
+ public Integer getQueueLevel() {
+ return queueLevel;
+ }
+
+ public List<String> getDisabledJobs() {
+ return new ArrayList<String>(initiallyDisabledJobs);
+ }
+
+ public void setDisabledJobs(List<String> disabledJobs) {
+ initiallyDisabledJobs = new HashSet<String>(disabledJobs);
+ }
+
+ public Map<String,Object> toObject() {
+ HashMap<String,Object> flowOptionObj = new HashMap<String,Object>();
+
+ flowOptionObj.put("flowParameters", this.flowParameters);
+ flowOptionObj.put("notifyOnFirstFailure", this.notifyOnFirstFailure);
+ flowOptionObj.put("notifyOnLastFailure", this.notifyOnLastFailure);
+ flowOptionObj.put("successEmails", successEmails);
+ flowOptionObj.put("failureEmails", failureEmails);
+ flowOptionObj.put("failureAction", failureAction.toString());
+ flowOptionObj.put("pipelineLevel", pipelineLevel);
+ flowOptionObj.put("pipelineExecId", pipelineExecId);
+ flowOptionObj.put("queueLevel", queueLevel);
+ flowOptionObj.put("concurrentOption", concurrentOption);
+ flowOptionObj.put("disabled", initiallyDisabledJobs);
+
+ return flowOptionObj;
+ }
+
+ @SuppressWarnings("unchecked")
+ public static ExecutionOptions createFromObject(Object obj) {
+ if (obj == null || !(obj instanceof Map)) {
+ return null;
+ }
+
+ Map<String,Object> optionsMap = new HashMap<String,Object>();
+
+ ExecutionOptions options = new ExecutionOptions();
+ if (optionsMap.containsKey("flowParameters")) {
+ options.flowParameters = new HashMap<String, String>((Map<String,String>)optionsMap.get("flowParameters"));
+ }
+ // Failure notification
+ if (optionsMap.containsKey("notifyOnFirstFailure")) {
+ options.notifyOnFirstFailure = (Boolean)optionsMap.get("notifyOnFirstFailure");
+ }
+ if (optionsMap.containsKey("notifyOnLastFailure")) {
+ options.notifyOnLastFailure = (Boolean)optionsMap.get("notifyOnLastFailure");
+ }
+ if (optionsMap.containsKey("concurrentOption")) {
+ options.concurrentOption = (String)optionsMap.get("concurrentOption");
+ }
+ if (optionsMap.containsKey("disabled")) {
+ options.initiallyDisabledJobs = new HashSet<String>((List<String>)optionsMap.get("disabled"));
+ }
+
+ // Failure action
+ if (optionsMap.containsKey("failureAction")) {
+ options.failureAction = FailureAction.valueOf((String)optionsMap.get("failureAction"));
+ }
+ options.pipelineLevel = (Integer)optionsMap.get("pipelineLevel");
+ options.pipelineExecId = (Integer)optionsMap.get("pipelineExecId");
+ options.queueLevel = (Integer)optionsMap.get("queueLevel");
+
+ // Success emails
+ options.setSuccessEmails((List<String>)optionsMap.get("successEmails"));
+ // Failure emails
+ options.setFailureEmails((List<String>)optionsMap.get("failureEmails"));
+
+ return options;
+ }
+}
diff --git a/src/java/azkaban/executor/ExecutorMailer.java b/src/java/azkaban/executor/ExecutorMailer.java
index 59d7727..a49b4fb 100644
--- a/src/java/azkaban/executor/ExecutorMailer.java
+++ b/src/java/azkaban/executor/ExecutorMailer.java
@@ -7,7 +7,7 @@ import javax.mail.MessagingException;
import org.apache.log4j.Logger;
-import azkaban.executor.ExecutableFlow.FailureAction;
+import azkaban.executor.ExecutionOptions.FailureAction;
import azkaban.utils.EmailMessage;
import azkaban.utils.Props;
import azkaban.utils.Utils;
@@ -39,7 +39,8 @@ public class ExecutorMailer {
}
public void sendFirstErrorMessage(ExecutableFlow flow) {
- List<String> emailList = flow.getFailureEmails();
+ ExecutionOptions option = flow.getExecutionOptions();
+ List<String> emailList = option.getDisabledJobs();
int execId = flow.getExecutionId();
if (emailList != null && !emailList.isEmpty()) {
@@ -51,10 +52,10 @@ public class ExecutorMailer {
message.println("<h2 style=\"color:#FF0000\"> Execution '" + flow.getExecutionId() + "' of flow '" + flow.getFlowId() + "' has encountered a failure on " + azkabanName + "</h2>");
- if (flow.getFailureAction() == FailureAction.CANCEL_ALL) {
+ if (option.getFailureAction() == FailureAction.CANCEL_ALL) {
message.println("This flow is set to cancel all currently running jobs.");
}
- else if (flow.getFailureAction() == FailureAction.FINISH_ALL_POSSIBLE){
+ else if (option.getFailureAction() == FailureAction.FINISH_ALL_POSSIBLE){
message.println("This flow is set to complete all jobs that aren't blocked by the failure.");
}
else {
@@ -91,7 +92,9 @@ public class ExecutorMailer {
}
public void sendErrorEmail(ExecutableFlow flow, String ... extraReasons) {
- List<String> emailList = flow.getFailureEmails();
+ ExecutionOptions option = flow.getExecutionOptions();
+
+ List<String> emailList = option.getFailureEmails();
int execId = flow.getExecutionId();
if (emailList != null && !emailList.isEmpty()) {
@@ -101,7 +104,7 @@ public class ExecutorMailer {
message.setMimeType("text/html");
message.setSubject("Flow '" + flow.getFlowId() + "' has failed on " + azkabanName);
- message.println("<h2 style=\"color:#FF0000\"> Execution '" + flow.getExecutionId() + "' of flow '" + flow.getFlowId() + "' has failed on " + azkabanName + "</h2>");
+ message.println("<h2 style=\"color:#FF0000\"> Execution '" + execId + "' of flow '" + flow.getFlowId() + "' has failed on " + azkabanName + "</h2>");
message.println("<table>");
message.println("<tr><td>Start Time</td><td>" + flow.getStartTime() +"</td></tr>");
message.println("<tr><td>End Time</td><td>" + flow.getEndTime() +"</td></tr>");
@@ -137,7 +140,8 @@ public class ExecutorMailer {
}
public void sendSuccessEmail(ExecutableFlow flow) {
- List<String> emailList = flow.getSuccessEmails();
+ ExecutionOptions option = flow.getExecutionOptions();
+ List<String> emailList = option.getSuccessEmails();
int execId = flow.getExecutionId();
diff --git a/src/java/azkaban/executor/ExecutorManager.java b/src/java/azkaban/executor/ExecutorManager.java
index ff5e2ff..b18bc6c 100644
--- a/src/java/azkaban/executor/ExecutorManager.java
+++ b/src/java/azkaban/executor/ExecutorManager.java
@@ -299,16 +299,24 @@ public class ExecutorManager {
List<Integer> running = getRunningFlows(projectId, flowId);
+ ExecutionOptions options = exflow.getExecutionOptions();
+
+ // Disable jobs
+ for(String disabledId : options.getDisabledJobs()) {
+ ExecutableNode node = exflow.getExecutableNode(disabledId);
+ node.setStatus(Status.DISABLED);
+ }
+
String message = "";
if (!running.isEmpty()) {
- if (exflow.getConcurrentOption().equals("pipeline")) {
+ if (options.getConcurrentOption().equals("pipeline")) {
Collections.sort(running);
Integer runningExecId = running.get(running.size() - 1);
- exflow.setPipelineExecutionId(runningExecId);
- message = "Flow " + flowId + " is already running with exec id " + runningExecId +". Pipelining level " + exflow.getPipelineLevel() + ". ";
+ options.setPipelineExecutionId(runningExecId);
+ message = "Flow " + flowId + " is already running with exec id " + runningExecId +". Pipelining level " + options.getPipelineLevel() + ". ";
}
- else if (exflow.getConcurrentOption().equals("skip")) {
+ else if (options.getConcurrentOption().equals("skip")) {
throw new ExecutorManagerException("Flow " + flowId + " is already running. Skipping execution.");
}
else {
@@ -582,10 +590,11 @@ public class ExecutorManager {
// TODO append to the flow log that we forced killed this flow because the target no longer had
// the reference.
+ ExecutionOptions options = flow.getExecutionOptions();
// But we can definitely email them.
if(flow.getStatus() == Status.FAILED || flow.getStatus() == Status.KILLED)
{
- if(flow.getFailureEmails() != null && !flow.getFailureEmails().isEmpty())
+ if(options.getFailureEmails() != null && !options.getFailureEmails().isEmpty())
{
try {
mailer.sendErrorEmail(flow, "Executor no longer seems to be running this execution. Most likely due to executor bounce.");
@@ -596,7 +605,7 @@ public class ExecutorManager {
}
else
{
- if(flow.getSuccessEmails() != null && !flow.getSuccessEmails().isEmpty())
+ if(options.getSuccessEmails() != null && !options.getSuccessEmails().isEmpty())
{
try {
mailer.sendSuccessEmail(flow);
@@ -681,9 +690,10 @@ public class ExecutorManager {
flow.applyUpdateObject(updateData);
Status newStatus = flow.getStatus();
+ ExecutionOptions options = flow.getExecutionOptions();
if (oldStatus != newStatus && newStatus.equals(Status.FAILED_FINISHING)) {
// We want to see if we should give an email status on first failure.
- if (flow.getNotifyOnFirstFailure()) {
+ if (options.getNotifyOnFirstFailure()) {
mailer.sendFirstErrorMessage(flow);
}
}
diff --git a/src/java/azkaban/jobtype/JobTypeManager.java b/src/java/azkaban/jobtype/JobTypeManager.java
index 65c747e..6464ca7 100644
--- a/src/java/azkaban/jobtype/JobTypeManager.java
+++ b/src/java/azkaban/jobtype/JobTypeManager.java
@@ -25,7 +25,6 @@ import azkaban.jobExecutor.ScriptJob;
import azkaban.utils.Props;
import azkaban.utils.PropsUtils;
import azkaban.utils.Utils;
-import azkaban.jobExecutor.utils.InitErrorJob;
import azkaban.jobExecutor.utils.JobExecutionException;
import java.io.File;
import java.net.MalformedURLException;
diff --git a/src/java/azkaban/scheduler/JdbcScheduleLoader.java b/src/java/azkaban/scheduler/JdbcScheduleLoader.java
index ee9c812..96130dd 100644
--- a/src/java/azkaban/scheduler/JdbcScheduleLoader.java
+++ b/src/java/azkaban/scheduler/JdbcScheduleLoader.java
@@ -24,7 +24,6 @@ import java.sql.SQLException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
-import java.util.Map;
import javax.sql.DataSource;
@@ -35,14 +34,7 @@ import org.apache.log4j.Logger;
import org.joda.time.DateTimeZone;
import org.joda.time.ReadablePeriod;
-import org.joda.time.format.DateTimeFormat;
-import org.joda.time.format.DateTimeFormatter;
-import azkaban.scheduler.Schedule.FlowOptions;
-import azkaban.scheduler.Schedule.SlaOptions;
-import azkaban.sla.SLA;
-import azkaban.sla.SLAManagerException;
-import azkaban.sla.JdbcSLALoader.EncodingType;
import azkaban.utils.DataSourceUtils;
import azkaban.utils.GZIPUtils;
import azkaban.utils.JSONUtils;
@@ -328,11 +320,10 @@ public class JdbcScheduleLoader implements ScheduleLoader {
int encodingType = rs.getInt(12);
byte[] data = rs.getBytes(13);
- FlowOptions flowOptions = null;
- SlaOptions slaOptions = null;
+ Object optsObj = null;
if (data != null) {
EncodingType encType = EncodingType.fromInteger(encodingType);
- Object optsObj;
+
try {
// Convoluted way to inflate strings. Should find common package or helper function.
if (encType == EncodingType.GZIP) {
@@ -343,15 +334,16 @@ public class JdbcScheduleLoader implements ScheduleLoader {
else {
String jsonString = new String(data, "UTF-8");
optsObj = JSONUtils.parseJSONFromString(jsonString);
- }
- flowOptions = Schedule.createFlowOptionFromObject(optsObj);
- slaOptions = Schedule.createSlaOptionFromObject(optsObj);
+ }
} catch (IOException e) {
throw new SQLException("Error reconstructing schedule options " + projectName + "." + flowName);
}
}
- Schedule s = new Schedule(projectId, projectName, flowName, status, firstSchedTime, timezone, period, lastModifyTime, nextExecTime, submitTime, submitUser, flowOptions, slaOptions);
+ Schedule s = new Schedule(projectId, projectName, flowName, status, firstSchedTime, timezone, period, lastModifyTime, nextExecTime, submitTime, submitUser);
+ if (optsObj != null) {
+ s.createAndSetScheduleOptions(optsObj);
+ }
schedules.add(s);
} while (rs.next());
src/java/azkaban/scheduler/Schedule.java 211(+29 -182)
diff --git a/src/java/azkaban/scheduler/Schedule.java b/src/java/azkaban/scheduler/Schedule.java
index b3382d3..2fd6074 100644
--- a/src/java/azkaban/scheduler/Schedule.java
+++ b/src/java/azkaban/scheduler/Schedule.java
@@ -16,9 +16,7 @@
package azkaban.scheduler;
-import java.util.ArrayList;
import java.util.HashMap;
-import java.util.List;
import java.util.Map;
import org.joda.time.DateTime;
@@ -32,153 +30,12 @@ import org.joda.time.ReadablePeriod;
import org.joda.time.Seconds;
import org.joda.time.Weeks;
-import azkaban.executor.ExecutableFlow.FailureAction;
-import azkaban.scheduler.Schedule.FlowOptions;
-import azkaban.scheduler.Schedule.SlaOptions;
-import azkaban.sla.SLA.SlaSetting;
+import azkaban.executor.ExecutionOptions;
+import azkaban.sla.SlaOptions;
import azkaban.utils.Pair;
public class Schedule{
-
- public static class FlowOptions {
-
- public List<String> getFailureEmails() {
- return failureEmails;
- }
- public void setFailureEmails(List<String> failureEmails) {
- this.failureEmails = failureEmails;
- }
- public List<String> getSuccessEmails() {
- return successEmails;
- }
- public void setSuccessEmails(List<String> successEmails) {
- this.successEmails = successEmails;
- }
- public FailureAction getFailureAction() {
- return failureAction;
- }
- public void setFailureAction(FailureAction failureAction) {
- this.failureAction = failureAction;
- }
- public boolean isnotifyOnFirstFailure() {
- return notifyOnFirstFailure;
- }
- public void setNotifyOnFirstFailure(boolean notifyOnFirstFailure) {
- this.notifyOnFirstFailure = notifyOnFirstFailure;
- }
- public boolean isnotifyOnLastFailure() {
- return notifyOnLastFailure;
- }
- public void setNotifyOnLastFailure(boolean notifyOnLastFailure) {
- this.notifyOnLastFailure = notifyOnLastFailure;
- }
- public Map<String, String> getFlowOverride() {
- return flowOverride;
- }
- public void setFlowOverride(Map<String, String> flowOverride) {
- this.flowOverride = flowOverride;
- }
- public List<String> getDisabledJobs() {
- return disabledJobs;
- }
- public void setDisabledJobs(List<String> disabledJobs) {
- this.disabledJobs = disabledJobs;
- }
- private List<String> failureEmails;
- private List<String> successEmails;
- private FailureAction failureAction = FailureAction.FINISH_CURRENTLY_RUNNING;
- private boolean notifyOnFirstFailure;
- private boolean notifyOnLastFailure;
- Map<String, String> flowOverride;
- private List<String> disabledJobs;
- public Object toObject() {
- Map<String, Object> obj = new HashMap<String, Object>();
- obj.put("failureEmails", failureEmails);
- obj.put("successEmails", successEmails);
- obj.put("failureAction", failureAction.toString());
- obj.put("notifyOnFirstFailure", notifyOnFirstFailure);
- obj.put("notifyOnLastFailure", notifyOnLastFailure);
- obj.put("flowOverride", flowOverride);
- obj.put("disabledJobs", disabledJobs);
- return obj;
- }
- @SuppressWarnings("unchecked")
- public static FlowOptions fromObject(Object object) {
- if(object != null) {
- FlowOptions flowOptions = new FlowOptions();
- Map<String, Object> obj = (HashMap<String, Object>) object;
- if(obj.containsKey("failureEmails")) {
- flowOptions.setFailureEmails((List<String>) obj.get("failureEmails"));
- }
- if(obj.containsKey("successEmails")) {
- flowOptions.setSuccessEmails((List<String>) obj.get("SuccessEmails"));
- }
- if(obj.containsKey("failureAction")) {
- flowOptions.setFailureAction(FailureAction.valueOf((String)obj.get("failureAction")));
- }
- if(obj.containsKey("notifyOnFirstFailure")) {
- flowOptions.setNotifyOnFirstFailure((Boolean)obj.get("notifyOnFirstFailure"));
- }
- if(obj.containsKey("notifyOnLastFailure")) {
- flowOptions.setNotifyOnFirstFailure((Boolean)obj.get("notifyOnLastFailure"));
- }
- if(obj.containsKey("flowOverride")) {
- flowOptions.setFlowOverride((Map<String, String>) obj.get("flowOverride"));
- }
- if(obj.containsKey("disabledJobs")) {
- flowOptions.setDisabledJobs((List<String>) obj.get("disabledJobs"));
- }
- return flowOptions;
- }
- return null;
- }
- }
-
- public static class SlaOptions {
-
- public List<String> getSlaEmails() {
- return slaEmails;
- }
- public void setSlaEmails(List<String> slaEmails) {
- this.slaEmails = slaEmails;
- }
- public List<SlaSetting> getSettings() {
- return settings;
- }
- public void setSettings(List<SlaSetting> settings) {
- this.settings = settings;
- }
- private List<String> slaEmails;
- private List<SlaSetting> settings;
- public Object toObject() {
- Map<String, Object> obj = new HashMap<String, Object>();
- obj.put("slaEmails", slaEmails);
- List<Object> slaSettings = new ArrayList<Object>();
- for(SlaSetting s : settings) {
- slaSettings.add(s.toObject());
- }
- obj.put("settings", slaSettings);
- return obj;
- }
- @SuppressWarnings("unchecked")
- public static SlaOptions fromObject(Object object) {
- if(object != null) {
- SlaOptions slaOptions = new SlaOptions();
- Map<String, Object> obj = (HashMap<String, Object>) object;
- slaOptions.setSlaEmails((List<String>) obj.get("slaEmails"));
- List<SlaSetting> slaSets = new ArrayList<SlaSetting>();
- for(Object set: (List<Object>)obj.get("settings")) {
- slaSets.add(SlaSetting.fromObject(set));
- }
- slaOptions.setSettings(slaSets);
- return slaOptions;
- }
- return null;
- }
-
- }
-
// private long projectGuid;
// private long flowGuid;
@@ -196,7 +53,7 @@ public class Schedule{
private String status;
private long submitTime;
- private FlowOptions flowOptions;
+ private ExecutionOptions executionOptions;
private SlaOptions slaOptions;
public Schedule(
@@ -223,7 +80,7 @@ public class Schedule{
this.submitUser = submitUser;
this.status = status;
this.submitTime = submitTime;
- this.flowOptions = null;
+ this.executionOptions = null;
this.slaOptions = null;
}
@@ -239,7 +96,7 @@ public class Schedule{
long nextExecTime,
long submitTime,
String submitUser,
- FlowOptions flowOptions,
+ ExecutionOptions executionOptions,
SlaOptions slaOptions
) {
this.projectId = projectId;
@@ -253,7 +110,7 @@ public class Schedule{
this.submitUser = submitUser;
this.status = status;
this.submitTime = submitTime;
- this.flowOptions = flowOptions;
+ this.executionOptions = executionOptions;
this.slaOptions = slaOptions;
}
@@ -265,11 +122,11 @@ public class Schedule{
long firstSchedTime,
DateTimeZone timezone,
ReadablePeriod period,
- long lastModifyTime,
- long nextExecTime,
+ long lastModifyTime,
+ long nextExecTime,
long submitTime,
String submitUser,
- FlowOptions flowOptions,
+ ExecutionOptions executionOptions,
SlaOptions slaOptions
) {
this.projectId = projectId;
@@ -283,16 +140,16 @@ public class Schedule{
this.submitUser = submitUser;
this.status = status;
this.submitTime = submitTime;
- this.flowOptions = flowOptions;
+ this.executionOptions = executionOptions;
this.slaOptions = slaOptions;
}
- public FlowOptions getFlowOptions() {
- return flowOptions;
+ public ExecutionOptions getExecutionOptions() {
+ return executionOptions;
}
- public void setFlowOptions(FlowOptions flowOptions) {
- this.flowOptions = flowOptions;
+ public void setFlowOptions(ExecutionOptions executionOptions) {
+ this.executionOptions = executionOptions;
}
public SlaOptions getSlaOptions() {
@@ -463,13 +320,13 @@ public class Schedule{
return periodStr;
}
-
+
public Map<String,Object> optionsToObject() {
- if(flowOptions != null || slaOptions != null) {
+ if(executionOptions != null || slaOptions != null) {
HashMap<String, Object> schedObj = new HashMap<String, Object>();
- if(flowOptions != null) {
- schedObj.put("flowOptions", flowOptions.toObject());
+ if(executionOptions != null) {
+ schedObj.put("executionOptions", executionOptions.toObject());
}
if(slaOptions != null) {
schedObj.put("slaOptions", slaOptions.toObject());
@@ -480,26 +337,16 @@ public class Schedule{
return null;
}
- @SuppressWarnings("unchecked")
- public static FlowOptions createFlowOptionFromObject(Object obj) {
- if(obj != null) {
- Map<String, Object> options = (HashMap<String, Object>) obj;
- if(options.containsKey("flowOptions")) {
- return FlowOptions.fromObject(options.get("flowOptions"));
- }
- }
- return null;
- }
-
- @SuppressWarnings("unchecked")
- public static SlaOptions createSlaOptionFromObject(Object obj) {
- if(obj != null) {
- Map<String, Object> options = (HashMap<String, Object>) obj;
- if(options.containsKey("slaOptions")) {
- return SlaOptions.fromObject(options.get("slaOptions"));
- }
- }
- return null;
+ public void createAndSetScheduleOptions(Object obj) {
+ @SuppressWarnings("unchecked")
+ HashMap<String, Object> schedObj = (HashMap<String, Object>)obj;
+ if (schedObj.containsKey("executionOptions")) {
+ ExecutionOptions execOptions = ExecutionOptions.createFromObject(schedObj.get("executionOptions"));
+ this.executionOptions = execOptions;
+ }
+ if (schedObj.containsKey("slaOptions")) {
+ SlaOptions slaOptions = SlaOptions.fromObject(schedObj.get("slaOptions"));
+ this.slaOptions = slaOptions;
+ }
}
-
}
\ No newline at end of file
src/java/azkaban/scheduler/ScheduleManager.java 89(+29 -60)
diff --git a/src/java/azkaban/scheduler/ScheduleManager.java b/src/java/azkaban/scheduler/ScheduleManager.java
index 7fad4e2..e298356 100644
--- a/src/java/azkaban/scheduler/ScheduleManager.java
+++ b/src/java/azkaban/scheduler/ScheduleManager.java
@@ -33,23 +33,20 @@ import org.joda.time.format.DateTimeFormat;
import org.joda.time.format.DateTimeFormatter;
import azkaban.executor.ExecutableFlow;
+import azkaban.executor.ExecutionOptions;
import azkaban.executor.ExecutorManager;
-import azkaban.executor.ExecutorManagerException;
-import azkaban.executor.Status;
import azkaban.flow.Flow;
import azkaban.project.Project;
import azkaban.project.ProjectManager;
-import azkaban.scheduler.Schedule.FlowOptions;
-import azkaban.scheduler.Schedule.SlaOptions;
import azkaban.sla.SLA.SlaAction;
import azkaban.sla.SLA.SlaRule;
import azkaban.sla.SLAManager;
import azkaban.sla.SLA.SlaSetting;
+import azkaban.sla.SlaOptions;
import azkaban.utils.Pair;
-
/**
* The ScheduleManager stores and executes the schedule. It uses a single thread
* instead and waits until correct loading time for the flow. It will not remove
@@ -127,8 +124,8 @@ public class ScheduleManager {
* @param id
* @return
*/
- public Schedule getSchedule(Pair<Integer, String> scheduleId) {
- return scheduleIDMap.get(scheduleId);
+ public Schedule getSchedule(int projectId, String flowId) {
+ return scheduleIDMap.get(new Pair<Integer,String>(projectId, flowId));
}
/**
@@ -136,7 +133,9 @@ public class ScheduleManager {
*
* @param id
*/
- public synchronized void removeSchedule(Pair<Integer, String> scheduleId) {
+ public synchronized void removeSchedule(int projectId, String flowId) {
+ Pair<Integer,String> scheduleId = new Pair<Integer,String>(projectId, flowId);
+
Schedule sched = scheduleIDMap.get(scheduleId);
scheduleIDMap.remove(scheduleId);
@@ -182,11 +181,27 @@ public class ScheduleManager {
final long lastModifyTime,
final long nextExecTime,
final long submitTime,
+ final String submitUser
+ ) {
+ return scheduleFlow(projectId, projectName, flowName, status, firstSchedTime, timezone, period, lastModifyTime, nextExecTime, submitTime, submitUser, null, null);
+ }
+
+ public Schedule scheduleFlow(
+ final int projectId,
+ final String projectName,
+ final String flowName,
+ final String status,
+ final long firstSchedTime,
+ final DateTimeZone timezone,
+ final ReadablePeriod period,
+ final long lastModifyTime,
+ final long nextExecTime,
+ final long submitTime,
final String submitUser,
- final FlowOptions flowOptions,
- final SlaOptions slaOptions
+ ExecutionOptions execOptions,
+ SlaOptions slaOptions
) {
- Schedule sched = new Schedule(projectId, projectName, flowName, status, firstSchedTime, timezone, period, lastModifyTime, nextExecTime, submitTime, submitUser, flowOptions, slaOptions);
+ Schedule sched = new Schedule(projectId, projectName, flowName, status, firstSchedTime, timezone, period, lastModifyTime, nextExecTime, submitTime, submitUser, execOptions, slaOptions);
logger.info("Scheduling flow '" + sched.getScheduleName() + "' for "
+ _dateFormat.print(firstSchedTime) + " with a period of "
+ period == null ? "(non-recurring)" : period);
@@ -195,28 +210,6 @@ public class ScheduleManager {
return sched;
}
-// /**
-// * Schedule the flow
-// *
-// * @param flowId
-// * @param date
-// * @param ignoreDep
-// */
-// public Schedule schedule(
-// String scheduleId,
-// String projectId,
-// String flowId,
-// String user,
-// String userSubmit,
-// DateTime submitTime,
-// DateTime firstSchedTime)
-// {
-// logger.info("Scheduling flow '" + scheduleId + "' for " + _dateFormat.print(firstSchedTime));
-// ScheduledFlow scheduleFlow = new ScheduledFlow(scheduleId, projectId, flowId, user, userSubmit, submitTime, firstSchedTime);
-// schedule(scheduleFlow);
-// return scheduleFlow;
-// }
-
/**
* Schedules the flow, but doesn't save the schedule afterwards.
*
@@ -381,31 +374,8 @@ public class ScheduleManager {
exflow.setSubmitUser(runningSched.getSubmitUser());
exflow.setProxyUsers(project.getProxyUsers());
- FlowOptions flowOptions = runningSched.getFlowOptions();
-
- if(flowOptions != null) {
- if (flowOptions.getFailureAction() != null) {
- exflow.setFailureAction(flowOptions.getFailureAction());
- }
- if (flowOptions.getFailureEmails() != null) {
- exflow.setFailureEmails(flowOptions.getFailureEmails());
- }
- if (flowOptions.getSuccessEmails() != null) {
- exflow.setSuccessEmails(flowOptions.getSuccessEmails());
- }
- exflow.setNotifyOnFirstFailure(flowOptions.isnotifyOnFirstFailure());
- exflow.setNotifyOnLastFailure(flowOptions.isnotifyOnLastFailure());
-
- exflow.addFlowParameters(flowOptions.getFlowOverride());
-
- List<String> disabled = flowOptions.getDisabledJobs();
- // Setup disabled
- if(disabled != null) {
- for (String job : disabled) {
- exflow.setNodeStatus(job, Status.DISABLED);
- }
- }
- }
+ ExecutionOptions flowOptions = runningSched.getExecutionOptions();
+ exflow.setExecutionOptions(flowOptions);
try {
executorManager.submitExecutableFlow(exflow);
@@ -417,7 +387,6 @@ public class ScheduleManager {
}
SlaOptions slaOptions = runningSched.getSlaOptions();
-
if(slaOptions != null) {
// submit flow slas
List<SlaSetting> jobsettings = new ArrayList<SlaSetting>();
@@ -452,7 +421,7 @@ public class ScheduleManager {
loader.updateSchedule(runningSched);
}
else {
- removeSchedule(runningSched.getScheduleId());
+ removeSchedule(runningSched.getProjectId(), runningSched.getFlowName());
}
} else {
// wait until flow run
diff --git a/src/java/azkaban/sla/JdbcSLALoader.java b/src/java/azkaban/sla/JdbcSLALoader.java
index cf6cd2f..0ab7b3b 100644
--- a/src/java/azkaban/sla/JdbcSLALoader.java
+++ b/src/java/azkaban/sla/JdbcSLALoader.java
@@ -7,7 +7,6 @@ import java.sql.SQLException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
-import java.util.Map;
import javax.sql.DataSource;
@@ -16,10 +15,7 @@ import org.apache.commons.dbutils.QueryRunner;
import org.apache.commons.dbutils.ResultSetHandler;
import org.apache.log4j.Logger;
import org.joda.time.DateTime;
-import org.joda.time.DateTimeZone;
-import org.joda.time.ReadablePeriod;
-import azkaban.scheduler.ScheduleManagerException;
import azkaban.sla.SLA.SlaRule;
import azkaban.utils.DataSourceUtils;
import azkaban.utils.GZIPUtils;
src/java/azkaban/sla/SlaMailer.java 146(+0 -146)
diff --git a/src/java/azkaban/sla/SlaMailer.java b/src/java/azkaban/sla/SlaMailer.java
index 9b19da8..db9e24d 100644
--- a/src/java/azkaban/sla/SlaMailer.java
+++ b/src/java/azkaban/sla/SlaMailer.java
@@ -1,16 +1,11 @@
package azkaban.sla;
-import java.util.ArrayList;
import java.util.List;
import javax.mail.MessagingException;
import org.apache.log4j.Logger;
-import azkaban.executor.ExecutableFlow;
-import azkaban.executor.ExecutableFlow.FailureAction;
-import azkaban.executor.ExecutableNode;
-import azkaban.executor.Status;
import azkaban.sla.SLA;
import azkaban.utils.EmailMessage;
import azkaban.utils.Props;
@@ -42,147 +37,6 @@ public class SlaMailer {
testMode = props.getBoolean("test.mode", false);
}
- public void sendFirstErrorMessage(ExecutableFlow flow) {
- List<String> emailList = flow.getFailureEmails();
- int execId = flow.getExecutionId();
-
- if (emailList != null && !emailList.isEmpty()) {
- EmailMessage message = new EmailMessage(mailHost, mailUser, mailPassword);
- message.setFromAddress(mailSender);
- message.addAllToAddress(emailList);
- message.setMimeType("text/html");
- message.setSubject("Flow '" + flow.getFlowId() + "' has failed on " + azkabanName);
-
- message.println("<h2 style=\"color:#FF0000\"> Execution '" + flow.getExecutionId() + "' of flow '" + flow.getFlowId() + "' has encountered a failure on " + azkabanName + "</h2>");
-
- if (flow.getFailureAction() == FailureAction.CANCEL_ALL) {
- message.println("This flow is set to cancel all currently running jobs.");
- }
- else if (flow.getFailureAction() == FailureAction.FINISH_ALL_POSSIBLE){
- message.println("This flow is set to complete all jobs that aren't blocked by the failure.");
- }
- else {
- message.println("This flow is set to complete all currently running jobs before stopping.");
- }
-
- message.println("<table>");
- message.println("<tr><td>Start Time</td><td>" + flow.getStartTime() +"</td></tr>");
- message.println("<tr><td>End Time</td><td>" + flow.getEndTime() +"</td></tr>");
- message.println("<tr><td>Duration</td><td>" + Utils.formatDuration(flow.getStartTime(), flow.getEndTime()) +"</td></tr>");
- message.println("</table>");
- message.println("");
- String executionUrl = "https://" + clientHostname + ":" + clientPortNumber + "/" + "executor?" + "execid=" + execId;
- message.println("<a href='\"" + executionUrl + "\">" + flow.getFlowId() + " Execution Link</a>");
-
- message.println("");
- message.println("<h3>Reason</h3>");
- List<String> failedJobs = findFailedJobs(flow);
- message.println("<ul>");
- for (String jobId : failedJobs) {
- message.println("<li><a href=\"" + executionUrl + "&job=" + jobId + "\">Failed job '" + jobId + "' Link</a></li>" );
- }
-
- message.println("</ul>");
-
- if (!testMode) {
- try {
- message.sendEmail();
- } catch (MessagingException e) {
- logger.error("Email message send failed" , e);
- }
- }
- }
- }
-
- public void sendErrorEmail(ExecutableFlow flow, String ... extraReasons) {
- List<String> emailList = flow.getFailureEmails();
- int execId = flow.getExecutionId();
-
- if (emailList != null && !emailList.isEmpty()) {
- EmailMessage message = new EmailMessage(mailHost, mailUser, mailPassword);
- message.setFromAddress(mailSender);
- message.addAllToAddress(emailList);
- message.setMimeType("text/html");
- message.setSubject("Flow '" + flow.getFlowId() + "' has failed on " + azkabanName);
-
- message.println("<h2 style=\"color:#FF0000\"> Execution '" + flow.getExecutionId() + "' of flow '" + flow.getFlowId() + "' has failed on " + azkabanName + "</h2>");
- message.println("<table>");
- message.println("<tr><td>Start Time</td><td>" + flow.getStartTime() +"</td></tr>");
- message.println("<tr><td>End Time</td><td>" + flow.getEndTime() +"</td></tr>");
- message.println("<tr><td>Duration</td><td>" + Utils.formatDuration(flow.getStartTime(), flow.getEndTime()) +"</td></tr>");
- message.println("</table>");
- message.println("");
- String executionUrl = "https://" + clientHostname + ":" + clientPortNumber + "/" + "executor?" + "execid=" + execId;
- message.println("<a href='\"" + executionUrl + "\">" + flow.getFlowId() + " Execution Link</a>");
-
- message.println("");
- message.println("<h3>Reason</h3>");
- List<String> failedJobs = findFailedJobs(flow);
- message.println("<ul>");
- for (String jobId : failedJobs) {
- message.println("<li><a href=\"" + executionUrl + "&job=" + jobId + "\">Failed job '" + jobId + "' Link</a></li>" );
- }
- for (String reasons: extraReasons) {
- message.println("<li>" + reasons + "</li>");
- }
-
- message.println("</ul>");
-
-
-
- if (!testMode) {
- try {
- message.sendEmail();
- } catch (MessagingException e) {
- logger.error("Email message send failed" , e);
- }
- }
- }
- }
-
- public void sendSuccessEmail(ExecutableFlow flow) {
- List<String> emailList = flow.getSuccessEmails();
-
- int execId = flow.getExecutionId();
-
- if (emailList != null && !emailList.isEmpty()) {
- EmailMessage message = new EmailMessage(mailHost, mailUser, mailPassword);
- message.setFromAddress(mailSender);
- message.addAllToAddress(emailList);
- message.setMimeType("text/html");
- message.setSubject("Flow '" + flow.getFlowId() + "' has succeeded on " + azkabanName);
-
- message.println("<h2> Execution '" + flow.getExecutionId() + "' of flow '" + flow.getFlowId() + "' has succeeded on " + azkabanName + "</h2>");
- message.println("<table>");
- message.println("<tr><td>Start Time</td><td>" + flow.getStartTime() +"</td></tr>");
- message.println("<tr><td>End Time</td><td>" + flow.getEndTime() +"</td></tr>");
- message.println("<tr><td>Duration</td><td>" + Utils.formatDuration(flow.getStartTime(), flow.getEndTime()) +"</td></tr>");
- message.println("</table>");
- message.println("");
- String executionUrl = "https://" + clientHostname + ":" + clientPortNumber + "/" + "executor?" + "execid=" + execId;
- message.println("<a href=\"" + executionUrl + "\">" + flow.getFlowId() + " Execution Link</a>");
-
- if (!testMode) {
- try {
- message.sendEmail();
- } catch (MessagingException e) {
- logger.error("Email message send failed" , e);
- }
- }
- }
- }
-
- private List<String> findFailedJobs(ExecutableFlow flow) {
- ArrayList<String> failedJobs = new ArrayList<String>();
- for (ExecutableNode node: flow.getExecutableNodes()) {
- if (node.getStatus() == Status.FAILED) {
- failedJobs.add(node.getJobId());
- }
- }
-
- return failedJobs;
- }
-
public void sendSlaEmail(SLA s, String ... extraReasons) {
List<String> emailList = s.getEmails();
src/java/azkaban/sla/SlaOptions.java 51(+51 -0)
diff --git a/src/java/azkaban/sla/SlaOptions.java b/src/java/azkaban/sla/SlaOptions.java
new file mode 100644
index 0000000..5fb2ff6
--- /dev/null
+++ b/src/java/azkaban/sla/SlaOptions.java
@@ -0,0 +1,51 @@
+package azkaban.sla;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import azkaban.sla.SLA.SlaSetting;
+
+public class SlaOptions {
+
+ public List<String> getSlaEmails() {
+ return slaEmails;
+ }
+ public void setSlaEmails(List<String> slaEmails) {
+ this.slaEmails = slaEmails;
+ }
+ public List<SlaSetting> getSettings() {
+ return settings;
+ }
+ public void setSettings(List<SlaSetting> settings) {
+ this.settings = settings;
+ }
+ private List<String> slaEmails;
+ private List<SlaSetting> settings;
+ public Object toObject() {
+ Map<String, Object> obj = new HashMap<String, Object>();
+ obj.put("slaEmails", slaEmails);
+ List<Object> slaSettings = new ArrayList<Object>();
+ for(SlaSetting s : settings) {
+ slaSettings.add(s.toObject());
+ }
+ obj.put("settings", slaSettings);
+ return obj;
+ }
+ @SuppressWarnings("unchecked")
+ public static SlaOptions fromObject(Object object) {
+ if(object != null) {
+ SlaOptions slaOptions = new SlaOptions();
+ Map<String, Object> obj = (HashMap<String, Object>) object;
+ slaOptions.setSlaEmails((List<String>) obj.get("slaEmails"));
+ List<SlaSetting> slaSets = new ArrayList<SlaSetting>();
+ for(Object set: (List<Object>)obj.get("settings")) {
+ slaSets.add(SlaSetting.fromObject(set));
+ }
+ slaOptions.setSettings(slaSets);
+ return slaOptions;
+ }
+ return null;
+ }
+}
\ No newline at end of file
diff --git a/src/java/azkaban/webapp/servlet/AbstractAzkabanServlet.java b/src/java/azkaban/webapp/servlet/AbstractAzkabanServlet.java
index 253c7e9..e257bc5 100644
--- a/src/java/azkaban/webapp/servlet/AbstractAzkabanServlet.java
+++ b/src/java/azkaban/webapp/servlet/AbstractAzkabanServlet.java
@@ -18,7 +18,6 @@ package azkaban.webapp.servlet;
import java.io.IOException;
import java.util.ArrayList;
-import java.util.Enumeration;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
@@ -102,7 +101,7 @@ public abstract class AbstractAzkabanServlet extends HttpServlet {
* @return
*/
public boolean hasParam(HttpServletRequest request, String param) {
- return request.getParameter(param) != null;
+ return HttpRequestUtils.hasParam(request, param);
}
/**
@@ -115,13 +114,7 @@ public abstract class AbstractAzkabanServlet extends HttpServlet {
* @throws ServletException
*/
public String getParam(HttpServletRequest request, String name) throws ServletException {
- String p = request.getParameter(name);
- if (p == null) {
- throw new ServletException("Missing required parameter '" + name + "'.");
- }
- else {
- return p;
- }
+ return HttpRequestUtils.getParam(request, name);
}
/**
@@ -134,11 +127,7 @@ public abstract class AbstractAzkabanServlet extends HttpServlet {
* @return
*/
public String getParam(HttpServletRequest request, String name, String defaultVal){
- String p = request.getParameter(name);
- if (p == null) {
- return defaultVal;
- }
- return p;
+ return HttpRequestUtils.getParam(request, name, defaultVal);
}
@@ -152,54 +141,24 @@ public abstract class AbstractAzkabanServlet extends HttpServlet {
* @throws ServletException
*/
public int getIntParam(HttpServletRequest request, String name) throws ServletException {
- String p = getParam(request, name);
- return Integer.parseInt(p);
+ return HttpRequestUtils.getIntParam(request, name);
}
public int getIntParam(HttpServletRequest request, String name, int defaultVal) {
- if (hasParam(request, name)) {
- try {
- return getIntParam(request, name);
- } catch (Exception e) {
- return defaultVal;
- }
- }
-
- return defaultVal;
+ return HttpRequestUtils.getIntParam(request, name, defaultVal);
}
public long getLongParam(HttpServletRequest request, String name) throws ServletException {
- String p = getParam(request, name);
- return Long.valueOf(p);
+ return HttpRequestUtils.getLongParam(request, name);
}
public long getLongParam(HttpServletRequest request, String name, long defaultVal) {
- if (hasParam(request, name)) {
- try {
- return getLongParam(request, name);
- } catch (Exception e) {
- return defaultVal;
- }
- }
-
- return defaultVal;
+ return HttpRequestUtils.getLongParam(request, name, defaultVal);
}
public Map<String, String> getParamGroup(HttpServletRequest request, String groupName) throws ServletException {
- @SuppressWarnings("unchecked")
- Enumeration<Object> enumerate = (Enumeration<Object>)request.getParameterNames();
- String matchString = groupName + "[";
-
- HashMap<String, String> groupParam = new HashMap<String, String>();
- while( enumerate.hasMoreElements() ) {
- String str = (String)enumerate.nextElement();
- if (str.startsWith(matchString)) {
- groupParam.put(str.substring(matchString.length(), str.length() - 1), request.getParameter(str));
- }
-
- }
- return groupParam;
+ return HttpRequestUtils.getParamGroup(request, groupName);
}
/**
src/java/azkaban/webapp/servlet/ExecutorServlet.java 128(+20 -108)
diff --git a/src/java/azkaban/webapp/servlet/ExecutorServlet.java b/src/java/azkaban/webapp/servlet/ExecutorServlet.java
index 524c223..f5f8c51 100644
--- a/src/java/azkaban/webapp/servlet/ExecutorServlet.java
+++ b/src/java/azkaban/webapp/servlet/ExecutorServlet.java
@@ -18,22 +18,19 @@ package azkaban.webapp.servlet;
import java.io.IOException;
import java.util.ArrayList;
-import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
-import java.util.Set;
import javax.servlet.ServletConfig;
import javax.servlet.ServletException;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
-import org.apache.commons.lang.StringUtils;
-
import azkaban.executor.ExecutableFlow;
import azkaban.executor.ExecutableNode;
-import azkaban.executor.ExecutableFlow.FailureAction;
+import azkaban.executor.ExecutionOptions;
+import azkaban.executor.ExecutionOptions.FailureAction;
import azkaban.executor.ExecutorManager;
import azkaban.executor.ExecutorManagerException;
import azkaban.executor.Status;
@@ -158,6 +155,7 @@ public class ExecutorServlet extends LoginAbstractAzkabanServlet {
return;
}
+ page.add("projectId", project.getId());
page.add("projectName", project.getName());
page.add("flowid", flow.getFlowId());
@@ -485,11 +483,13 @@ public class ExecutorServlet extends LoginAbstractAzkabanServlet {
return;
}
- ret.put("successEmails", exflow.getSuccessEmails());
- ret.put("failureEmails", exflow.getFailureEmails());
- ret.put("flowParam", exflow.getFlowParameters());
+ ExecutionOptions options = exflow.getExecutionOptions();
+
+ ret.put("successEmails", options.getSuccessEmails());
+ ret.put("failureEmails", options.getFailureEmails());
+ ret.put("flowParam", options.getFlowParameters());
- FailureAction action = exflow.getFailureAction();
+ FailureAction action = options.getFailureAction();
String failureAction = null;
switch (action) {
case FINISH_CURRENTLY_RUNNING:
@@ -504,21 +504,23 @@ public class ExecutorServlet extends LoginAbstractAzkabanServlet {
}
ret.put("failureAction", failureAction);
- ret.put("notifyFailureFirst", exflow.getNotifyOnFirstFailure());
- ret.put("notifyFailureLast", exflow.getNotifyOnLastFailure());
+ ret.put("notifyFailureFirst", options.getNotifyOnFirstFailure());
+ ret.put("notifyFailureLast", options.getNotifyOnLastFailure());
- ret.put("concurrentOptions", exflow.getConcurrentOption());
- ret.put("pipelineLevel", exflow.getPipelineLevel());
- ret.put("pipelineExecution", exflow.getPipelineExecutionId());
- ret.put("queueLevel", exflow.getQueueLevel());
+ ret.put("concurrentOptions", options.getConcurrentOption());
+ ret.put("pipelineLevel", options.getPipelineLevel());
+ ret.put("pipelineExecution", options.getPipelineExecutionId());
+ ret.put("queueLevel", options.getQueueLevel());
HashMap<String, String> nodeStatus = new HashMap<String,String>();
for(ExecutableNode node : exflow.getExecutableNodes()) {
nodeStatus.put(node.getJobId(), node.getStatus().toString());
}
ret.put("nodeStatus", nodeStatus);
+ ret.put("disabled", options.getDisabledJobs());
+
+ Schedule sflow = scheduleManager.getSchedule(project.getId(), exflow.getFlowId());
- Schedule sflow = null;
for (Schedule sched: scheduleManager.getSchedules()) {
if (sched.getProjectId() == project.getId() && sched.getFlowName().equals(exflow.getFlowId())) {
sflow = sched;
@@ -715,85 +717,8 @@ public class ExecutorServlet extends LoginAbstractAzkabanServlet {
exflow.setSubmitUser(user.getUserId());
exflow.setProxyUsers(project.getProxyUsers());
- if (hasParam(req, "failureAction")) {
- String option = getParam(req, "failureAction");
- if (option.equals("finishCurrent") ) {
- exflow.setFailureAction(FailureAction.FINISH_CURRENTLY_RUNNING);
- }
- else if (option.equals("cancelImmediately")) {
- exflow.setFailureAction(FailureAction.CANCEL_ALL);
- }
- else if (option.equals("finishPossible")) {
- exflow.setFailureAction(FailureAction.FINISH_ALL_POSSIBLE);
- }
- }
-
- String concurrentOption = "skip";
- if (hasParam(req, "failureEmails")) {
- String emails = getParam(req, "failureEmails");
- String[] emailSplit = emails.split("\\s*,\\s*|\\s*;\\s*|\\s+");
- exflow.setFailureEmails(Arrays.asList(emailSplit));
- }
- if (hasParam(req, "successEmails")) {
- String emails = getParam(req, "successEmails");
- String[] emailSplit = emails.split("\\s*,\\s*|\\s*;\\s*|\\s+");
- exflow.setSuccessEmails(Arrays.asList(emailSplit));
- }
- if (hasParam(req, "notifyFailureFirst")) {
- exflow.setNotifyOnFirstFailure(Boolean.parseBoolean(getParam(req, "notifyFailureFirst")));
- }
- if (hasParam(req, "notifyFailureLast")) {
- exflow.setNotifyOnLastFailure(Boolean.parseBoolean(getParam(req, "notifyFailureLast")));
- }
- if (hasParam(req, "concurrentOption")) {
- concurrentOption = getParam(req, "concurrentOption");
- exflow.setConcurrentOption(concurrentOption);
- if (concurrentOption.equals("pipeline")) {
- int pipelineLevel = getIntParam(req, "pipelineLevel");
- exflow.setPipelineLevel(pipelineLevel);
- }
- else if (concurrentOption.equals("queue")) {
- // Not yet implemented
- int queueLevel = getIntParam(req, "queueLevel", 1);
- exflow.setPipelineLevel(queueLevel);
- }
- }
-
- Map<String, String> flowParamGroup = this.getParamGroup(req, "flowOverride");
- exflow.addFlowParameters(flowParamGroup);
-
- if (hasParam(req, "job")) {
- // Disable everything.
- for(ExecutableNode node : exflow.getExecutableNodes()) {
- node.setStatus(Status.DISABLED);
- }
-
- String jobId = getParam(req, "job");
- ExecutableNode job = exflow.getExecutableNode(jobId);
- if (job == null) {
- ret.put("error", "Job " + jobId + " doesn't exist in flow.");
- return;
- }
-
- job.setStatus(Status.READY);
-
- if (hasParam(req, "withDep")) {
- boolean withDep = "true".equals(getParam(req, "withDep"));
- if (withDep) {
- enableAllAncestors(job, exflow);
- }
- }
- }
- else if (hasParam(req, "disabled")) {
- String disabled = getParam(req, "disabled");
- String[] disabledNodes = disabled.split("\\s*,\\s*");
-
- for (String node: disabledNodes) {
- if (!node.isEmpty()) {
- exflow.setNodeStatus(node, Status.DISABLED);
- }
- }
- }
+ ExecutionOptions options = HttpRequestUtils.parseFlowOptions(req);
+ exflow.setExecutionOptions(options);
try {
String message = executorManager.submitExecutableFlow(exflow);
@@ -807,19 +732,6 @@ public class ExecutorServlet extends LoginAbstractAzkabanServlet {
ret.put("execid", exflow.getExecutionId());
}
- private void enableAllAncestors(ExecutableNode node, ExecutableFlow flow) {
- Set<String> inNodes = node.getInNodes();
- if (inNodes != null) {
- for (String inNode: inNodes) {
- ExecutableNode job = flow.getExecutableNode(inNode);
- if (job != null) {
- job.setStatus(Status.READY);
- enableAllAncestors(job, flow);
- }
- }
- }
- }
-
public class ExecutorVelocityHelper {
public String getProjectName(int id) {
Project project = projectManager.getProject(id);
diff --git a/src/java/azkaban/webapp/servlet/HttpRequestUtils.java b/src/java/azkaban/webapp/servlet/HttpRequestUtils.java
new file mode 100644
index 0000000..d1e55da
--- /dev/null
+++ b/src/java/azkaban/webapp/servlet/HttpRequestUtils.java
@@ -0,0 +1,183 @@
+package azkaban.webapp.servlet;
+
+import java.util.Arrays;
+import java.util.Enumeration;
+import java.util.HashMap;
+import java.util.Map;
+
+import javax.servlet.ServletException;
+import javax.servlet.http.HttpServletRequest;
+
+import azkaban.executor.ExecutionOptions;
+import azkaban.executor.ExecutionOptions.FailureAction;
+
+public class HttpRequestUtils {
+ public static ExecutionOptions parseFlowOptions(HttpServletRequest req) throws ServletException {
+ ExecutionOptions execOptions = new ExecutionOptions();
+
+ if (hasParam(req, "failureAction")) {
+ String option = getParam(req, "failureAction");
+ if (option.equals("finishCurrent") ) {
+ execOptions.setFailureAction(FailureAction.FINISH_CURRENTLY_RUNNING);
+ }
+ else if (option.equals("cancelImmediately")) {
+ execOptions.setFailureAction(FailureAction.CANCEL_ALL);
+ }
+ else if (option.equals("finishPossible")) {
+ execOptions.setFailureAction(FailureAction.FINISH_ALL_POSSIBLE);
+ }
+ }
+
+ if (hasParam(req, "failureEmails")) {
+ String emails = getParam(req, "failureEmails");
+ String[] emailSplit = emails.split("\\s*,\\s*|\\s*;\\s*|\\s+");
+ execOptions.setFailureEmails(Arrays.asList(emailSplit));
+ }
+ if (hasParam(req, "successEmails")) {
+ String emails = getParam(req, "successEmails");
+ String[] emailSplit = emails.split("\\s*,\\s*|\\s*;\\s*|\\s+");
+ execOptions.setSuccessEmails(Arrays.asList(emailSplit));
+ }
+ if (hasParam(req, "notifyFailureFirst")) {
+ execOptions.setNotifyOnFirstFailure(Boolean.parseBoolean(getParam(req, "notifyFailureFirst")));
+ }
+ if (hasParam(req, "notifyFailureLast")) {
+ execOptions.setNotifyOnLastFailure(Boolean.parseBoolean(getParam(req, "notifyFailureLast")));
+ }
+
+ String concurrentOption = "skip";
+ if (hasParam(req, "concurrentOption")) {
+ concurrentOption = getParam(req, "concurrentOption");
+ execOptions.setConcurrentOption(concurrentOption);
+ if (concurrentOption.equals("pipeline")) {
+ int pipelineLevel = getIntParam(req, "pipelineLevel");
+ execOptions.setPipelineLevel(pipelineLevel);
+ }
+ else if (concurrentOption.equals("queue")) {
+ // Not yet implemented
+ int queueLevel = getIntParam(req, "queueLevel", 1);
+ execOptions.setPipelineLevel(queueLevel);
+ }
+ }
+
+ Map<String, String> flowParamGroup = getParamGroup(req, "flowOverride");
+ execOptions.setFlowParameters(flowParamGroup);
+
+ if (hasParam(req, "disabled")) {
+ String disabled = getParam(req, "disabled");
+ String[] disabledNodes = disabled.split("\\s*,\\s*");
+
+ execOptions.setDisabledJobs(Arrays.asList(disabledNodes));
+ }
+ return execOptions;
+ }
+
+ /**
+ * Checks for the existance of the parameter in the request
+ *
+ * @param request
+ * @param param
+ * @return
+ */
+ public static boolean hasParam(HttpServletRequest request, String param) {
+ return request.getParameter(param) != null;
+ }
+
+ /**
+ * Retrieves the param from the http servlet request. Will throw an
+ * exception if not found
+ *
+ * @param request
+ * @param name
+ * @return
+ * @throws ServletException
+ */
+ public static String getParam(HttpServletRequest request, String name) throws ServletException {
+ String p = request.getParameter(name);
+ if (p == null) {
+ throw new ServletException("Missing required parameter '" + name + "'.");
+ }
+ else {
+ return p;
+ }
+ }
+
+ /**
+ * Retrieves the param from the http servlet request.
+ *
+ * @param request
+ * @param name
+ * @param default
+ *
+ * @return
+ */
+ public static String getParam(HttpServletRequest request, String name, String defaultVal){
+ String p = request.getParameter(name);
+ if (p == null) {
+ return defaultVal;
+ }
+ return p;
+ }
+
+
+ /**
+ * Returns the param and parses it into an int. Will throw an exception if
+ * not found, or a parse error if the type is incorrect.
+ *
+ * @param request
+ * @param name
+ * @return
+ * @throws ServletException
+ */
+ public static int getIntParam(HttpServletRequest request, String name) throws ServletException {
+ String p = getParam(request, name);
+ return Integer.parseInt(p);
+ }
+
+ public static int getIntParam(HttpServletRequest request, String name, int defaultVal) {
+ if (hasParam(request, name)) {
+ try {
+ return getIntParam(request, name);
+ } catch (Exception e) {
+ return defaultVal;
+ }
+ }
+
+ return defaultVal;
+ }
+
+ public static long getLongParam(HttpServletRequest request, String name) throws ServletException {
+ String p = getParam(request, name);
+ return Long.valueOf(p);
+ }
+
+ public static long getLongParam(HttpServletRequest request, String name, long defaultVal) {
+ if (hasParam(request, name)) {
+ try {
+ return getLongParam(request, name);
+ } catch (Exception e) {
+ return defaultVal;
+ }
+ }
+
+ return defaultVal;
+ }
+
+
+ public static Map<String, String> getParamGroup(HttpServletRequest request, String groupName) throws ServletException {
+ @SuppressWarnings("unchecked")
+ Enumeration<Object> enumerate = (Enumeration<Object>)request.getParameterNames();
+ String matchString = groupName + "[";
+
+ HashMap<String, String> groupParam = new HashMap<String, String>();
+ while( enumerate.hasMoreElements() ) {
+ String str = (String)enumerate.nextElement();
+ if (str.startsWith(matchString)) {
+ groupParam.put(str.substring(matchString.length(), str.length() - 1), request.getParameter(str));
+ }
+
+ }
+ return groupParam;
+ }
+
+}
src/java/azkaban/webapp/servlet/ScheduleServlet.java 174(+15 -159)
diff --git a/src/java/azkaban/webapp/servlet/ScheduleServlet.java b/src/java/azkaban/webapp/servlet/ScheduleServlet.java
index 13de687..c382bbe 100644
--- a/src/java/azkaban/webapp/servlet/ScheduleServlet.java
+++ b/src/java/azkaban/webapp/servlet/ScheduleServlet.java
@@ -31,46 +31,35 @@ import javax.servlet.http.HttpServletResponse;
import org.apache.log4j.Logger;
import org.joda.time.DateTime;
import org.joda.time.DateTimeZone;
-import org.joda.time.Hours;
import org.joda.time.LocalDateTime;
import org.joda.time.Minutes;
import org.joda.time.ReadablePeriod;
import org.joda.time.format.DateTimeFormat;
-import azkaban.executor.ExecutableFlow;
-import azkaban.executor.ExecutableNode;
-import azkaban.executor.ExecutorManagerException;
-import azkaban.executor.ExecutableFlow.FailureAction;
+import azkaban.executor.ExecutionOptions;
import azkaban.flow.Flow;
import azkaban.flow.Node;
import azkaban.project.Project;
import azkaban.project.ProjectManager;
import azkaban.project.ProjectLogEvent.EventType;
import azkaban.user.Permission;
-import azkaban.user.Role;
import azkaban.user.User;
import azkaban.user.Permission.Type;
-import azkaban.user.UserManager;
-import azkaban.utils.Pair;
import azkaban.webapp.AzkabanWebServer;
import azkaban.webapp.session.Session;
import azkaban.scheduler.Schedule;
-import azkaban.scheduler.Schedule.FlowOptions;
-import azkaban.scheduler.Schedule.SlaOptions;
import azkaban.scheduler.ScheduleManager;
import azkaban.sla.SLA;
import azkaban.sla.SLA.SlaRule;
-import azkaban.sla.SLAManager;
import azkaban.sla.SLA.SlaAction;
import azkaban.sla.SLA.SlaSetting;
+import azkaban.sla.SlaOptions;
public class ScheduleServlet extends LoginAbstractAzkabanServlet {
private static final long serialVersionUID = 1L;
private static final Logger logger = Logger.getLogger(ScheduleServlet.class);
private ProjectManager projectManager;
private ScheduleManager scheduleManager;
- private SLAManager slaManager;
- private UserManager userManager;
@Override
public void init(ServletConfig config) throws ServletException {
@@ -78,8 +67,6 @@ public class ScheduleServlet extends LoginAbstractAzkabanServlet {
AzkabanWebServer server = (AzkabanWebServer)getApplication();
projectManager = server.getProjectManager();
scheduleManager = server.getScheduleManager();
- userManager = server.getUserManager();
- slaManager = server.getSLAManager();
}
@Override
@@ -103,8 +90,8 @@ public class ScheduleServlet extends LoginAbstractAzkabanServlet {
else if(ajaxName.equals("setSla")) {
ajaxSetSla(req, ret, session.getUser());
}
- else if(ajaxName.equals("advSchedule")) {
- ajaxAdvSchedule(req, ret, session.getUser());
+ else if(ajaxName.equals("scheduleFlow")) {
+ ajaxScheduleFlow(req, ret, session.getUser());
}
if (ret != null) {
@@ -124,7 +111,7 @@ public class ScheduleServlet extends LoginAbstractAzkabanServlet {
return;
}
- Schedule sched = scheduleManager.getSchedule(new Pair<Integer, String>(projectId, flowName));
+ Schedule sched = scheduleManager.getSchedule(projectId, flowName);
SlaOptions slaOptions= new SlaOptions();
@@ -167,7 +154,6 @@ public class ScheduleServlet extends LoginAbstractAzkabanServlet {
}
}
-
private SlaSetting parseSlaSetting(String set) {
// "" + Duration + EmailAction + KillAction
@@ -202,7 +188,6 @@ public class ScheduleServlet extends LoginAbstractAzkabanServlet {
return Minutes.minutes(min+hour*60).toPeriod();
}
- @SuppressWarnings("unchecked")
private void ajaxSlaInfo(HttpServletRequest req, HashMap<String, Object> ret, User user) {
int projId;
String flowName;
@@ -222,10 +207,10 @@ public class ScheduleServlet extends LoginAbstractAzkabanServlet {
return;
}
- Schedule sched = scheduleManager.getSchedule(new Pair<Integer, String>(projId, flowName));
+ Schedule sched = scheduleManager.getSchedule(projId, flowName);
SlaOptions slaOptions = sched.getSlaOptions();
- FlowOptions flowOptions = sched.getFlowOptions();
+ ExecutionOptions flowOptions = sched.getExecutionOptions();
if(slaOptions != null) {
ret.put("slaEmails", slaOptions.getSlaEmails());
@@ -333,8 +318,7 @@ public class ScheduleServlet extends LoginAbstractAzkabanServlet {
private void ajaxRemoveSched(HttpServletRequest req, Map<String, Object> ret, User user) throws ServletException{
int projectId = getIntParam(req, "projectId");
String flowName = getParam(req, "flowName");
- Pair<Integer, String> scheduleId = new Pair<Integer, String>(projectId, flowName);
- Schedule sched = scheduleManager.getSchedule(scheduleId);
+ Schedule sched = scheduleManager.getSchedule(projectId, flowName);
// int projectId = sched.getProjectId();
@@ -352,98 +336,18 @@ public class ScheduleServlet extends LoginAbstractAzkabanServlet {
return;
}
- scheduleManager.removeSchedule(scheduleId);
+ scheduleManager.removeSchedule(projectId, flowName);
logger.info("User '" + user.getUserId() + " has removed schedule " + sched.getScheduleName());
projectManager.postProjectEvent(project, EventType.SCHEDULE, user.getUserId(), "Schedule " + sched.toString() + " has been removed.");
ret.put("status", "success");
- ret.put("message", "flow " + scheduleId.getSecond() + " removed from Schedules.");
+ ret.put("message", "flow " + flowName + " removed from Schedules.");
return;
}
-
- private void ajaxScheduleFlow(HttpServletRequest req, Map<String, Object> ret, User user) throws ServletException {
- String projectName = getParam(req, "projectName");
- String flowName = getParam(req, "flowName");
- int projectId = getIntParam(req, "projectId");
-
- Project project = projectManager.getProject(projectId);
-
- if (project == null) {
- ret.put("message", "Project " + projectName + " does not exist");
- ret.put("status", "error");
- return;
- }
-
- if (!hasPermission(project, user, Type.SCHEDULE)) {
- ret.put("status", "error");
- ret.put("message", "Permission denied. Cannot execute " + flowName);
- return;
- }
-
- Flow flow = project.getFlow(flowName);
- if (flow == null) {
- ret.put("status", "error");
- ret.put("message", "Flow " + flowName + " cannot be found in project " + project);
- return;
- }
-
- int hour = getIntParam(req, "hour");
- int minutes = getIntParam(req, "minutes");
- boolean isPm = getParam(req, "am_pm").equalsIgnoreCase("pm");
-
- DateTimeZone timezone = getParam(req, "timezone").equals("UTC") ? DateTimeZone.UTC : DateTimeZone.forID("America/Los_Angeles");
-
- String scheduledDate = req.getParameter("date");
- DateTime day = null;
- if(scheduledDate == null || scheduledDate.trim().length() == 0) {
- day = new LocalDateTime().toDateTime();
- } else {
- try {
- day = DateTimeFormat.forPattern("MM/dd/yyyy").withZone(timezone).parseDateTime(scheduledDate);
- } catch(IllegalArgumentException e) {
- ret.put("error", "Invalid date: '" + scheduledDate + "'");
- return;
- }
- }
-
- ReadablePeriod thePeriod = null;
- try {
- if(hasParam(req, "is_recurring"))
- thePeriod = Schedule.parsePeriodString(getParam(req, "period")+getParam(req,"period_units"));
- }
- catch(Exception e){
- ret.put("error", e.getMessage());
- }
-
- if(isPm && hour < 12)
- hour += 12;
- hour %= 24;
-
- DateTime submitTime = new DateTime();
- DateTime firstSchedTime = day.withHourOfDay(hour).withMinuteOfHour(minutes).withSecondOfMinute(0);
-
- Schedule sched = scheduleManager.getSchedule(new Pair<Integer, String>(projectId, flowName));
- FlowOptions flowOptions = null;
- SlaOptions slaOptions = null;
- if(sched != null) {
- if(sched.getFlowOptions() != null) {
- flowOptions = sched.getFlowOptions();
- }
- if(sched.getSlaOptions() != null) {
- slaOptions = sched.getSlaOptions();
- }
- }
- Schedule schedule = scheduleManager.scheduleFlow(projectId, projectName, flowName, "ready", firstSchedTime.getMillis(), timezone, thePeriod, submitTime.getMillis(), firstSchedTime.getMillis(), firstSchedTime.getMillis(), user.getUserId(), flowOptions, slaOptions);
- logger.info("User '" + user.getUserId() + "' has scheduled " + "[" + projectName + flowName + " (" + projectId +")" + "].");
- projectManager.postProjectEvent(project, EventType.SCHEDULE, user.getUserId(), "Schedule " + schedule.toString() + " has been added.");
-
- ret.put("status", "success");
- ret.put("message", projectName + "." + flowName + " scheduled.");
- }
- private void ajaxAdvSchedule(HttpServletRequest req, HashMap<String, Object> ret, User user) throws ServletException {
+ private void ajaxScheduleFlow(HttpServletRequest req, HashMap<String, Object> ret, User user) throws ServletException {
String projectName = getParam(req, "projectName");
- String flowName = getParam(req, "flowName");
+ String flowName = getParam(req, "flow");
int projectId = getIntParam(req, "projectId");
Project project = projectManager.getProject(projectId);
@@ -488,10 +392,10 @@ public class ScheduleServlet extends LoginAbstractAzkabanServlet {
ret.put("error", e.getMessage());
}
- Schedule sched = scheduleManager.getSchedule(new Pair<Integer, String>(projectId, flowName));
- FlowOptions flowOptions = null;
+ Schedule sched = scheduleManager.getSchedule(projectId, flowName);
+ ExecutionOptions flowOptions = null;
try {
- flowOptions = parseFlowOptions(req);
+ flowOptions = HttpRequestUtils.parseFlowOptions(req);
}
catch (Exception e) {
ret.put("error", e.getMessage());
@@ -510,54 +414,6 @@ public class ScheduleServlet extends LoginAbstractAzkabanServlet {
ret.put("message", projectName + "." + flowName + " scheduled.");
}
- private FlowOptions parseFlowOptions(HttpServletRequest req) throws ServletException {
- FlowOptions flowOptions = new FlowOptions();
- if (hasParam(req, "failureAction")) {
- String option = getParam(req, "failureAction");
- if (option.equals("finishCurrent") ) {
- flowOptions.setFailureAction(FailureAction.FINISH_CURRENTLY_RUNNING);
- }
- else if (option.equals("cancelImmediately")) {
- flowOptions.setFailureAction(FailureAction.CANCEL_ALL);
- }
- else if (option.equals("finishPossible")) {
- flowOptions.setFailureAction(FailureAction.FINISH_ALL_POSSIBLE);
- }
- }
-
- if (hasParam(req, "failureEmails")) {
- String emails = getParam(req, "failureEmails");
- String[] emailSplit = emails.split("\\s*,\\s*|\\s*;\\s*|\\s+");
- flowOptions.setFailureEmails(Arrays.asList(emailSplit));
- }
- if (hasParam(req, "successEmails")) {
- String emails = getParam(req, "successEmails");
- String[] emailSplit = emails.split("\\s*,\\s*|\\s*;\\s*|\\s+");
- flowOptions.setSuccessEmails(Arrays.asList(emailSplit));
- }
- if (hasParam(req, "notifyFailureFirst")) {
- flowOptions.setNotifyOnFirstFailure(Boolean.parseBoolean(getParam(req, "notifyFailureFirst")));
- }
- if (hasParam(req, "notifyFailureLast")) {
- flowOptions.setNotifyOnLastFailure(Boolean.parseBoolean(getParam(req, "notifyFailureLast")));
- }
- if (hasParam(req, "executingJobOption")) {
- //String option = getParam(req, "jobOption");
- // Not set yet
- }
-
- Map<String, String> flowParamGroup = this.getParamGroup(req, "flowOverride");
- flowOptions.setFlowOverride(flowParamGroup);
-
- if (hasParam(req, "disabledJobs")) {
- String disable = getParam(req, "disabledJobs");
- String[] disableSplit = disable.split("\\s*,\\s*|\\s*;\\s*|\\s+");
- List<String> jobs = (List<String>) Arrays.asList(disableSplit);
- flowOptions.setDisabledJobs(jobs.subList(1, jobs.size()));
- }
- return flowOptions;
- }
-
private DateTime parseDateTime(String scheduleDate, String scheduleTime) {
// scheduleTime: 12,00,pm,PDT
String[] parts = scheduleTime.split(",", -1);
diff --git a/src/java/azkaban/webapp/servlet/velocity/executingflowpage.vm b/src/java/azkaban/webapp/servlet/velocity/executingflowpage.vm
index fea2b3a..de86051 100644
--- a/src/java/azkaban/webapp/servlet/velocity/executingflowpage.vm
+++ b/src/java/azkaban/webapp/servlet/velocity/executingflowpage.vm
@@ -41,7 +41,7 @@
var timezone = "${timezone}";
var errorMessage = null;
var successMessage = null;;
-
+ var projectId = "${projectId}";
var projectName = "${projectName}";
var flowId = "${flowid}";
var execId = "${execid}";
@@ -50,6 +50,8 @@
</head>
<body>
#set($current_page="all")
+#set($show_schedule="false")
+
#parse( "azkaban/webapp/servlet/velocity/nav.vm" )
<div class="messaging"><p id="messageClose">X</p><p id="message"></p></div>
<div class="content">
diff --git a/src/java/azkaban/webapp/servlet/velocity/flowexecutionpanel.vm b/src/java/azkaban/webapp/servlet/velocity/flowexecutionpanel.vm
index 16f8df2..806465f 100644
--- a/src/java/azkaban/webapp/servlet/velocity/flowexecutionpanel.vm
+++ b/src/java/azkaban/webapp/servlet/velocity/flowexecutionpanel.vm
@@ -123,14 +123,19 @@
</div>
<div class="actions">
+ #if(!$show_schedule || $show_schedule == 'true')
<a class="btn2" id="schedule-btn">Schedule</a>
+ #end
+
<a class="yes btn1" id="execute-btn">Execute</a>
<a class="no simplemodal-close btn3 closeExecPanel">Cancel</a>
</div>
</div>
</div>
+#if(!$show_schedule || $show_schedule == 'true')
#parse( "azkaban/webapp/servlet/velocity/schedulepanel.vm" )
+#end
<div id="contextMenu">
diff --git a/src/java/azkaban/webapp/servlet/velocity/projectpage.vm b/src/java/azkaban/webapp/servlet/velocity/projectpage.vm
index 32fc700..6c82bfb 100644
--- a/src/java/azkaban/webapp/servlet/velocity/projectpage.vm
+++ b/src/java/azkaban/webapp/servlet/velocity/projectpage.vm
@@ -38,6 +38,8 @@
var errorMessage = null;
var successMessage = null;
+ var projectId = ${project.id};
+
var execAccess = ${exec};
var projectName = "$project.name";
</script>
diff --git a/src/java/azkaban/webapp/servlet/velocity/schedulepanel.vm b/src/java/azkaban/webapp/servlet/velocity/schedulepanel.vm
index ea46998..002b188 100644
--- a/src/java/azkaban/webapp/servlet/velocity/schedulepanel.vm
+++ b/src/java/azkaban/webapp/servlet/velocity/schedulepanel.vm
@@ -1,4 +1,3 @@
-
<script type="text/javascript" src="${context}/js/azkaban.date.utils.js"></script>
<script type="text/javascript" src="${context}/js/azkaban.schedule.panel.view.js"></script>
@@ -12,24 +11,24 @@
<dl>
<dt>Schedule Time</dt>
<dd>
- <input id="advhour" type="text" size="2" value="12"/>
- <input id="advminutes" type="text" size="2" value="00"/>
- <select id="advam_pm">
+ <input id="hour" type="text" size="2" value="12"/>
+ <input id="minutes" type="text" size="2" value="00"/>
+ <select id="am_pm">
<option>pm</option>
<option>am</option>
</select>
- <select id="advtimezone">
+ <select id="timezone">
<option>PDT</option>
<option>UTC</option>
</select>
</dd>
- <dt>Schedule Date</dt><dd><input type="text" id="advdatepicker" /></dd>
+ <dt>Schedule Date</dt><dd><input type="text" id="datepicker" /></dd>
<dt>Recurrence</dt>
<dd>
- <input id="advis_recurring" type="checkbox" checked />
+ <input id="is_recurring" type="checkbox" checked />
<span>repeat every</span>
- <input id="advperiod" type="text" size="2" value="1"/>
- <select id="advperiod_units">
+ <input id="period" type="text" size="2" value="1"/>
+ <select id="period_units">
<option value="d">Days</option>
<option value="h">Hours</option>
<option value="m">Minutes</option>
src/web/js/azkaban.flow.execute.view.js 109(+57 -52)
diff --git a/src/web/js/azkaban.flow.execute.view.js b/src/web/js/azkaban.flow.execute.view.js
index a59714c..82fc060 100644
--- a/src/web/js/azkaban.flow.execute.view.js
+++ b/src/web/js/azkaban.flow.execute.view.js
@@ -34,6 +34,62 @@ azkaban.FlowExecuteDialogView= Backbone.View.extend({
},
render: function() {
},
+ getExecutionOptionData: function() {
+ var failureAction = $('#failureAction').val();
+ var failureEmails = $('#failureEmails').val();
+ var successEmails = $('#successEmails').val();
+ var notifyFailureFirst = $('#notifyFailureFirst').is(':checked');
+ var notifyFailureLast = $('#notifyFailureLast').is(':checked');
+
+ var flowOverride = {};
+ var editRows = $(".editRow");
+ for (var i = 0; i < editRows.length; ++i) {
+ var row = editRows[i];
+ var td = $(row).find('td');
+ var key = $(td[0]).text();
+ var val = $(td[1]).text();
+
+ if (key && key.length > 0) {
+ flowOverride[key] = val;
+ }
+ }
+
+ var disabled = "";
+ var disabledMap = this.model.get('disabled');
+ for (var dis in disabledMap) {
+ if (disabledMap[dis]) {
+ disabled += dis + ",";
+ }
+ }
+
+ var executingData = {
+ projectId: projectId,
+ project: this.projectName,
+ ajax: "executeFlow",
+ flow: this.flowId,
+ disabled: disabled,
+ failureAction: failureAction,
+ failureEmails: failureEmails,
+ successEmails: successEmails,
+ notifyFailureFirst: notifyFailureFirst,
+ notifyFailureLast: notifyFailureLast,
+ flowOverride: flowOverride
+ };
+
+ // Set concurrency option, default is skip
+
+ var concurrentOption = $('input[name=concurrent]:checked').val();
+ executingData.concurrentOption = concurrentOption;
+ if (concurrentOption == "pipeline") {
+ var pipelineLevel = $("#pipelineLevel").val();
+ executingData.pipelineLevel = pipelineLevel;
+ }
+ else if (concurrentOption == "queue") {
+ executingData.queueLevel = $("#queueLevel").val();
+ }
+
+ return executingData;
+ },
changeFlowInfo: function() {
var successEmails = this.model.get("successEmails");
var failureEmails = this.model.get("failureEmails");
@@ -176,58 +232,7 @@ azkaban.FlowExecuteDialogView= Backbone.View.extend({
},
handleExecuteFlow: function(evt) {
var executeURL = contextURL + "/executor";
- var failureAction = $('#failureAction').val();
- var failureEmails = $('#failureEmails').val();
- var successEmails = $('#successEmails').val();
- var notifyFailureFirst = $('#notifyFailureFirst').is(':checked');
- var notifyFailureLast = $('#notifyFailureLast').is(':checked');
-
- var flowOverride = {};
- var editRows = $(".editRow");
- for (var i = 0; i < editRows.length; ++i) {
- var row = editRows[i];
- var td = $(row).find('td');
- var key = $(td[0]).text();
- var val = $(td[1]).text();
-
- if (key && key.length > 0) {
- flowOverride[key] = val;
- }
- }
-
- var disabled = "";
- var disabledMap = this.model.get('disabled');
- for (var dis in disabledMap) {
- if (disabledMap[dis]) {
- disabled += dis + ",";
- }
- }
-
- var executingData = {
- project: this.projectName,
- ajax: "executeFlow",
- flow: this.flowId,
- disabled: disabled,
- failureAction: failureAction,
- failureEmails: failureEmails,
- successEmails: successEmails,
- notifyFailureFirst: notifyFailureFirst,
- notifyFailureLast: notifyFailureLast,
- flowOverride: flowOverride
- };
-
- // Set concurrency option, default is skip
-
- var concurrentOption = $('input[name=concurrent]:checked').val();
- executingData.concurrentOption = concurrentOption;
- if (concurrentOption == "pipeline") {
- var pipelineLevel = $("#pipelineLevel").val();
- executingData.pipelineLevel = pipelineLevel;
- }
- else if (concurrentOption == "queue") {
- executingData.queueLevel = $("#queueLevel").val();
- }
-
+ var executingData = this.getExecutionOptionData();
executeFlow(executingData);
}
});
src/web/js/azkaban.schedule.panel.view.js 55(+50 -5)
diff --git a/src/web/js/azkaban.schedule.panel.view.js b/src/web/js/azkaban.schedule.panel.view.js
index 4fc10ad..e445a5e 100644
--- a/src/web/js/azkaban.schedule.panel.view.js
+++ b/src/web/js/azkaban.schedule.panel.view.js
@@ -19,13 +19,14 @@ $.namespace('azkaban');
var schedulePanelView;
azkaban.SchedulePanelView= Backbone.View.extend({
events : {
- "click .closeSchedule": "hideSchedulePanel"
+ "click .closeSchedule": "hideSchedulePanel",
+ "click #schedule-button": "scheduleFlow"
},
initialize : function(settings) {
- $("#advdatepicker").css("backgroundColor", "transparent");
- $( "#advdatepicker" ).datepicker();
- $( "#advdatepicker" ).datepicker('setDate', new Date());
- $( "#advdatepicker" ).datepicker("hide");
+ $("#datepicker").css("backgroundColor", "transparent");
+ $( "#datepicker" ).datepicker();
+ $( "#datepicker" ).datepicker('setDate', new Date());
+ $( "#datepicker" ).datepicker("hide");
},
render: function() {
},
@@ -36,6 +37,50 @@ azkaban.SchedulePanelView= Backbone.View.extend({
hideSchedulePanel: function() {
$('#scheduleModalBackground').hide();
$('#schedule-panel').hide();
+ },
+ scheduleFlow: function() {
+ var hourVal = $('#hour').val();
+ var minutesVal = $('#minutes').val();
+ var ampmVal = $('#am_pm').val();
+ var timezoneVal = $('#timezone').val();
+ var dateVal = $('#datepicker').val();
+ var is_recurringVal = $('#is_recurring').val();
+ var periodVal = $('#period').val();
+ var periodUnits = $('#period_units').val();
+
+ var scheduleURL = contextURL + "/schedule"
+
+ var scheduleData = flowExecuteDialogView.getExecutionOptionData();
+ console.log("Creating schedule for "+projectName+"."+scheduleData.flow);
+ var scheduleTime = $('#hour').val() + "," + $('#minutes').val() + "," + $('#am_pm').val() + "," + $('#timezone').val();
+ var scheduleDate = $('#datepicker').val();
+ var is_recurring = $('#is_recurring').val();
+ var period = $('#period').val() + $('#period_units').val();
+
+ scheduleData.ajax = "scheduleFlow";
+ scheduleData.projectName = projectName;
+ scheduleData.scheduleTime = scheduleTime;
+ scheduleData.scheduleDate = scheduleDate;
+ scheduleData.is_recurring = is_recurring;
+ scheduleData.period = period;
+
+ $.post(
+ scheduleURL,
+ scheduleData,
+ function(data) {
+ if (data.error) {
+ messageDialogView.show("Error Scheduling Flow", data.message);
+ }
+ else {
+ messageDialogView.show("Flow Scheduled", data.message,
+ function() {
+ window.location.href = scheduleURL;
+ }
+ );
+ }
+ },
+ "json"
+ );
}
});
diff --git a/unit/java/azkaban/scheduler/JdbcScheduleLoaderTest.java b/unit/java/azkaban/scheduler/JdbcScheduleLoaderTest.java
index 4a9491c..91a5e6e 100644
--- a/unit/java/azkaban/scheduler/JdbcScheduleLoaderTest.java
+++ b/unit/java/azkaban/scheduler/JdbcScheduleLoaderTest.java
@@ -26,13 +26,11 @@ import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.Test;
-
-import azkaban.executor.ExecutableFlow.FailureAction;
-import azkaban.scheduler.Schedule.FlowOptions;
-import azkaban.scheduler.Schedule.SlaOptions;
+import azkaban.executor.ExecutionOptions;
import azkaban.sla.SLA.SlaAction;
import azkaban.sla.SLA.SlaRule;
import azkaban.sla.SLA.SlaSetting;
+import azkaban.sla.SlaOptions;
import azkaban.utils.DataSourceUtils;
import azkaban.utils.Props;
@@ -131,7 +129,7 @@ public class JdbcScheduleLoaderTest {
set1.setDuration(Schedule.parsePeriodString("1h"));
set1.setRule(SlaRule.FINISH);
slaSets.add(set1);
- FlowOptions flowOptions = new FlowOptions();
+ ExecutionOptions flowOptions = new ExecutionOptions();
flowOptions.setFailureEmails(emails);
flowOptions.setDisabledJobs(disabledJobs);
SlaOptions slaOptions = new SlaOptions();
@@ -159,7 +157,7 @@ public class JdbcScheduleLoaderTest {
Assert.assertEquals("America/Los_Angeles", sched.getTimezone().getID());
Assert.assertEquals(44444, sched.getSubmitTime());
Assert.assertEquals("1d", Schedule.createPeriodString(sched.getPeriod()));
- FlowOptions fOpt = sched.getFlowOptions();
+ ExecutionOptions fOpt = sched.getExecutionOptions();
SlaOptions sOpt = sched.getSlaOptions();
Assert.assertEquals(SlaAction.EMAIL, sOpt.getSettings().get(0).getActions().get(0));
Assert.assertEquals("", sOpt.getSettings().get(0).getId());
@@ -168,8 +166,8 @@ public class JdbcScheduleLoaderTest {
Assert.assertEquals(2, fOpt.getFailureEmails().size());
Assert.assertEquals(null, fOpt.getSuccessEmails());
Assert.assertEquals(2, fOpt.getDisabledJobs().size());
- Assert.assertEquals(FailureAction.FINISH_CURRENTLY_RUNNING, fOpt.getFailureAction());
- Assert.assertEquals(null, fOpt.getFlowOverride());
+ Assert.assertEquals(ExecutionOptions.FailureAction.FINISH_CURRENTLY_RUNNING, fOpt.getFailureAction());
+ Assert.assertEquals(null, fOpt.getFlowParameters());
}
@Test
@@ -196,7 +194,7 @@ public class JdbcScheduleLoaderTest {
set1.setDuration(Schedule.parsePeriodString("1h"));
set1.setRule(SlaRule.FINISH);
slaSets.add(set1);
- FlowOptions flowOptions = new FlowOptions();
+ ExecutionOptions flowOptions = new ExecutionOptions();
flowOptions.setFailureEmails(emails);
flowOptions.setDisabledJobs(disabledJobs);
SlaOptions slaOptions = new SlaOptions();
@@ -257,7 +255,7 @@ public class JdbcScheduleLoaderTest {
set1.setDuration(Schedule.parsePeriodString("1h"));
set1.setRule(SlaRule.FINISH);
slaSets.add(set1);
- FlowOptions flowOptions = new FlowOptions();
+ ExecutionOptions flowOptions = new ExecutionOptions();
flowOptions.setFailureEmails(emails);
flowOptions.setDisabledJobs(disabledJobs);
SlaOptions slaOptions = new SlaOptions();
diff --git a/unit/java/azkaban/test/execapp/event/LocalFlowWatcherTest.java b/unit/java/azkaban/test/execapp/event/LocalFlowWatcherTest.java
index 452be05..2bdfc7a 100644
--- a/unit/java/azkaban/test/execapp/event/LocalFlowWatcherTest.java
+++ b/unit/java/azkaban/test/execapp/event/LocalFlowWatcherTest.java
@@ -15,6 +15,7 @@ import azkaban.execapp.event.FlowWatcher;
import azkaban.execapp.event.LocalFlowWatcher;
import azkaban.executor.ExecutableFlow;
import azkaban.executor.ExecutableNode;
+import azkaban.executor.ExecutionOptions;
import azkaban.executor.ExecutorLoader;
import azkaban.executor.Status;
import azkaban.flow.Flow;
@@ -205,9 +206,10 @@ public class LocalFlowWatcherTest {
private FlowRunner createFlowRunner(File workingDir, ExecutorLoader loader, EventCollectorListener eventCollector, String flowName, int execId, FlowWatcher watcher, Integer pipeline) throws Exception {
File testDir = new File("unit/executions/exectest1");
ExecutableFlow exFlow = prepareExecDir(workingDir, testDir, flowName, execId);
+ ExecutionOptions option = exFlow.getExecutionOptions();
if (watcher != null) {
- exFlow.setPipelineLevel(pipeline);
- exFlow.setPipelineExecutionId(watcher.getExecId());
+ option.setPipelineLevel(pipeline);
+ option.setPipelineExecutionId(watcher.getExecId());
}
//MockProjectLoader projectLoader = new MockProjectLoader(new File(exFlow.getExecutionPath()));
diff --git a/unit/java/azkaban/test/execapp/event/RemoteFlowWatcherTest.java b/unit/java/azkaban/test/execapp/event/RemoteFlowWatcherTest.java
index 1fddfc7..edf520c 100644
--- a/unit/java/azkaban/test/execapp/event/RemoteFlowWatcherTest.java
+++ b/unit/java/azkaban/test/execapp/event/RemoteFlowWatcherTest.java
@@ -15,6 +15,7 @@ import azkaban.execapp.event.FlowWatcher;
import azkaban.execapp.event.RemoteFlowWatcher;
import azkaban.executor.ExecutableFlow;
import azkaban.executor.ExecutableNode;
+import azkaban.executor.ExecutionOptions;
import azkaban.executor.ExecutorLoader;
import azkaban.executor.Status;
import azkaban.flow.Flow;
@@ -205,9 +206,10 @@ public class RemoteFlowWatcherTest {
private FlowRunner createFlowRunner(File workingDir, ExecutorLoader loader, EventCollectorListener eventCollector, String flowName, int execId, FlowWatcher watcher, Integer pipeline) throws Exception {
File testDir = new File("unit/executions/exectest1");
ExecutableFlow exFlow = prepareExecDir(workingDir, testDir, flowName, execId);
+ ExecutionOptions options = exFlow.getExecutionOptions();
if (watcher != null) {
- exFlow.setPipelineLevel(pipeline);
- exFlow.setPipelineExecutionId(watcher.getExecId());
+ options.setPipelineLevel(pipeline);
+ options.setPipelineExecutionId(watcher.getExecId());
}
//MockProjectLoader projectLoader = new MockProjectLoader(new File(exFlow.getExecutionPath()));
diff --git a/unit/java/azkaban/test/execapp/FlowRunnerTest.java b/unit/java/azkaban/test/execapp/FlowRunnerTest.java
index 54dc49c..9cdca2f 100644
--- a/unit/java/azkaban/test/execapp/FlowRunnerTest.java
+++ b/unit/java/azkaban/test/execapp/FlowRunnerTest.java
@@ -16,7 +16,7 @@ import azkaban.execapp.event.Event;
import azkaban.execapp.event.Event.Type;
import azkaban.executor.ExecutableFlow;
import azkaban.executor.ExecutableNode;
-import azkaban.executor.ExecutableFlow.FailureAction;
+import azkaban.executor.ExecutionOptions.FailureAction;
import azkaban.executor.ExecutorLoader;
import azkaban.executor.Status;
@@ -79,7 +79,6 @@ public class FlowRunnerTest {
testStatus(exFlow, "job6", Status.SUCCEEDED);
testStatus(exFlow, "job7", Status.SUCCEEDED);
testStatus(exFlow, "job8", Status.SUCCEEDED);
- testStatus(exFlow, "job9", Status.SUCCEEDED);
testStatus(exFlow, "job10", Status.SUCCEEDED);
try {
@@ -125,7 +124,6 @@ public class FlowRunnerTest {
testStatus(exFlow, "job6", Status.SKIPPED);
testStatus(exFlow, "job7", Status.SUCCEEDED);
testStatus(exFlow, "job8", Status.SUCCEEDED);
- testStatus(exFlow, "job9", Status.SUCCEEDED);
testStatus(exFlow, "job10", Status.SKIPPED);
try {
@@ -181,7 +179,8 @@ public class FlowRunnerTest {
eventCollector.setEventFilterOut(Event.Type.JOB_FINISHED, Event.Type.JOB_STARTED, Event.Type.JOB_STATUS_CHANGED);
File testDir = new File("unit/executions/exectest1");
ExecutableFlow flow = prepareExecDir(testDir, "exec2", 1);
- flow.setFailureAction(FailureAction.CANCEL_ALL);
+ flow.getExecutionOptions().setFailureAction(FailureAction.CANCEL_ALL);
+
FlowRunner runner = createFlowRunner(flow, loader, eventCollector);
@@ -220,8 +219,7 @@ public class FlowRunnerTest {
eventCollector.setEventFilterOut(Event.Type.JOB_FINISHED, Event.Type.JOB_STARTED, Event.Type.JOB_STATUS_CHANGED);
File testDir = new File("unit/executions/exectest1");
ExecutableFlow flow = prepareExecDir(testDir, "exec3", 1);
- flow.setFailureAction(FailureAction.FINISH_ALL_POSSIBLE);
-
+ flow.getExecutionOptions().setFailureAction(FailureAction.FINISH_ALL_POSSIBLE);
FlowRunner runner = createFlowRunner(flow, loader, eventCollector);
runner.run();
@@ -288,7 +286,6 @@ public class FlowRunnerTest {
testStatus(exFlow, "job5", Status.KILLED);
testStatus(exFlow, "job7", Status.KILLED);
testStatus(exFlow, "job8", Status.KILLED);
- testStatus(exFlow, "job9", Status.KILLED);
testStatus(exFlow, "job10", Status.KILLED);
testStatus(exFlow, "job3", Status.FAILED);
testStatus(exFlow, "job4", Status.FAILED);
diff --git a/unit/java/azkaban/test/executor/JdbcExecutorLoaderTest.java b/unit/java/azkaban/test/executor/JdbcExecutorLoaderTest.java
index 56ffcc4..1fcd9ee 100644
--- a/unit/java/azkaban/test/executor/JdbcExecutorLoaderTest.java
+++ b/unit/java/azkaban/test/executor/JdbcExecutorLoaderTest.java
@@ -185,7 +185,7 @@ public class JdbcExecutorLoaderTest {
Assert.assertEquals(flow.getFlowId(), fetchFlow.getFlowId());
Assert.assertEquals(flow.getProjectId(), fetchFlow.getProjectId());
Assert.assertEquals(flow.getVersion(), fetchFlow.getVersion());
- Assert.assertEquals(flow.getFailureAction(), fetchFlow.getFailureAction());
+ Assert.assertEquals(flow.getExecutionOptions().getFailureAction(), fetchFlow.getExecutionOptions().getFailureAction());
Assert.assertEquals(new HashSet<String>(flow.getEndNodes()), new HashSet<String>(fetchFlow.getEndNodes()));
}
@@ -217,7 +217,7 @@ public class JdbcExecutorLoaderTest {
Assert.assertEquals(flow.getFlowId(), fetchFlow.getFlowId());
Assert.assertEquals(flow.getProjectId(), fetchFlow.getProjectId());
Assert.assertEquals(flow.getVersion(), fetchFlow.getVersion());
- Assert.assertEquals(flow.getFailureAction(), fetchFlow.getFailureAction());
+ Assert.assertEquals(flow.getExecutionOptions().getFailureAction(), fetchFlow.getExecutionOptions().getFailureAction());
Assert.assertEquals(new HashSet<String>(flow.getEndNodes()), new HashSet<String>(fetchFlow.getEndNodes()));
}
@@ -298,7 +298,7 @@ public class JdbcExecutorLoaderTest {
Assert.assertEquals(flow1.getFlowId(), flow1Result.getFlowId());
Assert.assertEquals(flow1.getProjectId(), flow1Result.getProjectId());
Assert.assertEquals(flow1.getVersion(), flow1Result.getVersion());
- Assert.assertEquals(flow1.getFailureAction(), flow1Result.getFailureAction());
+ Assert.assertEquals(flow1.getExecutionOptions().getFailureAction(), flow1Result.getExecutionOptions().getFailureAction());
ExecutableFlow flow1Result2 = activeFlows1.get(flow2.getExecutionId()).getSecond();
Assert.assertNotNull(flow1Result2);
@@ -310,7 +310,7 @@ public class JdbcExecutorLoaderTest {
Assert.assertEquals(flow2.getFlowId(), flow1Result2.getFlowId());
Assert.assertEquals(flow2.getProjectId(), flow1Result2.getProjectId());
Assert.assertEquals(flow2.getVersion(), flow1Result2.getVersion());
- Assert.assertEquals(flow2.getFailureAction(), flow1Result2.getFailureAction());
+ Assert.assertEquals(flow2.getExecutionOptions().getFailureAction(), flow1Result2.getExecutionOptions().getFailureAction());
loader.removeActiveExecutableReference(flow2.getExecutionId());
Map<Integer, Pair<ExecutionReference,ExecutableFlow>> activeFlows2 = loader.fetchActiveFlows();