azkaban-memoizeit
Changes
src/java/azkaban/execapp/FlowRunner.java 11(+7 -4)
src/java/azkaban/execapp/JobRunner.java 299(+178 -121)
Details
diff --git a/src/java/azkaban/execapp/event/EventHandler.java b/src/java/azkaban/execapp/event/EventHandler.java
index a71de50..450a4fc 100644
--- a/src/java/azkaban/execapp/event/EventHandler.java
+++ b/src/java/azkaban/execapp/event/EventHandler.java
@@ -16,6 +16,7 @@
package azkaban.execapp.event;
+import java.util.ArrayList;
import java.util.HashSet;
public class EventHandler {
@@ -29,6 +30,7 @@ public class EventHandler {
}
public void fireEventListeners(Event event) {
+ ArrayList<EventListener> listeners = new ArrayList<EventListener>(this.listeners);
for (EventListener listener: listeners) {
listener.handleEvent(event);
}
diff --git a/src/java/azkaban/execapp/event/FlowWatcher.java b/src/java/azkaban/execapp/event/FlowWatcher.java
index 1c24545..47ecf8f 100644
--- a/src/java/azkaban/execapp/event/FlowWatcher.java
+++ b/src/java/azkaban/execapp/event/FlowWatcher.java
@@ -78,17 +78,17 @@ public abstract class FlowWatcher {
return null;
}
- public synchronized void failAllWatches() {
- logger.info("Failing all watches on " + execId);
+ public synchronized void unblockAllWatches() {
+ logger.info("Unblock all watches on " + execId);
cancelWatch = true;
for(BlockingStatus status : map.values()) {
logger.info("Unblocking " + status.getJobId());
- status.changeStatus(Status.KILLED);
+ status.changeStatus(Status.SKIPPED);
status.unblock();
}
- logger.info("Successfully failed all watches on " + execId);
+ logger.info("Successfully unblocked all watches on " + execId);
}
public boolean isWatchCancelled() {
diff --git a/src/java/azkaban/execapp/event/LocalFlowWatcher.java b/src/java/azkaban/execapp/event/LocalFlowWatcher.java
index 1dd4ba5..3907b43 100644
--- a/src/java/azkaban/execapp/event/LocalFlowWatcher.java
+++ b/src/java/azkaban/execapp/event/LocalFlowWatcher.java
@@ -32,7 +32,7 @@ public class LocalFlowWatcher extends FlowWatcher {
runner = null;
getLogger().info("Stopping watcher, and unblocking pipeline");
- super.failAllWatches();
+ super.unblockAllWatches();
}
public class LocalFlowWatcherListener implements EventListener {
@@ -51,7 +51,7 @@ public class LocalFlowWatcher extends FlowWatcher {
// A job runner is finished
JobRunner runner = (JobRunner)event.getRunner();
ExecutableNode node = runner.getNode();
-
+ System.out.println(node + " looks like " + node.getStatus());
handleJobStatusChange(node.getNestedId(), node.getStatus());
}
}
diff --git a/src/java/azkaban/execapp/event/RemoteFlowWatcher.java b/src/java/azkaban/execapp/event/RemoteFlowWatcher.java
index 9d1c407..811f5c6 100644
--- a/src/java/azkaban/execapp/event/RemoteFlowWatcher.java
+++ b/src/java/azkaban/execapp/event/RemoteFlowWatcher.java
@@ -102,7 +102,7 @@ public class RemoteFlowWatcher extends FlowWatcher {
if (thread != null) {
thread.interrupt();
}
- super.failAllWatches();
+ super.unblockAllWatches();
loader = null;
flow = null;
}
src/java/azkaban/execapp/FlowRunner.java 11(+7 -4)
diff --git a/src/java/azkaban/execapp/FlowRunner.java b/src/java/azkaban/execapp/FlowRunner.java
index fbf4065..d4e18ed 100644
--- a/src/java/azkaban/execapp/FlowRunner.java
+++ b/src/java/azkaban/execapp/FlowRunner.java
@@ -57,8 +57,8 @@ import azkaban.utils.PropsUtils;
public class FlowRunner extends EventHandler implements Runnable {
private static final Layout DEFAULT_LAYOUT = new PatternLayout("%d{dd-MM-yyyy HH:mm:ss z} %c{1} %p - %m\n");
// We check update every 5 minutes, just in case things get stuck. But for the most part, we'll be idling.
- private static final long CHECK_WAIT_MS = 5*60*1000;
-
+ //private static final long CHECK_WAIT_MS = 5*60*1000;
+ private static final long CHECK_WAIT_MS = 30*1000;
private Logger logger;
private Layout loggerLayout = DEFAULT_LAYOUT;
private Appender flowAppender;
@@ -508,7 +508,9 @@ public class FlowRunner extends EventHandler implements Runnable {
}
}
// setting this fake source as this will be used to determine the location of log files.
- props.setSource(path.getPath());
+ if (path.getPath() != null) {
+ props.setSource(path.getPath());
+ }
return props;
}
@@ -794,9 +796,10 @@ public class FlowRunner extends EventHandler implements Runnable {
}
}
}
+
}
-
updateFlow();
+
interrupt();
fireEventListeners(event);
}
src/java/azkaban/execapp/JobRunner.java 299(+178 -121)
diff --git a/src/java/azkaban/execapp/JobRunner.java b/src/java/azkaban/execapp/JobRunner.java
index f948192..a684307 100644
--- a/src/java/azkaban/execapp/JobRunner.java
+++ b/src/java/azkaban/execapp/JobRunner.java
@@ -222,94 +222,157 @@ public class JobRunner extends EventHandler implements Runnable {
}
}
- @Override
- public void run() {
- Thread.currentThread().setName("JobRunner-" + this.jobId + "-" + executionId);
+ /**
+ * Used to handle non-ready and special status's (i.e. KILLED). Returns true
+ * if they handled anything.
+ *
+ * @return
+ */
+ private boolean handleNonReadyStatus() {
+ Status nodeStatus = node.getStatus();
+ boolean quickFinish = false;
+ long time = System.currentTimeMillis();
- if (node.getStatus() == Status.DISABLED) {
- node.setStartTime(System.currentTimeMillis());
- fireEvent(Event.create(this, Type.JOB_STARTED, null, false));
- node.setStatus(Status.SKIPPED);
- node.setEndTime(System.currentTimeMillis());
- fireEvent(Event.create(this, Type.JOB_FINISHED));
- return;
- } else if (this.cancelled) {
- node.setStartTime(System.currentTimeMillis());
- fireEvent(Event.create(this, Type.JOB_STARTED, null, false));
- node.setStatus(Status.FAILED);
- node.setEndTime(System.currentTimeMillis());
- fireEvent(Event.create(this, Type.JOB_FINISHED));
- } else if (node.getStatus() == Status.FAILED || node.getStatus() == Status.KILLED) {
- node.setStartTime(System.currentTimeMillis());
+ if (this.isCancelled() || Status.isStatusFinished(nodeStatus)) {
+ quickFinish = true;
+ }
+ else if (nodeStatus == Status.DISABLED) {
+ changeStatus(Status.SKIPPED, time);
+ quickFinish = true;
+ }
+ else if (this.cancelled) {
+ changeStatus(Status.FAILED, time);
+ quickFinish = true;
+ }
+
+ if (quickFinish) {
+ node.setStartTime(time);
fireEvent(Event.create(this, Type.JOB_STARTED, null, false));
- node.setEndTime(System.currentTimeMillis());
+ node.setEndTime(time);
fireEvent(Event.create(this, Type.JOB_FINISHED));
- return;
+ return true;
}
- else {
- createLogger();
- node.setUpdateTime(System.currentTimeMillis());
-
- // For pipelining of jobs. Will watch other jobs.
- if (!pipelineJobs.isEmpty()) {
- String blockedList = "";
- ArrayList<BlockingStatus> blockingStatus = new ArrayList<BlockingStatus>();
- for (String waitingJobId : pipelineJobs) {
- Status status = watcher.peekStatus(waitingJobId);
- if (status != null && !Status.isStatusFinished(status)) {
- BlockingStatus block = watcher.getBlockingStatus(waitingJobId);
- blockingStatus.add(block);
- blockedList += waitingJobId + ",";
- }
+
+ return false;
+ }
+
+ /**
+ * If pipelining is set, will block on another flow's jobs.
+ */
+ private boolean blockOnPipeLine() {
+ if (this.isCancelled()) {
+ return true;
+ }
+
+ // For pipelining of jobs. Will watch other jobs.
+ if (!pipelineJobs.isEmpty()) {
+ String blockedList = "";
+ ArrayList<BlockingStatus> blockingStatus = new ArrayList<BlockingStatus>();
+ for (String waitingJobId : pipelineJobs) {
+ Status status = watcher.peekStatus(waitingJobId);
+ if (status != null && !Status.isStatusFinished(status)) {
+ BlockingStatus block = watcher.getBlockingStatus(waitingJobId);
+ blockingStatus.add(block);
+ blockedList += waitingJobId + ",";
}
- if (!blockingStatus.isEmpty()) {
- logger.info("Pipeline job " + this.jobId + " waiting on " + blockedList + " in execution " + watcher.getExecId());
-
- for(BlockingStatus bStatus: blockingStatus) {
- logger.info("Waiting on pipelined job " + bStatus.getJobId());
- currentBlockStatus = bStatus;
- bStatus.blockOnFinishedStatus();
+ }
+ if (!blockingStatus.isEmpty()) {
+ logger.info("Pipeline job " + this.jobId + " waiting on " + blockedList + " in execution " + watcher.getExecId());
+
+ for(BlockingStatus bStatus: blockingStatus) {
+ logger.info("Waiting on pipelined job " + bStatus.getJobId());
+ currentBlockStatus = bStatus;
+ bStatus.blockOnFinishedStatus();
+ if (this.isCancelled()) {
+ logger.info("Job was cancelled while waiting on pipeline. Quiting.");
+ return true;
+ }
+ else {
logger.info("Pipelined job " + bStatus.getJobId() + " finished.");
- if (watcher.isWatchCancelled()) {
- break;
- }
}
- writeStatus();
- fireEvent(Event.create(this, Type.JOB_STATUS_CHANGED));
}
- if (watcher.isWatchCancelled()) {
- logger.info("Job was cancelled while waiting on pipeline. Quiting.");
- node.setStartTime(System.currentTimeMillis());
- node.setEndTime(System.currentTimeMillis());
- node.setStatus(Status.FAILED);
- fireEvent(Event.create(this, Type.JOB_FINISHED));
- return;
+ }
+ }
+
+ currentBlockStatus = null;
+ return false;
+ }
+
+ private boolean delayExecution() {
+ if (this.isCancelled()) {
+ return true;
+ }
+
+ long currentTime = System.currentTimeMillis();
+ if (delayStartMs > 0) {
+ logger.info("Delaying start of execution for " + delayStartMs + " milliseconds.");
+ synchronized(this) {
+ try {
+ this.wait(delayStartMs);
+ logger.info("Execution has been delayed for " + delayStartMs + " ms. Continuing with execution.");
+ } catch (InterruptedException e) {
+ logger.error("Job " + this.jobId + " was to be delayed for " + delayStartMs + ". Interrupted after " + (System.currentTimeMillis() - currentTime));
}
}
- currentBlockStatus = null;
- long currentTime = System.currentTimeMillis();
- if (delayStartMs > 0) {
- logger.info("Delaying start of execution for " + delayStartMs + " milliseconds.");
- synchronized(this) {
- try {
- this.wait(delayStartMs);
- logger.info("Execution has been delayed for " + delayStartMs + " ms. Continuing with execution.");
- } catch (InterruptedException e) {
- logger.error("Job " + this.jobId + " was to be delayed for " + delayStartMs + ". Interrupted after " + (System.currentTimeMillis() - currentTime));
+ if (this.isCancelled()) {
+ logger.info("Job was cancelled while in delay. Quiting.");
+ return true;
+ }
+ }
+
+ return false;
+ }
+
+ private void finalizeLogFile() {
+ closeLogger();
+
+ if (logFile != null) {
+ try {
+ File[] files = logFile.getParentFile().listFiles(new FilenameFilter() {
+
+ @Override
+ public boolean accept(File dir, String name) {
+ return name.startsWith(logFile.getName());
}
- }
+ }
+ );
+ Arrays.sort(files, Collections.reverseOrder());
- if (cancelled) {
- logger.info("Job was cancelled while in delay. Quiting.");
- node.setStartTime(System.currentTimeMillis());
- node.setEndTime(System.currentTimeMillis());
- fireEvent(Event.create(this, Type.JOB_FINISHED));
- return;
- }
+ loader.uploadLogFile(executionId, this.jobId, node.getAttempt(), files);
+ } catch (ExecutorManagerException e) {
+ flowLogger.error("Error writing out logs for job " + this.jobId, e);
}
-
- node.setStartTime(System.currentTimeMillis());
+ }
+ else {
+ flowLogger.info("Log file for job " + this.jobId + " is null");
+ }
+ }
+
+ /**
+ * The main run thread.
+ *
+ */
+ @Override
+ public void run() {
+ Thread.currentThread().setName("JobRunner-" + this.jobId + "-" + executionId);
+
+ // If the job is cancelled, disabled, killed. No log is created in this case
+ if (handleNonReadyStatus()) {
+ return;
+ }
+
+ createLogger();
+ boolean errorFound = false;
+ // Delay execution if necessary. Will return a true if something went wrong.
+ errorFound |= delayExecution();
+
+ // For pipelining of jobs. Will watch other jobs. Will return true if something went wrong.
+ errorFound |= blockOnPipeLine();
+
+ // Start the node.
+ node.setStartTime(System.currentTimeMillis());
+ if (!errorFound && !isCancelled()) {
fireEvent(Event.create(this, Type.JOB_STARTED, null, false));
try {
loader.uploadExecutableNode(node, props);
@@ -318,55 +381,26 @@ public class JobRunner extends EventHandler implements Runnable {
}
if (prepareJob()) {
+ // Writes status to the db
writeStatus();
fireEvent(Event.create(this, Type.JOB_STATUS_CHANGED), false);
runJob();
+ writeStatus();
}
else {
- node.setStatus(Status.FAILED);
+ changeStatus(Status.FAILED);
logError("Job run failed preparing the job.");
}
-
- node.setEndTime(System.currentTimeMillis());
-
- logInfo("Finishing job " + this.jobId + " at " + node.getEndTime());
-
- closeLogger();
- writeStatus();
-
- if (logFile != null) {
- try {
- File[] files = logFile.getParentFile().listFiles(new FilenameFilter() {
-
- @Override
- public boolean accept(File dir, String name) {
- return name.startsWith(logFile.getName());
- }
- }
- );
- Arrays.sort(files, Collections.reverseOrder());
-
- loader.uploadLogFile(executionId, this.jobId, node.getAttempt(), files);
- } catch (ExecutorManagerException e) {
- flowLogger.error("Error writing out logs for job " + this.jobId, e);
- }
- }
- else {
- flowLogger.info("Log file for job " + this.jobId + " is null");
- }
}
- fireEvent(Event.create(this, Type.JOB_FINISHED));
- }
-
- private void fireEvent(Event event) {
- fireEvent(event, true);
- }
-
- private void fireEvent(Event event, boolean updateTime) {
- if (updateTime) {
- node.setUpdateTime(System.currentTimeMillis());
+ node.setEndTime(System.currentTimeMillis());
+
+ if (isCancelled()) {
+ changeStatus(Status.FAILED);
}
- this.fireEventListeners(event);
+ logInfo("Finishing job " + this.jobId + " at " + node.getEndTime());
+
+ fireEvent(Event.create(this, Type.JOB_FINISHED), false);
+ finalizeLogFile();
}
private boolean prepareJob() throws RuntimeException {
@@ -396,8 +430,8 @@ public class JobRunner extends EventHandler implements Runnable {
props.put(CommonJobProperties.JOB_ATTEMPT, node.getAttempt());
props.put(CommonJobProperties.JOB_METADATA_FILE, createMetaDataFileName(executionId, this.jobId, node.getAttempt()));
- node.setStatus(Status.RUNNING);
-
+ changeStatus(Status.RUNNING);
+
// Ability to specify working directory
if (!props.containsKey(AbstractProcessJob.WORKING_DIR)) {
props.put(AbstractProcessJob.WORKING_DIR, workingDir.getAbsolutePath());
@@ -430,22 +464,45 @@ public class JobRunner extends EventHandler implements Runnable {
e.printStackTrace();
if (props.getBoolean("job.succeed.on.failure", false)) {
- node.setStatus(Status.FAILED_SUCCEEDED);
+ changeStatus(Status.FAILED_SUCCEEDED);
logError("Job run failed, but will treat it like success.");
logError(e.getMessage() + e.getCause());
}
else {
- node.setStatus(Status.FAILED);
+ changeStatus(Status.FAILED);
logError("Job run failed!");
logError(e.getMessage() + e.getCause());
}
- return;
}
-
- node.setStatus(Status.SUCCEEDED);
+
if (job != null) {
node.setOutputProps(job.getJobGeneratedProperties());
}
+
+ // If the job is still running, set the status to Success.
+ if (!Status.isStatusFinished(node.getStatus())) {
+ changeStatus(Status.SUCCEEDED);
+ }
+ }
+
+ private void changeStatus(Status status) {
+ changeStatus(status, System.currentTimeMillis());
+ }
+
+ private void changeStatus(Status status, long time) {
+ node.setStatus(status);
+ node.setUpdateTime(time);
+ }
+
+ private void fireEvent(Event event) {
+ fireEvent(event, true);
+ }
+
+ private void fireEvent(Event event, boolean updateTime) {
+ if (updateTime) {
+ node.setUpdateTime(System.currentTimeMillis());
+ }
+ this.fireEventListeners(event);
}
public void cancel() {
diff --git a/src/java/azkaban/executor/ExecutableFlowBase.java b/src/java/azkaban/executor/ExecutableFlowBase.java
index 35cd4f8..798f40a 100644
--- a/src/java/azkaban/executor/ExecutableFlowBase.java
+++ b/src/java/azkaban/executor/ExecutableFlowBase.java
@@ -305,7 +305,7 @@ public class ExecutableFlowBase extends ExecutableNode {
((ExecutableFlowBase)exNode).applyUpdateObject(node, updatedNodes);
}
else {
- exNode.applyUpdateObject(updateData);
+ exNode.applyUpdateObject(node);
}
}
}
diff --git a/src/java/azkaban/executor/ExecutableNode.java b/src/java/azkaban/executor/ExecutableNode.java
index db5fd8d..79942e9 100644
--- a/src/java/azkaban/executor/ExecutableNode.java
+++ b/src/java/azkaban/executor/ExecutableNode.java
@@ -241,7 +241,7 @@ public class ExecutableNode {
}
public String getPrintableId(String delimiter) {
- if (this.getParentFlow() instanceof ExecutableFlow) {
+ if (this.getParentFlow() == null || this.getParentFlow() instanceof ExecutableFlow) {
return getId();
}
return getParentFlow().getPrintableId(delimiter) + delimiter + getId();
diff --git a/unit/java/azkaban/test/execapp/event/LocalFlowWatcherTest.java b/unit/java/azkaban/test/execapp/event/LocalFlowWatcherTest.java
index aac1fb1..c87f2c6 100644
--- a/unit/java/azkaban/test/execapp/event/LocalFlowWatcherTest.java
+++ b/unit/java/azkaban/test/execapp/event/LocalFlowWatcherTest.java
@@ -190,6 +190,7 @@ public class LocalFlowWatcherTest {
" start: " + node.getStartTime() +
" dependent on " + watchedChild + " " + child.getEndTime() +
" diff: " + diff);
+
Assert.assertTrue(node.getStartTime() >= child.getEndTime());
}
diff --git a/unit/java/azkaban/test/execapp/event/RemoteFlowWatcherTest.java b/unit/java/azkaban/test/execapp/event/RemoteFlowWatcherTest.java
index fdff895..45c3a85 100644
--- a/unit/java/azkaban/test/execapp/event/RemoteFlowWatcherTest.java
+++ b/unit/java/azkaban/test/execapp/event/RemoteFlowWatcherTest.java
@@ -14,6 +14,7 @@ import azkaban.execapp.FlowRunner;
import azkaban.execapp.event.FlowWatcher;
import azkaban.execapp.event.RemoteFlowWatcher;
import azkaban.executor.ExecutableFlow;
+import azkaban.executor.ExecutableFlowBase;
import azkaban.executor.ExecutableNode;
import azkaban.executor.ExecutionOptions;
import azkaban.executor.ExecutorLoader;
@@ -72,8 +73,10 @@ public class RemoteFlowWatcherTest {
FlowRunner runner2 = createFlowRunner(workingDir2, loader, eventCollector, "exec1", 2, watcher, 2);
Thread runner2Thread = new Thread(runner2);
+ printCurrentState("runner1 ", runner1.getExecutableFlow());
runner1Thread.start();
runner2Thread.start();
+
runner2Thread.join();
FileUtils.deleteDirectory(workingDir1);
@@ -222,6 +225,16 @@ public class RemoteFlowWatcherTest {
return runner;
}
+ private void printCurrentState(String prefix, ExecutableFlowBase flow) {
+ for(ExecutableNode node: flow.getExecutableNodes()) {
+
+ System.err.println(prefix + node.getNestedId() + "->" + node.getStatus().name());
+ if (node instanceof ExecutableFlowBase) {
+ printCurrentState(prefix, (ExecutableFlowBase)node);
+ }
+ }
+ }
+
private ExecutableFlow prepareExecDir(File workingDir, File execDir, String flowName, int execId) throws IOException {
FileUtils.copyDirectory(execDir, workingDir);
diff --git a/unit/java/azkaban/test/execapp/FlowRunnerPipelineTest.java b/unit/java/azkaban/test/execapp/FlowRunnerPipelineTest.java
index d20f2f6..1ef8804 100644
--- a/unit/java/azkaban/test/execapp/FlowRunnerPipelineTest.java
+++ b/unit/java/azkaban/test/execapp/FlowRunnerPipelineTest.java
@@ -20,7 +20,6 @@ import azkaban.executor.ExecutableFlow;
import azkaban.executor.ExecutableFlowBase;
import azkaban.executor.ExecutableNode;
import azkaban.executor.ExecutionOptions;
-import azkaban.executor.ExecutionOptions.FailureAction;
import azkaban.executor.ExecutorLoader;
import azkaban.executor.Status;
import azkaban.flow.Flow;
@@ -31,8 +30,28 @@ import azkaban.project.ProjectManagerException;
import azkaban.test.executor.InteractiveTestJob;
import azkaban.test.executor.JavaJob;
import azkaban.utils.DirectoryFlowLoader;
-import azkaban.utils.Props;
+/**
+ * Flows in this test:
+ * joba
+ * jobb
+ * joba1
+ * jobc->joba
+ * jobd->joba
+ * jobe->jobb,jobc,jobd
+ * jobf->jobe,joba1
+ *
+ * jobb = innerFlow
+ * innerJobA
+ * innerJobB->innerJobA
+ * innerJobC->innerJobB
+ * innerFlow->innerJobB,innerJobC
+ *
+ * jobd=innerFlow2
+ * innerFlow2->innerJobA
+ * @author rpark
+ *
+ */
public class FlowRunnerPipelineTest {
private File workingDir;
private JobTypeManager jobtypeManager;
@@ -174,10 +193,70 @@ public class FlowRunnerPipelineTest {
compareStates(previousExpectedStateMap, previousNodeMap);
compareStates(pipelineExpectedStateMap, pipelineNodeMap);
-// Assert.assertEquals(Status.SUCCEEDED, pipelineFlow.getStatus());
-// Assert.assertEquals(Status.SUCCEEDED, previousFlow.getStatus());
-// Assert.assertFalse(thread1.isAlive());
-// Assert.assertFalse(thread2.isAlive());
+ InteractiveTestJob.getTestJob("prev:jobb:innerFlow").succeedJob();
+ InteractiveTestJob.getTestJob("pipe:jobc").succeedJob();
+ pause(250);
+ previousExpectedStateMap.put("jobb:innerFlow", Status.SUCCEEDED);
+ previousExpectedStateMap.put("jobb", Status.SUCCEEDED);
+ previousExpectedStateMap.put("jobe", Status.RUNNING);
+ pipelineExpectedStateMap.put("jobc", Status.SUCCEEDED);
+
+ compareStates(previousExpectedStateMap, previousNodeMap);
+ compareStates(pipelineExpectedStateMap, pipelineNodeMap);
+
+ InteractiveTestJob.getTestJob("pipe:jobb:innerJobB").succeedJob();
+ InteractiveTestJob.getTestJob("pipe:jobb:innerJobC").succeedJob();
+ InteractiveTestJob.getTestJob("prev:jobe").succeedJob();
+ pause(250);
+ previousExpectedStateMap.put("jobe", Status.SUCCEEDED);
+ pipelineExpectedStateMap.put("jobb:innerJobB", Status.SUCCEEDED);
+ pipelineExpectedStateMap.put("jobb:innerJobC", Status.SUCCEEDED);
+ pipelineExpectedStateMap.put("jobb:innerFlow", Status.RUNNING);
+
+ compareStates(previousExpectedStateMap, previousNodeMap);
+ compareStates(pipelineExpectedStateMap, pipelineNodeMap);
+
+ InteractiveTestJob.getTestJob("pipe:jobd:innerJobA").succeedJob();
+ InteractiveTestJob.getTestJob("pipe:jobb:innerFlow").succeedJob();
+ pause(250);
+ pipelineExpectedStateMap.put("jobb", Status.SUCCEEDED);
+ pipelineExpectedStateMap.put("jobd:innerJobA", Status.SUCCEEDED);
+ pipelineExpectedStateMap.put("jobb:innerFlow", Status.SUCCEEDED);
+ pipelineExpectedStateMap.put("jobd:innerFlow2", Status.RUNNING);
+ compareStates(pipelineExpectedStateMap, pipelineNodeMap);
+
+ InteractiveTestJob.getTestJob("pipe:jobd:innerFlow2").succeedJob();
+ InteractiveTestJob.getTestJob("prev:joba1").succeedJob();
+ pause(250);
+ pipelineExpectedStateMap.put("jobd:innerFlow2", Status.SUCCEEDED);
+ pipelineExpectedStateMap.put("jobd", Status.SUCCEEDED);
+ previousExpectedStateMap.put("jobf", Status.RUNNING);
+ previousExpectedStateMap.put("joba1", Status.SUCCEEDED);
+ pipelineExpectedStateMap.put("joba1", Status.RUNNING);
+ pipelineExpectedStateMap.put("jobe", Status.RUNNING);
+ compareStates(pipelineExpectedStateMap, pipelineNodeMap);
+ compareStates(previousExpectedStateMap, previousNodeMap);
+
+ InteractiveTestJob.getTestJob("pipe:jobe").succeedJob();
+ InteractiveTestJob.getTestJob("prev:jobf").succeedJob();
+ pause(250);
+ pipelineExpectedStateMap.put("jobe", Status.SUCCEEDED);
+ previousExpectedStateMap.put("jobf", Status.SUCCEEDED);
+ Assert.assertEquals(Status.SUCCEEDED, previousFlow.getStatus());
+ compareStates(previousExpectedStateMap, previousNodeMap);
+ compareStates(pipelineExpectedStateMap, pipelineNodeMap);
+
+ InteractiveTestJob.getTestJob("pipe:joba1").succeedJob();
+ pause(250);
+ pipelineExpectedStateMap.put("joba1", Status.SUCCEEDED);
+ pipelineExpectedStateMap.put("jobf", Status.RUNNING);
+ compareStates(pipelineExpectedStateMap, pipelineNodeMap);
+
+ InteractiveTestJob.getTestJob("pipe:jobf").succeedJob();
+ pause(250);
+ Assert.assertEquals(Status.SUCCEEDED, pipelineFlow.getStatus());
+ Assert.assertFalse(thread1.isAlive());
+ Assert.assertFalse(thread2.isAlive());
}
private Thread runFlowRunnerInThread(FlowRunner runner) {
@@ -234,6 +313,16 @@ public class FlowRunnerPipelineTest {
FileUtils.copyDirectory(directory, workingDir);
}
+ private void printCurrentState(String prefix, ExecutableFlowBase flow) {
+ for(ExecutableNode node: flow.getExecutableNodes()) {
+
+ System.err.println(prefix + node.getNestedId() + "->" + node.getStatus().name());
+ if (node instanceof ExecutableFlowBase) {
+ printCurrentState(prefix, (ExecutableFlowBase)node);
+ }
+ }
+ }
+
private FlowRunner createFlowRunner(EventCollectorListener eventCollector, String flowName, String groupName) throws Exception {
return createFlowRunner(eventCollector, flowName, groupName, new ExecutionOptions());
}
diff --git a/unit/java/azkaban/test/executor/ExecutableFlowTest.java b/unit/java/azkaban/test/executor/ExecutableFlowTest.java
index 4592720..35fa39d 100644
--- a/unit/java/azkaban/test/executor/ExecutableFlowTest.java
+++ b/unit/java/azkaban/test/executor/ExecutableFlowTest.java
@@ -178,10 +178,11 @@ public class ExecutableFlowTest {
exFlow.setUpdateTime(133);
// Change one job and see if it updates
- jobe.setEndTime(System.currentTimeMillis());
- jobe.setUpdateTime(System.currentTimeMillis());
+ long time = System.currentTimeMillis();
+ jobe.setEndTime(time);
+ jobe.setUpdateTime(time);
jobe.setStatus(Status.DISABLED);
- jobe.setStartTime(System.currentTimeMillis() - 1000);
+ jobe.setStartTime(time - 1);
// Should be one node that was changed
Map<String,Object> updateObject = exFlow.toUpdateObject(0);
Assert.assertEquals(1, ((List)(updateObject.get("nodes"))).size());
@@ -194,16 +195,18 @@ public class ExecutableFlowTest {
Assert.assertNull(updateObject.get("nodes"));
// Change inner flow
- jobbInnerFlowA.setEndTime(System.currentTimeMillis());
- jobbInnerFlowA.setUpdateTime(System.currentTimeMillis());
+ long currentTime = time + 1 ;
+ jobbInnerFlowA.setEndTime(currentTime);
+ jobbInnerFlowA.setUpdateTime(currentTime);
jobbInnerFlowA.setStatus(Status.DISABLED);
- jobbInnerFlowA.setStartTime(System.currentTimeMillis() - 1000);
+ jobbInnerFlowA.setStartTime(currentTime - 100);
// We should get 2 updates if we do a toUpdateObject using 0 as the start time
updateObject = exFlow.toUpdateObject(0);
Assert.assertEquals(2, ((List)(updateObject.get("nodes"))).size());
// This should provide 1 update. That we can apply
updateObject = exFlow.toUpdateObject(jobe.getUpdateTime());
+ Assert.assertNotNull(updateObject.get("nodes"));
Assert.assertEquals(1, ((List)(updateObject.get("nodes"))).size());
copyFlow.applyUpdateObject(updateObject);
testEquals(exFlow, copyFlow);
diff --git a/unit/java/azkaban/test/executor/JavaJobRunnerMain.java b/unit/java/azkaban/test/executor/JavaJobRunnerMain.java
index 9add12a..d1766e6 100644
--- a/unit/java/azkaban/test/executor/JavaJobRunnerMain.java
+++ b/unit/java/azkaban/test/executor/JavaJobRunnerMain.java
@@ -185,7 +185,9 @@ public class JavaJobRunnerMain {
new RuntimeException("Unable to store output properties to: " + outputFileStr);
} finally {
try {
- writer.close();
+ if (writer != null) {
+ writer.close();
+ }
} catch (IOException e) {
}
}