azkaban-uncached
Changes
src/java/azkaban/execapp/FlowRunner.java 48(+34 -14)
src/java/azkaban/execapp/JobRunner.java 59(+53 -6)
src/java/azkaban/executor/ExecutableFlow.java 60(+16 -44)
src/java/azkaban/executor/Status.java 56(+56 -0)
src/java/azkaban/sla/SlaMailer.java 2(+1 -1)
src/java/azkaban/sla/SLAManager.java 2(+1 -1)
src/java/azkaban/utils/WebUtils.java 2(+1 -1)
src/java/log4j.properties 17(+16 -1)
src/sql/create_execution_logs.sql 3(+2 -1)
src/sql/update_2.0_to_2.01.sql 3(+2 -1)
unit/executions/exectest1/exec1.flow 17(+0 -17)
unit/executions/exectest1/exec1-mod.flow 156(+156 -0)
Details
diff --git a/src/java/azkaban/execapp/AzkabanExecutorServer.java b/src/java/azkaban/execapp/AzkabanExecutorServer.java
index 8770e0e..7c25028 100644
--- a/src/java/azkaban/execapp/AzkabanExecutorServer.java
+++ b/src/java/azkaban/execapp/AzkabanExecutorServer.java
@@ -36,11 +36,8 @@ import org.mortbay.thread.QueuedThreadPool;
import azkaban.executor.ExecutorLoader;
import azkaban.executor.JdbcExecutorLoader;
-import azkaban.jmx.JmxExecutorManager;
import azkaban.jmx.JmxFlowRunnerManager;
import azkaban.jmx.JmxJettyServer;
-import azkaban.jmx.JmxSLAManager;
-import azkaban.jmx.JmxScheduler;
import azkaban.project.JdbcProjectLoader;
import azkaban.project.ProjectLoader;
import azkaban.utils.Props;
diff --git a/src/java/azkaban/execapp/event/BlockingStatus.java b/src/java/azkaban/execapp/event/BlockingStatus.java
new file mode 100644
index 0000000..02c0f44
--- /dev/null
+++ b/src/java/azkaban/execapp/event/BlockingStatus.java
@@ -0,0 +1,58 @@
+package azkaban.execapp.event;
+
+import azkaban.executor.Status;
+
+public class BlockingStatus {
+ private static final long WAIT_TIME = 5*60*1000;
+ private final int execId;
+ private final String jobId;
+ private Status status;
+
+ public BlockingStatus(int execId, String jobId, Status initialStatus) {
+ this.execId = execId;
+ this.jobId = jobId;
+ this.status = initialStatus;
+ }
+
+ public Status blockOnFinishedStatus() {
+ if (status == null) {
+ return null;
+ }
+
+ while (!Status.isStatusFinished(status)) {
+ synchronized(this) {
+ try {
+ this.wait(WAIT_TIME);
+ } catch (InterruptedException e) {
+ }
+ }
+ }
+
+ return status;
+ }
+
+ public Status viewStatus() {
+ return this.status;
+ }
+
+ public synchronized void unblock() {
+ this.notifyAll();
+ }
+
+ public void changeStatus(Status status) {
+ synchronized(this) {
+ this.status = status;
+ if (Status.isStatusFinished(status)) {
+ unblock();
+ }
+ }
+ }
+
+ public int getExecId() {
+ return execId;
+ }
+
+ public String getJobId() {
+ return jobId;
+ }
+}
diff --git a/src/java/azkaban/execapp/event/EventHandler.java b/src/java/azkaban/execapp/event/EventHandler.java
index d1337b6..a71de50 100644
--- a/src/java/azkaban/execapp/event/EventHandler.java
+++ b/src/java/azkaban/execapp/event/EventHandler.java
@@ -33,4 +33,8 @@ public class EventHandler {
listener.handleEvent(event);
}
}
+
+ public void removeListener(EventListener listener) {
+ listeners.remove(listener);
+ }
}
diff --git a/src/java/azkaban/execapp/event/FlowWatcher.java b/src/java/azkaban/execapp/event/FlowWatcher.java
new file mode 100644
index 0000000..9a46168
--- /dev/null
+++ b/src/java/azkaban/execapp/event/FlowWatcher.java
@@ -0,0 +1,76 @@
+package azkaban.execapp.event;
+
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+
+import azkaban.executor.ExecutableFlow;
+import azkaban.executor.ExecutableNode;
+import azkaban.executor.Status;
+
+public abstract class FlowWatcher {
+ private int execId;
+ private ExecutableFlow flow;
+ private Map<String, BlockingStatus> map = new ConcurrentHashMap<String, BlockingStatus>();
+ private boolean cancelWatch = false;
+
+ public FlowWatcher(int execId) {
+ this.execId = execId;
+ }
+
+ public void setFlow(ExecutableFlow flow) {
+ this.flow = flow;
+ }
+
+ /**
+ * Called to fire events to the JobRunner listeners
+ * @param jobId
+ */
+ protected synchronized void handleJobFinished(String jobId, Status status) {
+ BlockingStatus block = map.get(jobId);
+ if (block != null) {
+ block.changeStatus(status);
+ }
+ }
+
+ public int getExecId() {
+ return execId;
+ }
+
+ public synchronized BlockingStatus getBlockingStatus(String jobId) {
+ if (cancelWatch) {
+ return null;
+ }
+
+ ExecutableNode node = flow.getExecutableNode(jobId);
+ if (node == null) {
+ return null;
+ }
+
+ BlockingStatus blockingStatus = map.get(jobId);
+ if (blockingStatus == null) {
+ blockingStatus = new BlockingStatus(execId, jobId, node.getStatus());
+ map.put(jobId, blockingStatus);
+ }
+
+ return blockingStatus;
+ }
+
+ public Status peekStatus(String jobId) {
+ ExecutableNode node = flow.getExecutableNode(jobId);
+ if (node != null) {
+ return node.getStatus();
+ }
+
+ return null;
+ }
+
+ public synchronized void failAllWatches() {
+ cancelWatch = true;
+
+ for(BlockingStatus status : map.values()) {
+ status.unblock();
+ }
+ }
+
+ public abstract void stopWatcher();
+}
diff --git a/src/java/azkaban/execapp/event/LocalFlowWatcher.java b/src/java/azkaban/execapp/event/LocalFlowWatcher.java
new file mode 100644
index 0000000..b51f17b
--- /dev/null
+++ b/src/java/azkaban/execapp/event/LocalFlowWatcher.java
@@ -0,0 +1,40 @@
+package azkaban.execapp.event;
+
+import azkaban.execapp.FlowRunner;
+import azkaban.execapp.JobRunner;
+import azkaban.executor.ExecutableNode;
+
+public class LocalFlowWatcher extends FlowWatcher {
+ private LocalFlowWatcherListener watcherListener;
+ private FlowRunner runner;
+
+ public LocalFlowWatcher(FlowRunner runner) {
+ super(runner.getExecutableFlow().getExecutionId());
+ super.setFlow(runner.getExecutableFlow());
+
+ watcherListener = new LocalFlowWatcherListener();
+ this.runner = runner;
+ runner.addListener(watcherListener);
+ }
+
+ @Override
+ public void stopWatcher() {
+ // Just freeing stuff
+ runner.removeListener(watcherListener);
+ runner = null;
+
+ super.failAllWatches();
+ }
+
+ public class LocalFlowWatcherListener implements EventListener {
+ @Override
+ public void handleEvent(Event event) {
+ if (event.getRunner() instanceof JobRunner) {
+ JobRunner runner = (JobRunner)event.getRunner();
+ ExecutableNode node = runner.getNode();
+
+ handleJobFinished(node.getJobId(), node.getStatus());
+ }
+ }
+ }
+}
diff --git a/src/java/azkaban/execapp/event/RemoteFlowWatcher.java b/src/java/azkaban/execapp/event/RemoteFlowWatcher.java
new file mode 100644
index 0000000..398a364
--- /dev/null
+++ b/src/java/azkaban/execapp/event/RemoteFlowWatcher.java
@@ -0,0 +1,106 @@
+package azkaban.execapp.event;
+
+import azkaban.executor.ExecutableFlow;
+import azkaban.executor.ExecutableNode;
+import azkaban.executor.ExecutorLoader;
+import azkaban.executor.ExecutorManagerException;
+import azkaban.executor.Status;
+
+public class RemoteFlowWatcher extends FlowWatcher {
+ private final static long CHECK_INTERVAL_MS = 60*1000;
+
+ private int execId;
+ private ExecutorLoader loader;
+ private ExecutableFlow flow;
+ private RemoteUpdaterThread thread;
+ private boolean isShutdown = false;
+
+ // Every minute
+ private long checkIntervalMs = CHECK_INTERVAL_MS;
+
+ public RemoteFlowWatcher(int execId, ExecutorLoader loader) {
+ this(execId, loader, CHECK_INTERVAL_MS);
+ }
+
+ public RemoteFlowWatcher(int execId, ExecutorLoader loader, long interval) {
+ super(execId);
+ checkIntervalMs = interval;
+
+ try {
+ flow = loader.fetchExecutableFlow(execId);
+ } catch (ExecutorManagerException e) {
+ return;
+ }
+
+ super.setFlow(flow);
+ this.loader = loader;
+ this.execId = execId;
+ if (flow != null) {
+ this.thread = new RemoteUpdaterThread();
+ this.thread.setName("Remote-watcher-flow-" + execId);
+ this.thread.start();
+ }
+ }
+
+ private class RemoteUpdaterThread extends Thread {
+ @Override
+ public void run() {
+ do {
+ ExecutableFlow updateFlow = null;
+ try {
+ updateFlow = loader.fetchExecutableFlow(execId);
+ } catch (ExecutorManagerException e) {
+ e.printStackTrace();
+ isShutdown = true;
+ }
+
+ if (flow == null) {
+ flow = updateFlow;
+ }
+ else {
+ flow.setStatus(updateFlow.getStatus());
+ flow.setEndTime(updateFlow.getEndTime());
+ flow.setUpdateTime(updateFlow.getUpdateTime());
+
+ for (ExecutableNode node : flow.getExecutableNodes()) {
+ String jobId = node.getJobId();
+ ExecutableNode newNode = updateFlow.getExecutableNode(jobId);
+ long updateTime = node.getUpdateTime();
+ node.setUpdateTime(newNode.getUpdateTime());
+ node.setStatus(newNode.getStatus());
+ node.setStartTime(newNode.getStartTime());
+ node.setEndTime(newNode.getEndTime());
+
+ if (updateTime < newNode.getUpdateTime()) {
+ handleJobFinished(jobId, newNode.getStatus());
+ }
+ }
+ }
+
+ if (Status.isStatusFinished(flow.getStatus())) {
+ isShutdown = true;
+ }
+ else {
+ synchronized(this) {
+ try {
+ wait(checkIntervalMs);
+ } catch (InterruptedException e) {
+ }
+ }
+ }
+ } while (!isShutdown);
+ }
+
+ }
+
+ @Override
+ public synchronized void stopWatcher() {
+ isShutdown = true;
+ if (thread != null) {
+ thread.interrupt();
+ }
+ super.failAllWatches();
+ loader = null;
+ flow = null;
+ }
+}
src/java/azkaban/execapp/FlowRunner.java 48(+34 -14)
diff --git a/src/java/azkaban/execapp/FlowRunner.java b/src/java/azkaban/execapp/FlowRunner.java
index c912ad9..c61d724 100644
--- a/src/java/azkaban/execapp/FlowRunner.java
+++ b/src/java/azkaban/execapp/FlowRunner.java
@@ -25,12 +25,13 @@ import azkaban.execapp.event.Event;
import azkaban.execapp.event.Event.Type;
import azkaban.execapp.event.EventHandler;
import azkaban.execapp.event.EventListener;
+import azkaban.execapp.event.FlowWatcher;
import azkaban.executor.ExecutableFlow;
import azkaban.executor.ExecutableFlow.FailureAction;
-import azkaban.executor.ExecutableFlow.Status;
import azkaban.executor.ExecutableNode;
import azkaban.executor.ExecutorLoader;
import azkaban.executor.ExecutorManagerException;
+import azkaban.executor.Status;
import azkaban.flow.FlowProps;
import azkaban.jobtype.JobTypeManager;
import azkaban.project.ProjectLoader;
@@ -68,6 +69,7 @@ public class FlowRunner extends EventHandler implements Runnable {
private JobRunnerEventListener listener = new JobRunnerEventListener();
private BlockingQueue<JobRunner> jobsToRun = new LinkedBlockingQueue<JobRunner>();
private Map<String, JobRunner> runningJob = new ConcurrentHashMap<String, JobRunner>();
+
private Map<Pair<String, Integer>, JobRunner> allJobs = new ConcurrentHashMap<Pair<String, Integer>, JobRunner>();
private List<JobRunner> pausedJobsToRun = Collections.synchronizedList(new ArrayList<JobRunner>());
@@ -80,7 +82,14 @@ public class FlowRunner extends EventHandler implements Runnable {
private boolean flowFinished = false;
private boolean flowCancelled = false;
- public FlowRunner(ExecutableFlow flow, ExecutorLoader executorLoader, ProjectLoader projectLoader, JobTypeManager jobtypeManager) throws ExecutorManagerException {
+ // Used for pipelining
+ private Integer pipelineLevel = null;
+ private Integer pipelineExecId = null;
+
+ // Watches external flows for execution.
+ private FlowWatcher watcher = null;
+
+ public FlowRunner(ExecutableFlow flow, FlowWatcher watcher, ExecutorLoader executorLoader, ProjectLoader projectLoader, JobTypeManager jobtypeManager) throws ExecutorManagerException {
this.execId = flow.getExecutionId();
this.flow = flow;
this.executorLoader = executorLoader;
@@ -88,6 +97,10 @@ public class FlowRunner extends EventHandler implements Runnable {
this.executorService = Executors.newFixedThreadPool(numThreads);
this.execDir = new File(flow.getExecutionPath());
this.jobtypeManager = jobtypeManager;
+
+ this.pipelineLevel = flow.getPipelineLevel();
+ this.pipelineExecId = flow.getPipelineExecutionId();
+ this.watcher = watcher;
}
public FlowRunner setGlobalProps(Props globalProps) {
@@ -99,10 +112,6 @@ public class FlowRunner extends EventHandler implements Runnable {
return execDir;
}
- public void watchedExecutionUpdate(ExecutableFlow flow) {
-
- }
-
@Override
public void run() {
try {
@@ -116,6 +125,9 @@ public class FlowRunner extends EventHandler implements Runnable {
// Create execution dir
createLogger(flowId);
logger.info("Running execid:" + execId + " flow:" + flowId + " project:" + projectId + " version:" + version);
+ if (pipelineExecId != null) {
+ logger.info("Running simulateously with " + pipelineExecId + ". Pipelining level " + pipelineLevel);
+ }
// The current thread is used for interrupting blocks
currentThread = Thread.currentThread();
@@ -144,6 +156,10 @@ public class FlowRunner extends EventHandler implements Runnable {
flow.setStatus(Status.FAILED);
}
finally {
+ if (watcher != null) {
+ watcher.stopWatcher();
+ }
+
closeLogger();
flow.setEndTime(System.currentTimeMillis());
updateFlow();
@@ -346,6 +362,10 @@ public class FlowRunner extends EventHandler implements Runnable {
// should have one prop with system secrets, the other user level props
JobRunner jobRunner = new JobRunner(node, prop, path.getParentFile(), executorLoader, jobtypeManager, logger);
+ if (watcher != null) {
+ jobRunner.setPipeline(watcher, pipelineLevel);
+ }
+
jobRunner.addListener(listener);
return jobRunner;
@@ -398,6 +418,10 @@ public class FlowRunner extends EventHandler implements Runnable {
flowPaused = false;
flowCancelled = true;
+ if (watcher != null) {
+ watcher.stopWatcher();
+ }
+
for (JobRunner runner: pausedJobsToRun) {
ExecutableNode node = runner.getNode();
logger.info("Resumed flow is cancelled. Job killed " + node.getJobId() + " by " + user);
@@ -669,7 +693,7 @@ public class FlowRunner extends EventHandler implements Runnable {
String trigger = finishedNode.getAttempt() > 0 ? finishedNode.getJobId() + "." + finishedNode.getAttempt() : finishedNode.getJobId();
for (String dependent : finishedNode.getOutNodes()) {
ExecutableNode dependentNode = flow.getExecutableNode(dependent);
- queueNextJob(dependentNode, finishedNode.getJobId());
+ queueNextJob(dependentNode, trigger);
}
}
@@ -750,6 +774,8 @@ public class FlowRunner extends EventHandler implements Runnable {
jobOutputProps.put(node.getJobId(), runner.getOutputProps());
runningJob.remove(node.getJobId());
+
+ fireEventListeners(event);
queueNextJobs(node);
}
@@ -781,13 +807,7 @@ public class FlowRunner extends EventHandler implements Runnable {
private boolean isFlowFinished() {
for (String end: flow.getEndNodes()) {
ExecutableNode node = flow.getExecutableNode(end);
- switch(node.getStatus()) {
- case KILLED:
- case SKIPPED:
- case FAILED:
- case SUCCEEDED:
- continue;
- default:
+ if (!Status.isStatusFinished(node.getStatus())) {
return false;
}
}
diff --git a/src/java/azkaban/execapp/FlowRunnerManager.java b/src/java/azkaban/execapp/FlowRunnerManager.java
index b0da031..6871e08 100644
--- a/src/java/azkaban/execapp/FlowRunnerManager.java
+++ b/src/java/azkaban/execapp/FlowRunnerManager.java
@@ -37,6 +37,9 @@ import org.apache.log4j.Logger;
import azkaban.project.ProjectLoader;
import azkaban.execapp.event.Event;
import azkaban.execapp.event.EventListener;
+import azkaban.execapp.event.FlowWatcher;
+import azkaban.execapp.event.LocalFlowWatcher;
+import azkaban.execapp.event.RemoteFlowWatcher;
import azkaban.executor.ExecutableFlow;
import azkaban.executor.ExecutorLoader;
import azkaban.executor.ExecutorManagerException;
@@ -319,7 +322,20 @@ public class FlowRunnerManager implements EventListener {
setupFlow(flow);
// Setup flow runner
- FlowRunner runner = new FlowRunner(flow, executorLoader, projectLoader, jobtypeManager);
+ FlowWatcher watcher = null;
+ if (flow.getPipelineExecutionId() != null) {
+ int pipelineExecId = flow.getPipelineExecutionId();
+ FlowRunner runner = runningFlows.get(pipelineExecId);
+
+ if (runner != null) {
+ watcher = new LocalFlowWatcher(runner);
+ }
+ else {
+ watcher = new RemoteFlowWatcher(pipelineExecId, executorLoader);
+ }
+ }
+
+ FlowRunner runner = new FlowRunner(flow, watcher, executorLoader, projectLoader, jobtypeManager);
runner.setGlobalProps(globalProps);
runner.addListener(this);
src/java/azkaban/execapp/JobRunner.java 59(+53 -6)
diff --git a/src/java/azkaban/execapp/JobRunner.java b/src/java/azkaban/execapp/JobRunner.java
index e34132e..f6c482b 100644
--- a/src/java/azkaban/execapp/JobRunner.java
+++ b/src/java/azkaban/execapp/JobRunner.java
@@ -17,6 +17,10 @@ package azkaban.execapp;
import java.io.File;
import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
import org.apache.log4j.Appender;
import org.apache.log4j.FileAppender;
@@ -24,13 +28,15 @@ import org.apache.log4j.Layout;
import org.apache.log4j.Logger;
import org.apache.log4j.PatternLayout;
+import azkaban.execapp.event.BlockingStatus;
import azkaban.execapp.event.Event;
import azkaban.execapp.event.Event.Type;
import azkaban.execapp.event.EventHandler;
-import azkaban.executor.ExecutableFlow.Status;
+import azkaban.execapp.event.FlowWatcher;
import azkaban.executor.ExecutableNode;
import azkaban.executor.ExecutorLoader;
import azkaban.executor.ExecutorManagerException;
+import azkaban.executor.Status;
import azkaban.flow.CommonJobProperties;
import azkaban.jobExecutor.AbstractProcessJob;
import azkaban.jobExecutor.Job;
@@ -61,7 +67,10 @@ public class JobRunner extends EventHandler implements Runnable {
private Object syncObject = new Object();
private final JobTypeManager jobtypeManager;
-
+ private Integer pipelineLevel = null;
+ private FlowWatcher watcher = null;
+ private Set<String> pipelineJobs = new HashSet<String>();
+
public JobRunner(ExecutableNode node, Props props, File workingDir, ExecutorLoader loader, JobTypeManager jobtypeManager, Logger flowLogger) {
this.props = props;
this.node = node;
@@ -72,6 +81,19 @@ public class JobRunner extends EventHandler implements Runnable {
this.flowLogger = flowLogger;
}
+ public void setPipeline(FlowWatcher watcher, int pipelineLevel) {
+ this.watcher = watcher;
+ this.pipelineLevel = pipelineLevel;
+
+ if (pipelineLevel == 1) {
+ pipelineJobs.add(node.getJobId());
+ }
+ else if (pipelineLevel == 2) {
+ pipelineJobs.add(node.getJobId());
+ pipelineJobs.addAll(node.getOutNodes());
+ }
+ }
+
public ExecutableNode getNode() {
return node;
}
@@ -122,16 +144,16 @@ public class JobRunner extends EventHandler implements Runnable {
@Override
public void run() {
Thread.currentThread().setName("JobRunner-" + node.getJobId() + "-" + executionId);
-
- node.setStartTime(System.currentTimeMillis());
if (node.getStatus() == Status.DISABLED) {
+ node.setStartTime(System.currentTimeMillis());
fireEvent(Event.create(this, Type.JOB_STARTED, null, false));
node.setStatus(Status.SKIPPED);
node.setEndTime(System.currentTimeMillis());
fireEvent(Event.create(this, Type.JOB_FINISHED));
return;
} else if (node.getStatus() == Status.KILLED) {
+ node.setStartTime(System.currentTimeMillis());
fireEvent(Event.create(this, Type.JOB_STARTED, null, false));
node.setEndTime(System.currentTimeMillis());
fireEvent(Event.create(this, Type.JOB_FINISHED));
@@ -139,7 +161,32 @@ public class JobRunner extends EventHandler implements Runnable {
}
else {
createLogger();
-
+ node.setUpdateTime(System.currentTimeMillis());
+
+ // For pipelining of jobs. Will watch other jobs.
+ if (!pipelineJobs.isEmpty()) {
+ String blockedList = "";
+ ArrayList<BlockingStatus> blockingStatus = new ArrayList<BlockingStatus>();
+ for (String waitingJobId : pipelineJobs) {
+ Status status = watcher.peekStatus(waitingJobId);
+ if (status != null && !Status.isStatusFinished(status)) {
+ BlockingStatus block = watcher.getBlockingStatus(waitingJobId);
+ blockingStatus.add(block);
+ blockedList += waitingJobId + ",";
+ }
+ }
+ if (!blockingStatus.isEmpty()) {
+ logger.info("Pipeline job " + node.getJobId() + " waiting on " + blockedList + " in execution " + watcher.getExecId());
+
+ for(BlockingStatus bStatus: blockingStatus) {
+ logger.info("Waiting on pipelined job " + bStatus.getJobId());
+ bStatus.blockOnFinishedStatus();
+ logger.info("Pipelined job " + bStatus.getJobId() + " finished.");
+ }
+ }
+ }
+
+ node.setStartTime(System.currentTimeMillis());
fireEvent(Event.create(this, Type.JOB_STARTED, null, false));
try {
loader.uploadExecutableNode(node, props);
@@ -185,7 +232,7 @@ public class JobRunner extends EventHandler implements Runnable {
this.fireEventListeners(event);
}
- private boolean prepareJob() throws RuntimeException{
+ private boolean prepareJob() throws RuntimeException {
// Check pre conditions
if (props == null) {
node.setStatus(Status.FAILED);
src/java/azkaban/executor/ExecutableFlow.java 60(+16 -44)
diff --git a/src/java/azkaban/executor/ExecutableFlow.java b/src/java/azkaban/executor/ExecutableFlow.java
index ce88cb8..d2d44dc 100644
--- a/src/java/azkaban/executor/ExecutableFlow.java
+++ b/src/java/azkaban/executor/ExecutableFlow.java
@@ -58,6 +58,7 @@ public class ExecutableFlow {
private boolean notifyOnLastFailure = false;
private Integer pipelineLevel = null;
+ private Integer pipelineExecId = null;
private Map<String, String> flowParameters = new HashMap<String, String>();
public enum FailureAction {
@@ -67,49 +68,6 @@ public class ExecutableFlow {
}
private FailureAction failureAction = FailureAction.FINISH_CURRENTLY_RUNNING;
-
- public static enum Status {
- READY(10), PREPARING(20), RUNNING(30), PAUSED(40), SUCCEEDED(50), KILLED(60), FAILED(70), FAILED_FINISHING(80), SKIPPED(90), DISABLED(100), QUEUED(110);
-
- private int numVal;
-
- Status(int numVal) {
- this.numVal = numVal;
- }
-
- public int getNumVal() {
- return numVal;
- }
-
- public static Status fromInteger(int x) {
- switch (x) {
- case 10:
- return READY;
- case 20:
- return PREPARING;
- case 30:
- return RUNNING;
- case 40:
- return PAUSED;
- case 50:
- return SUCCEEDED;
- case 60:
- return KILLED;
- case 70:
- return FAILED;
- case 80:
- return FAILED_FINISHING;
- case 90:
- return SKIPPED;
- case 100:
- return DISABLED;
- case 110:
- return QUEUED;
- default:
- return READY;
- }
- }
- }
public ExecutableFlow(Flow flow) {
this.projectId = flow.getProjectId();
@@ -330,6 +288,7 @@ public class ExecutableFlow {
flowObj.put("failureEmails", failureEmails);
flowObj.put("failureAction", failureAction.toString());
flowObj.put("pipelineLevel", pipelineLevel);
+ flowObj.put("pipelineExecId", pipelineExecId);
flowObj.put("version", version);
ArrayList<Object> props = new ArrayList<Object>();
@@ -464,6 +423,7 @@ public class ExecutableFlow {
exFlow.failureAction = FailureAction.valueOf((String)flowObj.get("failureAction"));
}
exFlow.pipelineLevel = (Integer)flowObj.get("pipelineLevel");
+ exFlow.pipelineExecId = (Integer)flowObj.get("pipelineExecId");
// Copy nodes
List<Object> nodes = (List<Object>)flowObj.get("nodes");
@@ -532,10 +492,14 @@ public class ExecutableFlow {
this.submitUser = submitUser;
}
- public void setPipelineLevel(int level) {
+ public void setPipelineLevel(Integer level) {
pipelineLevel = level;
}
+ public void setPipelineExecutionId(Integer execId) {
+ pipelineExecId = execId;
+ }
+
public void setNotifyOnFirstFailure(boolean notify) {
this.notifyOnFirstFailure = notify;
}
@@ -559,4 +523,12 @@ public class ExecutableFlow {
public void setVersion(int version) {
this.version = version;
}
+
+ public Integer getPipelineLevel() {
+ return pipelineLevel;
+ }
+
+ public Integer getPipelineExecutionId() {
+ return pipelineExecId;
+ }
}
diff --git a/src/java/azkaban/executor/ExecutableJobInfo.java b/src/java/azkaban/executor/ExecutableJobInfo.java
index b716b5f..b096505 100644
--- a/src/java/azkaban/executor/ExecutableJobInfo.java
+++ b/src/java/azkaban/executor/ExecutableJobInfo.java
@@ -3,8 +3,6 @@ package azkaban.executor;
import java.util.HashMap;
import java.util.Map;
-import azkaban.executor.ExecutableFlow.Status;
-
public class ExecutableJobInfo {
private final int execId;
private final int projectId;
diff --git a/src/java/azkaban/executor/ExecutableNode.java b/src/java/azkaban/executor/ExecutableNode.java
index aaa6992..35483c8 100644
--- a/src/java/azkaban/executor/ExecutableNode.java
+++ b/src/java/azkaban/executor/ExecutableNode.java
@@ -23,7 +23,6 @@ import java.util.List;
import java.util.Map;
import java.util.Set;
-import azkaban.executor.ExecutableFlow.Status;
import azkaban.flow.Node;
import azkaban.utils.JSONUtils;
import azkaban.utils.Props;
diff --git a/src/java/azkaban/executor/ExecutorMailer.java b/src/java/azkaban/executor/ExecutorMailer.java
index 75b2723..59d7727 100644
--- a/src/java/azkaban/executor/ExecutorMailer.java
+++ b/src/java/azkaban/executor/ExecutorMailer.java
@@ -8,7 +8,6 @@ import javax.mail.MessagingException;
import org.apache.log4j.Logger;
import azkaban.executor.ExecutableFlow.FailureAction;
-import azkaban.executor.ExecutableFlow.Status;
import azkaban.utils.EmailMessage;
import azkaban.utils.Props;
import azkaban.utils.Utils;
diff --git a/src/java/azkaban/executor/ExecutorManager.java b/src/java/azkaban/executor/ExecutorManager.java
index 72c8e3d..975a58e 100644
--- a/src/java/azkaban/executor/ExecutorManager.java
+++ b/src/java/azkaban/executor/ExecutorManager.java
@@ -35,7 +35,6 @@ import org.apache.http.impl.client.BasicResponseHandler;
import org.apache.http.impl.client.DefaultHttpClient;
import org.apache.log4j.Logger;
-import azkaban.executor.ExecutableFlow.Status;
import azkaban.project.Project;
import azkaban.utils.FileIOUtils.LogData;
import azkaban.utils.JSONUtils;
diff --git a/src/java/azkaban/executor/JdbcExecutorLoader.java b/src/java/azkaban/executor/JdbcExecutorLoader.java
index c70d9f5..2fb2dbe 100644
--- a/src/java/azkaban/executor/JdbcExecutorLoader.java
+++ b/src/java/azkaban/executor/JdbcExecutorLoader.java
@@ -23,7 +23,6 @@ import org.apache.commons.dbutils.ResultSetHandler;
import org.apache.commons.io.IOUtils;
import org.apache.log4j.Logger;
-import azkaban.executor.ExecutableFlow.Status;
import azkaban.utils.DataSourceUtils;
import azkaban.utils.FileIOUtils;
import azkaban.utils.FileIOUtils.LogData;
@@ -110,8 +109,8 @@ public class JdbcExecutorLoader implements ExecutorLoader {
long id;
try {
- flow.setStatus(ExecutableFlow.Status.PREPARING);
- runner.update(connection, INSERT_EXECUTABLE_FLOW, flow.getProjectId(), flow.getFlowId(), flow.getVersion(), ExecutableFlow.Status.PREPARING.getNumVal(), submitTime, flow.getSubmitUser(), submitTime);
+ flow.setStatus(Status.PREPARING);
+ runner.update(connection, INSERT_EXECUTABLE_FLOW, flow.getProjectId(), flow.getFlowId(), flow.getVersion(), Status.PREPARING.getNumVal(), submitTime, flow.getSubmitUser(), submitTime);
connection.commit();
id = runner.query(connection, LastInsertID.LAST_INSERT_ID, new LastInsertID());
src/java/azkaban/executor/Status.java 56(+56 -0)
diff --git a/src/java/azkaban/executor/Status.java b/src/java/azkaban/executor/Status.java
new file mode 100644
index 0000000..1f4ee39
--- /dev/null
+++ b/src/java/azkaban/executor/Status.java
@@ -0,0 +1,56 @@
+package azkaban.executor;
+
+public enum Status {
+ READY(10), PREPARING(20), RUNNING(30), PAUSED(40), SUCCEEDED(50), KILLED(60), FAILED(70), FAILED_FINISHING(80), SKIPPED(90), DISABLED(100), QUEUED(110);
+
+ private int numVal;
+
+ Status(int numVal) {
+ this.numVal = numVal;
+ }
+
+ public int getNumVal() {
+ return numVal;
+ }
+
+ public static Status fromInteger(int x) {
+ switch (x) {
+ case 10:
+ return READY;
+ case 20:
+ return PREPARING;
+ case 30:
+ return RUNNING;
+ case 40:
+ return PAUSED;
+ case 50:
+ return SUCCEEDED;
+ case 60:
+ return KILLED;
+ case 70:
+ return FAILED;
+ case 80:
+ return FAILED_FINISHING;
+ case 90:
+ return SKIPPED;
+ case 100:
+ return DISABLED;
+ case 110:
+ return QUEUED;
+ default:
+ return READY;
+ }
+ }
+
+ public static boolean isStatusFinished(Status status) {
+ switch (status) {
+ case FAILED:
+ case KILLED:
+ case SUCCEEDED:
+ case SKIPPED:
+ return true;
+ default:
+ return false;
+ }
+ }
+}
\ No newline at end of file
diff --git a/src/java/azkaban/flow/CommonJobProperties.java b/src/java/azkaban/flow/CommonJobProperties.java
index 319ce96..5d71d30 100644
--- a/src/java/azkaban/flow/CommonJobProperties.java
+++ b/src/java/azkaban/flow/CommonJobProperties.java
@@ -1,8 +1,5 @@
package azkaban.flow;
-import java.util.UUID;
-
-import org.joda.time.DateTime;
public class CommonJobProperties {
/*
diff --git a/src/java/azkaban/scheduler/ScheduleManager.java b/src/java/azkaban/scheduler/ScheduleManager.java
index 9a53f37..540fea6 100644
--- a/src/java/azkaban/scheduler/ScheduleManager.java
+++ b/src/java/azkaban/scheduler/ScheduleManager.java
@@ -35,7 +35,7 @@ import org.joda.time.format.DateTimeFormatter;
import azkaban.executor.ExecutableFlow;
import azkaban.executor.ExecutorManager;
import azkaban.executor.ExecutorManagerException;
-import azkaban.executor.ExecutableFlow.Status;
+import azkaban.executor.Status;
import azkaban.flow.Flow;
import azkaban.project.Project;
src/java/azkaban/sla/SlaMailer.java 2(+1 -1)
diff --git a/src/java/azkaban/sla/SlaMailer.java b/src/java/azkaban/sla/SlaMailer.java
index 17a67d0..9b19da8 100644
--- a/src/java/azkaban/sla/SlaMailer.java
+++ b/src/java/azkaban/sla/SlaMailer.java
@@ -9,8 +9,8 @@ import org.apache.log4j.Logger;
import azkaban.executor.ExecutableFlow;
import azkaban.executor.ExecutableFlow.FailureAction;
-import azkaban.executor.ExecutableFlow.Status;
import azkaban.executor.ExecutableNode;
+import azkaban.executor.Status;
import azkaban.sla.SLA;
import azkaban.utils.EmailMessage;
import azkaban.utils.Props;
src/java/azkaban/sla/SLAManager.java 2(+1 -1)
diff --git a/src/java/azkaban/sla/SLAManager.java b/src/java/azkaban/sla/SLAManager.java
index 04fbe08..1adffc6 100644
--- a/src/java/azkaban/sla/SLAManager.java
+++ b/src/java/azkaban/sla/SLAManager.java
@@ -18,11 +18,11 @@ import org.joda.time.format.DateTimeFormat;
import org.joda.time.format.DateTimeFormatter;
import azkaban.executor.ExecutableFlow;
-import azkaban.executor.ExecutableFlow.Status;
import azkaban.executor.ExecutableNode;
import azkaban.executor.ExecutorMailer;
import azkaban.executor.ExecutorManager;
import azkaban.executor.ExecutorManagerException;
+import azkaban.executor.Status;
import azkaban.flow.Flow;
import azkaban.project.Project;
import azkaban.project.ProjectManager;
src/java/azkaban/utils/WebUtils.java 2(+1 -1)
diff --git a/src/java/azkaban/utils/WebUtils.java b/src/java/azkaban/utils/WebUtils.java
index 4878e34..ef63fb6 100644
--- a/src/java/azkaban/utils/WebUtils.java
+++ b/src/java/azkaban/utils/WebUtils.java
@@ -7,7 +7,7 @@ import org.joda.time.DurationFieldType;
import org.joda.time.ReadablePeriod;
import org.joda.time.format.DateTimeFormat;
-import azkaban.executor.ExecutableFlow.Status;
+import azkaban.executor.Status;
public class WebUtils {
public static final String DATE_TIME_STRING = "YYYY-MM-dd HH:mm:ss";
diff --git a/src/java/azkaban/webapp/AzkabanWebServer.java b/src/java/azkaban/webapp/AzkabanWebServer.java
index 71eb652..97c7414 100644
--- a/src/java/azkaban/webapp/AzkabanWebServer.java
+++ b/src/java/azkaban/webapp/AzkabanWebServer.java
@@ -35,7 +35,6 @@ import javax.management.MBeanServer;
import javax.management.ObjectName;
import org.apache.commons.lang.StringUtils;
-import org.apache.log4j.FileAppender;
import org.apache.log4j.Logger;
import org.apache.velocity.app.VelocityEngine;
import org.apache.velocity.runtime.log.Log4JLogChute;
@@ -179,11 +178,6 @@ public class AzkabanWebServer implements AzkabanServer {
configureMBeanServer();
}
-
- private void setupLoggers() {
- //FileAppender fileAppender = new FileAppender(loggerLayout, absolutePath, true);
-
- }
private void setViewerPlugins(List<ViewerPlugin> viewerPlugins) {
this.viewerPlugins = viewerPlugins;
diff --git a/src/java/azkaban/webapp/servlet/ExecutorServlet.java b/src/java/azkaban/webapp/servlet/ExecutorServlet.java
index f275119..c16f6b9 100644
--- a/src/java/azkaban/webapp/servlet/ExecutorServlet.java
+++ b/src/java/azkaban/webapp/servlet/ExecutorServlet.java
@@ -33,18 +33,16 @@ import azkaban.executor.ExecutableFlow;
import azkaban.executor.ExecutableNode;
import azkaban.executor.ExecutableFlow.FailureAction;
import azkaban.executor.ExecutorManager;
-import azkaban.executor.ExecutableFlow.Status;
import azkaban.executor.ExecutorManagerException;
+import azkaban.executor.Status;
import azkaban.flow.Flow;
import azkaban.project.Project;
import azkaban.project.ProjectManager;
import azkaban.scheduler.Schedule;
import azkaban.scheduler.ScheduleManager;
import azkaban.user.Permission;
-import azkaban.user.Role;
import azkaban.user.User;
import azkaban.user.Permission.Type;
-import azkaban.user.UserManager;
import azkaban.utils.FileIOUtils.LogData;
import azkaban.webapp.AzkabanWebServer;
import azkaban.webapp.session.Session;
@@ -54,7 +52,6 @@ public class ExecutorServlet extends LoginAbstractAzkabanServlet {
private ProjectManager projectManager;
private ExecutorManager executorManager;
private ScheduleManager scheduleManager;
- private UserManager userManager;
private ExecutorVMHelper vmHelper;
@Override
@@ -64,7 +61,6 @@ public class ExecutorServlet extends LoginAbstractAzkabanServlet {
projectManager = server.getProjectManager();
executorManager = server.getExecutorManager();
scheduleManager = server.getScheduleManager();
- userManager = server.getUserManager();
vmHelper = new ExecutorVMHelper();
}
@@ -750,19 +746,4 @@ public class ExecutorServlet extends LoginAbstractAzkabanServlet {
return project.getName();
}
}
-
- private boolean hasPermission(Project project, User user, Permission.Type type) {
- if (project.hasPermission(user, type)) {
- return true;
- }
-
- for(String roleName: user.getRoles()) {
- Role role = userManager.getRole(roleName);
- if (role.getPermission().isPermissionSet(type) || role.getPermission().isPermissionSet(Permission.Type.ADMIN)) {
- return true;
- }
- }
-
- return false;
- }
}
diff --git a/src/java/azkaban/webapp/servlet/LoginAbstractAzkabanServlet.java b/src/java/azkaban/webapp/servlet/LoginAbstractAzkabanServlet.java
index a92fb45..891e896 100644
--- a/src/java/azkaban/webapp/servlet/LoginAbstractAzkabanServlet.java
+++ b/src/java/azkaban/webapp/servlet/LoginAbstractAzkabanServlet.java
@@ -31,6 +31,9 @@ import javax.servlet.http.HttpServletResponse;
import org.apache.commons.fileupload.servlet.ServletFileUpload;
import org.apache.log4j.Logger;
+import azkaban.project.Project;
+import azkaban.user.Permission;
+import azkaban.user.Role;
import azkaban.user.User;
import azkaban.user.UserManager;
import azkaban.user.UserManagerException;
@@ -217,6 +220,22 @@ public abstract class LoginAbstractAzkabanServlet extends AbstractAzkabanServlet
return session;
}
+ protected boolean hasPermission(Project project, User user, Permission.Type type) {
+ UserManager userManager = getApplication().getUserManager();
+ if (project.hasPermission(user, type)) {
+ return true;
+ }
+
+ for(String roleName: user.getRoles()) {
+ Role role = userManager.getRole(roleName);
+ if (role.getPermission().isPermissionSet(type) || role.getPermission().isPermissionSet(Permission.Type.ADMIN)) {
+ return true;
+ }
+ }
+
+ return false;
+ }
+
protected void handleAjaxLoginAction(HttpServletRequest req, HttpServletResponse resp, Map<String, Object> ret) throws ServletException {
if (hasParam(req, "username") && hasParam(req, "password")) {
Session session = null;
diff --git a/src/java/azkaban/webapp/servlet/ProjectManagerServlet.java b/src/java/azkaban/webapp/servlet/ProjectManagerServlet.java
index 2f1403b..0e06e59 100644
--- a/src/java/azkaban/webapp/servlet/ProjectManagerServlet.java
+++ b/src/java/azkaban/webapp/servlet/ProjectManagerServlet.java
@@ -1250,21 +1250,6 @@ public class ProjectManagerServlet extends LoginAbstractAzkabanServlet {
}
}
- private boolean hasPermission(Project project, User user, Permission.Type type) {
- if (project.hasPermission(user, type)) {
- return true;
- }
-
- for(String roleName: user.getRoles()) {
- Role role = userManager.getRole(roleName);
- if (role.getPermission().isPermissionSet(type) || role.getPermission().isPermissionSet(Permission.Type.ADMIN)) {
- return true;
- }
- }
-
- return false;
- }
-
private Permission getPermissionObject(Project project, User user, Permission.Type type) {
Permission perm = project.getCollectivePermission(user);
diff --git a/src/java/azkaban/webapp/servlet/ScheduleServlet.java b/src/java/azkaban/webapp/servlet/ScheduleServlet.java
index 489299a..13de687 100644
--- a/src/java/azkaban/webapp/servlet/ScheduleServlet.java
+++ b/src/java/azkaban/webapp/servlet/ScheduleServlet.java
@@ -27,7 +27,6 @@ import javax.servlet.ServletConfig;
import javax.servlet.ServletException;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
-import javax.swing.text.StyledEditorKit.BoldAction;
import org.apache.log4j.Logger;
import org.joda.time.DateTime;
@@ -42,7 +41,6 @@ import azkaban.executor.ExecutableFlow;
import azkaban.executor.ExecutableNode;
import azkaban.executor.ExecutorManagerException;
import azkaban.executor.ExecutableFlow.FailureAction;
-import azkaban.executor.ExecutableFlow.Status;
import azkaban.flow.Flow;
import azkaban.flow.Node;
import azkaban.project.Project;
@@ -585,19 +583,4 @@ public class ScheduleServlet extends LoginAbstractAzkabanServlet {
return firstSchedTime;
}
-
- private boolean hasPermission(Project project, User user, Permission.Type type) {
- if (project.hasPermission(user, type)) {
- return true;
- }
-
- for(String roleName: user.getRoles()) {
- Role role = userManager.getRole(roleName);
- if (role.getPermission().isPermissionSet(type) || role.getPermission().isPermissionSet(Permission.Type.ADMIN)) {
- return true;
- }
- }
-
- return false;
- }
}
src/java/log4j.properties 17(+16 -1)
diff --git a/src/java/log4j.properties b/src/java/log4j.properties
index 9df233d..52008f9 100644
--- a/src/java/log4j.properties
+++ b/src/java/log4j.properties
@@ -1,6 +1,7 @@
log4j.rootLogger=INFO, Console
-log4j.logger.azkaban.webapp.AzkabanServer=INFO, R
+log4j.logger.azkaban.webapp=INFO, WebServer
log4j.logger.azkaban.webapp.servlet.AbstractAzkabanServlet=INFO, R
+log4j.logger.azkaban.webapp.servlet.LoginAbstractAzkabanServlet=INFO, R
log4j.appender.R=org.apache.log4j.RollingFileAppender
log4j.appender.R.layout=org.apache.log4j.PatternLayout
@@ -9,6 +10,20 @@ log4j.appender.R.layout.ConversionPattern=%d{yyyy/MM/dd HH:mm:ss.SSS Z} %p [%c{1
log4j.appender.R.MaxFileSize=102400MB
log4j.appender.R.MaxBackupIndex=2
+log4j.appender.WebServer=org.apache.log4j.RollingFileAppender
+log4j.appender.WebServer.layout=org.apache.log4j.PatternLayout
+log4j.appender.WebServer.File=azkaban-webserver.log
+log4j.appender.WebServer.layout.ConversionPattern=%d{yyyy/MM/dd HH:mm:ss.SSS Z} %p [%c{1}] [Azkaban] %m%n
+log4j.appender.WebServer.MaxFileSize=102400MB
+log4j.appender.WebServer.MaxBackupIndex=2
+
+log4j.appender.ExecServer=org.apache.log4j.RollingFileAppender
+log4j.appender.ExecServer.layout=org.apache.log4j.PatternLayout
+log4j.appender.ExecServer.File=azkaban-execserver.log
+log4j.appender.ExecServer.layout.ConversionPattern=%d{yyyy/MM/dd HH:mm:ss.SSS Z} %p [%c{1}] [Azkaban] %m%n
+log4j.appender.ExecServer.MaxFileSize=102400MB
+log4j.appender.ExecServer.MaxBackupIndex=2
+
log4j.appender.Console=org.apache.log4j.ConsoleAppender
log4j.appender.Console.layout=org.apache.log4j.PatternLayout
log4j.appender.Console.layout.ConversionPattern=%d{yyyy/MM/dd HH:mm:ss.SSS Z} %p [%c{1}] [Azkaban] %m%n
\ No newline at end of file
src/sql/create_execution_logs.sql 3(+2 -1)
diff --git a/src/sql/create_execution_logs.sql b/src/sql/create_execution_logs.sql
index 07e0b7b..6e76312 100644
--- a/src/sql/create_execution_logs.sql
+++ b/src/sql/create_execution_logs.sql
@@ -6,7 +6,8 @@ CREATE TABLE execution_logs (
start_byte INT,
end_byte INT,
log LONGBLOB,
- PRIMARY KEY (exec_id, name, attempt),
+ PRIMARY KEY (exec_id, name, attempt, start_byte),
+ INDEX log_attempt (exec_id, name, attempt),
INDEX log_index (exec_id, name),
INDEX byte_log_index(exec_id, name, start_byte, end_byte)
) ENGINE=InnoDB;
diff --git a/src/sql/create_project_event_table.sql b/src/sql/create_project_event_table.sql
index 4cf8f21..ea9802e 100644
--- a/src/sql/create_project_event_table.sql
+++ b/src/sql/create_project_event_table.sql
@@ -3,6 +3,6 @@ CREATE TABLE project_events (
event_type TINYINT NOT NULL,
event_time BIGINT NOT NULL,
username VARCHAR(64),
- message VARCHAR(128),
+ message VARCHAR(512),
INDEX log (project_id, event_time)
) ENGINE=InnoDB;
src/sql/update_2.0_to_2.01.sql 3(+2 -1)
diff --git a/src/sql/update_2.0_to_2.01.sql b/src/sql/update_2.0_to_2.01.sql
index 80273f4..9ab565e 100644
--- a/src/sql/update_2.0_to_2.01.sql
+++ b/src/sql/update_2.0_to_2.01.sql
@@ -8,7 +8,8 @@ ALTER TABLE execution_jobs ADD INDEX exec_job (exec_id, job_id);
ALTER TABLE execution_logs ADD COLUMN attempt INT DEFAULT 0;
ALTER TABLE execution_logs DROP PRIMARY KEY;
-ALTER TABLE execution_logs ADD PRIMARY KEY(exec_id, name, attempt);
+ALTER TABLE execution_logs ADD PRIMARY KEY(exec_id, name, attempt, start_byte);
+ALTER TABLE execution_logs ADD INDEX log_attempt (exec_id, name, attempt)
ALTER TABLE schedules ADD COLUMN enc_type TINYINT;
ALTER TABLE schedules ADD COLUMN schedule_options LONGBLOB;
unit/executions/exectest1/exec1.flow 17(+0 -17)
diff --git a/unit/executions/exectest1/exec1.flow b/unit/executions/exectest1/exec1.flow
index 3612d58..5ca051c 100644
--- a/unit/executions/exectest1/exec1.flow
+++ b/unit/executions/exectest1/exec1.flow
@@ -30,17 +30,10 @@
},{
"source" : "job7",
"target" : "job8"
- },{
- "source" : "job7",
- "target" : "job9"
},
{
"source" : "job8",
"target" : "job10"
- },
- {
- "source" : "job9",
- "target" : "job10"
}
],
"failure.email" : [],
@@ -126,16 +119,6 @@
},
{
"propSource" : "prop2.properties",
- "id" : "job9",
- "jobType" : "java",
- "layout" : {
- "level" : 0
- },
- "jobSource" : "job9.job",
- "expectedRuntime" : 1
- },
- {
- "propSource" : "prop2.properties",
"id" : "job10",
"jobType" : "java",
"layout" : {
unit/executions/exectest1/exec1-mod.flow 156(+156 -0)
diff --git a/unit/executions/exectest1/exec1-mod.flow b/unit/executions/exectest1/exec1-mod.flow
new file mode 100644
index 0000000..3612d58
--- /dev/null
+++ b/unit/executions/exectest1/exec1-mod.flow
@@ -0,0 +1,156 @@
+{
+ "project.id":1,
+ "version":2,
+ "id" : "derived-member-data",
+ "success.email" : [],
+ "edges" : [ {
+ "source" : "job1",
+ "target" : "job2"
+ }, {
+ "source" : "job2",
+ "target" : "job3"
+ },{
+ "source" : "job2",
+ "target" : "job4"
+ }, {
+ "source" : "job3",
+ "target" : "job5"
+ },{
+ "source" : "job4",
+ "target" : "job5"
+ },{
+ "source" : "job5",
+ "target" : "job7"
+ },{
+ "source" : "job1",
+ "target" : "job6"
+ },{
+ "source" : "job6",
+ "target" : "job7"
+ },{
+ "source" : "job7",
+ "target" : "job8"
+ },{
+ "source" : "job7",
+ "target" : "job9"
+ },
+ {
+ "source" : "job8",
+ "target" : "job10"
+ },
+ {
+ "source" : "job9",
+ "target" : "job10"
+ }
+ ],
+ "failure.email" : [],
+ "nodes" : [ {
+ "propSource" : "prop2.properties",
+ "id" : "job1",
+ "jobType" : "java",
+ "layout" : {
+ "level" : 0
+ },
+ "jobSource" : "job1.job",
+ "expectedRuntime" : 1
+ },
+ {
+ "propSource" : "prop2.properties",
+ "id" : "job2",
+ "jobType" : "java",
+ "layout" : {
+ "level" : 0
+ },
+ "jobSource" : "job2.job",
+ "expectedRuntime" : 1
+ },
+ {
+ "propSource" : "prop2.properties",
+ "id" : "job3",
+ "jobType" : "java",
+ "layout" : {
+ "level" : 0
+ },
+ "jobSource" : "job3.job",
+ "expectedRuntime" : 1
+ },
+ {
+ "propSource" : "prop2.properties",
+ "id" : "job4",
+ "jobType" : "java",
+ "layout" : {
+ "level" : 0
+ },
+ "jobSource" : "job4.job",
+ "expectedRuntime" : 1
+ },
+ {
+ "propSource" : "prop2.properties",
+ "id" : "job5",
+ "jobType" : "java",
+ "layout" : {
+ "level" : 0
+ },
+ "jobSource" : "job5.job",
+ "expectedRuntime" : 1
+ },
+ {
+ "propSource" : "prop2.properties",
+ "id" : "job6",
+ "jobType" : "java",
+ "layout" : {
+ "level" : 0
+ },
+ "jobSource" : "job6.job",
+ "expectedRuntime" : 1
+ },
+ {
+ "propSource" : "prop2.properties",
+ "id" : "job7",
+ "jobType" : "java",
+ "layout" : {
+ "level" : 0
+ },
+ "jobSource" : "job7.job",
+ "expectedRuntime" : 1
+ },
+ {
+ "propSource" : "prop2.properties",
+ "id" : "job8",
+ "jobType" : "java",
+ "layout" : {
+ "level" : 0
+ },
+ "jobSource" : "job8.job",
+ "expectedRuntime" : 1
+ },
+ {
+ "propSource" : "prop2.properties",
+ "id" : "job9",
+ "jobType" : "java",
+ "layout" : {
+ "level" : 0
+ },
+ "jobSource" : "job9.job",
+ "expectedRuntime" : 1
+ },
+ {
+ "propSource" : "prop2.properties",
+ "id" : "job10",
+ "jobType" : "java",
+ "layout" : {
+ "level" : 0
+ },
+ "jobSource" : "job10.job",
+ "expectedRuntime" : 1
+ }
+ ],
+ "layedout" : false,
+ "type" : "flow",
+ "props" : [ {
+ "inherits" : "prop1.properties",
+ "source" : "prop2.properties"
+ },{
+ "source" : "prop1.properties"
+ }]
+}
\ No newline at end of file
diff --git a/unit/java/azkaban/test/execapp/event/BlockingStatusTest.java b/unit/java/azkaban/test/execapp/event/BlockingStatusTest.java
new file mode 100644
index 0000000..3acd91b
--- /dev/null
+++ b/unit/java/azkaban/test/execapp/event/BlockingStatusTest.java
@@ -0,0 +1,114 @@
+package azkaban.test.execapp.event;
+
+import org.junit.Test;
+
+import junit.framework.Assert;
+import azkaban.execapp.event.BlockingStatus;
+import azkaban.executor.Status;
+
+public class BlockingStatusTest {
+
+ public class WatchingThread extends Thread {
+ private BlockingStatus status;
+ private long diff = 0;
+ public WatchingThread(BlockingStatus status) {
+ this.status = status;
+ }
+
+ public void run() {
+ long startTime = System.currentTimeMillis();
+ status.blockOnFinishedStatus();
+ diff = System.currentTimeMillis() - startTime;
+ }
+
+ public long getDiff() {
+ return diff;
+ }
+ }
+
+ @Test
+ public void testFinishedBlock() {
+ BlockingStatus status = new BlockingStatus(1, "test", Status.SKIPPED);
+
+ WatchingThread thread = new WatchingThread(status);
+ thread.start();
+ try {
+ thread.join();
+ } catch (InterruptedException e) {
+ e.printStackTrace();
+ }
+ System.out.println("Diff " + thread.getDiff());
+ Assert.assertTrue(thread.getDiff() < 100);
+ }
+
+ @Test
+ public void testUnfinishedBlock() throws InterruptedException {
+ BlockingStatus status = new BlockingStatus(1, "test", Status.QUEUED);
+
+ WatchingThread thread = new WatchingThread(status);
+ thread.start();
+
+ synchronized(this) {
+ wait(3000);
+ }
+
+ status.changeStatus(Status.SUCCEEDED);
+ thread.join();
+
+ System.out.println("Diff " + thread.getDiff());
+ Assert.assertTrue(thread.getDiff() >= 3000 && thread.getDiff() < 3100);
+ }
+
+ @Test
+ public void testUnfinishedBlockSeveralChanges() throws InterruptedException {
+ BlockingStatus status = new BlockingStatus(1, "test", Status.QUEUED);
+
+ WatchingThread thread = new WatchingThread(status);
+ thread.start();
+
+ synchronized(this) {
+ wait(3000);
+ }
+
+ status.changeStatus(Status.PAUSED);
+
+ synchronized(this) {
+ wait(1000);
+ }
+
+ status.changeStatus(Status.FAILED);
+
+ thread.join(1000);
+
+ System.out.println("Diff " + thread.getDiff());
+ Assert.assertTrue(thread.getDiff() >= 4000 && thread.getDiff() < 4100);
+ }
+
+ @Test
+ public void testMultipleWatchers() throws InterruptedException {
+ BlockingStatus status = new BlockingStatus(1, "test", Status.QUEUED);
+
+ WatchingThread thread1 = new WatchingThread(status);
+ thread1.start();
+
+ synchronized(this) {
+ wait(2000);
+ }
+
+ WatchingThread thread2 = new WatchingThread(status);
+ thread2.start();
+
+ synchronized(this) {
+ wait(2000);
+ }
+
+ status.changeStatus(Status.FAILED);
+ thread2.join(1000);
+ thread1.join(1000);
+
+ System.out.println("Diff thread 1 " + thread1.getDiff());
+ System.out.println("Diff thread 2 " + thread2.getDiff());
+ Assert.assertTrue(thread1.getDiff() >= 4000 && thread1.getDiff() < 4100);
+ Assert.assertTrue(thread2.getDiff() >= 2000 && thread2.getDiff() < 2100);
+ }
+}
diff --git a/unit/java/azkaban/test/execapp/event/LocalFlowWatcherTest.java b/unit/java/azkaban/test/execapp/event/LocalFlowWatcherTest.java
new file mode 100644
index 0000000..df5f5a6
--- /dev/null
+++ b/unit/java/azkaban/test/execapp/event/LocalFlowWatcherTest.java
@@ -0,0 +1,234 @@
+package azkaban.test.execapp.event;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.HashMap;
+
+import org.apache.commons.io.FileUtils;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import azkaban.execapp.FlowRunner;
+import azkaban.execapp.event.FlowWatcher;
+import azkaban.execapp.event.LocalFlowWatcher;
+import azkaban.executor.ExecutableFlow;
+import azkaban.executor.ExecutableNode;
+import azkaban.executor.ExecutorLoader;
+import azkaban.executor.Status;
+import azkaban.flow.Flow;
+import azkaban.jobtype.JobTypeManager;
+import azkaban.project.ProjectLoader;
+import azkaban.test.execapp.EventCollectorListener;
+import azkaban.test.execapp.MockExecutorLoader;
+import azkaban.test.execapp.MockProjectLoader;
+import azkaban.test.executor.JavaJob;
+import azkaban.utils.JSONUtils;
+
+public class LocalFlowWatcherTest {
+ private File workingDir;
+ private JobTypeManager jobtypeManager;
+ private ProjectLoader fakeProjectLoader;
+ private int dirVal= 0;
+
+ @Before
+ public void setUp() throws Exception {
+ jobtypeManager = new JobTypeManager(null, this.getClass().getClassLoader());
+ jobtypeManager.registerJobType("java", JavaJob.class);
+ fakeProjectLoader = new MockProjectLoader(workingDir);
+ }
+
+ @After
+ public void tearDown() throws IOException {
+ }
+
+ public File setupDirectory() throws IOException {
+ System.out.println("Create temp dir");
+ File workingDir = new File("_AzkabanTestDir_" + dirVal );
+ if (workingDir.exists()) {
+ FileUtils.deleteDirectory(workingDir);
+ }
+ workingDir.mkdirs();
+ dirVal++;
+
+ return workingDir;
+ }
+
+ @Test
+ public void testBasicLocalFlowWatcher() throws Exception {
+ MockExecutorLoader loader = new MockExecutorLoader();
+
+ EventCollectorListener eventCollector = new EventCollectorListener();
+
+ File workingDir1 = setupDirectory();
+ FlowRunner runner1 = createFlowRunner(workingDir1, loader, eventCollector, "exec1", 1, null, null);
+ Thread runner1Thread = new Thread(runner1);
+
+ File workingDir2 = setupDirectory();
+ LocalFlowWatcher watcher = new LocalFlowWatcher(runner1);
+ FlowRunner runner2 = createFlowRunner(workingDir2, loader, eventCollector, "exec1", 2, watcher, 2);
+ Thread runner2Thread = new Thread(runner2);
+
+ runner1Thread.start();
+ runner2Thread.start();
+ runner2Thread.join();
+
+ FileUtils.deleteDirectory(workingDir1);
+ FileUtils.deleteDirectory(workingDir2);
+
+ testPipelineLevel2(runner1.getExecutableFlow(), runner2.getExecutableFlow());
+ }
+
+ @Test
+ public void testLevel1LocalFlowWatcher() throws Exception {
+ MockExecutorLoader loader = new MockExecutorLoader();
+
+ EventCollectorListener eventCollector = new EventCollectorListener();
+
+ File workingDir1 = setupDirectory();
+ FlowRunner runner1 = createFlowRunner(workingDir1, loader, eventCollector, "exec1", 1, null, null);
+ Thread runner1Thread = new Thread(runner1);
+
+ File workingDir2 = setupDirectory();
+ LocalFlowWatcher watcher = new LocalFlowWatcher(runner1);
+ FlowRunner runner2 = createFlowRunner(workingDir2, loader, eventCollector, "exec1", 2, watcher, 1);
+ Thread runner2Thread = new Thread(runner2);
+
+ runner1Thread.start();
+ runner2Thread.start();
+ runner2Thread.join();
+
+ FileUtils.deleteDirectory(workingDir1);
+ FileUtils.deleteDirectory(workingDir2);
+
+ testPipelineLevel1(runner1.getExecutableFlow(), runner2.getExecutableFlow());
+ }
+
+ @Test
+ public void testLevel2DiffLocalFlowWatcher() throws Exception {
+ MockExecutorLoader loader = new MockExecutorLoader();
+
+ EventCollectorListener eventCollector = new EventCollectorListener();
+
+ File workingDir1 = setupDirectory();
+ FlowRunner runner1 = createFlowRunner(workingDir1, loader, eventCollector, "exec1", 1, null, null);
+ Thread runner1Thread = new Thread(runner1);
+
+ File workingDir2 = setupDirectory();
+ LocalFlowWatcher watcher = new LocalFlowWatcher(runner1);
+ FlowRunner runner2 = createFlowRunner(workingDir2, loader, eventCollector, "exec1-mod", 2, watcher, 1);
+ Thread runner2Thread = new Thread(runner2);
+
+ runner1Thread.start();
+ runner2Thread.start();
+ runner2Thread.join();
+
+ FileUtils.deleteDirectory(workingDir1);
+ FileUtils.deleteDirectory(workingDir2);
+
+ testPipelineLevel1(runner1.getExecutableFlow(), runner2.getExecutableFlow());
+ }
+
+ private void testPipelineLevel1(ExecutableFlow first, ExecutableFlow second) {
+ for (ExecutableNode node: second.getExecutableNodes()) {
+ Assert.assertEquals(node.getStatus(), Status.SUCCEEDED);
+
+ // check it's start time is after the first's children.
+ ExecutableNode watchedNode = first.getExecutableNode(node.getJobId());
+ if (watchedNode == null) {
+ continue;
+ }
+ Assert.assertEquals(watchedNode.getStatus(), Status.SUCCEEDED);
+
+ System.out.println("Node " + node.getJobId() +
+ " start: " + node.getStartTime() +
+ " dependent on " + watchedNode.getJobId() +
+ " " + watchedNode.getEndTime() +
+ " diff: " + (node.getStartTime() - watchedNode.getEndTime()));
+
+ Assert.assertTrue(node.getStartTime() >= watchedNode.getEndTime());
+
+ long minParentDiff = 0;
+ if (node.getInNodes().size() > 0) {
+ minParentDiff = Long.MAX_VALUE;
+ for (String dependency: node.getInNodes()) {
+ ExecutableNode parent = second.getExecutableNode(dependency);
+ long diff = node.getStartTime() - parent.getEndTime();
+ minParentDiff = Math.min(minParentDiff, diff);
+ }
+ }
+ long diff = node.getStartTime() - watchedNode.getEndTime();
+ System.out.println(" minPipelineTimeDiff:" + diff + " minDependencyTimeDiff:" + minParentDiff);
+ Assert.assertTrue(minParentDiff < 100 || diff < 100);
+ }
+ }
+
+ private void testPipelineLevel2(ExecutableFlow first, ExecutableFlow second) {
+ for (ExecutableNode node: second.getExecutableNodes()) {
+ Assert.assertEquals(node.getStatus(), Status.SUCCEEDED);
+
+ // check it's start time is after the first's children.
+ ExecutableNode watchedNode = first.getExecutableNode(node.getJobId());
+ if (watchedNode == null) {
+ continue;
+ }
+ Assert.assertEquals(watchedNode.getStatus(), Status.SUCCEEDED);
+
+ long minDiff = Long.MAX_VALUE;
+ for (String watchedChild: watchedNode.getOutNodes()) {
+ ExecutableNode child = first.getExecutableNode(watchedChild);
+ if (child == null) {
+ continue;
+ }
+ Assert.assertEquals(child.getStatus(), Status.SUCCEEDED);
+ long diff = node.getStartTime() - child.getEndTime();
+ minDiff = Math.min(minDiff, diff);
+ System.out.println("Node " + node.getJobId() +
+ " start: " + node.getStartTime() +
+ " dependent on " + watchedChild + " " + child.getEndTime() +
+ " diff: " + diff);
+ Assert.assertTrue(node.getStartTime() >= child.getEndTime());
+ }
+
+ long minParentDiff = Long.MAX_VALUE;
+ for (String dependency: node.getInNodes()) {
+ ExecutableNode parent = second.getExecutableNode(dependency);
+ long diff = node.getStartTime() - parent.getEndTime();
+ minParentDiff = Math.min(minParentDiff, diff);
+ }
+ System.out.println(" minPipelineTimeDiff:" + minDiff + " minDependencyTimeDiff:" + minParentDiff);
+ Assert.assertTrue(minParentDiff < 100 || minDiff < 100);
+ }
+ }
+
+ private FlowRunner createFlowRunner(File workingDir, ExecutorLoader loader, EventCollectorListener eventCollector, String flowName, int execId, FlowWatcher watcher, Integer pipeline) throws Exception {
+ File testDir = new File("unit/executions/exectest1");
+ ExecutableFlow exFlow = prepareExecDir(workingDir, testDir, flowName, execId);
+ if (watcher != null) {
+ exFlow.setPipelineLevel(pipeline);
+ exFlow.setPipelineExecutionId(watcher.getExecId());
+ }
+ //MockProjectLoader projectLoader = new MockProjectLoader(new File(exFlow.getExecutionPath()));
+
+ loader.uploadExecutableFlow(exFlow);
+ FlowRunner runner = new FlowRunner(exFlow, watcher, loader, fakeProjectLoader, jobtypeManager);
+ runner.addListener(eventCollector);
+
+ return runner;
+ }
+
+ private ExecutableFlow prepareExecDir(File workingDir, File execDir, String flowName, int execId) throws IOException {
+ FileUtils.copyDirectory(execDir, workingDir);
+
+ File jsonFlowFile = new File(workingDir, flowName + ".flow");
+ @SuppressWarnings("unchecked")
+ HashMap<String, Object> flowObj = (HashMap<String, Object>) JSONUtils.parseJSONFromFile(jsonFlowFile);
+
+ Flow flow = Flow.flowFromObject(flowObj);
+ ExecutableFlow execFlow = new ExecutableFlow(flow);
+ execFlow.setExecutionId(execId);
+ execFlow.setExecutionPath(workingDir.getPath());
+ return execFlow;
+ }
+}
diff --git a/unit/java/azkaban/test/execapp/event/RemoteFlowWatcherTest.java b/unit/java/azkaban/test/execapp/event/RemoteFlowWatcherTest.java
new file mode 100644
index 0000000..78aac81
--- /dev/null
+++ b/unit/java/azkaban/test/execapp/event/RemoteFlowWatcherTest.java
@@ -0,0 +1,234 @@
+package azkaban.test.execapp.event;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.HashMap;
+
+import org.apache.commons.io.FileUtils;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import azkaban.execapp.FlowRunner;
+import azkaban.execapp.event.FlowWatcher;
+import azkaban.execapp.event.RemoteFlowWatcher;
+import azkaban.executor.ExecutableFlow;
+import azkaban.executor.ExecutableNode;
+import azkaban.executor.ExecutorLoader;
+import azkaban.executor.Status;
+import azkaban.flow.Flow;
+import azkaban.jobtype.JobTypeManager;
+import azkaban.project.ProjectLoader;
+import azkaban.test.execapp.EventCollectorListener;
+import azkaban.test.execapp.MockExecutorLoader;
+import azkaban.test.execapp.MockProjectLoader;
+import azkaban.test.executor.JavaJob;
+import azkaban.utils.JSONUtils;
+
+public class RemoteFlowWatcherTest {
+ private File workingDir;
+ private JobTypeManager jobtypeManager;
+ private ProjectLoader fakeProjectLoader;
+ private int dirVal= 0;
+
+ @Before
+ public void setUp() throws Exception {
+ jobtypeManager = new JobTypeManager(null, this.getClass().getClassLoader());
+ jobtypeManager.registerJobType("java", JavaJob.class);
+ fakeProjectLoader = new MockProjectLoader(workingDir);
+ }
+
+ @After
+ public void tearDown() throws IOException {
+ }
+
+ public File setupDirectory() throws IOException {
+ System.out.println("Create temp dir");
+ File workingDir = new File("_AzkabanTestDir_" + dirVal );
+ if (workingDir.exists()) {
+ FileUtils.deleteDirectory(workingDir);
+ }
+ workingDir.mkdirs();
+ dirVal++;
+
+ return workingDir;
+ }
+
+ @Test
+ public void testBasicRemoteFlowWatcher() throws Exception {
+ MockExecutorLoader loader = new MockExecutorLoader();
+
+ EventCollectorListener eventCollector = new EventCollectorListener();
+
+ File workingDir1 = setupDirectory();
+ FlowRunner runner1 = createFlowRunner(workingDir1, loader, eventCollector, "exec1", 1, null, null);
+ Thread runner1Thread = new Thread(runner1);
+
+ File workingDir2 = setupDirectory();
+ RemoteFlowWatcher watcher = new RemoteFlowWatcher(1, loader, 100);
+ FlowRunner runner2 = createFlowRunner(workingDir2, loader, eventCollector, "exec1", 2, watcher, 2);
+ Thread runner2Thread = new Thread(runner2);
+
+ runner1Thread.start();
+ runner2Thread.start();
+ runner2Thread.join();
+
+ FileUtils.deleteDirectory(workingDir1);
+ FileUtils.deleteDirectory(workingDir2);
+
+ testPipelineLevel2(runner1.getExecutableFlow(), runner2.getExecutableFlow());
+ }
+
+ @Test
+ public void testLevel1RemoteFlowWatcher() throws Exception {
+ MockExecutorLoader loader = new MockExecutorLoader();
+
+ EventCollectorListener eventCollector = new EventCollectorListener();
+
+ File workingDir1 = setupDirectory();
+ FlowRunner runner1 = createFlowRunner(workingDir1, loader, eventCollector, "exec1", 1, null, null);
+ Thread runner1Thread = new Thread(runner1);
+
+ File workingDir2 = setupDirectory();
+ RemoteFlowWatcher watcher = new RemoteFlowWatcher(1, loader, 100);
+ FlowRunner runner2 = createFlowRunner(workingDir2, loader, eventCollector, "exec1", 2, watcher, 1);
+ Thread runner2Thread = new Thread(runner2);
+
+ runner1Thread.start();
+ runner2Thread.start();
+ runner2Thread.join();
+
+ FileUtils.deleteDirectory(workingDir1);
+ FileUtils.deleteDirectory(workingDir2);
+
+ testPipelineLevel1(runner1.getExecutableFlow(), runner2.getExecutableFlow());
+ }
+
+ @Test
+ public void testLevel2DiffRemoteFlowWatcher() throws Exception {
+ MockExecutorLoader loader = new MockExecutorLoader();
+
+ EventCollectorListener eventCollector = new EventCollectorListener();
+
+ File workingDir1 = setupDirectory();
+ FlowRunner runner1 = createFlowRunner(workingDir1, loader, eventCollector, "exec1", 1, null, null);
+ Thread runner1Thread = new Thread(runner1);
+
+ File workingDir2 = setupDirectory();
+
+ RemoteFlowWatcher watcher = new RemoteFlowWatcher(1, loader, 100);
+ FlowRunner runner2 = createFlowRunner(workingDir2, loader, eventCollector, "exec1-mod", 2, watcher, 1);
+ Thread runner2Thread = new Thread(runner2);
+
+ runner1Thread.start();
+ runner2Thread.start();
+ runner2Thread.join();
+
+ FileUtils.deleteDirectory(workingDir1);
+ FileUtils.deleteDirectory(workingDir2);
+
+ testPipelineLevel1(runner1.getExecutableFlow(), runner2.getExecutableFlow());
+ }
+
+ private void testPipelineLevel1(ExecutableFlow first, ExecutableFlow second) {
+ for (ExecutableNode node: second.getExecutableNodes()) {
+ Assert.assertEquals(node.getStatus(), Status.SUCCEEDED);
+
+ // check it's start time is after the first's children.
+ ExecutableNode watchedNode = first.getExecutableNode(node.getJobId());
+ if (watchedNode == null) {
+ continue;
+ }
+ Assert.assertEquals(watchedNode.getStatus(), Status.SUCCEEDED);
+
+ System.out.println("Node " + node.getJobId() +
+ " start: " + node.getStartTime() +
+ " dependent on " + watchedNode.getJobId() +
+ " " + watchedNode.getEndTime() +
+ " diff: " + (node.getStartTime() - watchedNode.getEndTime()));
+
+ Assert.assertTrue(node.getStartTime() >= watchedNode.getEndTime());
+
+ long minParentDiff = 0;
+ if (node.getInNodes().size() > 0) {
+ minParentDiff = Long.MAX_VALUE;
+ for (String dependency: node.getInNodes()) {
+ ExecutableNode parent = second.getExecutableNode(dependency);
+ long diff = node.getStartTime() - parent.getEndTime();
+ minParentDiff = Math.min(minParentDiff, diff);
+ }
+ }
+ long diff = node.getStartTime() - watchedNode.getEndTime();
+ Assert.assertTrue(minParentDiff < 500 || diff < 500);
+ }
+ }
+
+ private void testPipelineLevel2(ExecutableFlow first, ExecutableFlow second) {
+ for (ExecutableNode node: second.getExecutableNodes()) {
+ Assert.assertEquals(node.getStatus(), Status.SUCCEEDED);
+
+ // check it's start time is after the first's children.
+ ExecutableNode watchedNode = first.getExecutableNode(node.getJobId());
+ if (watchedNode == null) {
+ continue;
+ }
+ Assert.assertEquals(watchedNode.getStatus(), Status.SUCCEEDED);
+
+ long minDiff = Long.MAX_VALUE;
+ for (String watchedChild: watchedNode.getOutNodes()) {
+ ExecutableNode child = first.getExecutableNode(watchedChild);
+ if (child == null) {
+ continue;
+ }
+ Assert.assertEquals(child.getStatus(), Status.SUCCEEDED);
+ long diff = node.getStartTime() - child.getEndTime();
+ minDiff = Math.min(minDiff, diff);
+ System.out.println("Node " + node.getJobId() +
+ " start: " + node.getStartTime() +
+ " dependent on " + watchedChild + " " + child.getEndTime() +
+ " diff: " + diff);
+ Assert.assertTrue(node.getStartTime() >= child.getEndTime());
+ }
+
+ long minParentDiff = Long.MAX_VALUE;
+ for (String dependency: node.getInNodes()) {
+ ExecutableNode parent = second.getExecutableNode(dependency);
+ long diff = node.getStartTime() - parent.getEndTime();
+ minParentDiff = Math.min(minParentDiff, diff);
+ }
+ System.out.println(" minPipelineTimeDiff:" + minDiff + " minDependencyTimeDiff:" + minParentDiff);
+ Assert.assertTrue(minParentDiff < 500 || minDiff < 500);
+ }
+ }
+
+ private FlowRunner createFlowRunner(File workingDir, ExecutorLoader loader, EventCollectorListener eventCollector, String flowName, int execId, FlowWatcher watcher, Integer pipeline) throws Exception {
+ File testDir = new File("unit/executions/exectest1");
+ ExecutableFlow exFlow = prepareExecDir(workingDir, testDir, flowName, execId);
+ if (watcher != null) {
+ exFlow.setPipelineLevel(pipeline);
+ exFlow.setPipelineExecutionId(watcher.getExecId());
+ }
+ //MockProjectLoader projectLoader = new MockProjectLoader(new File(exFlow.getExecutionPath()));
+
+ loader.uploadExecutableFlow(exFlow);
+ FlowRunner runner = new FlowRunner(exFlow, watcher, loader, fakeProjectLoader, jobtypeManager);
+ runner.addListener(eventCollector);
+
+ return runner;
+ }
+
+ private ExecutableFlow prepareExecDir(File workingDir, File execDir, String flowName, int execId) throws IOException {
+ FileUtils.copyDirectory(execDir, workingDir);
+
+ File jsonFlowFile = new File(workingDir, flowName + ".flow");
+ @SuppressWarnings("unchecked")
+ HashMap<String, Object> flowObj = (HashMap<String, Object>) JSONUtils.parseJSONFromFile(jsonFlowFile);
+
+ Flow flow = Flow.flowFromObject(flowObj);
+ ExecutableFlow execFlow = new ExecutableFlow(flow);
+ execFlow.setExecutionId(execId);
+ execFlow.setExecutionPath(workingDir.getPath());
+ return execFlow;
+ }
+}
diff --git a/unit/java/azkaban/test/execapp/EventCollectorListener.java b/unit/java/azkaban/test/execapp/EventCollectorListener.java
index 1dde864..ba763d1 100644
--- a/unit/java/azkaban/test/execapp/EventCollectorListener.java
+++ b/unit/java/azkaban/test/execapp/EventCollectorListener.java
@@ -1,6 +1,8 @@
package azkaban.test.execapp;
import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashSet;
import azkaban.execapp.event.Event;
import azkaban.execapp.event.Event.Type;
@@ -8,10 +10,17 @@ import azkaban.execapp.event.EventListener;
public class EventCollectorListener implements EventListener {
private ArrayList<Event> eventList = new ArrayList<Event>();
+ private HashSet<Event.Type> filterOutTypes = new HashSet<Event.Type>();
+
+ public void setEventFilterOut(Event.Type ... types) {
+ filterOutTypes.addAll(Arrays.asList(types));
+ }
@Override
public void handleEvent(Event event) {
- eventList.add(event);
+ if (!filterOutTypes.contains(event.getType())) {
+ eventList.add(event);
+ }
}
public ArrayList<Event> getEventList() {
diff --git a/unit/java/azkaban/test/execapp/FlowRunnerTest.java b/unit/java/azkaban/test/execapp/FlowRunnerTest.java
index 9a9d80c..c63fb11 100644
--- a/unit/java/azkaban/test/execapp/FlowRunnerTest.java
+++ b/unit/java/azkaban/test/execapp/FlowRunnerTest.java
@@ -12,12 +12,13 @@ import org.junit.Before;
import org.junit.Test;
import azkaban.execapp.FlowRunner;
+import azkaban.execapp.event.Event;
import azkaban.execapp.event.Event.Type;
import azkaban.executor.ExecutableFlow;
import azkaban.executor.ExecutableNode;
import azkaban.executor.ExecutableFlow.FailureAction;
-import azkaban.executor.ExecutableFlow.Status;
import azkaban.executor.ExecutorLoader;
+import azkaban.executor.Status;
import azkaban.flow.Flow;
import azkaban.jobtype.JobTypeManager;
@@ -61,6 +62,7 @@ public class FlowRunnerTest {
//just making compile. may not work at all.
EventCollectorListener eventCollector = new EventCollectorListener();
+ eventCollector.setEventFilterOut(Event.Type.JOB_FINISHED, Event.Type.JOB_STARTED, Event.Type.JOB_STATUS_CHANGED);
FlowRunner runner = createFlowRunner(loader, eventCollector, "exec1");
Assert.assertTrue(!runner.isCancelled());
@@ -94,6 +96,7 @@ public class FlowRunnerTest {
public void exec1Disabled() throws Exception {
MockExecutorLoader loader = new MockExecutorLoader();
EventCollectorListener eventCollector = new EventCollectorListener();
+ eventCollector.setEventFilterOut(Event.Type.JOB_FINISHED, Event.Type.JOB_STARTED, Event.Type.JOB_STATUS_CHANGED);
File testDir = new File("unit/executions/exectest1");
ExecutableFlow exFlow = prepareExecDir(testDir, "exec1", 1);
@@ -139,6 +142,7 @@ public class FlowRunnerTest {
public void exec1Failed() throws Exception {
MockExecutorLoader loader = new MockExecutorLoader();
EventCollectorListener eventCollector = new EventCollectorListener();
+ eventCollector.setEventFilterOut(Event.Type.JOB_FINISHED, Event.Type.JOB_STARTED, Event.Type.JOB_STATUS_CHANGED);
File testDir = new File("unit/executions/exectest1");
ExecutableFlow flow = prepareExecDir(testDir, "exec2", 1);
@@ -174,6 +178,7 @@ public class FlowRunnerTest {
public void exec1FailedKillAll() throws Exception {
MockExecutorLoader loader = new MockExecutorLoader();
EventCollectorListener eventCollector = new EventCollectorListener();
+ eventCollector.setEventFilterOut(Event.Type.JOB_FINISHED, Event.Type.JOB_STARTED, Event.Type.JOB_STATUS_CHANGED);
File testDir = new File("unit/executions/exectest1");
ExecutableFlow flow = prepareExecDir(testDir, "exec2", 1);
flow.setFailureAction(FailureAction.CANCEL_ALL);
@@ -212,6 +217,7 @@ public class FlowRunnerTest {
public void exec1FailedFinishRest() throws Exception {
MockExecutorLoader loader = new MockExecutorLoader();
EventCollectorListener eventCollector = new EventCollectorListener();
+ eventCollector.setEventFilterOut(Event.Type.JOB_FINISHED, Event.Type.JOB_STARTED, Event.Type.JOB_STATUS_CHANGED);
File testDir = new File("unit/executions/exectest1");
ExecutableFlow flow = prepareExecDir(testDir, "exec3", 1);
flow.setFailureAction(FailureAction.FINISH_ALL_POSSIBLE);
@@ -247,6 +253,7 @@ public class FlowRunnerTest {
public void execAndCancel() throws Exception {
MockExecutorLoader loader = new MockExecutorLoader();
EventCollectorListener eventCollector = new EventCollectorListener();
+ eventCollector.setEventFilterOut(Event.Type.JOB_FINISHED, Event.Type.JOB_STARTED, Event.Type.JOB_STATUS_CHANGED);
FlowRunner runner = createFlowRunner(loader, eventCollector, "exec1");
Assert.assertTrue(!runner.isCancelled());
@@ -354,7 +361,7 @@ public class FlowRunnerTest {
//MockProjectLoader projectLoader = new MockProjectLoader(new File(flow.getExecutionPath()));
loader.uploadExecutableFlow(flow);
- FlowRunner runner = new FlowRunner(flow, loader, fakeProjectLoader, jobtypeManager);
+ FlowRunner runner = new FlowRunner(flow, null, loader, fakeProjectLoader, jobtypeManager);
runner.addListener(eventCollector);
return runner;
@@ -366,7 +373,7 @@ public class FlowRunnerTest {
//MockProjectLoader projectLoader = new MockProjectLoader(new File(exFlow.getExecutionPath()));
loader.uploadExecutableFlow(exFlow);
- FlowRunner runner = new FlowRunner(exFlow, loader, fakeProjectLoader, jobtypeManager);
+ FlowRunner runner = new FlowRunner(exFlow, null, loader, fakeProjectLoader, jobtypeManager);
runner.addListener(eventCollector);
return runner;
diff --git a/unit/java/azkaban/test/execapp/JobRunnerTest.java b/unit/java/azkaban/test/execapp/JobRunnerTest.java
index 41d44f3..e4d305c 100644
--- a/unit/java/azkaban/test/execapp/JobRunnerTest.java
+++ b/unit/java/azkaban/test/execapp/JobRunnerTest.java
@@ -16,8 +16,8 @@ import azkaban.execapp.event.Event;
import azkaban.execapp.event.Event.Type;
import azkaban.executor.ExecutableFlow;
import azkaban.executor.ExecutableNode;
-import azkaban.executor.ExecutableFlow.Status;
import azkaban.executor.ExecutorLoader;
+import azkaban.executor.Status;
import azkaban.jobExecutor.ProcessJob;
import azkaban.jobtype.JobTypeManager;
import azkaban.test.executor.JavaJob;
diff --git a/unit/java/azkaban/test/executor/JdbcExecutorLoaderTest.java b/unit/java/azkaban/test/executor/JdbcExecutorLoaderTest.java
index ba673dd..56ffcc4 100644
--- a/unit/java/azkaban/test/executor/JdbcExecutorLoaderTest.java
+++ b/unit/java/azkaban/test/executor/JdbcExecutorLoaderTest.java
@@ -27,7 +27,7 @@ import azkaban.executor.ExecutionReference;
import azkaban.executor.ExecutorLoader;
import azkaban.executor.ExecutorManagerException;
import azkaban.executor.JdbcExecutorLoader;
-import azkaban.executor.ExecutableFlow.Status;
+import azkaban.executor.Status;
import azkaban.flow.Flow;
import azkaban.utils.DataSourceUtils;