azkaban-memoizeit
Changes
src/java/azkaban/execapp/FlowRunner.java 12(+10 -2)
Details
diff --git a/src/java/azkaban/execapp/event/BlockingStatus.java b/src/java/azkaban/execapp/event/BlockingStatus.java
index 02c0f44..3a262b4 100644
--- a/src/java/azkaban/execapp/event/BlockingStatus.java
+++ b/src/java/azkaban/execapp/event/BlockingStatus.java
@@ -35,8 +35,10 @@ public class BlockingStatus {
return this.status;
}
- public synchronized void unblock() {
- this.notifyAll();
+ public void unblock() {
+ synchronized(this) {
+ this.notifyAll();
+ }
}
public void changeStatus(Status status) {
diff --git a/src/java/azkaban/execapp/event/FlowWatcher.java b/src/java/azkaban/execapp/event/FlowWatcher.java
index 1c9a2ad..2e4f576 100644
--- a/src/java/azkaban/execapp/event/FlowWatcher.java
+++ b/src/java/azkaban/execapp/event/FlowWatcher.java
@@ -3,11 +3,15 @@ package azkaban.execapp.event;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
+import org.apache.log4j.Logger;
+
import azkaban.executor.ExecutableFlow;
import azkaban.executor.ExecutableNode;
import azkaban.executor.Status;
public abstract class FlowWatcher {
+ private static final Logger logger = Logger.getLogger(FlowWatcher.class);
+
private int execId;
private ExecutableFlow flow;
private Map<String, BlockingStatus> map = new ConcurrentHashMap<String, BlockingStatus>();
@@ -26,6 +30,10 @@ public abstract class FlowWatcher {
* @param jobId
*/
protected synchronized void handleJobFinished(String jobId, Status status) {
+ if (cancelWatch) {
+ return;
+ }
+
BlockingStatus block = map.get(jobId);
if (block != null) {
block.changeStatus(status);
@@ -65,11 +73,15 @@ public abstract class FlowWatcher {
}
public synchronized void failAllWatches() {
+ logger.info("Failing all watches on " + execId);
cancelWatch = true;
for(BlockingStatus status : map.values()) {
+ status.changeStatus(Status.KILLED);
status.unblock();
}
+
+ logger.info("Successfully failed all watches on " + execId);
}
public boolean isWatchCancelled() {
src/java/azkaban/execapp/FlowRunner.java 12(+10 -2)
diff --git a/src/java/azkaban/execapp/FlowRunner.java b/src/java/azkaban/execapp/FlowRunner.java
index c80a9ff..c91bb08 100644
--- a/src/java/azkaban/execapp/FlowRunner.java
+++ b/src/java/azkaban/execapp/FlowRunner.java
@@ -155,9 +155,11 @@ public class FlowRunner extends EventHandler implements Runnable {
if (watcher != null) {
watcher.stopWatcher();
}
-
- closeLogger();
+
flow.setEndTime(System.currentTimeMillis());
+ logger.info("Setting end time for flow " + execId + " to " + System.currentTimeMillis());
+ closeLogger();
+
updateFlow();
this.fireEventListeners(Event.create(this, Type.FLOW_FINISHED));
}
@@ -347,9 +349,11 @@ public class FlowRunner extends EventHandler implements Runnable {
flow.setStatus(Status.FAILED);
case FAILED:
case KILLED:
+ logger.info("Flow is set to " + flow.getStatus().toString());
break;
default:
flow.setStatus(Status.SUCCEEDED);
+ logger.info("Flow is set to " + flow.getStatus().toString());
}
}
}
@@ -520,18 +524,22 @@ public class FlowRunner extends EventHandler implements Runnable {
private void cancel() {
synchronized(mainSyncObj) {
+ logger.info("Cancel has been called on flow " + execId);
flowPaused = false;
flowCancelled = true;
if (watcher != null) {
+ logger.info("Watcher is attached. Stopping watcher.");
watcher.stopWatcher();
}
+ logger.info("Cancelling " + activeJobRunners.size() + " jobs.");
for (JobRunner runner : activeJobRunners.values()) {
runner.cancel();
}
if (flow.getStatus() != Status.FAILED && flow.getStatus() != Status.FAILED_FINISHING) {
+ logger.info("Setting flow status to " + Status.KILLED.toString());
flow.setStatus(Status.KILLED);
}
}
diff --git a/src/java/azkaban/execapp/FlowRunnerManager.java b/src/java/azkaban/execapp/FlowRunnerManager.java
index d4c4d6e..f7ce2c7 100644
--- a/src/java/azkaban/execapp/FlowRunnerManager.java
+++ b/src/java/azkaban/execapp/FlowRunnerManager.java
@@ -501,11 +501,12 @@ public class FlowRunnerManager implements EventListener {
@Override
public void handleEvent(Event event) {
if (event.getType() == Event.Type.FLOW_FINISHED) {
+
FlowRunner flowRunner = (FlowRunner)event.getRunner();
ExecutableFlow flow = flowRunner.getExecutableFlow();
recentlyFinishedFlows.put(flow.getExecutionId(), flow);
- logger.info("Flow " + flow.getFlowId() + " is finished. Adding it to recently finished flows list.");
+ logger.info("Flow " + flow.getExecutionId() + " is finished. Adding it to recently finished flows list.");
runningFlows.remove(flow.getExecutionId());
}
}
diff --git a/src/java/azkaban/executor/ExecutorManager.java b/src/java/azkaban/executor/ExecutorManager.java
index 458ae84..9671a70 100644
--- a/src/java/azkaban/executor/ExecutorManager.java
+++ b/src/java/azkaban/executor/ExecutorManager.java
@@ -689,6 +689,10 @@ public class ExecutorManager {
}
// Delete the executing reference.
+ if (flow.getEndTime() == -1) {
+ flow.setEndTime(System.currentTimeMillis());
+ executorLoader.updateExecutableFlow(dsFlow);
+ }
executorLoader.removeActiveExecutableReference(execId);
runningFlows.remove(execId);