azkaban-uncached
Changes
src/java/azkaban/execapp/FlowRunner.java 24(+15 -9)
src/java/azkaban/execapp/FlowWatcher.java 123(+123 -0)
Details
diff --git a/src/java/azkaban/execapp/event/Event.java b/src/java/azkaban/execapp/event/Event.java
index 2ad400b..764b527 100644
--- a/src/java/azkaban/execapp/event/Event.java
+++ b/src/java/azkaban/execapp/event/Event.java
@@ -22,7 +22,9 @@ public class Event {
FLOW_FINISHED,
JOB_STARTED,
JOB_FINISHED,
- JOB_STATUS_CHANGED
+ JOB_STATUS_CHANGED,
+ EXTERNAL_FLOW_UPDATED,
+ EXTERNAL_JOB_UPDATED
}
private final Object runner;
src/java/azkaban/execapp/FlowRunner.java 24(+15 -9)
diff --git a/src/java/azkaban/execapp/FlowRunner.java b/src/java/azkaban/execapp/FlowRunner.java
index e046958..1a91438 100644
--- a/src/java/azkaban/execapp/FlowRunner.java
+++ b/src/java/azkaban/execapp/FlowRunner.java
@@ -99,6 +99,10 @@ public class FlowRunner extends EventHandler implements Runnable {
return execDir;
}
+ public void watchedExecutionUpdate(ExecutableFlow flow) {
+
+ }
+
@Override
public void run() {
try {
@@ -412,7 +416,7 @@ public class FlowRunner extends EventHandler implements Runnable {
for (ExecutableNode node: pausedNode.values()) {
node.setStatus(Status.KILLED);
node.setPaused(false);
- queueNextJob(node);
+ queueNextJob(node, "cancel-all-action");
}
updateFlow();
@@ -437,7 +441,7 @@ public class FlowRunner extends EventHandler implements Runnable {
node.setStatus(Status.KILLED);
if (node.isPaused()) {
node.setPaused(false);
- queueNextJob(node);
+ queueNextJob(node, "cancel-action");
}
}
}
@@ -458,7 +462,7 @@ public class FlowRunner extends EventHandler implements Runnable {
if (node.isPaused()) {
node.setPaused(false);
if (pausedNode.containsKey(jobId)) {
- queueNextJob(node);
+ queueNextJob(node, "resume-action");
}
updateFlow();
@@ -585,7 +589,7 @@ public class FlowRunner extends EventHandler implements Runnable {
}
for (ExecutableNode node: jobsToBeQueued) {
- queueNextJob(node);
+ queueNextJob(node, "retry-action");
}
updateFlow();
@@ -633,6 +637,9 @@ public class FlowRunner extends EventHandler implements Runnable {
case SKIPPED:
case SUCCEEDED:
continue;
+ case RUNNING:
+ case QUEUED:
+ return null;
default:
// Return null means it's not ready to run.
return null;
@@ -660,7 +667,7 @@ public class FlowRunner extends EventHandler implements Runnable {
private synchronized void queueNextJobs(ExecutableNode finishedNode) {
for (String dependent : finishedNode.getOutNodes()) {
ExecutableNode dependentNode = flow.getExecutableNode(dependent);
- queueNextJob(dependentNode);
+ queueNextJob(dependentNode, finishedNode.getJobId());
}
}
@@ -669,7 +676,7 @@ public class FlowRunner extends EventHandler implements Runnable {
*
* @param node
*/
- private void queueNextJob(ExecutableNode node) {
+ private void queueNextJob(ExecutableNode node, String trigger) {
Status nextStatus = getImpliedStatus(node);
if (nextStatus == null) {
// Not yet ready or not applicable
@@ -706,11 +713,10 @@ public class FlowRunner extends EventHandler implements Runnable {
logger.info("Flow Paused. Pausing " + node.getJobId());
}
else {
- logger.info("Adding " + node.getJobId() + " to run queue.");
if (node.getStatus() != Status.DISABLED && node.getStatus() != Status.KILLED) {
node.setStatus(Status.QUEUED);
}
-
+ logger.info("Adding " + node.getJobId() + " to run queue with status " + node.getStatus().toString() + " triggered by '" + trigger + "'.");
jobsToRun.add(runner);
}
}
@@ -726,7 +732,7 @@ public class FlowRunner extends EventHandler implements Runnable {
if (event.getType() == Type.JOB_FINISHED) {
ExecutableNode node = runner.getNode();
- logger.info("Job Finished " + node.getJobId());
+ logger.info("Job Finished " + node.getJobId() + " with status " + node.getStatus());
synchronized (actionSyncObj) {
if (node.getStatus() == Status.FAILED) {
// Setting failure
diff --git a/src/java/azkaban/execapp/FlowRunnerManager.java b/src/java/azkaban/execapp/FlowRunnerManager.java
index 68f5882..b0da031 100644
--- a/src/java/azkaban/execapp/FlowRunnerManager.java
+++ b/src/java/azkaban/execapp/FlowRunnerManager.java
@@ -38,7 +38,6 @@ import azkaban.project.ProjectLoader;
import azkaban.execapp.event.Event;
import azkaban.execapp.event.EventListener;
import azkaban.executor.ExecutableFlow;
-import azkaban.executor.ExecutableFlow.Status;
import azkaban.executor.ExecutorLoader;
import azkaban.executor.ExecutorManagerException;
import azkaban.jobtype.JobTypeManager;
src/java/azkaban/execapp/FlowWatcher.java 123(+123 -0)
diff --git a/src/java/azkaban/execapp/FlowWatcher.java b/src/java/azkaban/execapp/FlowWatcher.java
index 5bcb4d0..22c0d24 100644
--- a/src/java/azkaban/execapp/FlowWatcher.java
+++ b/src/java/azkaban/execapp/FlowWatcher.java
@@ -1,9 +1,132 @@
package azkaban.execapp;
+import azkaban.execapp.event.Event;
+import azkaban.execapp.event.EventListener;
+import azkaban.executor.ExecutableFlow;
+import azkaban.executor.ExecutableNode;
+import azkaban.executor.ExecutorLoader;
+import azkaban.executor.ExecutorManagerException;
+
/**
* Class that watches and updates execution flows that are being listened to by
* other executing flows.
*/
public class FlowWatcher {
+ private FlowRunnerManager manager;
+ private ExecutorLoader loader;
+ private WatcherThread watcherThread;
+
+ public FlowWatcher(FlowRunnerManager manager, ExecutorLoader loader) {
+ this.manager = manager;
+ this.loader = loader;
+
+ watcherThread = new WatcherThread();
+ watcherThread.start();
+ }
+
+ public void watchExecution(int execId, EventListener listener) throws ExecutorManagerException {
+ watchExecution(execId, null, listener);
+ }
+
+ public void watchExecution(int execId, String jobId, EventListener listener) throws ExecutorManagerException {
+ Watch watcher = new Watch(execId, jobId, listener);
+
+
+ }
+
+ public void unwatchAll(EventListener listener, int execId, String jobId) {
+
+ }
+
+ private class WatcherThread extends Thread {
+ boolean isShutdown = false;
+
+ public WatcherThread() {
+ this.setName("Execution-watcher-Thread");
+ }
+
+ public void run() {
+ while (!isShutdown) {
+
+ }
+ }
+ }
+
+ public class Watch {
+ private final EventListener listener;
+
+ private final int execId;
+ private final String jobId;
+ private long updateTime = -1;
+
+ public Watch(int execId, EventListener listener) {
+ this(execId, null, listener);
+ }
+
+ @Override
+ public int hashCode() {
+ final int prime = 31;
+ int result = 1;
+ result = prime * result + getOuterType().hashCode();
+ result = prime * result + execId;
+ result = prime * result + ((jobId == null) ? 0 : jobId.hashCode());
+ result = prime * result + ((listener == null) ? 0 : listener.hashCode());
+ return result;
+ }
+
+ @Override
+ public boolean equals(Object obj) {
+ if (this == obj)
+ return true;
+ if (obj == null)
+ return false;
+ if (getClass() != obj.getClass())
+ return false;
+ Watch other = (Watch) obj;
+ if (!getOuterType().equals(other.getOuterType()))
+ return false;
+ if (execId != other.execId)
+ return false;
+ if (jobId == null) {
+ if (other.jobId != null)
+ return false;
+ } else if (!jobId.equals(other.jobId))
+ return false;
+ if (listener == null) {
+ if (other.listener != null)
+ return false;
+ } else if (!listener.equals(other.listener))
+ return false;
+ return true;
+ }
+
+ public Watch(int execId, String jobId, EventListener listener) {
+ this.execId = execId;
+ this.listener = listener;
+ this.jobId = jobId;
+ }
+
+ private void notifyIfUpdate(ExecutableFlow flow) {
+ if (jobId == null) {
+ if (flow.getUpdateTime() > updateTime) {
+ listener.handleEvent(Event.create(flow, Event.Type.EXTERNAL_FLOW_UPDATED));
+
+ updateTime = flow.getUpdateTime();
+ }
+ }
+ else {
+ ExecutableNode node = flow.getExecutableNode(jobId);
+
+ if (node.getUpdateTime() > updateTime) {
+ listener.handleEvent(Event.create(node, Event.Type.EXTERNAL_JOB_UPDATED));
+
+ updateTime = node.getUpdateTime();
+ }
+ }
+ }
+ private FlowWatcher getOuterType() {
+ return FlowWatcher.this;
+ }
+ }
}
diff --git a/src/java/azkaban/project/JdbcProjectLoader.java b/src/java/azkaban/project/JdbcProjectLoader.java
index 2d9c475..01976b7 100644
--- a/src/java/azkaban/project/JdbcProjectLoader.java
+++ b/src/java/azkaban/project/JdbcProjectLoader.java
@@ -801,7 +801,7 @@ public class JdbcProjectLoader implements ProjectLoader {
runner.update(connection, INSERT_PROPERTIES, project.getId(), project.getVersion(), name, System.currentTimeMillis(), defaultEncodingType.getNumVal(), data);
connection.commit();
} catch (SQLException e) {
- throw new ProjectManagerException("Error fetching flows from project " + project.getName() + " version " + project.getVersion(), e);
+ throw new ProjectManagerException("Error uploading project properties " + name + " into " + project.getName() + " version " + project.getVersion(), e);
}
}