azkaban-memoizeit
Changes
src/java/azkaban/execapp/FlowRunner.java 13(+10 -3)
Details
diff --git a/src/java/azkaban/execapp/ExecutorServlet.java b/src/java/azkaban/execapp/ExecutorServlet.java
index 282c14c..00f19a2 100644
--- a/src/java/azkaban/execapp/ExecutorServlet.java
+++ b/src/java/azkaban/execapp/ExecutorServlet.java
@@ -201,7 +201,7 @@ public class ExecutorServlet extends HttpServlet implements ConnectorParams {
continue;
}
- if (flow.getUpdateTime() >= updateTime) {
+ if (flow.getUpdateTime() > updateTime) {
updateList.add(flow.toUpdateObject(updateTime));
}
}
src/java/azkaban/execapp/FlowRunner.java 13(+10 -3)
diff --git a/src/java/azkaban/execapp/FlowRunner.java b/src/java/azkaban/execapp/FlowRunner.java
index 5d8b39d..c80a9ff 100644
--- a/src/java/azkaban/execapp/FlowRunner.java
+++ b/src/java/azkaban/execapp/FlowRunner.java
@@ -69,6 +69,7 @@ public class FlowRunner extends EventHandler implements Runnable {
private JobRunnerEventListener listener = new JobRunnerEventListener();
private Map<String, JobRunner> jobRunners = new ConcurrentHashMap<String, JobRunner>();
+ private Map<String, JobRunner> activeJobRunners = new ConcurrentHashMap<String, JobRunner>();
// Used for pipelining
private Integer pipelineLevel = null;
@@ -300,6 +301,7 @@ public class FlowRunner extends EventHandler implements Runnable {
try {
executorService.submit(runner);
jobRunners.put(node.getJobId(), runner);
+ activeJobRunners.put(node.getJobId(), runner);
} catch (RejectedExecutionException e) {
logger.error(e);
};
@@ -373,6 +375,10 @@ public class FlowRunner extends EventHandler implements Runnable {
}
private boolean isFlowFinished() {
+ if (!activeJobRunners.isEmpty()) {
+ return false;
+ }
+
for (String end: flow.getEndNodes()) {
ExecutableNode node = flow.getExecutableNode(end);
if (!Status.isStatusFinished(node.getStatus()) ) {
@@ -521,7 +527,7 @@ public class FlowRunner extends EventHandler implements Runnable {
watcher.stopWatcher();
}
- for (JobRunner runner : jobRunners.values()) {
+ for (JobRunner runner : activeJobRunners.values()) {
runner.cancel();
}
@@ -672,7 +678,8 @@ public class FlowRunner extends EventHandler implements Runnable {
if (event.getType() == Type.JOB_FINISHED) {
synchronized(mainSyncObj) {
ExecutableNode node = runner.getNode();
-
+ activeJobRunners.remove(node.getJobId());
+
logger.info("Job Finished " + node.getJobId() + " with status " + node.getStatus());
if (runner.getOutputProps() != null) {
logger.info("Job " + node.getJobId() + " had output props.");
@@ -755,6 +762,6 @@ public class FlowRunner extends EventHandler implements Runnable {
}
public int getNumRunningJobs() {
- return jobRunners.size();
+ return activeJobRunners.size();
}
}
\ No newline at end of file