azkaban-developers
Changes
src/java/azkaban/execapp/FlowRunner.java 510(+342 -168)
src/java/azkaban/execapp/JobRunner.java 72(+41 -31)
src/java/azkaban/utils/SwapQueue.java 79(+79 -0)
src/less/azkaban-graph.less 13(+13 -0)
src/less/flow.less 2(+1 -1)
src/less/tables.less 2(+1 -1)
src/web/js/azkaban/util/flow-loader.js 14(+7 -7)
src/web/js/azkaban/view/flow-execution-list.js 23(+13 -10)
Details
diff --git a/src/java/azkaban/execapp/event/FlowWatcher.java b/src/java/azkaban/execapp/event/FlowWatcher.java
index 2dbf05f..9ae7ed0 100644
--- a/src/java/azkaban/execapp/event/FlowWatcher.java
+++ b/src/java/azkaban/execapp/event/FlowWatcher.java
@@ -69,8 +69,7 @@ public abstract class FlowWatcher {
return null;
}
- String[] split = jobId.split(":");
- ExecutableNode node = flow.getExecutableNode(split);
+ ExecutableNode node = flow.getExecutableNodePath(jobId);
if (node == null) {
return null;
}
@@ -85,8 +84,7 @@ public abstract class FlowWatcher {
}
public Status peekStatus(String jobId) {
- String[] split = jobId.split(":");
- ExecutableNode node = flow.getExecutableNode(split);
+ ExecutableNode node = flow.getExecutableNodePath(jobId);
if (node != null) {
return node.getStatus();
}
src/java/azkaban/execapp/FlowRunner.java 510(+342 -168)
diff --git a/src/java/azkaban/execapp/FlowRunner.java b/src/java/azkaban/execapp/FlowRunner.java
index db33b46..cab5f58 100644
--- a/src/java/azkaban/execapp/FlowRunner.java
+++ b/src/java/azkaban/execapp/FlowRunner.java
@@ -18,8 +18,11 @@ package azkaban.execapp;
import java.io.File;
import java.io.IOException;
+import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
+import java.util.HashSet;
+import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Set;
@@ -53,7 +56,12 @@ import azkaban.project.ProjectLoader;
import azkaban.project.ProjectManagerException;
import azkaban.utils.Props;
import azkaban.utils.PropsUtils;
+import azkaban.utils.SwapQueue;
+/**
+ * Class that handles the running of a ExecutableFlow DAG
+ *
+ */
public class FlowRunner extends EventHandler implements Runnable {
private static final Layout DEFAULT_LAYOUT = new PatternLayout("%d{dd-MM-yyyy HH:mm:ss z} %c{1} %p - %m\n");
// We check update every 5 minutes, just in case things get stuck. But for the most part, we'll be idling.
@@ -87,6 +95,9 @@ public class FlowRunner extends EventHandler implements Runnable {
private JobRunnerEventListener listener = new JobRunnerEventListener();
private Set<JobRunner> activeJobRunners = Collections.newSetFromMap(new ConcurrentHashMap<JobRunner, Boolean>());
+ // Thread safe swap queue for finishedExecutions.
+ private SwapQueue<ExecutableNode> finishedNodes;
+
// Used for pipelining
private Integer pipelineLevel = null;
private Integer pipelineExecId = null;
@@ -105,10 +116,35 @@ public class FlowRunner extends EventHandler implements Runnable {
private boolean flowFinished = false;
private boolean flowCancelled = false;
+ // The following is state that will trigger a retry of all failed jobs
+ private boolean retryFailedJobs = false;
+
+
+ /**
+ * Constructor.
+ * This will create its own ExecutorService for thread pools
+ *
+ * @param flow
+ * @param executorLoader
+ * @param projectLoader
+ * @param jobtypeManager
+ * @throws ExecutorManagerException
+ */
public FlowRunner(ExecutableFlow flow, ExecutorLoader executorLoader, ProjectLoader projectLoader, JobTypeManager jobtypeManager) throws ExecutorManagerException {
this(flow, executorLoader, projectLoader, jobtypeManager, null);
}
+ /**
+ * Constructor.
+ * If executorService is null, then it will create it's own for thread pools.
+ *
+ * @param flow
+ * @param executorLoader
+ * @param projectLoader
+ * @param jobtypeManager
+ * @param executorService
+ * @throws ExecutorManagerException
+ */
public FlowRunner(ExecutableFlow flow, ExecutorLoader executorLoader, ProjectLoader projectLoader, JobTypeManager jobtypeManager, ExecutorService executorService) throws ExecutorManagerException {
this.execId = flow.getExecutionId();
this.flow = flow;
@@ -123,6 +159,7 @@ public class FlowRunner extends EventHandler implements Runnable {
this.failureAction = options.getFailureAction();
this.proxyUsers = flow.getProxyUsers();
this.executorService = executorService;
+ this.finishedNodes = new SwapQueue<ExecutableNode>();
}
public FlowRunner setFlowWatcher(FlowWatcher watcher) {
@@ -317,7 +354,7 @@ public class FlowRunner extends EventHandler implements Runnable {
*/
private void runFlow() throws Exception {
logger.info("Starting flows");
- flow.setStatus(Status.RUNNING);
+ runReadyJob(this.flow);
updateFlow();
while (!flowFinished) {
@@ -331,7 +368,10 @@ public class FlowRunner extends EventHandler implements Runnable {
continue;
}
else {
- if (!progressGraph()) {
+ if (retryFailedJobs) {
+ retryAllFailures();
+ }
+ else if (!progressGraph()) {
try {
mainSyncObj.wait(CHECK_WAIT_MS);
} catch (InterruptedException e) {
@@ -344,53 +384,186 @@ public class FlowRunner extends EventHandler implements Runnable {
logger.info("Finishing up flow. Awaiting Termination");
executorService.shutdown();
- finalizeFlow(flow);
updateFlow();
logger.info("Finished Flow");
}
- private boolean progressGraph() throws IOException {
- List<ExecutableNode> jobsReadyToRun = flow.findNextJobsToRun();
-
- // If its the current flow
- if (jobsReadyToRun.size() == 1 && jobsReadyToRun.get(0) == flow) {
- flowFinished = true;
- return true;
+ private void retryAllFailures() throws IOException {
+ logger.info("Restarting all failed jobs");
+
+ this.retryFailedJobs = false;
+ this.flowCancelled = false;
+ this.flowFailed = false;
+ this.flow.setStatus(Status.RUNNING);
+
+ ArrayList<ExecutableNode> retryJobs = new ArrayList<ExecutableNode>();
+ resetFailedState(this.flow, retryJobs);
+
+ for (ExecutableNode node: retryJobs) {
+ if(node.getStatus() == Status.READY || node.getStatus() == Status.DISABLED) {
+ runReadyJob(node);
+ }
+ else if (node.getStatus() == Status.SUCCEEDED){
+ for (String outNodeId: node.getOutNodes()) {
+ ExecutableFlowBase base = node.getParentFlow();
+ runReadyJob(base.getExecutableNode(outNodeId));
+ }
+ }
+
+ runReadyJob(node);
}
+
+ updateFlow();
+ }
+
+ private boolean progressGraph() throws IOException {
+ finishedNodes.swap();
- for (ExecutableNode node: jobsReadyToRun) {
- Status nextStatus = getImpliedStatus(node);
+ // The following nodes are finished, so we'll collect a list of outnodes
+ // that are candidates for running next.
+ HashSet<ExecutableNode> nodesToCheck = new HashSet<ExecutableNode>();
+ for (ExecutableNode node: finishedNodes) {
+ Set<String> outNodeIds = node.getOutNodes();
+ ExecutableFlowBase parentFlow = node.getParentFlow();
- if (node instanceof ExecutableFlowBase && ((ExecutableFlowBase)node).isFlowFinished()) {
- finalizeFlow((ExecutableFlowBase)node);
- fireEventListeners(Event.create(this, Type.JOB_FINISHED, node));
+ // If a job is seen as failed, then we set the parent flow to FAILED_FINISHING
+ if (node.getStatus() == Status.FAILED) {
+ // The job cannot be retried or has run out of retry attempts. We will
+ // fail the job and its flow now.
+ if (!retryJobIfPossible(node)) {
+ propagateStatus(node.getParentFlow(), Status.FAILED_FINISHING);
+ if (failureAction == FailureAction.CANCEL_ALL) {
+ this.cancel();
+ }
+ this.flowFailed = true;
+ }
+ else {
+ nodesToCheck.add(node);
+ continue;
+ }
}
- else if (nextStatus == Status.KILLED || isCancelled()) {
- logger.info("Killing " + node.getId() + " due to prior errors.");
- node.killNode(System.currentTimeMillis());
- fireEventListeners(Event.create(this, Type.JOB_FINISHED, node));
+
+ if (outNodeIds.isEmpty()) {
+ // There's no outnodes means it's the end of a flow, so we finalize
+ // and fire an event.
+ finalizeFlow(parentFlow);
+ finishExecutableNode(parentFlow);
+
+ // If the parent has a parent, then we process
+ if (!(parentFlow instanceof ExecutableFlow)) {
+ outNodeIds = parentFlow.getOutNodes();
+ parentFlow = parentFlow.getParentFlow();
+ }
}
- else if (nextStatus == Status.DISABLED) {
- logger.info("Skipping disabled job " + node.getId() + ".");
- node.skipNode(System.currentTimeMillis());
- fireEventListeners(Event.create(this, Type.JOB_FINISHED, node));
+
+ // Add all out nodes from the finished job. We'll check against this set to
+ // see if any are candidates for running.
+ for (String nodeId: outNodeIds) {
+ ExecutableNode outNode = parentFlow.getExecutableNode(nodeId);
+ nodesToCheck.add(outNode);
+ }
+ }
+
+ // Runs candidate jobs. The code will check to see if they are ready to run before
+ // Instant kill or skip if necessary.
+ boolean jobsRun = false;
+ for (ExecutableNode node: nodesToCheck) {
+ if (Status.isStatusFinished(node.getStatus()) ||
+ Status.isStatusRunning(node.getStatus())) {
+ // Really shouldn't get in here.
+ continue;
+ }
+
+ jobsRun |= runReadyJob(node);
+ }
+
+ if (jobsRun || finishedNodes.getSize() > 0 ) {
+ updateFlow();
+ return true;
+ }
+
+ return false;
+ }
+ private boolean runReadyJob(ExecutableNode node) throws IOException {
+ if (Status.isStatusFinished(node.getStatus()) ||
+ Status.isStatusRunning(node.getStatus())) {
+ return false;
+ }
+
+ Status nextNodeStatus = getImpliedStatus(node);
+ if (nextNodeStatus == null) {
+ return false;
+ }
+
+ if (nextNodeStatus == Status.KILLED) {
+ logger.info("Killing '" + node.getNestedId() + "' due to prior errors.");
+ node.killNode(System.currentTimeMillis());
+ finishExecutableNode(node);
+ }
+ else if (nextNodeStatus == Status.SKIPPED) {
+ logger.info("Skipping disabled job '" + node.getId() + "'.");
+ node.skipNode(System.currentTimeMillis());
+ finishExecutableNode(node);
+ }
+ else if (nextNodeStatus == Status.READY) {
+ if (node instanceof ExecutableFlowBase) {
+ ExecutableFlowBase flow = ((ExecutableFlowBase) node);
+ logger.info("Running flow '" + flow.getNestedId() + "'.");
+ flow.setStatus(Status.RUNNING);
+ flow.setStartTime(System.currentTimeMillis());
+ prepareJobProperties(flow);
+
+ for (String startNodeId: ((ExecutableFlowBase) node).getStartNodes()) {
+ ExecutableNode startNode = flow.getExecutableNode(startNodeId);
+ runReadyJob(startNode);
+ }
}
else {
runExecutableNode(node);
}
}
-
- if (!jobsReadyToRun.isEmpty()) {
- updateFlow();
+ return true;
+ }
+
+ private boolean retryJobIfPossible(ExecutableNode node) {
+ if (node instanceof ExecutableFlowBase) {
+ return false;
+ }
+
+ if (node.getRetries() > node.getAttempt()) {
+ logger.info("Job '" + node.getId() + "' will be retried. Attempt " + node.getAttempt() + " of " + node.getRetries());
+ node.setDelayedExecution(node.getRetryBackoff());
+ node.resetForRetry();
return true;
}
else {
+ if (node.getRetries() > 0) {
+ logger.info("Job '" + node.getId() + "' has run out of retry attempts");
+ // Setting delayed execution to 0 in case this is manually re-tried.
+ node.setDelayedExecution(0);
+ }
+
return false;
}
}
+ private void propagateStatus(ExecutableFlowBase base, Status status) {
+ if (!Status.isStatusFinished(base.getStatus())) {
+ logger.info("Setting " + base.getNestedId() + " to " + status);
+ base.setStatus(status);
+ if (base.getParentFlow() != null) {
+ propagateStatus(base.getParentFlow(), status);
+ }
+ }
+ }
+
+ private void finishExecutableNode(ExecutableNode node) {
+ finishedNodes.add(node);
+ fireEventListeners(Event.create(this, Type.JOB_FINISHED, node));
+ }
+
private void finalizeFlow(ExecutableFlowBase flow) {
- String id = flow == this.flow ? "" : flow.getNestedId() + " ";
+ String id = flow == this.flow ? "" : flow.getNestedId();
// If it's not the starting flow, we'll create set of output props
// for the finished flow.
@@ -418,21 +591,31 @@ public class FlowRunner extends EventHandler implements Runnable {
}
flow.setEndTime(System.currentTimeMillis());
+ flow.setUpdateTime(System.currentTimeMillis());
+ long durationSec = (flow.getEndTime() - flow.getStartTime()) / 1000;
switch(flow.getStatus()) {
case FAILED_FINISHING:
- logger.info("Setting flow " + id + "status to Failed.");
+ logger.info("Setting flow '" + id + "' status to FAILED in " + durationSec + " seconds");
flow.setStatus(Status.FAILED);
+ break;
case FAILED:
case KILLED:
case FAILED_SUCCEEDED:
- logger.info("Flow " + id + "is set to " + flow.getStatus().toString());
+ logger.info("Flow '" + id + "' is set to " + flow.getStatus().toString() + " in " + durationSec + " seconds");
break;
default:
flow.setStatus(Status.SUCCEEDED);
- logger.info("Flow " + id + "is set to " + flow.getStatus().toString());
+ logger.info("Flow '" + id + "' is set to " + flow.getStatus().toString() + " in " + durationSec + " seconds");
+ }
+
+ // If the finalized flow is actually the top level flow, than we finish
+ // the main loop.
+ if (flow instanceof ExecutableFlow) {
+ flowFinished = true;
}
}
+
@SuppressWarnings("unchecked")
private void prepareJobProperties(ExecutableNode node) throws IOException {
Props props = null;
@@ -515,66 +698,63 @@ public class FlowRunner extends EventHandler implements Runnable {
// Collect output props from the job's dependencies.
prepareJobProperties(node);
- if (node instanceof ExecutableFlowBase) {
- node.setStatus(Status.RUNNING);
- node.setStartTime(System.currentTimeMillis());
-
- logger.info("Starting subflow " + node.getNestedId() + ".");
- }
- else {
- node.setStatus(Status.QUEUED);
- JobRunner runner = createJobRunner(node);
- logger.info("Submitting job " + node.getNestedId() + " to run.");
- try {
- executorService.submit(runner);
- activeJobRunners.add(runner);
- } catch (RejectedExecutionException e) {
- logger.error(e);
- };
- }
+ node.setStatus(Status.QUEUED);
+ JobRunner runner = createJobRunner(node);
+ logger.info("Submitting job '" + node.getNestedId() + "' to run.");
+ try {
+ executorService.submit(runner);
+ activeJobRunners.add(runner);
+ } catch (RejectedExecutionException e) {
+ logger.error(e);
+ };
+
}
/**
- * Determines what the state of the next node should be.
+ * Determines what the state of the next node should be. Returns null if
+ * the node should not be run.
*
* @param node
* @return
*/
public Status getImpliedStatus(ExecutableNode node) {
- if (flowFailed && failureAction == ExecutionOptions.FailureAction.FINISH_CURRENTLY_RUNNING) {
- return Status.KILLED;
- }
- else if (node.getStatus() == Status.DISABLED) {
- return Status.DISABLED;
+ // If it's running or finished with 'SUCCEEDED', than don't even
+ // bother starting this job.
+ if (Status.isStatusRunning(node.getStatus()) ||
+ node.getStatus() == Status.SUCCEEDED) {
+ return null;
}
+ // Go through the node's dependencies. If all of the previous job's
+ // statuses is finished and not FAILED or KILLED, than we can safely
+ // run this job.
ExecutableFlowBase flow = node.getParentFlow();
boolean shouldKill = false;
for (String dependency: node.getInNodes()) {
ExecutableNode dependencyNode = flow.getExecutableNode(dependency);
Status depStatus = dependencyNode.getStatus();
- switch (depStatus) {
- case FAILED:
- case KILLED:
- shouldKill = true;
- case SKIPPED:
- case SUCCEEDED:
- case FAILED_SUCCEEDED:
- continue;
- default:
- // Should never come here.
+ if (!Status.isStatusFinished(depStatus)) {
return null;
}
- }
-
- if (shouldKill) {
- return Status.KILLED;
+ else if (depStatus == Status.FAILED || depStatus == Status.KILLED) {
+ // We propagate failures as KILLED states.
+ shouldKill = true;
+ }
}
// If it's disabled but ready to run, we want to make sure it continues being disabled.
- if (node.getStatus() == Status.DISABLED) {
- return Status.DISABLED;
+ if (node.getStatus() == Status.DISABLED || node.getStatus() == Status.SKIPPED) {
+ return Status.SKIPPED;
+ }
+
+ // If the flow has failed, and we want to finish only the currently running jobs, we just
+ // kill everything else. We also kill, if the flow has been cancelled.
+ if (flowFailed && failureAction == ExecutionOptions.FailureAction.FINISH_CURRENTLY_RUNNING) {
+ return Status.KILLED;
+ }
+ else if (shouldKill || isCancelled()) {
+ return Status.KILLED;
}
// All good to go, ready to run.
@@ -686,58 +866,97 @@ public class FlowRunner extends EventHandler implements Runnable {
public void retryFailures(String user) {
synchronized(mainSyncObj) {
logger.info("Retrying failures invoked by " + user);
- retryFailures(flow);
-
- flow.setStatus(Status.RUNNING);
- flow.setUpdateTime(System.currentTimeMillis());
- flowFailed = false;
-
- updateFlow();
+ retryFailedJobs = true;
interrupt();
}
}
- private void retryFailures(ExecutableFlowBase flow) {
- for (ExecutableNode node: flow.getExecutableNodes()) {
- if (node instanceof ExecutableFlowBase) {
- if (node.getStatus() == Status.FAILED || node.getStatus() == Status.FAILED_FINISHING || node.getStatus() == Status.KILLED) {
- retryFailures((ExecutableFlowBase)node);
- }
- }
+ private void resetFailedState(ExecutableFlowBase flow, List<ExecutableNode> nodesToRetry) {
+ //bottom up
+ LinkedList<ExecutableNode> queue = new LinkedList<ExecutableNode>();
+ for (String id : flow.getEndNodes()) {
+ ExecutableNode node = flow.getExecutableNode(id);
+ queue.add(node);
+ }
+
+ long maxStartTime = -1;
+ while (!queue.isEmpty()) {
+ ExecutableNode node = queue.poll();
+ Status oldStatus = node.getStatus();
+ maxStartTime = Math.max(node.getStartTime(), maxStartTime);
- if (node.getStatus() == Status.FAILED) {
- node.resetForRetry();
- logger.info("Re-enabling job " + node.getNestedId() + " attempt " + node.getAttempt());
- reEnableDependents(node);
+ long currentTime = System.currentTimeMillis();
+ if (node.getStatus() == Status.SUCCEEDED) {
+ // This is a candidate parent for restart
+ nodesToRetry.add(node);
+ continue;
}
- else if (node.getStatus() == Status.KILLED) {
- node.setStartTime(-1);
+ else if (node.getStatus() == Status.RUNNING) {
+ continue;
+ }
+ else if (node.getStatus() == Status.SKIPPED) {
+ node.setStatus(Status.DISABLED);
node.setEndTime(-1);
- node.setStatus(Status.READY);
+ node.setStartTime(-1);
+ node.setUpdateTime(currentTime);
+ }
+ else if (node instanceof ExecutableFlowBase) {
+ ExecutableFlowBase base = (ExecutableFlowBase)node;
+ switch (base.getStatus()) {
+ case KILLED:
+ case FAILED:
+ case FAILED_FINISHING:
+ resetFailedState(base, nodesToRetry);
+ default:
+ }
+
+ if (base.getStatus() != Status.KILLED) {
+ continue;
+ }
}
- else if (node.getStatus() == Status.FAILED_FINISHING) {
+ else if (node.getStatus() == Status.KILLED) {
+ // Not a flow, but killed
+ node.setStatus(Status.READY);
node.setStartTime(-1);
node.setEndTime(-1);
- node.setStatus(Status.READY);
+ node.setUpdateTime(currentTime);
+ }
+ else if(node.getStatus() == Status.FAILED) {
+ node.resetForRetry();
+ nodesToRetry.add(node);
+ }
+
+ if (!(node instanceof ExecutableFlowBase) && node.getStatus() != oldStatus) {
+ logger.info("Resetting job '" + node.getNestedId() + "' from " + oldStatus + " to " + node.getStatus());
}
- }
- }
-
- private void reEnableDependents(ExecutableNode node) {
- for(String dependent: node.getOutNodes()) {
- ExecutableNode dependentNode = node.getParentFlow().getExecutableNode(dependent);
- if (dependentNode.getStatus() == Status.KILLED) {
- dependentNode.setStatus(Status.READY);
- dependentNode.setUpdateTime(System.currentTimeMillis());
- reEnableDependents(dependentNode);
+ for(String inId: node.getInNodes()) {
+ ExecutableNode nodeUp = flow.getExecutableNode(inId);
+ queue.add(nodeUp);
}
- else if (dependentNode.getStatus() == Status.SKIPPED) {
- dependentNode.setStatus(Status.DISABLED);
- dependentNode.setUpdateTime(System.currentTimeMillis());
- reEnableDependents(dependentNode);
+ }
+
+ Status oldFlowState = flow.getStatus();
+ if (maxStartTime == -1) {
+ // Nothing has run inside the flow, so we assume the flow hasn't even started running yet.
+ flow.setStatus(Status.READY);
+ }
+ else {
+ flow.setStatus(Status.RUNNING);
+
+ // Add any READY start nodes. Usually it means the flow started, but the start node has not.
+ for (String id: flow.getStartNodes()) {
+ ExecutableNode node = flow.getExecutableNode(id);
+ if (node.getStatus() == Status.READY || node.getStatus() == Status.DISABLED) {
+ nodesToRetry.add(node);
+ }
}
}
+ flow.setUpdateTime(System.currentTimeMillis());
+ flow.setEndTime(-1);
+ flow.setStartTime(maxStartTime);
+
+ logger.info("Resetting flow '" + flow.getNestedId() + "' from " + oldFlowState + " to " + flow.getStatus());
}
private void interrupt() {
@@ -756,61 +975,17 @@ public class FlowRunner extends EventHandler implements Runnable {
updateFlow();
}
else if (event.getType() == Type.JOB_FINISHED) {
+ ExecutableNode node = runner.getNode();
+ long seconds = (node.getEndTime() - node.getStartTime())/1000;
synchronized(mainSyncObj) {
- ExecutableNode node = runner.getNode();
- activeJobRunners.remove(node.getId());
-
- String id = node.getNestedId();
- logger.info("Job Finished " + id + " with status " + node.getStatus());
- if (node.getOutputProps() != null && node.getOutputProps().size() > 0) {
- logger.info("Job " + id + " had output props.");
- }
-
- if (node.getStatus() == Status.FAILED) {
- if (runner.getRetries() > node.getAttempt()) {
- logger.info("Job " + id + " will be retried. Attempt " + node.getAttempt() + " of " + runner.getRetries());
- node.setDelayedExecution(runner.getRetryBackoff());
- node.resetForRetry();
- }
- else {
- if (runner.getRetries() > 0) {
- logger.info("Job " + id + " has run out of retry attempts");
- // Setting delayed execution to 0 in case this is manually re-tried.
- node.setDelayedExecution(0);
- }
-
- flowFailed = true;
-
- // The KILLED status occurs when cancel is invoked. We want to keep this
- // status even in failure conditions.
- if (!flowCancelled) {
- // During a failure, we propagate the failure to parent flows
- propagateStatus(node.getParentFlow(), Status.FAILED_FINISHING);
-
- if (failureAction == FailureAction.CANCEL_ALL) {
- logger.info("Flow failed. Failure option is Cancel All. Stopping execution.");
- cancel();
- }
- }
- }
-
- }
- updateFlow();
-
+ logger.info("Job " + node.getNestedId() + " finished with status " + node.getStatus() + " in " + seconds + " seconds");
+ finishedNodes.add(node);
+ node.getParentFlow().setUpdateTime(System.currentTimeMillis());
interrupt();
fireEventListeners(event);
}
}
}
-
- private void propagateStatus(ExecutableFlowBase base, Status status) {
- if (!Status.isStatusFinished(base.getStatus())) {
- base.setStatus(status);
- if (base.getParentFlow() != null) {
- propagateStatus(base.getParentFlow(), status);
- }
- }
- }
}
public boolean isCancelled() {
@@ -826,10 +1001,10 @@ public class FlowRunner extends EventHandler implements Runnable {
}
public File getJobLogFile(String jobId, int attempt) {
- ExecutableNode node = flow.getExecutableNode(jobId);
+ ExecutableNode node = flow.getExecutableNodePath(jobId);
File path = new File(execDir, node.getJobSource());
- String logFileName = JobRunner.createLogFileName(execId, jobId, attempt);
+ String logFileName = JobRunner.createLogFileName(node, attempt);
File logFile = new File(path.getParentFile(), logFileName);
if (!logFile.exists()) {
@@ -840,23 +1015,22 @@ public class FlowRunner extends EventHandler implements Runnable {
}
public File getJobAttachmentFile(String jobId, int attempt) {
- ExecutableNode node = flow.getExecutableNode(jobId);
- File path = new File(execDir, node.getJobSource());
+ ExecutableNode node = flow.getExecutableNodePath(jobId);
+ File path = new File(execDir, node.getJobSource());
- String attachmentFileName =
- JobRunner.createAttachmentFileName(execId, jobId, attempt);
- File attachmentFile = new File(path.getParentFile(), attachmentFileName);
- if (!attachmentFile.exists()) {
- return null;
- }
- return attachmentFile;
+ String attachmentFileName = JobRunner.createAttachmentFileName(node, attempt);
+ File attachmentFile = new File(path.getParentFile(), attachmentFileName);
+ if (!attachmentFile.exists()) {
+ return null;
+ }
+ return attachmentFile;
}
public File getJobMetaDataFile(String jobId, int attempt) {
- ExecutableNode node = flow.getExecutableNode(jobId);
+ ExecutableNode node = flow.getExecutableNodePath(jobId);
File path = new File(execDir, node.getJobSource());
- String metaDataFileName = JobRunner.createMetaDataFileName(execId, jobId, attempt);
+ String metaDataFileName = JobRunner.createMetaDataFileName(node, attempt);
File metaDataFile = new File(path.getParentFile(), metaDataFileName);
if (!metaDataFile.exists()) {
src/java/azkaban/execapp/JobRunner.java 72(+41 -31)
diff --git a/src/java/azkaban/execapp/JobRunner.java b/src/java/azkaban/execapp/JobRunner.java
index 211507f..3de3d23 100644
--- a/src/java/azkaban/execapp/JobRunner.java
+++ b/src/java/azkaban/execapp/JobRunner.java
@@ -205,12 +205,7 @@ public class JobRunner extends EventHandler implements Runnable {
logger = Logger.getLogger(loggerName);
// Create file appender
- String id = this.jobId;
- if (node.getExecutableFlow() != node.getParentFlow()) {
- id = node.getPrintableId("._.");
- }
-
- String logName = createLogFileName(this.executionId, id, node.getAttempt());
+ String logName = createLogFileName(node);
logFile = new File(workingDir, logName);
String absolutePath = logFile.getAbsolutePath();
@@ -231,8 +226,7 @@ public class JobRunner extends EventHandler implements Runnable {
}
private void createAttachmentFile() {
- String fileName = createAttachmentFileName(
- this.executionId, this.jobId, node.getAttempt());
+ String fileName = createAttachmentFileName(node);
File file = new File(workingDir, fileName);
attachmentFileName = file.getAbsolutePath();
}
@@ -485,8 +479,7 @@ public class JobRunner extends EventHandler implements Runnable {
}
props.put(CommonJobProperties.JOB_ATTEMPT, node.getAttempt());
- props.put(CommonJobProperties.JOB_METADATA_FILE,
- createMetaDataFileName(executionId, this.jobId, node.getAttempt()));
+ props.put(CommonJobProperties.JOB_METADATA_FILE, createMetaDataFileName(node));
props.put(CommonJobProperties.JOB_ATTACHMENT_FILE, attachmentFileName);
changeStatus(Status.RUNNING);
@@ -618,32 +611,49 @@ public class JobRunner extends EventHandler implements Runnable {
return logFile;
}
- public int getRetries() {
- return props.getInt("retries", 0);
+ public static String createLogFileName(ExecutableNode node, int attempt) {
+ int executionId = node.getExecutableFlow().getExecutionId();
+ String jobId = node.getId();
+ if (node.getExecutableFlow() != node.getParentFlow()) {
+ // Posix safe file delimiter
+ jobId = node.getPrintableId("._.");
+ }
+ return attempt > 0 ? "_job." + executionId + "." + attempt + "." + jobId + ".log" : "_job." + executionId + "." + jobId + ".log";
}
- public long getRetryBackoff() {
- return props.getLong("retry.backoff", 0);
+ public static String createLogFileName(ExecutableNode node) {
+ return JobRunner.createLogFileName(node, node.getAttempt());
+ }
+
+ public static String createMetaDataFileName(ExecutableNode node, int attempt) {
+ int executionId = node.getExecutableFlow().getExecutionId();
+ String jobId = node.getId();
+ if (node.getExecutableFlow() != node.getParentFlow()) {
+ // Posix safe file delimiter
+ jobId = node.getPrintableId("._.");
+ }
+
+ return attempt > 0 ? "_job." + executionId + "." + attempt + "." + jobId + ".meta" : "_job." + executionId + "." + jobId + ".meta";
}
- public static String createAttachmentFileName(
- int executionId, String jobId, int attempt) {
- return attempt > 0
- ? "_job." + executionId + "." + attempt + "." + jobId + ".attach"
- : "_job." + executionId + "." + jobId + ".attach";
+ public static String createMetaDataFileName(ExecutableNode node) {
+ return JobRunner.createMetaDataFileName(node, node.getAttempt());
+ }
+
+ public static String createAttachmentFileName(ExecutableNode node) {
+
+ return JobRunner.createAttachmentFileName(node, node.getAttempt());
}
+
+ public static String createAttachmentFileName(ExecutableNode node, int attempt) {
+ int executionId = node.getExecutableFlow().getExecutionId();
+ String jobId = node.getId();
+ if (node.getExecutableFlow() != node.getParentFlow()) {
+ // Posix safe file delimiter
+ jobId = node.getPrintableId("._.");
+ }
+
+ return attempt > 0 ? "_job." + executionId + "." + attempt + "." + jobId + ".attach" : "_job." + executionId + "." + jobId + ".attach";
- public static String createLogFileName(
- int executionId, String jobId, int attempt) {
- return attempt > 0
- ? "_job." + executionId + "." + attempt + "." + jobId + ".log"
- : "_job." + executionId + "." + jobId + ".log";
- }
-
- public static String createMetaDataFileName(
- int executionId, String jobId, int attempt) {
- return attempt > 0
- ? "_job." + executionId + "." + attempt + "." + jobId + ".meta"
- : "_job." + executionId + "." + jobId + ".meta";
}
}
diff --git a/src/java/azkaban/executor/ExecutableFlow.java b/src/java/azkaban/executor/ExecutableFlow.java
index 7c721cd..bc33c0e 100644
--- a/src/java/azkaban/executor/ExecutableFlow.java
+++ b/src/java/azkaban/executor/ExecutableFlow.java
@@ -225,4 +225,10 @@ public class ExecutableFlow extends ExecutableFlowBase {
updateData.put(EXECUTIONID_PARAM, this.executionId);
return updateData;
}
+
+ public void resetForRetry() {
+ super.resetForRetry();
+ this.setStatus(Status.RUNNING);
+ }
+
}
\ No newline at end of file
diff --git a/src/java/azkaban/executor/ExecutableFlowBase.java b/src/java/azkaban/executor/ExecutableFlowBase.java
index 040888b..a760204 100644
--- a/src/java/azkaban/executor/ExecutableFlowBase.java
+++ b/src/java/azkaban/executor/ExecutableFlowBase.java
@@ -123,11 +123,16 @@ public class ExecutableFlowBase extends ExecutableNode {
return executableNodes.get(id);
}
- public ExecutableNode getExecutableNode(String ... ids) {
- return getExecutableNode(this, ids, 0);
+ public ExecutableNode getExecutableNodePath(String ids) {
+ String[] split = ids.split(":");
+ return getExecutableNodePath(split);
}
- private ExecutableNode getExecutableNode(ExecutableFlowBase flow, String[] ids, int currentIdIdx) {
+ public ExecutableNode getExecutableNodePath(String ... ids) {
+ return getExecutableNodePath(this, ids, 0);
+ }
+
+ private ExecutableNode getExecutableNodePath(ExecutableFlowBase flow, String[] ids, int currentIdIdx) {
ExecutableNode node = flow.getExecutableNode(ids[currentIdIdx]);
currentIdIdx++;
@@ -139,7 +144,7 @@ public class ExecutableFlowBase extends ExecutableNode {
return node;
}
else if (node instanceof ExecutableFlowBase) {
- return getExecutableNode((ExecutableFlowBase)node, ids, currentIdIdx);
+ return getExecutableNodePath((ExecutableFlowBase)node, ids, currentIdIdx);
}
else {
return null;
@@ -406,4 +411,13 @@ public class ExecutableFlowBase extends ExecutableNode {
return jobsToRun;
}
+
+ public String getFlowPath() {
+ if (this.getParentFlow() == null) {
+ return this.getFlowId();
+ }
+ else {
+ return this.getParentFlow().getFlowPath() + "," + this.getId() + ":"+ this.getFlowId();
+ }
+ }
}
\ No newline at end of file
diff --git a/src/java/azkaban/executor/ExecutableJobInfo.java b/src/java/azkaban/executor/ExecutableJobInfo.java
index 53eca5c..f993111 100644
--- a/src/java/azkaban/executor/ExecutableJobInfo.java
+++ b/src/java/azkaban/executor/ExecutableJobInfo.java
@@ -16,9 +16,13 @@
package azkaban.executor;
+import java.util.ArrayList;
import java.util.HashMap;
+import java.util.List;
import java.util.Map;
+import azkaban.utils.Pair;
+
public class ExecutableJobInfo {
private final int execId;
private final int projectId;
@@ -29,6 +33,8 @@ public class ExecutableJobInfo {
private final long endTime;
private final Status status;
private final int attempt;
+
+ private ArrayList<Pair<String, String>> jobPath;
public ExecutableJobInfo(int execId, int projectId, int version, String flowId, String jobId, long startTime, long endTime, Status status, int attempt) {
this.execId = execId;
@@ -40,6 +46,8 @@ public class ExecutableJobInfo {
this.flowId = flowId;
this.jobId = jobId;
this.attempt = attempt;
+
+ parseFlowId();
}
public int getProjectId() {
@@ -58,6 +66,20 @@ public class ExecutableJobInfo {
return flowId;
}
+ public String getImmediateFlowId() {
+ if (jobPath.size() == 1) {
+ return flowId;
+ }
+ Pair<String, String> pair = jobPath.get(jobPath.size() - 1);
+ return pair.getSecond();
+ }
+
+ public String getHeadFlowId() {
+ Pair<String, String> pair = jobPath.get(0);
+
+ return pair.getFirst();
+ }
+
public String getJobId() {
return jobId;
}
@@ -78,6 +100,40 @@ public class ExecutableJobInfo {
return attempt;
}
+ public List<Pair<String,String>> getParsedFlowId() {
+ return jobPath;
+ }
+
+ private void parseFlowId() {
+ jobPath = new ArrayList<Pair<String,String>>();
+ String[] flowPairs = flowId.split(",");
+
+ for (String flowPair: flowPairs) {
+ String[] pairSplit = flowPair.split(":");
+ Pair<String, String> pair;
+ if (pairSplit.length == 1) {
+ pair = new Pair<String, String>(pairSplit[0], pairSplit[0]);
+ }
+ else {
+ pair = new Pair<String, String>(pairSplit[0], pairSplit[1]);
+ }
+
+ jobPath.add(pair);
+ }
+ }
+
+ public String getJobIdPath() {
+ // Skip the first one because it's always just the root.
+ String path = "";
+ for (int i=1; i < jobPath.size(); ++i) {
+ Pair<String,String> pair = jobPath.get(i);
+ path += pair.getFirst() + ":";
+ }
+
+ path += jobId;
+ return path;
+ }
+
public Map<String, Object> toObject() {
HashMap<String, Object> map = new HashMap<String, Object>();
map.put("execId", execId);
diff --git a/src/java/azkaban/executor/ExecutableNode.java b/src/java/azkaban/executor/ExecutableNode.java
index 5fbeed2..bacad6c 100644
--- a/src/java/azkaban/executor/ExecutableNode.java
+++ b/src/java/azkaban/executor/ExecutableNode.java
@@ -397,6 +397,7 @@ public class ExecutableNode {
this.setStatus(Status.KILLED);
this.setStartTime(killTime);
this.setEndTime(killTime);
+ this.setUpdateTime(killTime);
}
}
@@ -404,6 +405,7 @@ public class ExecutableNode {
this.setStatus(Status.SKIPPED);
this.setStartTime(skipTime);
this.setEndTime(skipTime);
+ this.setUpdateTime(skipTime);
}
private void updatePastAttempts(List<Object> pastAttemptsList) {
@@ -428,4 +430,13 @@ public class ExecutableNode {
}
}
}
+
+ public int getRetries() {
+ return inputProps.getInt("retries", 0);
+ }
+
+ public long getRetryBackoff() {
+ return inputProps.getLong("retry.backoff", 0);
+ }
}
+
diff --git a/src/java/azkaban/executor/ExecutorManager.java b/src/java/azkaban/executor/ExecutorManager.java
index 8b71e29..f2e979b 100644
--- a/src/java/azkaban/executor/ExecutorManager.java
+++ b/src/java/azkaban/executor/ExecutorManager.java
@@ -80,7 +80,6 @@ public class ExecutorManager extends EventHandler implements ExecutorManagerAdap
File cacheDir;
public ExecutorManager(Props props, ExecutorLoader loader, Map<String, Alerter> alters) throws ExecutorManagerException {
-
this.executorLoader = loader;
this.loadRunningFlows();
executorHost = props.getString("executor.host", "localhost");
diff --git a/src/java/azkaban/executor/JdbcExecutorLoader.java b/src/java/azkaban/executor/JdbcExecutorLoader.java
index e756cfb..8c7a8ed 100644
--- a/src/java/azkaban/executor/JdbcExecutorLoader.java
+++ b/src/java/azkaban/executor/JdbcExecutorLoader.java
@@ -498,14 +498,8 @@ public class JdbcExecutorLoader extends AbstractJdbcLoader
}
ExecutableFlow flow = node.getExecutableFlow();
- String flowId = flow.getFlowId();
-
- // If the main flow is not the parent, then we'll create a composite key
- // for flowID.
- if (flow != node.getParentFlow()) {
- flowId = node.getParentFlow().getNestedId();
- }
-
+ String flowId = node.getParentFlow().getFlowPath();
+ System.out.println("Uploading flowId " + flowId);
QueryRunner runner = createQueryRunner();
try {
runner.update(
@@ -556,7 +550,7 @@ public class JdbcExecutorLoader extends AbstractJdbcLoader
node.getStatus().getNumVal(),
outputParam,
node.getExecutableFlow().getExecutionId(),
- node.getParentFlow().getNestedId(),
+ node.getParentFlow().getFlowPath(),
node.getId(),
node.getAttempt());
} catch (SQLException e) {
@@ -596,7 +590,8 @@ public class JdbcExecutorLoader extends AbstractJdbcLoader
FetchExecutableJobHandler.FETCH_EXECUTABLE_NODE,
new FetchExecutableJobHandler(),
execId,
- jobId);
+ jobId,
+ attempts);
if (info == null || info.isEmpty()) {
return null;
}
diff --git a/src/java/azkaban/migration/schedule2trigger/Schedule2Trigger.java b/src/java/azkaban/migration/schedule2trigger/Schedule2Trigger.java
index 6d1bd39..630d54d 100644
--- a/src/java/azkaban/migration/schedule2trigger/Schedule2Trigger.java
+++ b/src/java/azkaban/migration/schedule2trigger/Schedule2Trigger.java
@@ -1,6 +1,7 @@
package azkaban.migration.schedule2trigger;
import java.io.File;
+
import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
@@ -26,6 +27,7 @@ import azkaban.trigger.builtin.BasicTimeChecker;
import azkaban.trigger.builtin.ExecuteFlowAction;
import azkaban.utils.Utils;
+@SuppressWarnings("deprecation")
public class Schedule2Trigger {
private static final Logger logger = Logger.getLogger(Schedule2Trigger.class);
@@ -162,6 +164,7 @@ public class Schedule2Trigger {
props.storeLocal(outputFile);
}
+ @SuppressWarnings("unchecked")
private static void file2ScheduleTrigger() throws Exception {
TriggerLoader triggerLoader = new JdbcTriggerLoader(props);
src/java/azkaban/utils/SwapQueue.java 79(+79 -0)
diff --git a/src/java/azkaban/utils/SwapQueue.java b/src/java/azkaban/utils/SwapQueue.java
new file mode 100644
index 0000000..9999306
--- /dev/null
+++ b/src/java/azkaban/utils/SwapQueue.java
@@ -0,0 +1,79 @@
+/*
+ * Copyright 2012 LinkedIn Corp.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package azkaban.utils;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Iterator;
+
+/**
+ * Queue that swaps its lists. Allows for non-blocking writes when reading.
+ * Swap should be called before every read.
+ */
+public class SwapQueue<T> implements Iterable<T> {
+ ArrayList<T> primaryQueue;
+ ArrayList<T> secondaryQueue;
+
+ public SwapQueue() {
+ primaryQueue = new ArrayList<T>();
+ secondaryQueue = new ArrayList<T>();
+ }
+
+ /**
+ * Swaps primaryQueue with secondary queue. The previous primary queue will be released.
+ */
+ public synchronized void swap() {
+ primaryQueue = secondaryQueue;
+ secondaryQueue = new ArrayList<T>();
+ }
+
+ /**
+ * Returns a count of the secondary queue.
+ * @return
+ */
+ public synchronized int getSwapQueueSize() {
+ return secondaryQueue.size();
+ }
+
+ public synchronized int getPrimarySize() {
+ return primaryQueue.size();
+ }
+
+ public synchronized void addAll(Collection<T> col) {
+ secondaryQueue.addAll(col);
+ }
+
+ /**
+ * Returns both the secondary and primary size
+ * @return
+ */
+ public synchronized int getSize() {
+ return secondaryQueue.size() + primaryQueue.size();
+ }
+
+ public synchronized void add(T element) {
+ secondaryQueue.add(element);
+ }
+
+ /**
+ * Returns iterator over the primary queue.
+ */
+ @Override
+ public synchronized Iterator<T> iterator() {
+ return primaryQueue.iterator();
+ }
+}
diff --git a/src/java/azkaban/webapp/servlet/ExecutorServlet.java b/src/java/azkaban/webapp/servlet/ExecutorServlet.java
index bcab22d..6f78ac8 100644
--- a/src/java/azkaban/webapp/servlet/ExecutorServlet.java
+++ b/src/java/azkaban/webapp/servlet/ExecutorServlet.java
@@ -17,7 +17,6 @@
package azkaban.webapp.servlet;
import java.io.IOException;
-import java.io.File;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
@@ -31,7 +30,6 @@ import javax.servlet.http.HttpServletResponse;
import azkaban.executor.ExecutableFlow;
import azkaban.executor.ExecutableFlowBase;
import azkaban.executor.ExecutableNode;
-import azkaban.executor.ExecutionAttempt;
import azkaban.executor.ExecutionOptions;
import azkaban.executor.ExecutorManagerAdapter;
import azkaban.executor.ExecutionOptions.FailureAction;
@@ -47,7 +45,6 @@ import azkaban.user.Permission;
import azkaban.user.User;
import azkaban.user.Permission.Type;
import azkaban.utils.FileIOUtils.LogData;
-import azkaban.utils.JSONUtils;
import azkaban.webapp.AzkabanWebServer;
import azkaban.webapp.session.Session;
import azkaban.webapp.plugin.PluginRegistry;
@@ -189,6 +186,7 @@ public class ExecutorServlet extends LoginAbstractAzkabanServlet {
page.add("attempt", attempt);
ExecutableFlow flow = null;
+ ExecutableNode node = null;
try {
flow = executorManager.getExecutableFlow(execId);
if (flow == null) {
@@ -197,7 +195,7 @@ public class ExecutorServlet extends LoginAbstractAzkabanServlet {
return;
}
- ExecutableNode node = flow.getExecutableNode(jobId);
+ node = flow.getExecutableNodePath(jobId);
if (node == null) {
page.add("errorMsg", "Job " + jobId + " doesn't exist in " + flow.getExecutionId());
return;
@@ -219,9 +217,11 @@ public class ExecutorServlet extends LoginAbstractAzkabanServlet {
page.render();
return;
}
-
+
page.add("projectName", project.getName());
- page.add("flowid", flow.getFlowId());
+ page.add("flowid", flow.getId());
+ page.add("parentflowid", node.getParentFlow().getFlowId());
+ page.add("jobname", node.getId());
page.render();
}
@@ -441,7 +441,7 @@ public class ExecutorServlet extends LoginAbstractAzkabanServlet {
resp.setCharacterEncoding("utf-8");
try {
- ExecutableNode node = exFlow.getExecutableNode(jobId);
+ ExecutableNode node = exFlow.getExecutableNodePath(jobId);
if (node == null) {
ret.put("error", "Job " + jobId + " doesn't exist in " + exFlow.getExecutionId());
return;
@@ -475,11 +475,12 @@ public class ExecutorServlet extends LoginAbstractAzkabanServlet {
if (project == null) {
return;
}
-
+
String jobId = this.getParam(req, "jobid");
resp.setCharacterEncoding("utf-8");
+
try {
- ExecutableNode node = exFlow.getExecutableNode(jobId);
+ ExecutableNode node = exFlow.getExecutableNodePath(jobId);
if (node == null) {
ret.put("error", "Job " + jobId + " doesn't exist in " +
exFlow.getExecutionId());
diff --git a/src/java/azkaban/webapp/servlet/velocity/jobdetailsheader.vm b/src/java/azkaban/webapp/servlet/velocity/jobdetailsheader.vm
index 4bf06c3..5b172c8 100644
--- a/src/java/azkaban/webapp/servlet/velocity/jobdetailsheader.vm
+++ b/src/java/azkaban/webapp/servlet/velocity/jobdetailsheader.vm
@@ -24,13 +24,14 @@
</div>
<div class="col-xs-6">
<div class="pull-right az-page-header-form">
- <a href="${context}/manager?project=${projectName}&flow=${flowid}&job=$jobid" class="btn btn-info">Job Properties</a>
+ <a href="${context}/manager?project=${projectName}&flow=${parentflowid}&job=$jobname" class="btn btn-info">Job Properties</a>
</div>
</div>
</div>
</div>
</div>
+
<div class="container-full">
#parse ("azkaban/webapp/servlet/velocity/alerts.vm")
@@ -41,7 +42,7 @@
<li><a href="${context}/manager?project=${projectName}"><strong>Project</strong> $projectName</a></li>
<li><a href="${context}/manager?project=${projectName}&flow=${flowid}"><strong>Flow</strong> $flowid</a></li>
<li><a href="${context}/executor?execid=${execid}#jobslist"><strong>Execution</strong> $execid</a></li>
- <li class="active"><strong>Job</strong> $jobid</li>
+ <li class="active"><strong>Job</strong> $jobid</li>
</ol>
## Tabs
diff --git a/src/java/azkaban/webapp/servlet/velocity/jobdetailspage.vm b/src/java/azkaban/webapp/servlet/velocity/jobdetailspage.vm
index f551245..27f5d40 100644
--- a/src/java/azkaban/webapp/servlet/velocity/jobdetailspage.vm
+++ b/src/java/azkaban/webapp/servlet/velocity/jobdetailspage.vm
@@ -47,7 +47,6 @@
#parse ("azkaban/webapp/servlet/velocity/errormsg.vm")
#else
-
#parse ("azkaban/webapp/servlet/velocity/jobdetailsheader.vm")
## Log content.
diff --git a/src/java/azkaban/webapp/servlet/velocity/jobhistorypage.vm b/src/java/azkaban/webapp/servlet/velocity/jobhistorypage.vm
index bdeca48..63856ec 100644
--- a/src/java/azkaban/webapp/servlet/velocity/jobhistorypage.vm
+++ b/src/java/azkaban/webapp/servlet/velocity/jobhistorypage.vm
@@ -100,10 +100,10 @@
#end
</td>
<td>
- <a href="${context}/manager?project=${projectName}&flow=${job.flowId}&job=${jobid}">${jobid}</a>
+ <a href="${context}/manager?project=${projectName}&flow=${job.immediateFlowId}&job=${jobid}">${jobid}</a>
</td>
<td>
- <a href="${context}/manager?project=${projectName}&flow=${job.flowId}">${job.flowId}</a>
+ <a href="${context}/manager?project=${projectName}&flow=${job.headFlowId}">${job.flowId}</a>
</td>
<td>$utils.formatDate(${job.startTime})</td>
<td>$utils.formatDate(${job.endTime})</td>
@@ -114,7 +114,7 @@
</div>
</td>
<td class="logLink">
- <a href="${context}/executor?execid=${job.execId}&job=${jobid}&attempt=${job.attempt}">Logs</a>
+ <a href="${context}/executor?execid=${job.execId}&job=${job.jobIdPath}&attempt=${job.attempt}">Logs</a>
</td>
</tr>
#end
src/less/azkaban-graph.less 13(+13 -0)
diff --git a/src/less/azkaban-graph.less b/src/less/azkaban-graph.less
index 5cd548c..5a5d213 100644
--- a/src/less/azkaban-graph.less
+++ b/src/less/azkaban-graph.less
@@ -124,6 +124,19 @@
opacity: 0.25;
}
+.QUEUED > g > rect {
+ fill: #39b3d7;
+ stroke: #39b3d7;
+}
+
+.QUEUED > g > text {
+ fill: #FFF;
+}
+
+.QUEUED {
+ opacity: 0.5;
+}
+
/* Edges */
.edge {
stroke: #CCC;
src/less/flow.less 2(+1 -1)
diff --git a/src/less/flow.less b/src/less/flow.less
index 99f6be5..5e60909 100644
--- a/src/less/flow.less
+++ b/src/less/flow.less
@@ -169,7 +169,7 @@ li.tree-list-item {
}
&.subFilter > a > .expandarrow {
- color : #3398cc;
+ color : #f19153;
}
> a {
src/less/tables.less 2(+1 -1)
diff --git a/src/less/tables.less b/src/less/tables.less
index ba34cb4..5d5aeea 100644
--- a/src/less/tables.less
+++ b/src/less/tables.less
@@ -129,7 +129,7 @@ table.table-properties {
&.timeline {
width: 280px;
- padding: 0px;
+ padding: 0px 0px 0px 4px;
height: 100%;
vertical-align: bottom;
margin: 0px;
src/web/js/azkaban/util/flow-loader.js 14(+7 -7)
diff --git a/src/web/js/azkaban/util/flow-loader.js b/src/web/js/azkaban/util/flow-loader.js
index ba193f0..97e447c 100644
--- a/src/web/js/azkaban/util/flow-loader.js
+++ b/src/web/js/azkaban/util/flow-loader.js
@@ -133,7 +133,7 @@ var nodeClickCallback = function(event, model, node) {
}
$.merge(menu, [
- {title: "View Properties...", callback: function() {openJobDisplayCallback(jobId, flowId, event)}},
+ // {title: "View Properties...", callback: function() {openJobDisplayCallback(jobId, flowId, event)}},
{break: 1},
{title: "Open Flow...", callback: function() {window.location.href=flowRequestURL;}},
{title: "Open Flow in New Window...", callback: function() {window.open(flowRequestURL);}},
@@ -146,8 +146,8 @@ var nodeClickCallback = function(event, model, node) {
}
else {
menu = [
- {title: "View Properties...", callback: function() {openJobDisplayCallback(jobId, flowId, event)}},
- {break: 1},
+ // {title: "View Properties...", callback: function() {openJobDisplayCallback(jobId, flowId, event)}},
+ // {break: 1},
{title: "Open Job...", callback: function() {window.location.href=requestURL;}},
{title: "Open Job in New Window...", callback: function() {window.open(requestURL);}},
{break: 1},
@@ -170,8 +170,8 @@ var jobClickCallback = function(event, model, node) {
if (type == "flow") {
var flowRequestURL = contextURL + "/manager?project=" + projectName + "&flow=" + node.flowId;
menu = [
- {title: "View Properties...", callback: function() {openJobDisplayCallback(jobId, flowId, event)}},
- {break: 1},
+ // {title: "View Properties...", callback: function() {openJobDisplayCallback(jobId, flowId, event)}},
+ // {break: 1},
{title: "Open Flow...", callback: function() {window.location.href=flowRequestURL;}},
{title: "Open Flow in New Window...", callback: function() {window.open(flowRequestURL);}},
{break: 1},
@@ -183,8 +183,8 @@ var jobClickCallback = function(event, model, node) {
}
else {
menu = [
- {title: "View Job...", callback: function() {openJobDisplayCallback(jobId, flowId, event)}},
- {break: 1},
+ // {title: "View Job...", callback: function() {openJobDisplayCallback(jobId, flowId, event)}},
+ // {break: 1},
{title: "Open Job...", callback: function() {window.location.href=requestURL;}},
{title: "Open Job in New Window...", callback: function() {window.open(requestURL);}},
{break: 1},
src/web/js/azkaban/view/flow-execution-list.js 23(+13 -10)
diff --git a/src/web/js/azkaban/view/flow-execution-list.js b/src/web/js/azkaban/view/flow-execution-list.js
index 2a0ce22..0ec15a9 100644
--- a/src/web/js/azkaban/view/flow-execution-list.js
+++ b/src/web/js/azkaban/view/flow-execution-list.js
@@ -171,20 +171,29 @@ azkaban.ExecutionListView = Backbone.View.extend({
outerWidth = parseInt(outerWidth);
}
+ var parentLastTime = data.endTime == -1 ? (new Date()).getTime() : data.endTime;
+ var parentStartTime = data.startTime;
+
+ var factor = outerWidth / (flowLastTime - flowStartTime);
+ var outerProgressBarWidth = factor * (parentLastTime - parentStartTime);
+ var outerLeftMargin = factor * (parentStartTime - flowStartTime);
+
var nodes = data.nodes;
- var diff = flowLastTime - flowStartTime;
- var factor = outerWidth/diff;
for (var i = 0; i < nodes.length; ++i) {
var node = nodes[i];
// calculate the progress
var tr = node.joblistrow;
-
+ var outerProgressBar = $(tr).find("> td.timeline > .flow-progress");
var progressBar = $(tr).find("> td.timeline > .flow-progress > .main-progress");
var offsetLeft = 0;
var minOffset = 0;
progressBar.attempt = 0;
+ // Shift the outer progress
+ $(outerProgressBar).css("width", outerProgressBarWidth)
+ $(outerProgressBar).css("margin-left", outerLeftMargin);
+
// Add all the attempts
if (node.pastAttempts) {
var logURL = contextURL + "/executor?execid=" + execId + "&job=" + node.id + "&attempt=" + node.pastAttempts.length;
@@ -212,7 +221,7 @@ azkaban.ExecutionListView = Backbone.View.extend({
}
var nodeLastTime = node.endTime == -1 ? (new Date()).getTime() : node.endTime;
- var left = Math.max((node.startTime-flowStartTime)*factor, minOffset);
+ var left = Math.max((node.startTime-parentStartTime)*factor, minOffset);
var margin = left - offsetLeft;
var width = Math.max((nodeLastTime - node.startTime)*factor, 3);
width = Math.min(width, outerWidth);
@@ -247,12 +256,6 @@ azkaban.ExecutionListView = Backbone.View.extend({
$(subFlowRow).show();
}
},
- expandFlow: function(flow) {
- for (var i = 0; i < flow.nodes.length; ++i) {
- var node = flow.nodes[i];
- ///@TODO Expand.
- }
- },
addNodeRow: function(node, body) {
var self = this;
var tr = document.createElement("tr");
diff --git a/unit/executions/embedded/innerFlow.job b/unit/executions/embedded/innerFlow.job
index e9b3b89..da71d64 100644
--- a/unit/executions/embedded/innerFlow.job
+++ b/unit/executions/embedded/innerFlow.job
@@ -1,4 +1,5 @@
type=javaprocess
+java.class=azkaban.test.executor.SleepJavaJob
seconds=1
fail=false
dependencies=innerJobB,innerJobC
\ No newline at end of file
diff --git a/unit/executions/embedded3/innerFlow.job b/unit/executions/embedded3/innerFlow.job
index e9b3b89..da71d64 100644
--- a/unit/executions/embedded3/innerFlow.job
+++ b/unit/executions/embedded3/innerFlow.job
@@ -1,4 +1,5 @@
type=javaprocess
+java.class=azkaban.test.executor.SleepJavaJob
seconds=1
fail=false
dependencies=innerJobB,innerJobC
\ No newline at end of file
diff --git a/unit/executions/embedded3/innerFlow2.job b/unit/executions/embedded3/innerFlow2.job
index 2346982..e8430ee 100644
--- a/unit/executions/embedded3/innerFlow2.job
+++ b/unit/executions/embedded3/innerFlow2.job
@@ -1,4 +1,5 @@
type=javaprocess
+java.class=azkaban.test.executor.SleepJavaJob
seconds=1
fail=false
dependencies=innerJobA
\ No newline at end of file
diff --git a/unit/java/azkaban/test/execapp/FlowRunnerTest.java b/unit/java/azkaban/test/execapp/FlowRunnerTest.java
index 42026fc..c0f8025 100644
--- a/unit/java/azkaban/test/execapp/FlowRunnerTest.java
+++ b/unit/java/azkaban/test/execapp/FlowRunnerTest.java
@@ -315,7 +315,7 @@ public class FlowRunnerTest {
testStatus(exFlow, "job4", Status.FAILED);
testStatus(exFlow, "job6", Status.FAILED);
- Assert.assertTrue("Expected KILLED status instead got " + exFlow.getStatus(),exFlow.getStatus() == Status.KILLED);
+ Assert.assertTrue("Expected FAILED status instead got " + exFlow.getStatus(),exFlow.getStatus() == Status.FAILED);
try {
eventCollector.checkEventExists(new Type[] {Type.FLOW_STARTED, Type.FLOW_FINISHED});
diff --git a/unit/java/azkaban/test/execapp/FlowRunnerTest2.java b/unit/java/azkaban/test/execapp/FlowRunnerTest2.java
index 0ac1fb6..ada74b4 100644
--- a/unit/java/azkaban/test/execapp/FlowRunnerTest2.java
+++ b/unit/java/azkaban/test/execapp/FlowRunnerTest2.java
@@ -124,7 +124,9 @@ public class FlowRunnerTest2 {
expectedStateMap.put("jobb:innerJobA", Status.RUNNING);
compareStates(expectedStateMap, nodeMap);
- Props jobb = nodeMap.get("jobb").getInputProps();
+ ExecutableNode node = nodeMap.get("jobb");
+ Assert.assertEquals(Status.RUNNING, node.getStatus());
+ Props jobb = node.getInputProps();
Assert.assertEquals("test1.1", jobb.get("param1"));
Assert.assertEquals("test1.1", jobb.get("param1"));
Assert.assertEquals("test1.2", jobb.get("param2"));
@@ -584,7 +586,7 @@ public class FlowRunnerTest2 {
expectedStateMap.put("jobb:innerJobC", Status.FAILED);
expectedStateMap.put("jobb:innerFlow", Status.KILLED);
expectedStateMap.put("jobc", Status.FAILED);
- expectedStateMap.put("jobd", Status.KILLED);
+ expectedStateMap.put("jobd", Status.FAILED);
expectedStateMap.put("jobd:innerJobA", Status.FAILED);
expectedStateMap.put("jobd:innerFlow2", Status.KILLED);
expectedStateMap.put("jobe", Status.KILLED);
@@ -644,6 +646,13 @@ public class FlowRunnerTest2 {
Assert.assertEquals(Status.FAILED_FINISHING, flow.getStatus());
compareStates(expectedStateMap, nodeMap);
+ ExecutableNode node = nodeMap.get("jobd:innerFlow2");
+ ExecutableFlowBase base = node.getParentFlow();
+ for (String nodeId : node.getInNodes()) {
+ ExecutableNode inNode = base.getExecutableNode(nodeId);
+ System.out.println(inNode.getId() + " > " + inNode.getStatus());
+ }
+
runner.retryFailures("me");
pause(500);
expectedStateMap.put("jobb:innerJobB", Status.RUNNING);
@@ -656,6 +665,7 @@ public class FlowRunnerTest2 {
compareStates(expectedStateMap, nodeMap);
Assert.assertTrue(thread.isAlive());
+
InteractiveTestJob.getTestJob("jobb:innerJobB").succeedJob();
InteractiveTestJob.getTestJob("jobb:innerJobC").succeedJob();
InteractiveTestJob.getTestJob("jobd:innerFlow2").succeedJob();
@@ -732,18 +742,18 @@ public class FlowRunnerTest2 {
runner.cancel("me");
pause(250);
- expectedStateMap.put("jobb", Status.KILLED);
+ expectedStateMap.put("jobb", Status.FAILED);
expectedStateMap.put("jobb:innerJobB", Status.FAILED);
expectedStateMap.put("jobb:innerJobC", Status.FAILED);
expectedStateMap.put("jobb:innerFlow", Status.KILLED);
expectedStateMap.put("jobc", Status.FAILED);
- expectedStateMap.put("jobd", Status.KILLED);
+ expectedStateMap.put("jobd", Status.FAILED);
expectedStateMap.put("jobd:innerJobA", Status.FAILED);
expectedStateMap.put("jobd:innerFlow2", Status.KILLED);
expectedStateMap.put("jobe", Status.KILLED);
expectedStateMap.put("jobf", Status.KILLED);
- Assert.assertEquals(Status.KILLED, flow.getStatus());
+ Assert.assertEquals(Status.FAILED, flow.getStatus());
compareStates(expectedStateMap, nodeMap);
Assert.assertFalse(thread.isAlive());
}
@@ -801,7 +811,7 @@ public class FlowRunnerTest2 {
expectedStateMap.put("jobb:innerJobC", Status.FAILED);
expectedStateMap.put("jobb:innerFlow", Status.KILLED);
expectedStateMap.put("jobc", Status.FAILED);
- expectedStateMap.put("jobd", Status.KILLED);
+ expectedStateMap.put("jobd", Status.FAILED);
expectedStateMap.put("jobd:innerJobA", Status.FAILED);
expectedStateMap.put("jobd:innerFlow2", Status.KILLED);
expectedStateMap.put("jobe", Status.KILLED);
diff --git a/unit/java/azkaban/test/execapp/MockExecutorLoader.java b/unit/java/azkaban/test/execapp/MockExecutorLoader.java
index 05641ec..6c1246d 100644
--- a/unit/java/azkaban/test/execapp/MockExecutorLoader.java
+++ b/unit/java/azkaban/test/execapp/MockExecutorLoader.java
@@ -202,5 +202,19 @@ public class MockExecutorLoader implements ExecutorLoader {
return null;
}
+ @Override
+ public List<Object> fetchAttachments(int execId, String name, int attempt)
+ throws ExecutorManagerException {
+ // TODO Auto-generated method stub
+ return null;
+ }
+
+ @Override
+ public void uploadAttachmentFile(ExecutableNode node, File file)
+ throws ExecutorManagerException {
+ // TODO Auto-generated method stub
+
+ }
+
}
\ No newline at end of file