azkaban-memoizeit
Changes
src/java/azkaban/execapp/FlowRunner.java 42(+39 -3)
src/java/azkaban/execapp/JobRunner.java 11(+11 -0)
Details
diff --git a/src/java/azkaban/execapp/event/FlowWatcher.java b/src/java/azkaban/execapp/event/FlowWatcher.java
index 2e4f576..4a13a4b 100644
--- a/src/java/azkaban/execapp/event/FlowWatcher.java
+++ b/src/java/azkaban/execapp/event/FlowWatcher.java
@@ -10,7 +10,7 @@ import azkaban.executor.ExecutableNode;
import azkaban.executor.Status;
public abstract class FlowWatcher {
- private static final Logger logger = Logger.getLogger(FlowWatcher.class);
+ private Logger logger;
private int execId;
private ExecutableFlow flow;
@@ -25,15 +25,19 @@ public abstract class FlowWatcher {
this.flow = flow;
}
+ public void setLogger(Logger logger) {
+ this.logger = logger;
+ }
+
+ protected Logger getLogger() {
+ return this.logger;
+ }
+
/**
* Called to fire events to the JobRunner listeners
* @param jobId
*/
protected synchronized void handleJobFinished(String jobId, Status status) {
- if (cancelWatch) {
- return;
- }
-
BlockingStatus block = map.get(jobId);
if (block != null) {
block.changeStatus(status);
diff --git a/src/java/azkaban/execapp/event/LocalFlowWatcher.java b/src/java/azkaban/execapp/event/LocalFlowWatcher.java
index ea78174..afe9248 100644
--- a/src/java/azkaban/execapp/event/LocalFlowWatcher.java
+++ b/src/java/azkaban/execapp/event/LocalFlowWatcher.java
@@ -3,6 +3,7 @@ package azkaban.execapp.event;
import azkaban.execapp.FlowRunner;
import azkaban.execapp.JobRunner;
+import azkaban.execapp.event.Event.Type;
import azkaban.executor.ExecutableNode;
public class LocalFlowWatcher extends FlowWatcher {
@@ -30,17 +31,30 @@ public class LocalFlowWatcher extends FlowWatcher {
runner.removeListener(watcherListener);
runner = null;
+ getLogger().info("Stopping watcher, and unblocking pipeline");
super.failAllWatches();
}
public class LocalFlowWatcherListener implements EventListener {
@Override
public void handleEvent(Event event) {
- if (event.getRunner() instanceof JobRunner) {
- JobRunner runner = (JobRunner)event.getRunner();
- ExecutableNode node = runner.getNode();
-
- handleJobFinished(node.getJobId(), node.getStatus());
+ if (event.getType() == Type.JOB_FINISHED) {
+ if (event.getRunner() instanceof FlowRunner) {
+ Object data = event.getData();
+ if (data instanceof ExecutableNode) {
+ ExecutableNode node = (ExecutableNode)data;
+ handleJobFinished(node.getJobId(), node.getStatus());
+ }
+ }
+ else if (event.getRunner() instanceof JobRunner) {
+ JobRunner runner = (JobRunner)event.getRunner();
+ ExecutableNode node = runner.getNode();
+
+ handleJobFinished(node.getJobId(), node.getStatus());
+ }
+ }
+ else if (event.getType() == Type.FLOW_FINISHED) {
+ stopWatcher();
}
}
}
src/java/azkaban/execapp/FlowRunner.java 42(+39 -3)
diff --git a/src/java/azkaban/execapp/FlowRunner.java b/src/java/azkaban/execapp/FlowRunner.java
index 7d68d80..99c1173 100644
--- a/src/java/azkaban/execapp/FlowRunner.java
+++ b/src/java/azkaban/execapp/FlowRunner.java
@@ -40,7 +40,7 @@ import azkaban.utils.PropsUtils;
public class FlowRunner extends EventHandler implements Runnable {
private static final Layout DEFAULT_LAYOUT = new PatternLayout("%d{dd-MM-yyyy HH:mm:ss z} %c{1} %p - %m\n");
// We check update every 5 minutes, just in case things get stuck. But for the most part, we'll be idling.
- private static final long CHECK_WAIT_MS = 5*60*60*1000;
+ private static final long CHECK_WAIT_MS = 5*60*1000;
private Logger logger;
private Layout loggerLayout = DEFAULT_LAYOUT;
@@ -160,7 +160,9 @@ public class FlowRunner extends EventHandler implements Runnable {
}
finally {
if (watcher != null) {
+ logger.info("Watcher is attached. Stopping watcher.");
watcher.stopWatcher();
+ logger.info("Watcher cancelled status is " + watcher.isWatchCancelled());
}
flow.setEndTime(System.currentTimeMillis());
@@ -182,6 +184,11 @@ public class FlowRunner extends EventHandler implements Runnable {
// Create execution dir
createLogger(flowId);
+
+ if (this.watcher != null) {
+ this.watcher.setLogger(logger);
+ }
+
logger.info("Running execid:" + execId + " flow:" + flowId + " project:" + projectId + " version:" + version);
if (pipelineExecId != null) {
logger.info("Running simulateously with " + pipelineExecId + ". Pipelining level " + pipelineLevel);
@@ -296,7 +303,7 @@ public class FlowRunner extends EventHandler implements Runnable {
else {
List<ExecutableNode> jobsReadyToRun = findReadyJobsToRun();
- if (!jobsReadyToRun.isEmpty()) {
+ if (!jobsReadyToRun.isEmpty() && !flowCancelled) {
for (ExecutableNode node : jobsReadyToRun) {
long currentTime = System.currentTimeMillis();
@@ -320,19 +327,21 @@ public class FlowRunner extends EventHandler implements Runnable {
logger.info("Killing " + node.getJobId() + " due to prior errors.");
node.setStartTime(currentTime);
node.setEndTime(currentTime);
+ fireEventListeners(Event.create(this, Type.JOB_FINISHED, node));
} // If disabled, then we auto skip
else if (node.getStatus() == Status.DISABLED) {
logger.info("Skipping disabled job " + node.getJobId() + ".");
node.setStartTime(currentTime);
node.setEndTime(currentTime);
node.setStatus(Status.SKIPPED);
+ fireEventListeners(Event.create(this, Type.JOB_FINISHED, node));
}
}
updateFlow();
}
else {
- if (isFlowFinished()) {
+ if (isFlowFinished() || flowCancelled ) {
flowFinished = true;
break;
}
@@ -346,6 +355,32 @@ public class FlowRunner extends EventHandler implements Runnable {
}
}
+ if (flowCancelled) {
+ try {
+ logger.info("Flow was force cancelled cleaning up.");
+ for(JobRunner activeRunner : activeJobRunners.values()) {
+ activeRunner.cancel();
+ }
+
+ for (ExecutableNode node: flow.getExecutableNodes()) {
+ if (Status.isStatusFinished(node.getStatus())) {
+ continue;
+ }
+ else if (node.getStatus() == Status.DISABLED) {
+ node.setStatus(Status.SKIPPED);
+ }
+ else {
+ node.setStatus(Status.KILLED);
+ }
+ fireEventListeners(Event.create(this, Type.JOB_FINISHED, node));
+ }
+ } catch (Exception e) {
+ logger.error(e);
+ }
+
+ updateFlow();
+ }
+
logger.info("Finishing up flow. Awaiting Termination");
executorService.shutdown();
@@ -538,6 +573,7 @@ public class FlowRunner extends EventHandler implements Runnable {
if (watcher != null) {
logger.info("Watcher is attached. Stopping watcher.");
watcher.stopWatcher();
+ logger.info("Watcher cancelled status is " + watcher.isWatchCancelled());
}
logger.info("Cancelling " + activeJobRunners.size() + " jobs.");
src/java/azkaban/execapp/JobRunner.java 11(+11 -0)
diff --git a/src/java/azkaban/execapp/JobRunner.java b/src/java/azkaban/execapp/JobRunner.java
index 16d80a5..4e7413b 100644
--- a/src/java/azkaban/execapp/JobRunner.java
+++ b/src/java/azkaban/execapp/JobRunner.java
@@ -85,6 +85,7 @@ public class JobRunner extends EventHandler implements Runnable {
private long delayStartMs = 0;
private boolean cancelled = false;
+ private BlockingStatus currentBlockStatus = null;
public JobRunner(ExecutableNode node, Props props, File workingDir, ExecutorLoader loader, JobTypeManager jobtypeManager) {
this.props = props;
@@ -223,8 +224,12 @@ public class JobRunner extends EventHandler implements Runnable {
for(BlockingStatus bStatus: blockingStatus) {
logger.info("Waiting on pipelined job " + bStatus.getJobId());
+ currentBlockStatus = bStatus;
bStatus.blockOnFinishedStatus();
logger.info("Pipelined job " + bStatus.getJobId() + " finished.");
+ if (watcher.isWatchCancelled()) {
+ break;
+ }
}
}
if (watcher.isWatchCancelled()) {
@@ -237,6 +242,7 @@ public class JobRunner extends EventHandler implements Runnable {
}
}
+ currentBlockStatus = null;
long currentTime = System.currentTimeMillis();
if (delayStartMs > 0) {
logger.info("Delaying start of execution for " + delayStartMs + " milliseconds.");
@@ -390,6 +396,11 @@ public class JobRunner extends EventHandler implements Runnable {
logError("Cancel has been called.");
this.cancelled = true;
+ BlockingStatus status = currentBlockStatus;
+ if (status != null) {
+ status.unblock();
+ }
+
// Cancel code here
if (job == null) {
logError("Job hasn't started yet.");
diff --git a/src/java/azkaban/webapp/servlet/ProjectManagerServlet.java b/src/java/azkaban/webapp/servlet/ProjectManagerServlet.java
index 8567895..0b2d413 100644
--- a/src/java/azkaban/webapp/servlet/ProjectManagerServlet.java
+++ b/src/java/azkaban/webapp/servlet/ProjectManagerServlet.java
@@ -1170,6 +1170,9 @@ public class ProjectManagerServlet extends LoginAbstractAzkabanServlet {
if (perm.isPermissionSet(Type.EXECUTE) || adminPerm) {
page.add("exec", true);
}
+ else {
+ page.add("exec", false);
+ }
List<Flow> flows = project.getFlows();
if (!flows.isEmpty()) {