azkaban-developers
Changes
build.properties 2(+1 -1)
src/java/azkaban/execapp/FlowRunner.java 81(+47 -34)
src/java/azkaban/execapp/JobRunner.java 42(+21 -21)
src/java/azkaban/executor/Status.java 17(+16 -1)
src/less/azkaban-graph.less 11(+10 -1)
src/less/flow.less 20(+17 -3)
src/web/js/azkaban/util/flow-loader.js 14(+0 -14)
src/web/js/azkaban/view/exflow.js 4(+2 -2)
Details
build.properties 2(+1 -1)
diff --git a/build.properties b/build.properties
index aa6322d..745ac17 100644
--- a/build.properties
+++ b/build.properties
@@ -1,3 +1,3 @@
name=azkaban
-version=2.2
+version=2.5
spec.file=azkaban.spec
src/java/azkaban/execapp/FlowRunner.java 81(+47 -34)
diff --git a/src/java/azkaban/execapp/FlowRunner.java b/src/java/azkaban/execapp/FlowRunner.java
index f5f07f4..77c9016 100644
--- a/src/java/azkaban/execapp/FlowRunner.java
+++ b/src/java/azkaban/execapp/FlowRunner.java
@@ -114,7 +114,7 @@ public class FlowRunner extends EventHandler implements Runnable {
private boolean flowPaused = false;
private boolean flowFailed = false;
private boolean flowFinished = false;
- private boolean flowCancelled = false;
+ private boolean flowKilled = false;
// The following is state that will trigger a retry of all failed jobs
private boolean retryFailedJobs = false;
@@ -392,7 +392,7 @@ public class FlowRunner extends EventHandler implements Runnable {
logger.info("Restarting all failed jobs");
this.retryFailedJobs = false;
- this.flowCancelled = false;
+ this.flowKilled = false;
this.flowFailed = false;
this.flow.setStatus(Status.RUNNING);
@@ -400,7 +400,7 @@ public class FlowRunner extends EventHandler implements Runnable {
resetFailedState(this.flow, retryJobs);
for (ExecutableNode node: retryJobs) {
- if(node.getStatus() == Status.READY || node.getStatus() == Status.DISABLED) {
+ if (node.getStatus() == Status.READY || node.getStatus() == Status.DISABLED) {
runReadyJob(node);
}
else if (node.getStatus() == Status.SUCCEEDED){
@@ -415,7 +415,7 @@ public class FlowRunner extends EventHandler implements Runnable {
updateFlow();
}
-
+
private boolean progressGraph() throws IOException {
finishedNodes.swap();
@@ -433,7 +433,7 @@ public class FlowRunner extends EventHandler implements Runnable {
if (!retryJobIfPossible(node)) {
propagateStatus(node.getParentFlow(), Status.FAILED_FINISHING);
if (failureAction == FailureAction.CANCEL_ALL) {
- this.cancel();
+ this.kill();
}
this.flowFailed = true;
}
@@ -495,9 +495,9 @@ public class FlowRunner extends EventHandler implements Runnable {
return false;
}
- if (nextNodeStatus == Status.KILLED) {
- logger.info("Killing '" + node.getNestedId() + "' due to prior errors.");
- node.killNode(System.currentTimeMillis());
+ if (nextNodeStatus == Status.CANCELLED) {
+ logger.info("Cancelling '" + node.getNestedId() + "' due to prior errors.");
+ node.cancelNode(System.currentTimeMillis());
finishExecutableNode(node);
}
else if (nextNodeStatus == Status.SKIPPED) {
@@ -570,10 +570,12 @@ public class FlowRunner extends EventHandler implements Runnable {
boolean succeeded = true;
Props previousOutput = null;
- for(String end: flow.getEndNodes()) {
+ for (String end: flow.getEndNodes()) {
ExecutableNode node = flow.getExecutableNode(end);
- if (node.getStatus() == Status.KILLED || node.getStatus() == Status.FAILED) {
+ if (node.getStatus() == Status.KILLED ||
+ node.getStatus() == Status.FAILED ||
+ node.getStatus() == Status.CANCELLED) {
succeeded = false;
}
@@ -600,6 +602,7 @@ public class FlowRunner extends EventHandler implements Runnable {
break;
case FAILED:
case KILLED:
+ case CANCELLED:
case FAILED_SUCCEEDED:
logger.info("Flow '" + id + "' is set to " + flow.getStatus().toString() + " in " + durationSec + " seconds");
break;
@@ -675,14 +678,14 @@ public class FlowRunner extends EventHandler implements Runnable {
// load the override props if any
try {
props = projectLoader.fetchProjectProperty(flow.getProjectId(), flow.getVersion(), node.getId()+".jor");
- }
+ }
catch(ProjectManagerException e) {
e.printStackTrace();
logger.error("Error loading job override property for job " + node.getId());
}
File path = new File(execDir, source);
- if(props == null) {
+ if (props == null) {
// if no override prop, load the original one on disk
try {
props = new Props(null, path);
@@ -741,7 +744,9 @@ public class FlowRunner extends EventHandler implements Runnable {
if (!Status.isStatusFinished(depStatus)) {
return null;
}
- else if (depStatus == Status.FAILED || depStatus == Status.KILLED) {
+ else if (depStatus == Status.FAILED ||
+ depStatus == Status.CANCELLED ||
+ depStatus == Status.KILLED) {
// We propagate failures as KILLED states.
shouldKill = true;
}
@@ -755,10 +760,10 @@ public class FlowRunner extends EventHandler implements Runnable {
// If the flow has failed, and we want to finish only the currently running jobs, we just
// kill everything else. We also kill, if the flow has been cancelled.
if (flowFailed && failureAction == ExecutionOptions.FailureAction.FINISH_CURRENTLY_RUNNING) {
- return Status.KILLED;
+ return Status.CANCELLED;
}
- else if (shouldKill || isCancelled()) {
- return Status.KILLED;
+ else if (shouldKill || isKilled()) {
+ return Status.CANCELLED;
}
// All good to go, ready to run.
@@ -827,7 +832,7 @@ public class FlowRunner extends EventHandler implements Runnable {
if (flowFailed) {
flow.setStatus(Status.FAILED_FINISHING);
}
- else if (flowCancelled) {
+ else if (flowKilled) {
flow.setStatus(Status.KILLED);
}
else {
@@ -839,20 +844,20 @@ public class FlowRunner extends EventHandler implements Runnable {
}
}
- public void cancel(String user) {
+ public void kill(String user) {
synchronized(mainSyncObj) {
- logger.info("Flow cancelled by " + user);
- cancel();
+ logger.info("Flow killed by " + user);
+ kill();
updateFlow();
}
interrupt();
}
- private void cancel() {
+ private void kill() {
synchronized(mainSyncObj) {
- logger.info("Cancel has been called on flow " + execId);
+ logger.info("Kill has been called on flow " + execId);
flowPaused = false;
- flowCancelled = true;
+ flowKilled = true;
if (watcher != null) {
logger.info("Watcher is attached. Stopping watcher.");
@@ -860,9 +865,9 @@ public class FlowRunner extends EventHandler implements Runnable {
logger.info("Watcher cancelled status is " + watcher.isWatchCancelled());
}
- logger.info("Cancelling " + activeJobRunners.size() + " jobs.");
+ logger.info("Killing " + activeJobRunners.size() + " jobs.");
for (JobRunner runner : activeJobRunners) {
- runner.cancel();
+ runner.kill();
}
}
}
@@ -907,25 +912,32 @@ public class FlowRunner extends EventHandler implements Runnable {
else if (node instanceof ExecutableFlowBase) {
ExecutableFlowBase base = (ExecutableFlowBase)node;
switch (base.getStatus()) {
+ case CANCELLED:
+ node.setStatus(Status.READY);
+ node.setEndTime(-1);
+ node.setStartTime(-1);
+ node.setUpdateTime(currentTime);
+ // Break out of the switch. We'll reset the flow just like a normal node
+ break;
case KILLED:
case FAILED:
case FAILED_FINISHING:
- resetFailedState(base, nodesToRetry);
+ resetFailedState(base, nodesToRetry);
+ continue;
default:
- }
-
- if (base.getStatus() != Status.KILLED) {
+ // Continue the while loop. If the job is in a finished state that's not
+ // a failure, we don't want to reset the job.
continue;
}
}
- else if (node.getStatus() == Status.KILLED) {
+ else if (node.getStatus() == Status.CANCELLED) {
// Not a flow, but killed
node.setStatus(Status.READY);
node.setStartTime(-1);
node.setEndTime(-1);
node.setUpdateTime(currentTime);
}
- else if(node.getStatus() == Status.FAILED) {
+ else if (node.getStatus() == Status.FAILED || node.getStatus() == Status.KILLED) {
node.resetForRetry();
nodesToRetry.add(node);
}
@@ -934,12 +946,13 @@ public class FlowRunner extends EventHandler implements Runnable {
logger.info("Resetting job '" + node.getNestedId() + "' from " + oldStatus + " to " + node.getStatus());
}
- for(String inId: node.getInNodes()) {
+ for (String inId: node.getInNodes()) {
ExecutableNode nodeUp = flow.getExecutableNode(inId);
queue.add(nodeUp);
}
}
+ // At this point, the following code will reset the flow
Status oldFlowState = flow.getStatus();
if (maxStartTime == -1) {
// Nothing has run inside the flow, so we assume the flow hasn't even started running yet.
@@ -992,8 +1005,8 @@ public class FlowRunner extends EventHandler implements Runnable {
}
}
- public boolean isCancelled() {
- return flowCancelled;
+ public boolean isKilled() {
+ return flowKilled;
}
public ExecutableFlow getExecutableFlow() {
diff --git a/src/java/azkaban/execapp/FlowRunnerManager.java b/src/java/azkaban/execapp/FlowRunnerManager.java
index 3fbdb15..f886e64 100644
--- a/src/java/azkaban/execapp/FlowRunnerManager.java
+++ b/src/java/azkaban/execapp/FlowRunnerManager.java
@@ -463,7 +463,7 @@ public class FlowRunnerManager implements EventListener {
throw new ExecutorManagerException("Execution " + execId + " is not running.");
}
- runner.cancel(user);
+ runner.kill(user);
}
public void pauseFlow(int execId, String user) throws ExecutorManagerException {
src/java/azkaban/execapp/JobRunner.java 42(+21 -21)
diff --git a/src/java/azkaban/execapp/JobRunner.java b/src/java/azkaban/execapp/JobRunner.java
index 3de3d23..cbdcecc 100644
--- a/src/java/azkaban/execapp/JobRunner.java
+++ b/src/java/azkaban/execapp/JobRunner.java
@@ -87,7 +87,7 @@ public class JobRunner extends EventHandler implements Runnable {
private int jobLogBackupIndex;
private long delayStartMs = 0;
- private boolean cancelled = false;
+ private boolean killed = false;
private BlockingStatus currentBlockStatus = null;
public JobRunner(ExecutableNode node, File workingDir, ExecutorLoader loader, JobTypeManager jobtypeManager) {
@@ -259,15 +259,15 @@ public class JobRunner extends EventHandler implements Runnable {
boolean quickFinish = false;
long time = System.currentTimeMillis();
- if (this.isCancelled() || Status.isStatusFinished(nodeStatus)) {
+ if (this.isKilled() || Status.isStatusFinished(nodeStatus)) {
quickFinish = true;
}
else if (nodeStatus == Status.DISABLED) {
changeStatus(Status.SKIPPED, time);
quickFinish = true;
}
- else if (this.cancelled) {
- changeStatus(Status.FAILED, time);
+ else if (this.killed) {
+ changeStatus(Status.KILLED, time);
quickFinish = true;
}
@@ -286,7 +286,7 @@ public class JobRunner extends EventHandler implements Runnable {
* If pipelining is set, will block on another flow's jobs.
*/
private boolean blockOnPipeLine() {
- if (this.isCancelled()) {
+ if (this.isKilled()) {
return true;
}
@@ -309,8 +309,8 @@ public class JobRunner extends EventHandler implements Runnable {
logger.info("Waiting on pipelined job " + bStatus.getJobId());
currentBlockStatus = bStatus;
bStatus.blockOnFinishedStatus();
- if (this.isCancelled()) {
- logger.info("Job was cancelled while waiting on pipeline. Quiting.");
+ if (this.isKilled()) {
+ logger.info("Job was killed while waiting on pipeline. Quiting.");
return true;
}
else {
@@ -325,7 +325,7 @@ public class JobRunner extends EventHandler implements Runnable {
}
private boolean delayExecution() {
- if (this.isCancelled()) {
+ if (this.isKilled()) {
return true;
}
@@ -342,8 +342,8 @@ public class JobRunner extends EventHandler implements Runnable {
}
}
- if (this.isCancelled()) {
- logger.info("Job was cancelled while in delay. Quiting.");
+ if (this.isKilled()) {
+ logger.info("Job was killed while in delay. Quiting.");
return true;
}
}
@@ -420,7 +420,7 @@ public class JobRunner extends EventHandler implements Runnable {
// Start the node.
node.setStartTime(System.currentTimeMillis());
- if (!errorFound && !isCancelled()) {
+ if (!errorFound && !isKilled()) {
fireEvent(Event.create(this, Type.JOB_STARTED, null, false));
try {
loader.uploadExecutableNode(node, props);
@@ -443,10 +443,10 @@ public class JobRunner extends EventHandler implements Runnable {
}
node.setEndTime(System.currentTimeMillis());
- if (isCancelled()) {
- changeStatus(Status.FAILED);
+ if (isKilled()) {
+ changeStatus(Status.KILLED);
}
- logInfo("Finishing job " + this.jobId + " at " + node.getEndTime());
+ logInfo("Finishing job " + this.jobId + " at " + node.getEndTime() + " with status " + node.getStatus());
fireEvent(Event.create(this, Type.JOB_FINISHED), false);
finalizeLogFile();
@@ -455,13 +455,13 @@ public class JobRunner extends EventHandler implements Runnable {
private boolean prepareJob() throws RuntimeException {
// Check pre conditions
- if (props == null || cancelled) {
+ if (props == null || killed) {
logError("Failing job. The job properties don't exist");
return false;
}
synchronized (syncObject) {
- if (node.getStatus() == Status.FAILED || cancelled) {
+ if (node.getStatus() == Status.FAILED || killed) {
return false;
}
@@ -557,10 +557,10 @@ public class JobRunner extends EventHandler implements Runnable {
this.fireEventListeners(event);
}
- public void cancel() {
+ public void kill() {
synchronized (syncObject) {
- logError("Cancel has been called.");
- this.cancelled = true;
+ logError("Kill has been called.");
+ this.killed = true;
BlockingStatus status = currentBlockStatus;
if (status != null) {
@@ -587,8 +587,8 @@ public class JobRunner extends EventHandler implements Runnable {
}
}
- public boolean isCancelled() {
- return cancelled;
+ public boolean isKilled() {
+ return killed;
}
public Status getStatus() {
diff --git a/src/java/azkaban/executor/ExecutableNode.java b/src/java/azkaban/executor/ExecutableNode.java
index bacad6c..d444ef6 100644
--- a/src/java/azkaban/executor/ExecutableNode.java
+++ b/src/java/azkaban/executor/ExecutableNode.java
@@ -389,15 +389,15 @@ public class ExecutableNode {
applyUpdateObject(wrapper);
}
- public void killNode(long killTime) {
+ public void cancelNode(long cancelTime) {
if (this.status == Status.DISABLED) {
- skipNode(killTime);
+ skipNode(cancelTime);
}
else {
- this.setStatus(Status.KILLED);
- this.setStartTime(killTime);
- this.setEndTime(killTime);
- this.setUpdateTime(killTime);
+ this.setStatus(Status.CANCELLED);
+ this.setStartTime(cancelTime);
+ this.setEndTime(cancelTime);
+ this.setUpdateTime(cancelTime);
}
}
src/java/azkaban/executor/Status.java 17(+16 -1)
diff --git a/src/java/azkaban/executor/Status.java b/src/java/azkaban/executor/Status.java
index fa45066..7643d2f 100644
--- a/src/java/azkaban/executor/Status.java
+++ b/src/java/azkaban/executor/Status.java
@@ -17,7 +17,19 @@
package azkaban.executor;
public enum Status {
- READY(10), PREPARING(20), RUNNING(30), PAUSED(40), SUCCEEDED(50), KILLED(60), FAILED(70), FAILED_FINISHING(80), SKIPPED(90), DISABLED(100), QUEUED(110), FAILED_SUCCEEDED(120);
+ READY(10),
+ PREPARING(20),
+ RUNNING(30),
+ PAUSED(40),
+ SUCCEEDED(50),
+ KILLED(60),
+ FAILED(70),
+ FAILED_FINISHING(80),
+ SKIPPED(90),
+ DISABLED(100),
+ QUEUED(110),
+ FAILED_SUCCEEDED(120),
+ CANCELLED(130);
private int numVal;
@@ -55,6 +67,8 @@ public enum Status {
return QUEUED;
case 120:
return FAILED_SUCCEEDED;
+ case 130:
+ return CANCELLED;
default:
return READY;
}
@@ -67,6 +81,7 @@ public enum Status {
case SUCCEEDED:
case SKIPPED:
case FAILED_SUCCEEDED:
+ case CANCELLED:
return true;
default:
return false;
diff --git a/src/java/azkaban/scheduler/TriggerBasedScheduleLoader.java b/src/java/azkaban/scheduler/TriggerBasedScheduleLoader.java
index 66bc178..7648a05 100644
--- a/src/java/azkaban/scheduler/TriggerBasedScheduleLoader.java
+++ b/src/java/azkaban/scheduler/TriggerBasedScheduleLoader.java
@@ -30,9 +30,6 @@ public class TriggerBasedScheduleLoader implements ScheduleLoader {
public TriggerBasedScheduleLoader(TriggerManager triggerManager, String triggerSource) {
this.triggerManager = triggerManager;
this.triggerSource = triggerSource;
-// // need to init the action types and condition checker types
-// ExecuteFlowAction.setExecutorManager(executorManager);
-// ExecuteFlowAction.setProjectManager(projectManager);
}
private Trigger scheduleToTrigger(Schedule s) {
@@ -52,15 +49,6 @@ public class TriggerBasedScheduleLoader implements ScheduleLoader {
List<TriggerAction> actions = new ArrayList<TriggerAction>();
ExecuteFlowAction executeAct = new ExecuteFlowAction("executeFlowAction", s.getProjectId(), s.getProjectName(), s.getFlowName(), s.getSubmitUser(), s.getExecutionOptions(), s.getSlaOptions());
actions.add(executeAct);
-// List<SlaOption> slaOptions = s.getSlaOptions();
-// if(slaOptions != null && slaOptions.size() > 0) {
-// // insert a trigger to keep watching that execution
-// for(SlaOption sla : slaOptions) {
-// // need to create triggers for each sla
-// SlaChecker slaChecker = new SlaChecker("slaChecker", sla, executeAct.getId());
-//
-// }
-// }
return actions;
}
@@ -178,9 +166,7 @@ public class TriggerBasedScheduleLoader implements ScheduleLoader {
@Override
public void updateNextExecTime(Schedule s)
throws ScheduleManagerException {
-// Trigger t = triggersLocalCopy.get(s.getScheduleId());
-// BasicTimeChecker ck = (BasicTimeChecker) t.getTriggerCondition().getCheckers().values().toArray()[0];
-// s.setNextExecTime(ck.getNextCheckTime().getMillis());
+
}
@Override
diff --git a/src/java/azkaban/trigger/builtin/SlaAlertAction.java b/src/java/azkaban/trigger/builtin/SlaAlertAction.java
index 1f6eb00..f340497 100644
--- a/src/java/azkaban/trigger/builtin/SlaAlertAction.java
+++ b/src/java/azkaban/trigger/builtin/SlaAlertAction.java
@@ -166,7 +166,7 @@ public class SlaAlertAction implements TriggerAction{
@Override
public String getDescription() {
- return type + " with " + slaOption.toString();
+ return type + " for " + execId + " with " + slaOption.toString();
}
}
diff --git a/src/java/azkaban/trigger/TriggerManager.java b/src/java/azkaban/trigger/TriggerManager.java
index 594f759..6f88fd6 100644
--- a/src/java/azkaban/trigger/TriggerManager.java
+++ b/src/java/azkaban/trigger/TriggerManager.java
@@ -53,7 +53,7 @@ public class TriggerManager extends EventHandler implements TriggerManagerAdapte
private ExecutorManagerEventListener listener = new ExecutorManagerEventListener();
- private Object syncObj = new Object();
+ private final Object syncObj = new Object();
private String scannerStage = "";
@@ -85,7 +85,7 @@ public class TriggerManager extends EventHandler implements TriggerManagerAdapte
@Override
public void start() throws TriggerManagerException{
- try{
+ try {
// expect loader to return valid triggers
List<Trigger> triggers = triggerLoader.loadTriggers();
for(Trigger t : triggers) {
@@ -185,7 +185,7 @@ public class TriggerManager extends EventHandler implements TriggerManagerAdapte
triggers = new PriorityBlockingQueue<Trigger>(1, new TriggerComparator());
justFinishedFlows = new ConcurrentHashMap<Integer, ExecutableFlow>();
this.setName("TriggerRunnerManager-Trigger-Scanner-Thread");
- this.scannerInterval = scannerInterval;;
+ this.scannerInterval = scannerInterval;
}
public void shutdown() {
@@ -216,12 +216,12 @@ public class TriggerManager extends EventHandler implements TriggerManagerAdapte
//while(stillAlive.get()) {
while(!shutdown) {
synchronized (syncObj) {
- try{
+ try {
lastRunnerThreadCheckTime = System.currentTimeMillis();
scannerStage = "Ready to start a new scan cycle at " + lastRunnerThreadCheckTime;
- try{
+ try {
checkAllTriggers();
justFinishedFlows.clear();
} catch(Exception e) {
@@ -231,15 +231,15 @@ public class TriggerManager extends EventHandler implements TriggerManagerAdapte
t.printStackTrace();
logger.error(t.getMessage());
}
-
+
scannerStage = "Done flipping all triggers.";
runnerThreadIdleTime = scannerInterval - (System.currentTimeMillis() - lastRunnerThreadCheckTime);
-
+
if(runnerThreadIdleTime < 0) {
logger.error("Trigger manager thread " + this.getName() + " is too busy!");
} else {
- wait(runnerThreadIdleTime);
+ syncObj.wait(runnerThreadIdleTime);
}
} catch(InterruptedException e) {
logger.info("Interrupted. Probably to shut down.");
diff --git a/src/java/azkaban/webapp/servlet/velocity/executingflowpage.vm b/src/java/azkaban/webapp/servlet/velocity/executingflowpage.vm
index 5454c99..5582114 100644
--- a/src/java/azkaban/webapp/servlet/velocity/executingflowpage.vm
+++ b/src/java/azkaban/webapp/servlet/velocity/executingflowpage.vm
@@ -106,7 +106,7 @@
<li id="statsViewLink"><a href="#stats">Stats</a></li>
<li class="nav-button pull-right"><button type="button" id="pausebtn" class="btn btn-primary">Pause</button></li>
<li class="nav-button pull-right"><button type="button" id="resumebtn" class="btn btn-primary">Resume</button></li>
- <li class="nav-button pull-right"><button type="button" id="cancelbtn" class="btn btn-danger">Cancel</button></li>
+ <li class="nav-button pull-right"><button type="button" id="cancelbtn" class="btn btn-danger">Kill</button></li>
<li class="nav-button pull-right"><button type="button" id="retrybtn" class="btn btn-success">Retry Failed</button></li>
<li class="nav-button pull-right"><button type="button" id="executebtn" class="btn btn-success">Prepare Execution</button></li>
</ul>
src/less/azkaban-graph.less 11(+10 -1)
diff --git a/src/less/azkaban-graph.less b/src/less/azkaban-graph.less
index 5a5d213..93fb36c 100644
--- a/src/less/azkaban-graph.less
+++ b/src/less/azkaban-graph.less
@@ -80,11 +80,20 @@
}
.KILLED > g > rect {
+ fill: #d2322d;
+ stroke: #d2322d;
+}
+
+.KILLED > g > text {
+ fill: #FFF;
+}
+
+.CANCELLED > g > rect {
fill: #FF9999;
stroke: #FF9999;
}
-.KILLED > g > text {
+.CANCELLED > g > text {
fill: #FFF;
}
src/less/flow.less 20(+17 -3)
diff --git a/src/less/flow.less b/src/less/flow.less
index 5e60909..b8dde8e 100644
--- a/src/less/flow.less
+++ b/src/less/flow.less
@@ -45,6 +45,10 @@
&.FAILED {
background-color: #d9534f;
}
+
+ &.KILLED {
+ background-color: #d9534f;
+ }
&.RUNNING {
background-color: #3398cc;
@@ -61,7 +65,7 @@
background-color: #009fc9;
}
- &.KILLED {
+ &.CANCELLED {
background-color: #ff9999;
}
}
@@ -91,6 +95,10 @@ td {
&.FAILED {
background-color: #d9534f;
}
+
+ &.KILLED {
+ background-color: #d9534f;
+ }
&.PAUSED {
background-color: #c82123;
@@ -114,7 +122,7 @@ td {
background-color: #aaa;
}
- &.KILLED {
+ &.CANCELLED {
background-color: #ff9999;
}
}
@@ -139,7 +147,8 @@ td {
&.FAILED,
&.FAILED_FINISHING,
- &.KILLED {
+ &.KILLED,
+ &.CANCELLED {
color: #cc0000;
}
}
@@ -220,6 +229,11 @@ li.tree-list-item {
background-position: 0px 0px;
}
+ &.CANCELLED .icon {
+ background-position: 0px 0px;
+ opacity: 0.5;
+ }
+
&.FAILED_FINISHING .icon {
background-position: 0px 0px;
}
src/web/js/azkaban/util/flow-loader.js 14(+0 -14)
diff --git a/src/web/js/azkaban/util/flow-loader.js b/src/web/js/azkaban/util/flow-loader.js
index ee5ed8b..b68dfb5 100644
--- a/src/web/js/azkaban/util/flow-loader.js
+++ b/src/web/js/azkaban/util/flow-loader.js
@@ -14,20 +14,6 @@
* the License.
*/
-var statusStringMap = {
- "FAILED": "Failed",
- "SUCCEEDED": "Success",
- "FAILED_FINISHING": "Running w/Failure",
- "RUNNING": "Running",
- "WAITING": "Waiting",
- "KILLED": "Killed",
- "DISABLED": "Disabled",
- "READY": "Ready",
- "UNKNOWN": "Unknown",
- "QUEUED": "Queued",
- "SKIPPED": "Skipped"
-};
-
var extendedViewPanels = {};
var extendedDataModels = {};
var openJobDisplayCallback = function(nodeId, flowId, evt) {
diff --git a/src/web/js/azkaban/util/job-status.js b/src/web/js/azkaban/util/job-status.js
index ee03ae6..deef88e 100644
--- a/src/web/js/azkaban/util/job-status.js
+++ b/src/web/js/azkaban/util/job-status.js
@@ -14,8 +14,9 @@
* the License.
*/
-var statusList = ["FAILED", "FAILED_FINISHING", "SUCCEEDED", "RUNNING", "WAITING", "KILLED", "DISABLED", "READY", "UNKNOWN", "PAUSED", "SKIPPED"];
+var statusList = ["FAILED", "FAILED_FINISHING", "SUCCEEDED", "RUNNING", "WAITING", "KILLED", "DISABLED", "READY", "CANCELLED", "UNKNOWN", "PAUSED", "SKIPPED", "QUEUED"];
var statusStringMap = {
+ "QUEUED": "Queued",
"SKIPPED": "Skipped",
"PREPARING": "Preparing",
"FAILED": "Failed",
@@ -24,6 +25,7 @@ var statusStringMap = {
"RUNNING": "Running",
"WAITING": "Waiting",
"KILLED": "Killed",
+ "CANCELLED": "Cancelled",
"DISABLED": "Disabled",
"READY": "Ready",
"UNKNOWN": "Unknown",
src/web/js/azkaban/view/exflow.js 4(+2 -2)
diff --git a/src/web/js/azkaban/view/exflow.js b/src/web/js/azkaban/view/exflow.js
index c495584..f6ecc38 100644
--- a/src/web/js/azkaban/view/exflow.js
+++ b/src/web/js/azkaban/view/exflow.js
@@ -439,8 +439,8 @@ var updaterFunction = function() {
var data = graphModel.get("data");
if (data.status == "UNKNOWN" ||
- data.status == "WAITING" ||
- data.status == "PREPARING") {
+ data.status == "WAITING" ||
+ data.status == "PREPARING") {
setTimeout(function() {updaterFunction();}, 1000);
}
else if (data.status != "SUCCEEDED" && data.status != "FAILED") {
diff --git a/src/web/js/azkaban/view/flow-execute-dialog.js b/src/web/js/azkaban/view/flow-execute-dialog.js
index e8db80c..6b93bac 100644
--- a/src/web/js/azkaban/view/flow-execute-dialog.js
+++ b/src/web/js/azkaban/view/flow-execute-dialog.js
@@ -495,14 +495,14 @@ var disableFinishedJobs = function(data) {
else if (node.status == "SUCCEEDED" || node.status=="RUNNING") {
node.disabled = true;
}
- else if (node.status == "KILLED") {
+ else if (node.status == "CANCELLED") {
node.disabled = false;
node.status="READY";
}
else {
node.disabled = false;
- if (node.flowData) {
- disableFinishedJobs(node.flowData);
+ if (node.type == "flow") {
+ disableFinishedJobs(node);
}
}
}
@@ -526,8 +526,8 @@ var recurseTree = function(data, disabled, recurse) {
var node = data.nodes[i];
node.disabled = disabled;
- if (node.flowData && recurse) {
- recurseTree(node.flowData, disabled);
+ if (node.type == "flow" && recurse) {
+ recurseTree(node, disabled);
}
}
}
@@ -583,8 +583,8 @@ var gatherDisabledNodes = function(data) {
disabled.push(node.id);
}
else {
- if (node.flowData) {
- var array = gatherDisabledNodes(node.flowData);
+ if (node.type == "flow") {
+ var array = gatherDisabledNodes(node);
if (array && array.length > 0) {
disabled.push({id: node.id, children: array});
}
diff --git a/unit/java/azkaban/test/execapp/FlowRunnerTest.java b/unit/java/azkaban/test/execapp/FlowRunnerTest.java
index c0f8025..21b43f1 100644
--- a/unit/java/azkaban/test/execapp/FlowRunnerTest.java
+++ b/unit/java/azkaban/test/execapp/FlowRunnerTest.java
@@ -73,7 +73,7 @@ public class FlowRunnerTest {
eventCollector.setEventFilterOut(Event.Type.JOB_FINISHED, Event.Type.JOB_STARTED, Event.Type.JOB_STATUS_CHANGED);
FlowRunner runner = createFlowRunner(loader, eventCollector, "exec1");
- Assert.assertTrue(!runner.isCancelled());
+ Assert.assertTrue(!runner.isKilled());
runner.run();
ExecutableFlow exFlow = runner.getExecutableFlow();
Assert.assertTrue(exFlow.getStatus() == Status.SUCCEEDED);
@@ -115,7 +115,7 @@ public class FlowRunnerTest {
FlowRunner runner = createFlowRunner(exFlow, loader, eventCollector);
- Assert.assertTrue(!runner.isCancelled());
+ Assert.assertTrue(!runner.isKilled());
Assert.assertTrue(exFlow.getStatus() == Status.READY);
runner.run();
@@ -156,19 +156,19 @@ public class FlowRunnerTest {
runner.run();
ExecutableFlow exFlow = runner.getExecutableFlow();
- Assert.assertTrue(!runner.isCancelled());
+ Assert.assertTrue(!runner.isKilled());
Assert.assertTrue("Flow status " + exFlow.getStatus(), exFlow.getStatus() == Status.FAILED);
testStatus(exFlow, "job1", Status.SUCCEEDED);
testStatus(exFlow, "job2d", Status.FAILED);
- testStatus(exFlow, "job3", Status.KILLED);
- testStatus(exFlow, "job4", Status.KILLED);
- testStatus(exFlow, "job5", Status.KILLED);
+ testStatus(exFlow, "job3", Status.CANCELLED);
+ testStatus(exFlow, "job4", Status.CANCELLED);
+ testStatus(exFlow, "job5", Status.CANCELLED);
testStatus(exFlow, "job6", Status.SUCCEEDED);
- testStatus(exFlow, "job7", Status.KILLED);
- testStatus(exFlow, "job8", Status.KILLED);
- testStatus(exFlow, "job9", Status.KILLED);
- testStatus(exFlow, "job10", Status.KILLED);
+ testStatus(exFlow, "job7", Status.CANCELLED);
+ testStatus(exFlow, "job8", Status.CANCELLED);
+ testStatus(exFlow, "job9", Status.CANCELLED);
+ testStatus(exFlow, "job10", Status.CANCELLED);
try {
eventCollector.checkEventExists(new Type[] {Type.FLOW_STARTED, Type.FLOW_FINISHED});
@@ -195,7 +195,7 @@ public class FlowRunnerTest {
runner.run();
ExecutableFlow exFlow = runner.getExecutableFlow();
- Assert.assertTrue(runner.isCancelled());
+ Assert.assertTrue(runner.isKilled());
Assert.assertTrue("Expected flow " + Status.FAILED + " instead " + exFlow.getStatus(), exFlow.getStatus() == Status.FAILED);
@@ -209,14 +209,14 @@ public class FlowRunnerTest {
testStatus(exFlow, "job1", Status.SUCCEEDED);
testStatus(exFlow, "job2d", Status.FAILED);
- testStatus(exFlow, "job3", Status.KILLED);
- testStatus(exFlow, "job4", Status.KILLED);
- testStatus(exFlow, "job5", Status.KILLED);
- testStatus(exFlow, "job6", Status.FAILED);
- testStatus(exFlow, "job7", Status.KILLED);
- testStatus(exFlow, "job8", Status.KILLED);
- testStatus(exFlow, "job9", Status.KILLED);
- testStatus(exFlow, "job10", Status.KILLED);
+ testStatus(exFlow, "job3", Status.CANCELLED);
+ testStatus(exFlow, "job4", Status.CANCELLED);
+ testStatus(exFlow, "job5", Status.CANCELLED);
+ testStatus(exFlow, "job6", Status.KILLED);
+ testStatus(exFlow, "job7", Status.CANCELLED);
+ testStatus(exFlow, "job8", Status.CANCELLED);
+ testStatus(exFlow, "job9", Status.CANCELLED);
+ testStatus(exFlow, "job10", Status.CANCELLED);
try {
eventCollector.checkEventExists(new Type[] {Type.FLOW_STARTED, Type.FLOW_FINISHED});
@@ -246,20 +246,19 @@ public class FlowRunnerTest {
try {
wait(500);
} catch(InterruptedException e) {
-
}
}
testStatus(exFlow, "job1", Status.SUCCEEDED);
testStatus(exFlow, "job2d", Status.FAILED);
testStatus(exFlow, "job3", Status.SUCCEEDED);
- testStatus(exFlow, "job4", Status.KILLED);
- testStatus(exFlow, "job5", Status.KILLED);
- testStatus(exFlow, "job6", Status.KILLED);
+ testStatus(exFlow, "job4", Status.CANCELLED);
+ testStatus(exFlow, "job5", Status.CANCELLED);
+ testStatus(exFlow, "job6", Status.CANCELLED);
testStatus(exFlow, "job7", Status.SUCCEEDED);
testStatus(exFlow, "job8", Status.SUCCEEDED);
testStatus(exFlow, "job9", Status.SUCCEEDED);
- testStatus(exFlow, "job10", Status.KILLED);
+ testStatus(exFlow, "job10", Status.CANCELLED);
try {
eventCollector.checkEventExists(new Type[] {Type.FLOW_STARTED, Type.FLOW_FINISHED});
@@ -278,7 +277,7 @@ public class FlowRunnerTest {
eventCollector.setEventFilterOut(Event.Type.JOB_FINISHED, Event.Type.JOB_STARTED, Event.Type.JOB_STATUS_CHANGED);
FlowRunner runner = createFlowRunner(loader, eventCollector, "exec1");
- Assert.assertTrue(!runner.isCancelled());
+ Assert.assertTrue(!runner.isKilled());
Thread thread = new Thread(runner);
thread.start();
@@ -290,8 +289,8 @@ public class FlowRunnerTest {
e.printStackTrace();
}
- runner.cancel("me");
- Assert.assertTrue(runner.isCancelled());
+ runner.kill("me");
+ Assert.assertTrue(runner.isKilled());
}
@@ -307,15 +306,15 @@ public class FlowRunnerTest {
ExecutableFlow exFlow = runner.getExecutableFlow();
testStatus(exFlow, "job1", Status.SUCCEEDED);
testStatus(exFlow, "job2", Status.SUCCEEDED);
- testStatus(exFlow, "job5", Status.KILLED);
- testStatus(exFlow, "job7", Status.KILLED);
- testStatus(exFlow, "job8", Status.KILLED);
- testStatus(exFlow, "job10", Status.KILLED);
- testStatus(exFlow, "job3", Status.FAILED);
- testStatus(exFlow, "job4", Status.FAILED);
- testStatus(exFlow, "job6", Status.FAILED);
+ testStatus(exFlow, "job5", Status.CANCELLED);
+ testStatus(exFlow, "job7", Status.CANCELLED);
+ testStatus(exFlow, "job8", Status.CANCELLED);
+ testStatus(exFlow, "job10", Status.CANCELLED);
+ testStatus(exFlow, "job3", Status.KILLED);
+ testStatus(exFlow, "job4", Status.KILLED);
+ testStatus(exFlow, "job6", Status.KILLED);
- Assert.assertTrue("Expected FAILED status instead got " + exFlow.getStatus(),exFlow.getStatus() == Status.FAILED);
+ Assert.assertTrue("Expected FAILED status instead got " + exFlow.getStatus(),exFlow.getStatus() == Status.KILLED);
try {
eventCollector.checkEventExists(new Type[] {Type.FLOW_STARTED, Type.FLOW_FINISHED});
diff --git a/unit/java/azkaban/test/execapp/FlowRunnerTest2.java b/unit/java/azkaban/test/execapp/FlowRunnerTest2.java
index ada74b4..e5fa39f 100644
--- a/unit/java/azkaban/test/execapp/FlowRunnerTest2.java
+++ b/unit/java/azkaban/test/execapp/FlowRunnerTest2.java
@@ -319,21 +319,21 @@ public class FlowRunnerTest2 {
Assert.assertEquals(Status.FAILED_FINISHING, flow.getStatus());
expectedStateMap.put("joba", Status.FAILED);
expectedStateMap.put("joba1", Status.RUNNING);
- expectedStateMap.put("jobb", Status.KILLED);
- expectedStateMap.put("jobc", Status.KILLED);
- expectedStateMap.put("jobd", Status.KILLED);
+ expectedStateMap.put("jobb", Status.CANCELLED);
+ expectedStateMap.put("jobc", Status.CANCELLED);
+ expectedStateMap.put("jobd", Status.CANCELLED);
expectedStateMap.put("jobd:innerJobA", Status.READY);
expectedStateMap.put("jobd:innerFlow2", Status.READY);
expectedStateMap.put("jobb:innerJobA", Status.READY);
expectedStateMap.put("jobb:innerFlow", Status.READY);
- expectedStateMap.put("jobe", Status.KILLED);
+ expectedStateMap.put("jobe", Status.CANCELLED);
compareStates(expectedStateMap, nodeMap);
// 3. jobb:Inner completes
/// innerJobA completes
InteractiveTestJob.getTestJob("joba1").succeedJob();
pause(250);
- expectedStateMap.put("jobf", Status.KILLED);
+ expectedStateMap.put("jobf", Status.CANCELLED);
Assert.assertEquals(Status.FAILED, flow.getStatus());
Assert.assertFalse(thread.isAlive());
}
@@ -379,10 +379,10 @@ public class FlowRunnerTest2 {
pause(250);
expectedStateMap.put("jobb:innerJobA", Status.SUCCEEDED);
expectedStateMap.put("jobd:innerJobA", Status.SUCCEEDED);
- expectedStateMap.put("jobb:innerJobB", Status.KILLED);
- expectedStateMap.put("jobb:innerJobC", Status.KILLED);
- expectedStateMap.put("jobb:innerFlow", Status.KILLED);
- expectedStateMap.put("jobd:innerFlow2", Status.KILLED);
+ expectedStateMap.put("jobb:innerJobB", Status.CANCELLED);
+ expectedStateMap.put("jobb:innerJobC", Status.CANCELLED);
+ expectedStateMap.put("jobb:innerFlow", Status.CANCELLED);
+ expectedStateMap.put("jobd:innerFlow2", Status.CANCELLED);
expectedStateMap.put("jobb", Status.KILLED);
expectedStateMap.put("jobd", Status.KILLED);
compareStates(expectedStateMap, nodeMap);
@@ -391,8 +391,8 @@ public class FlowRunnerTest2 {
InteractiveTestJob.getTestJob("jobc").succeedJob();
pause(250);
expectedStateMap.put("jobc", Status.SUCCEEDED);
- expectedStateMap.put("jobe", Status.KILLED);
- expectedStateMap.put("jobf", Status.KILLED);
+ expectedStateMap.put("jobe", Status.CANCELLED);
+ expectedStateMap.put("jobf", Status.CANCELLED);
compareStates(expectedStateMap, nodeMap);
Assert.assertEquals(Status.FAILED, flow.getStatus());
@@ -401,7 +401,7 @@ public class FlowRunnerTest2 {
@Test
public void testNormalFailure3() throws Exception {
- // Test propagation of KILLED status to embedded flows different branch
+ // Test propagation of CANCELLED status to embedded flows different branch
EventCollectorListener eventCollector = new EventCollectorListener();
FlowRunner runner = createFlowRunner(eventCollector, "jobf");
ExecutableFlow flow = runner.getExecutableFlow();
@@ -449,10 +449,10 @@ public class FlowRunnerTest2 {
InteractiveTestJob.getTestJob("jobd:innerJobA").succeedJob();
pause(250);
expectedStateMap.put("jobd:innerJobA", Status.SUCCEEDED);
- expectedStateMap.put("jobd:innerFlow2", Status.KILLED);
+ expectedStateMap.put("jobd:innerFlow2", Status.CANCELLED);
expectedStateMap.put("jobd", Status.KILLED);
expectedStateMap.put("jobb:innerJobC", Status.SUCCEEDED);
- expectedStateMap.put("jobb:innerFlow", Status.KILLED);
+ expectedStateMap.put("jobb:innerFlow", Status.CANCELLED);
expectedStateMap.put("jobb", Status.FAILED);
compareStates(expectedStateMap, nodeMap);
@@ -460,8 +460,8 @@ public class FlowRunnerTest2 {
InteractiveTestJob.getTestJob("jobc").succeedJob();
pause(250);
expectedStateMap.put("jobc", Status.SUCCEEDED);
- expectedStateMap.put("jobe", Status.KILLED);
- expectedStateMap.put("jobf", Status.KILLED);
+ expectedStateMap.put("jobe", Status.CANCELLED);
+ expectedStateMap.put("jobf", Status.CANCELLED);
compareStates(expectedStateMap, nodeMap);
Assert.assertEquals(Status.FAILED, flow.getStatus());
@@ -521,7 +521,7 @@ public class FlowRunnerTest2 {
expectedStateMap.put("jobd:innerJobA", Status.SUCCEEDED);
expectedStateMap.put("jobd:innerFlow2", Status.RUNNING);
expectedStateMap.put("jobb:innerJobC", Status.SUCCEEDED);
- expectedStateMap.put("jobb:innerFlow", Status.KILLED);
+ expectedStateMap.put("jobb:innerFlow", Status.CANCELLED);
compareStates(expectedStateMap, nodeMap);
InteractiveTestJob.getTestJob("jobd:innerFlow2").succeedJob();
@@ -533,8 +533,8 @@ public class FlowRunnerTest2 {
InteractiveTestJob.getTestJob("jobc").succeedJob();
pause(250);
expectedStateMap.put("jobc", Status.SUCCEEDED);
- expectedStateMap.put("jobe", Status.KILLED);
- expectedStateMap.put("jobf", Status.KILLED);
+ expectedStateMap.put("jobe", Status.CANCELLED);
+ expectedStateMap.put("jobf", Status.CANCELLED);
compareStates(expectedStateMap, nodeMap);
Assert.assertEquals(Status.FAILED, flow.getStatus());
Assert.assertFalse(thread.isAlive());
@@ -583,14 +583,14 @@ public class FlowRunnerTest2 {
pause(250);
expectedStateMap.put("jobb", Status.FAILED);
expectedStateMap.put("jobb:innerJobB", Status.FAILED);
- expectedStateMap.put("jobb:innerJobC", Status.FAILED);
- expectedStateMap.put("jobb:innerFlow", Status.KILLED);
- expectedStateMap.put("jobc", Status.FAILED);
- expectedStateMap.put("jobd", Status.FAILED);
- expectedStateMap.put("jobd:innerJobA", Status.FAILED);
- expectedStateMap.put("jobd:innerFlow2", Status.KILLED);
- expectedStateMap.put("jobe", Status.KILLED);
- expectedStateMap.put("jobf", Status.KILLED);
+ expectedStateMap.put("jobb:innerJobC", Status.KILLED);
+ expectedStateMap.put("jobb:innerFlow", Status.CANCELLED);
+ expectedStateMap.put("jobc", Status.KILLED);
+ expectedStateMap.put("jobd", Status.KILLED);
+ expectedStateMap.put("jobd:innerJobA", Status.KILLED);
+ expectedStateMap.put("jobd:innerFlow2", Status.CANCELLED);
+ expectedStateMap.put("jobe", Status.CANCELLED);
+ expectedStateMap.put("jobf", Status.CANCELLED);
compareStates(expectedStateMap, nodeMap);
Assert.assertFalse(thread.isAlive());
@@ -640,7 +640,7 @@ public class FlowRunnerTest2 {
expectedStateMap.put("jobb:innerJobB", Status.FAILED);
expectedStateMap.put("jobb:innerJobC", Status.FAILED);
expectedStateMap.put("jobb:innerFlow", Status.SKIPPED);
- expectedStateMap.put("jobd:innerFlow2", Status.KILLED);
+ expectedStateMap.put("jobd:innerFlow2", Status.CANCELLED);
expectedStateMap.put("jobd:innerJobA", Status.SUCCEEDED);
expectedStateMap.put("jobd", Status.KILLED);
Assert.assertEquals(Status.FAILED_FINISHING, flow.getStatus());
@@ -739,21 +739,21 @@ public class FlowRunnerTest2 {
expectedStateMap.put("jobb:innerJobC", Status.RUNNING);
compareStates(expectedStateMap, nodeMap);
- runner.cancel("me");
+ runner.kill("me");
pause(250);
- expectedStateMap.put("jobb", Status.FAILED);
- expectedStateMap.put("jobb:innerJobB", Status.FAILED);
- expectedStateMap.put("jobb:innerJobC", Status.FAILED);
- expectedStateMap.put("jobb:innerFlow", Status.KILLED);
- expectedStateMap.put("jobc", Status.FAILED);
- expectedStateMap.put("jobd", Status.FAILED);
- expectedStateMap.put("jobd:innerJobA", Status.FAILED);
- expectedStateMap.put("jobd:innerFlow2", Status.KILLED);
- expectedStateMap.put("jobe", Status.KILLED);
- expectedStateMap.put("jobf", Status.KILLED);
+ expectedStateMap.put("jobb", Status.KILLED);
+ expectedStateMap.put("jobb:innerJobB", Status.KILLED);
+ expectedStateMap.put("jobb:innerJobC", Status.KILLED);
+ expectedStateMap.put("jobb:innerFlow", Status.CANCELLED);
+ expectedStateMap.put("jobc", Status.KILLED);
+ expectedStateMap.put("jobd", Status.KILLED);
+ expectedStateMap.put("jobd:innerJobA", Status.KILLED);
+ expectedStateMap.put("jobd:innerFlow2", Status.CANCELLED);
+ expectedStateMap.put("jobe", Status.CANCELLED);
+ expectedStateMap.put("jobf", Status.CANCELLED);
- Assert.assertEquals(Status.FAILED, flow.getStatus());
+ Assert.assertEquals(Status.KILLED, flow.getStatus());
compareStates(expectedStateMap, nodeMap);
Assert.assertFalse(thread.isAlive());
}
@@ -804,18 +804,18 @@ public class FlowRunnerTest2 {
Assert.assertEquals(Status.FAILED_FINISHING, flow.getStatus());
compareStates(expectedStateMap, nodeMap);
- runner.cancel("me");
+ runner.kill("me");
pause(1000);
expectedStateMap.put("jobb", Status.FAILED);
- expectedStateMap.put("jobb:innerJobC", Status.FAILED);
- expectedStateMap.put("jobb:innerFlow", Status.KILLED);
- expectedStateMap.put("jobc", Status.FAILED);
- expectedStateMap.put("jobd", Status.FAILED);
- expectedStateMap.put("jobd:innerJobA", Status.FAILED);
- expectedStateMap.put("jobd:innerFlow2", Status.KILLED);
- expectedStateMap.put("jobe", Status.KILLED);
- expectedStateMap.put("jobf", Status.KILLED);
+ expectedStateMap.put("jobb:innerJobC", Status.KILLED);
+ expectedStateMap.put("jobb:innerFlow", Status.CANCELLED);
+ expectedStateMap.put("jobc", Status.KILLED);
+ expectedStateMap.put("jobd", Status.KILLED);
+ expectedStateMap.put("jobd:innerJobA", Status.KILLED);
+ expectedStateMap.put("jobd:innerFlow2", Status.CANCELLED);
+ expectedStateMap.put("jobe", Status.CANCELLED);
+ expectedStateMap.put("jobf", Status.CANCELLED);
Assert.assertEquals(Status.FAILED, flow.getStatus());
compareStates(expectedStateMap, nodeMap);
diff --git a/unit/java/azkaban/test/execapp/JobRunnerTest.java b/unit/java/azkaban/test/execapp/JobRunnerTest.java
index 3201767..69d6296 100644
--- a/unit/java/azkaban/test/execapp/JobRunnerTest.java
+++ b/unit/java/azkaban/test/execapp/JobRunnerTest.java
@@ -108,7 +108,7 @@ public class JobRunnerTest {
Assert.assertTrue(outputProps == null);
Assert.assertTrue(logFile.exists());
Assert.assertTrue(eventCollector.checkOrdering());
- Assert.assertTrue(!runner.isCancelled());
+ Assert.assertTrue(!runner.isKilled());
Assert.assertTrue(loader.getNodeUpdateCount(node.getId()) == 3);
try {
@@ -180,7 +180,7 @@ public class JobRunnerTest {
Props outputProps = runner.getNode().getOutputProps();
Assert.assertTrue(outputProps == null);
Assert.assertTrue(runner.getLogFilePath() == null);
- Assert.assertTrue(!runner.isCancelled());
+ Assert.assertTrue(!runner.isKilled());
try {
eventCollector.checkEventExists(new Type[] {Type.JOB_STARTED, Type.JOB_FINISHED});
}
@@ -208,7 +208,7 @@ public class JobRunnerTest {
// TODO Auto-generated catch block
e.printStackTrace();
}
- runner.cancel();
+ runner.kill();
try {
wait(500);
} catch (InterruptedException e) {
@@ -218,7 +218,7 @@ public class JobRunnerTest {
}
Assert.assertTrue(runner.getStatus() == node.getStatus());
- Assert.assertTrue("Status is " + node.getStatus(), node.getStatus() == Status.FAILED);
+ Assert.assertTrue("Status is " + node.getStatus(), node.getStatus() == Status.KILLED);
Assert.assertTrue(node.getStartTime() > 0 && node.getEndTime() > 0);
// Give it 10 ms to fail.
Assert.assertTrue(node.getEndTime() - node.getStartTime() < 3000);
@@ -230,7 +230,7 @@ public class JobRunnerTest {
Assert.assertTrue(outputProps == null);
Assert.assertTrue(logFile.exists());
Assert.assertTrue(eventCollector.checkOrdering());
- Assert.assertTrue(runner.isCancelled());
+ Assert.assertTrue(runner.isKilled());
try {
eventCollector.checkEventExists(new Type[] {Type.JOB_STARTED, Type.JOB_STATUS_CHANGED, Type.JOB_FINISHED});
}
@@ -266,7 +266,7 @@ public class JobRunnerTest {
Props outputProps = runner.getNode().getOutputProps();
Assert.assertTrue(outputProps != null);
Assert.assertTrue(logFile.exists());
- Assert.assertFalse(runner.isCancelled());
+ Assert.assertFalse(runner.isKilled());
Assert.assertTrue(loader.getNodeUpdateCount(node.getId()) == 3);
Assert.assertTrue(eventCollector.checkOrdering());
@@ -300,7 +300,7 @@ public class JobRunnerTest {
// TODO Auto-generated catch block
e.printStackTrace();
}
- runner.cancel();
+ runner.kill();
try {
wait(500);
} catch (InterruptedException e) {
@@ -312,12 +312,12 @@ public class JobRunnerTest {
eventCollector.handleEvent(Event.create(null, Event.Type.JOB_FINISHED));
Assert.assertTrue(runner.getStatus() == node.getStatus());
- Assert.assertTrue("Node status is " + node.getStatus(), node.getStatus() == Status.FAILED);
+ Assert.assertTrue("Node status is " + node.getStatus(), node.getStatus() == Status.KILLED);
Assert.assertTrue(node.getStartTime() > 0 && node.getEndTime() > 0);
Assert.assertTrue( node.getEndTime() - node.getStartTime() < 1000);
Assert.assertTrue(node.getStartTime() - startTime >= 2000);
Assert.assertTrue(node.getStartTime() - startTime <= 5000);
- Assert.assertTrue(runner.isCancelled());
+ Assert.assertTrue(runner.isKilled());
File logFile = new File(runner.getLogFilePath());
Props outputProps = runner.getNode().getOutputProps();
diff --git a/unit/java/azkaban/test/execapp/MockExecutorLoader.java b/unit/java/azkaban/test/execapp/MockExecutorLoader.java
index 6c1246d..92c1dee 100644
--- a/unit/java/azkaban/test/execapp/MockExecutorLoader.java
+++ b/unit/java/azkaban/test/execapp/MockExecutorLoader.java
@@ -23,6 +23,7 @@ public class MockExecutorLoader implements ExecutorLoader {
HashMap<Integer, ExecutionReference> refs = new HashMap<Integer, ExecutionReference>();
int flowUpdateCount = 0;
HashMap<String, Integer> jobUpdateCount = new HashMap<String,Integer>();
+ Map<Integer, Pair<ExecutionReference, ExecutableFlow>> activeFlows = new HashMap<Integer, Pair<ExecutionReference,ExecutableFlow>>();
@Override
public void uploadExecutableFlow(ExecutableFlow flow) throws ExecutorManagerException {
@@ -38,7 +39,7 @@ public class MockExecutorLoader implements ExecutorLoader {
@Override
public Map<Integer, Pair<ExecutionReference, ExecutableFlow>> fetchActiveFlows() throws ExecutorManagerException {
- return null;
+ return activeFlows;
}
@Override
diff --git a/unit/java/azkaban/test/trigger/MockTriggerLoader.java b/unit/java/azkaban/test/trigger/MockTriggerLoader.java
new file mode 100644
index 0000000..67ef5c7
--- /dev/null
+++ b/unit/java/azkaban/test/trigger/MockTriggerLoader.java
@@ -0,0 +1,53 @@
+package azkaban.test.trigger;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import azkaban.trigger.Trigger;
+import azkaban.trigger.TriggerLoader;
+import azkaban.trigger.TriggerLoaderException;
+
+public class MockTriggerLoader implements TriggerLoader {
+
+ Map<Integer, Trigger> triggers = new HashMap<Integer, Trigger>();
+ int triggerCount = 0;
+
+ @Override
+ public synchronized void addTrigger(Trigger t) throws TriggerLoaderException {
+ t.setTriggerId(triggerCount);
+ t.setLastModifyTime(System.currentTimeMillis());
+ triggers.put(t.getTriggerId(), t);
+ triggerCount++;
+ }
+
+ @Override
+ public synchronized void removeTrigger(Trigger s) throws TriggerLoaderException {
+ triggers.remove(s);
+ }
+
+ @Override
+ public synchronized void updateTrigger(Trigger t) throws TriggerLoaderException {
+ t.setLastModifyTime(System.currentTimeMillis());
+ triggers.put(t.getTriggerId(), t);
+ }
+
+ @Override
+ public synchronized List<Trigger> loadTriggers() throws TriggerLoaderException {
+ return new ArrayList<Trigger>(triggers.values());
+ }
+
+ @Override
+ public synchronized Trigger loadTrigger(int triggerId) throws TriggerLoaderException {
+ return triggers.get(triggerId);
+ }
+
+ @Override
+ public List<Trigger> getUpdatedTriggers(long lastUpdateTime)
+ throws TriggerLoaderException {
+ // TODO Auto-generated method stub
+ return null;
+ }
+
+}
diff --git a/unit/java/azkaban/test/trigger/TriggerManagerDeadlockTest.java b/unit/java/azkaban/test/trigger/TriggerManagerDeadlockTest.java
new file mode 100644
index 0000000..02c7cc1
--- /dev/null
+++ b/unit/java/azkaban/test/trigger/TriggerManagerDeadlockTest.java
@@ -0,0 +1,186 @@
+package azkaban.test.trigger;
+
+import static org.junit.Assert.*;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.joda.time.DateTime;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+import azkaban.alert.Alerter;
+import azkaban.executor.ExecutorLoader;
+import azkaban.executor.ExecutorManager;
+import azkaban.executor.ExecutorManagerException;
+import azkaban.test.execapp.MockExecutorLoader;
+import azkaban.trigger.Condition;
+import azkaban.trigger.ConditionChecker;
+import azkaban.trigger.Trigger;
+import azkaban.trigger.TriggerAction;
+import azkaban.trigger.TriggerLoader;
+import azkaban.trigger.TriggerLoaderException;
+import azkaban.trigger.TriggerManager;
+import azkaban.trigger.TriggerManagerException;
+import azkaban.trigger.builtin.CreateTriggerAction;
+import azkaban.utils.Props;
+
+public class TriggerManagerDeadlockTest {
+
+ TriggerLoader loader;
+ TriggerManager triggerManager;
+ ExecutorLoader execLoader;
+
+ @Before
+ public void setup() throws ExecutorManagerException, TriggerManagerException {
+ loader = new MockTriggerLoader();
+ Props props = new Props();
+ props.put("trigger.scan.interval", 1000);
+ props.put("executor.port", 12321);
+ execLoader = new MockExecutorLoader();
+ Map<String, Alerter> alerters = new HashMap<String, Alerter>();
+ ExecutorManager executorManager = new ExecutorManager(props, execLoader, alerters);
+ triggerManager = new TriggerManager(props, loader, executorManager);
+ }
+
+ @After
+ public void tearDown() {
+
+ }
+
+ @Test
+ public void deadlockTest() throws TriggerLoaderException, TriggerManagerException {
+ // this should well saturate it
+ for(int i = 0; i < 1000; i++) {
+ Trigger t = createSelfRegenTrigger();
+ loader.addTrigger(t);
+ }
+ // keep going and add more
+ for(int i = 0; i < 10000; i++) {
+ Trigger d = createDummyTrigger();
+ triggerManager.insertTrigger(d);
+ triggerManager.removeTrigger(d);
+ }
+
+ System.out.println("No dead lock.");
+ }
+
+ public class AlwaysOnChecker implements ConditionChecker {
+
+ public static final String type = "AlwaysOnChecker";
+
+ private final String id;
+ private final Boolean alwaysOn;
+
+ public AlwaysOnChecker(String id, Boolean alwaysOn) {
+ this.id = id;
+ this.alwaysOn = alwaysOn;
+ }
+
+ @Override
+ public Object eval() {
+ // TODO Auto-generated method stub
+ return alwaysOn;
+ }
+
+ @Override
+ public Object getNum() {
+ // TODO Auto-generated method stub
+ return null;
+ }
+
+ @Override
+ public void reset() {
+ // TODO Auto-generated method stub
+
+ }
+
+ @Override
+ public String getId() {
+ return id;
+ }
+
+ @Override
+ public String getType() {
+ // TODO Auto-generated method stub
+ return type;
+ }
+
+ @Override
+ public ConditionChecker fromJson(Object obj) throws Exception {
+ // TODO Auto-generated method stub
+ return null;
+ }
+
+ @Override
+ public Object toJson() {
+ // TODO Auto-generated method stub
+ return null;
+ }
+
+ @Override
+ public void stopChecker() {
+ // TODO Auto-generated method stub
+
+ }
+
+ @Override
+ public void setContext(Map<String, Object> context) {
+ // TODO Auto-generated method stub
+
+ }
+
+ @Override
+ public long getNextCheckTime() {
+ // TODO Auto-generated method stub
+ return 0;
+ }
+
+ }
+
+ private Trigger createSelfRegenTrigger() {
+ ConditionChecker alwaysOnChecker = new AlwaysOnChecker("alwaysOn", Boolean.TRUE);
+ String triggerExpr = alwaysOnChecker.getId() + ".eval()";
+ Map<String, ConditionChecker> triggerCheckers = new HashMap<String, ConditionChecker>();
+ triggerCheckers.put(alwaysOnChecker.getId(), alwaysOnChecker);
+ Condition triggerCond = new Condition(triggerCheckers, triggerExpr);
+
+ TriggerAction triggerAct = new CreateTriggerAction("dummyTrigger", createDummyTrigger());
+ List<TriggerAction> actions = new ArrayList<TriggerAction>();
+ actions.add(triggerAct);
+
+ ConditionChecker alwaysOffChecker = new AlwaysOnChecker("alwaysOff", Boolean.FALSE);
+ String expireExpr = alwaysOffChecker.getId() + ".eval()";
+ Map<String, ConditionChecker> expireCheckers = new HashMap<String, ConditionChecker>();
+ expireCheckers.put(alwaysOffChecker.getId(), alwaysOffChecker);
+ Condition expireCond = new Condition(expireCheckers, expireExpr);
+
+ Trigger t = new Trigger("azkaban", "azkabanTest", triggerCond, expireCond, actions);
+ return t;
+ }
+
+ private Trigger createDummyTrigger() {
+ ConditionChecker alwaysOnChecker = new AlwaysOnChecker("alwaysOn", Boolean.TRUE);
+ String triggerExpr = alwaysOnChecker.getId() + ".eval()";
+ Map<String, ConditionChecker> triggerCheckers = new HashMap<String, ConditionChecker>();
+ triggerCheckers.put(alwaysOnChecker.getId(), alwaysOnChecker);
+ Condition triggerCond = new Condition(triggerCheckers, triggerExpr);
+
+ TriggerAction triggerAct = new DummyTriggerAction("howdy!");
+ List<TriggerAction> actions = new ArrayList<TriggerAction>();
+ actions.add(triggerAct);
+
+ ConditionChecker alwaysOffChecker = new AlwaysOnChecker("alwaysOff", Boolean.FALSE);
+ String expireExpr = alwaysOffChecker.getId() + ".eval()";
+ Map<String, ConditionChecker> expireCheckers = new HashMap<String, ConditionChecker>();
+ expireCheckers.put(alwaysOffChecker.getId(), alwaysOffChecker);
+ Condition expireCond = new Condition(expireCheckers, expireExpr);
+
+ Trigger t = new Trigger("azkaban", "azkabanTest", triggerCond, expireCond, actions);
+ return t;
+ }
+
+}