azkaban-memoizeit
Changes
src/java/azkaban/execapp/FlowRunner.java 14(+12 -2)
src/java/azkaban/execapp/JobRunner.java 14(+9 -5)
Details
diff --git a/src/java/azkaban/execapp/event/BlockingStatus.java b/src/java/azkaban/execapp/event/BlockingStatus.java
index 02c0f44..3a262b4 100644
--- a/src/java/azkaban/execapp/event/BlockingStatus.java
+++ b/src/java/azkaban/execapp/event/BlockingStatus.java
@@ -35,8 +35,10 @@ public class BlockingStatus {
return this.status;
}
- public synchronized void unblock() {
- this.notifyAll();
+ public void unblock() {
+ synchronized(this) {
+ this.notifyAll();
+ }
}
public void changeStatus(Status status) {
diff --git a/src/java/azkaban/execapp/event/FlowWatcher.java b/src/java/azkaban/execapp/event/FlowWatcher.java
index 1c9a2ad..2e4f576 100644
--- a/src/java/azkaban/execapp/event/FlowWatcher.java
+++ b/src/java/azkaban/execapp/event/FlowWatcher.java
@@ -3,11 +3,15 @@ package azkaban.execapp.event;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
+import org.apache.log4j.Logger;
+
import azkaban.executor.ExecutableFlow;
import azkaban.executor.ExecutableNode;
import azkaban.executor.Status;
public abstract class FlowWatcher {
+ private static final Logger logger = Logger.getLogger(FlowWatcher.class);
+
private int execId;
private ExecutableFlow flow;
private Map<String, BlockingStatus> map = new ConcurrentHashMap<String, BlockingStatus>();
@@ -26,6 +30,10 @@ public abstract class FlowWatcher {
* @param jobId
*/
protected synchronized void handleJobFinished(String jobId, Status status) {
+ if (cancelWatch) {
+ return;
+ }
+
BlockingStatus block = map.get(jobId);
if (block != null) {
block.changeStatus(status);
@@ -65,11 +73,15 @@ public abstract class FlowWatcher {
}
public synchronized void failAllWatches() {
+ logger.info("Failing all watches on " + execId);
cancelWatch = true;
for(BlockingStatus status : map.values()) {
+ status.changeStatus(Status.KILLED);
status.unblock();
}
+
+ logger.info("Successfully failed all watches on " + execId);
}
public boolean isWatchCancelled() {
diff --git a/src/java/azkaban/execapp/event/LocalFlowWatcher.java b/src/java/azkaban/execapp/event/LocalFlowWatcher.java
index b51f17b..ea78174 100644
--- a/src/java/azkaban/execapp/event/LocalFlowWatcher.java
+++ b/src/java/azkaban/execapp/event/LocalFlowWatcher.java
@@ -1,5 +1,6 @@
package azkaban.execapp.event;
+
import azkaban.execapp.FlowRunner;
import azkaban.execapp.JobRunner;
import azkaban.executor.ExecutableNode;
@@ -7,6 +8,7 @@ import azkaban.executor.ExecutableNode;
public class LocalFlowWatcher extends FlowWatcher {
private LocalFlowWatcherListener watcherListener;
private FlowRunner runner;
+ private boolean isShutdown = false;
public LocalFlowWatcher(FlowRunner runner) {
super(runner.getExecutableFlow().getExecutionId());
@@ -20,6 +22,11 @@ public class LocalFlowWatcher extends FlowWatcher {
@Override
public void stopWatcher() {
// Just freeing stuff
+ if(isShutdown) {
+ return;
+ }
+
+ isShutdown = true;
runner.removeListener(watcherListener);
runner = null;
diff --git a/src/java/azkaban/execapp/event/RemoteFlowWatcher.java b/src/java/azkaban/execapp/event/RemoteFlowWatcher.java
index 398a364..ec60025 100644
--- a/src/java/azkaban/execapp/event/RemoteFlowWatcher.java
+++ b/src/java/azkaban/execapp/event/RemoteFlowWatcher.java
@@ -95,6 +95,9 @@ public class RemoteFlowWatcher extends FlowWatcher {
@Override
public synchronized void stopWatcher() {
+ if(isShutdown) {
+ return;
+ }
isShutdown = true;
if (thread != null) {
thread.interrupt();
src/java/azkaban/execapp/FlowRunner.java 14(+12 -2)
diff --git a/src/java/azkaban/execapp/FlowRunner.java b/src/java/azkaban/execapp/FlowRunner.java
index fb4c349..f6f9612 100644
--- a/src/java/azkaban/execapp/FlowRunner.java
+++ b/src/java/azkaban/execapp/FlowRunner.java
@@ -155,9 +155,11 @@ public class FlowRunner extends EventHandler implements Runnable {
if (watcher != null) {
watcher.stopWatcher();
}
-
- closeLogger();
+
flow.setEndTime(System.currentTimeMillis());
+ logger.info("Setting end time for flow " + execId + " to " + System.currentTimeMillis());
+ closeLogger();
+
updateFlow();
this.fireEventListeners(Event.create(this, Type.FLOW_FINISHED));
}
@@ -347,9 +349,11 @@ public class FlowRunner extends EventHandler implements Runnable {
flow.setStatus(Status.FAILED);
case FAILED:
case KILLED:
+ logger.info("Flow is set to " + flow.getStatus().toString());
break;
default:
flow.setStatus(Status.SUCCEEDED);
+ logger.info("Flow is set to " + flow.getStatus().toString());
}
}
}
@@ -520,18 +524,22 @@ public class FlowRunner extends EventHandler implements Runnable {
private void cancel() {
synchronized(mainSyncObj) {
+ logger.info("Cancel has been called on flow " + execId);
flowPaused = false;
flowCancelled = true;
if (watcher != null) {
+ logger.info("Watcher is attached. Stopping watcher.");
watcher.stopWatcher();
}
+ logger.info("Cancelling " + activeJobRunners.size() + " jobs.");
for (JobRunner runner : activeJobRunners.values()) {
runner.cancel();
}
if (flow.getStatus() != Status.FAILED && flow.getStatus() != Status.FAILED_FINISHING) {
+ logger.info("Setting flow status to " + Status.KILLED.toString());
flow.setStatus(Status.KILLED);
}
}
@@ -686,6 +694,8 @@ public class FlowRunner extends EventHandler implements Runnable {
jobOutputProps.put(node.getJobId(), runner.getOutputProps());
}
+ updateFlow();
+
if (node.getStatus() == Status.FAILED) {
// Retry failure if conditions are met.
if (!runner.isCancelled() && runner.getRetries() > node.getAttempt()) {
diff --git a/src/java/azkaban/execapp/FlowRunnerManager.java b/src/java/azkaban/execapp/FlowRunnerManager.java
index 59cbbb7..a293ac7 100644
--- a/src/java/azkaban/execapp/FlowRunnerManager.java
+++ b/src/java/azkaban/execapp/FlowRunnerManager.java
@@ -502,11 +502,12 @@ public class FlowRunnerManager implements EventListener {
@Override
public void handleEvent(Event event) {
if (event.getType() == Event.Type.FLOW_FINISHED) {
+
FlowRunner flowRunner = (FlowRunner)event.getRunner();
ExecutableFlow flow = flowRunner.getExecutableFlow();
recentlyFinishedFlows.put(flow.getExecutionId(), flow);
- logger.info("Flow " + flow.getFlowId() + " is finished. Adding it to recently finished flows list.");
+ logger.info("Flow " + flow.getExecutionId() + " is finished. Adding it to recently finished flows list.");
runningFlows.remove(flow.getExecutionId());
}
}
src/java/azkaban/execapp/JobRunner.java 14(+9 -5)
diff --git a/src/java/azkaban/execapp/JobRunner.java b/src/java/azkaban/execapp/JobRunner.java
index cb58324..40bc1ec 100644
--- a/src/java/azkaban/execapp/JobRunner.java
+++ b/src/java/azkaban/execapp/JobRunner.java
@@ -189,7 +189,13 @@ public class JobRunner extends EventHandler implements Runnable {
node.setEndTime(System.currentTimeMillis());
fireEvent(Event.create(this, Type.JOB_FINISHED));
return;
- } else if (node.getStatus() == Status.KILLED) {
+ } else if (this.cancelled) {
+ node.setStartTime(System.currentTimeMillis());
+ fireEvent(Event.create(this, Type.JOB_STARTED, null, false));
+ node.setStatus(Status.FAILED);
+ node.setEndTime(System.currentTimeMillis());
+ fireEvent(Event.create(this, Type.JOB_FINISHED));
+ } else if (node.getStatus() == Status.FAILED || node.getStatus() == Status.KILLED) {
node.setStartTime(System.currentTimeMillis());
fireEvent(Event.create(this, Type.JOB_STARTED, null, false));
node.setEndTime(System.currentTimeMillis());
@@ -314,14 +320,13 @@ public class JobRunner extends EventHandler implements Runnable {
private boolean prepareJob() throws RuntimeException {
// Check pre conditions
- if (props == null) {
- node.setStatus(Status.FAILED);
+ if (props == null || cancelled) {
logError("Failing job. The job properties don't exist");
return false;
}
synchronized(syncObject) {
- if (node.getStatus() == Status.FAILED) {
+ if (node.getStatus() == Status.FAILED || cancelled) {
return false;
}
@@ -387,7 +392,6 @@ public class JobRunner extends EventHandler implements Runnable {
// Cancel code here
if (job == null) {
- node.setStatus(Status.FAILED);
logError("Job hasn't started yet.");
// Just in case we're waiting on the delay
synchronized(this) {
diff --git a/src/java/azkaban/executor/ExecutionOptions.java b/src/java/azkaban/executor/ExecutionOptions.java
index 3e9460f..a91a6d0 100644
--- a/src/java/azkaban/executor/ExecutionOptions.java
+++ b/src/java/azkaban/executor/ExecutionOptions.java
@@ -48,7 +48,7 @@ public class ExecutionOptions {
}
public void setFailureEmails(Collection<String> emails) {
- failureEmails.addAll(emails);
+ failureEmails = new ArrayList<String>(emails);
}
public boolean isFailureEmailsOverridden() {
@@ -72,7 +72,7 @@ public class ExecutionOptions {
}
public void setSuccessEmails(Collection<String> emails) {
- successEmails.addAll(emails);
+ successEmails = new ArrayList<String>(emails);
}
public List<String> getSuccessEmails() {
diff --git a/src/java/azkaban/executor/ExecutorManager.java b/src/java/azkaban/executor/ExecutorManager.java
index b58c167..3f482bc 100644
--- a/src/java/azkaban/executor/ExecutorManager.java
+++ b/src/java/azkaban/executor/ExecutorManager.java
@@ -709,6 +709,10 @@ public class ExecutorManager {
}
// Delete the executing reference.
+ if (flow.getEndTime() == -1) {
+ flow.setEndTime(System.currentTimeMillis());
+ executorLoader.updateExecutableFlow(dsFlow);
+ }
executorLoader.removeActiveExecutableReference(execId);
runningFlows.remove(execId);
diff --git a/src/java/azkaban/project/JdbcProjectLoader.java b/src/java/azkaban/project/JdbcProjectLoader.java
index c692c27..18d8cf3 100644
--- a/src/java/azkaban/project/JdbcProjectLoader.java
+++ b/src/java/azkaban/project/JdbcProjectLoader.java
@@ -295,6 +295,7 @@ public class JdbcProjectLoader implements ProjectLoader {
}
}
+ @SuppressWarnings("resource")
private void uploadProjectFile(Connection connection, Project project, int version, String filetype, String filename, File localFile, String uploader) throws ProjectManagerException {
QueryRunner runner = new QueryRunner();
long updateTime = System.currentTimeMillis();
@@ -384,6 +385,7 @@ public class JdbcProjectLoader implements ProjectLoader {
return handler;
}
+ @SuppressWarnings("resource")
private ProjectFileHandler getUploadedFile(Connection connection, int projectId, int version) throws ProjectManagerException {
QueryRunner runner = new QueryRunner();
ProjectVersionResultHandler pfHandler = new ProjectVersionResultHandler();
diff --git a/src/java/azkaban/utils/FileIOUtils.java b/src/java/azkaban/utils/FileIOUtils.java
index 2b1f319..7934a19 100644
--- a/src/java/azkaban/utils/FileIOUtils.java
+++ b/src/java/azkaban/utils/FileIOUtils.java
@@ -150,6 +150,7 @@ public class FileIOUtils {
long skipped = fileStream.skip(offset);
if (skipped < offset) {
+ fileStream.close();
return new Pair<Integer,Integer>(0, 0);
}
@@ -174,6 +175,7 @@ public class FileIOUtils {
long skipped = fileStream.skip(fileOffset);
if (skipped < fileOffset) {
+ fileStream.close();
return new LogData(fileOffset, 0, "");
}
diff --git a/src/web/js/azkaban.flow.execute.view.js b/src/web/js/azkaban.flow.execute.view.js
index 97609ae..c8aed58 100644
--- a/src/web/js/azkaban.flow.execute.view.js
+++ b/src/web/js/azkaban.flow.execute.view.js
@@ -88,8 +88,8 @@ azkaban.FlowExecuteDialogView= Backbone.View.extend({
ajax: "executeFlow",
flow: this.flowId,
disabled: disabled,
- failureEmailOverride:failureEmailsOverride,
- successEmailOverride:successEmailsOverride,
+ failureEmailsOverride:failureEmailsOverride,
+ successEmailsOverride:successEmailsOverride,
failureAction: failureAction,
failureEmails: failureEmails,
successEmails: successEmails,
diff --git a/src/web/js/azkaban.schedule.panel.view.js b/src/web/js/azkaban.schedule.panel.view.js
index e445a5e..af81c0c 100644
--- a/src/web/js/azkaban.schedule.panel.view.js
+++ b/src/web/js/azkaban.schedule.panel.view.js
@@ -54,7 +54,7 @@ azkaban.SchedulePanelView= Backbone.View.extend({
console.log("Creating schedule for "+projectName+"."+scheduleData.flow);
var scheduleTime = $('#hour').val() + "," + $('#minutes').val() + "," + $('#am_pm').val() + "," + $('#timezone').val();
var scheduleDate = $('#datepicker').val();
- var is_recurring = $('#is_recurring').val();
+ var is_recurring = document.getElementById('is_recurring').checked ? 'on' : 'off';
var period = $('#period').val() + $('#period_units').val();
scheduleData.ajax = "scheduleFlow";