azkaban-developers
Changes
src/java/azkaban/executor/FlowRunner.java 82(+52 -30)
src/web/css/azkaban.css 51(+50 -1)
src/web/css/images/dot-icon.png 0(+0 -0)
src/web/js/azkaban.exflow.view.js 15(+10 -5)
Details
diff --git a/src/java/azkaban/executor/ExecutableFlow.java b/src/java/azkaban/executor/ExecutableFlow.java
index 2f3fbba..d358b02 100644
--- a/src/java/azkaban/executor/ExecutableFlow.java
+++ b/src/java/azkaban/executor/ExecutableFlow.java
@@ -18,6 +18,7 @@ public class ExecutableFlow {
private String flowId;
private String projectId;
private String executionPath;
+ private long lastCheckedTime;
private HashMap<String, FlowProps> flowProps = new HashMap<String, FlowProps>();
private HashMap<String, ExecutableNode> executableNodes = new HashMap<String, ExecutableNode>();;
@@ -31,6 +32,7 @@ public class ExecutableFlow {
private int updateNumber = 0;
private Status flowStatus = Status.UNKNOWN;
private String submitUser;
+ private boolean submitted = false;
public enum Status {
FAILED, FAILED_FINISHING, SUCCEEDED, RUNNING, WAITING, KILLED, DISABLED, READY, UNKNOWN
@@ -47,6 +49,14 @@ public class ExecutableFlow {
public ExecutableFlow() {
}
+ public long getLastCheckedTime() {
+ return lastCheckedTime;
+ }
+
+ public void setLastCheckedTime(long lastCheckedTime) {
+ this.lastCheckedTime = lastCheckedTime;
+ }
+
public List<ExecutableNode> getExecutableNodes() {
return new ArrayList<ExecutableNode>(executableNodes.values());
}
@@ -300,6 +310,14 @@ public class ExecutableFlow {
this.submitUser = submitUser;
}
+ public boolean isSubmitted() {
+ return submitted;
+ }
+
+ public void setSubmitted(boolean submitted) {
+ this.submitted = submitted;
+ }
+
public static class ExecutableNode {
private String id;
diff --git a/src/java/azkaban/executor/ExecutorManager.java b/src/java/azkaban/executor/ExecutorManager.java
index 503efc4..54c61d5 100644
--- a/src/java/azkaban/executor/ExecutorManager.java
+++ b/src/java/azkaban/executor/ExecutorManager.java
@@ -41,6 +41,7 @@ import azkaban.webapp.AzkabanExecutorServer;
*/
public class ExecutorManager {
private static Logger logger = Logger.getLogger(ExecutorManager.class);
+ private static final long ACCESS_ERROR_THRESHOLD = 60000;
private File basePath;
private AtomicInteger counter = new AtomicInteger();
@@ -124,6 +125,8 @@ public class ExecutorManager {
ExecutionReference reference = ExecutionReference.readFromDirectory(activeFlowDir);
ExecutableFlow flow = this.getFlowFromReference(reference);
+ flow.setLastCheckedTime(System.currentTimeMillis());
+ flow.setSubmitted(true);
if (flow != null) {
runningFlows.put(flow.getExecutionId(), flow);
}
@@ -237,6 +240,7 @@ public class ExecutorManager {
writeResourceFile(executionDir, flow);
ExecutableFlowLoader.writeExecutableFlowFile(executionDir, flow, null);
addActiveExecutionReference(flow);
+ flow.setLastCheckedTime(System.currentTimeMillis());
runningFlows.put(flow.getExecutionId(), flow);
logger.info("Setting up " + flow.getExecutionId() + " for execution.");
@@ -277,6 +281,8 @@ public class ExecutorManager {
httpclient.getConnectionManager().shutdown();
}
+ flow.setLastCheckedTime(System.currentTimeMillis());
+ flow.setSubmitted(true);
logger.debug("Submitted Response: " + response);
}
@@ -404,11 +410,11 @@ public class ExecutorManager {
throw new ExecutorManagerException("Cleaning failed. Resource file " + flowFilename + " parse error.", e);
}
+ logger.info("Deleting resources for " + exflow.getFlowId());
for (String deletable: deletableResources) {
File deleteFile = new File(executionPath, deletable);
if (deleteFile.exists()) {
if (deleteFile.isDirectory()) {
- logger.info("Deleting directory " + deleteFile);
try {
FileUtils.deleteDirectory(deleteFile);
} catch (IOException e) {
@@ -416,7 +422,6 @@ public class ExecutorManager {
}
}
else {
- logger.info("Deleting file " + deleteFile);
if(!deleteFile.delete()) {
logger.error("Deleting of resource file '" + deleteFile + "' failed.");
}
@@ -495,11 +500,15 @@ public class ExecutorManager {
private class ExecutingManagerUpdaterThread extends Thread {
private boolean shutdown = false;
- private int updateTimeMs = 100;
+ private int updateTimeMs = 1000;
public void run() {
while (!shutdown) {
ArrayList<ExecutableFlow> flows = new ArrayList<ExecutableFlow>(runningFlows.values());
for(ExecutableFlow exFlow : flows) {
+ if (!exFlow.isSubmitted()) {
+ continue;
+ }
+
File executionDir = new File(exFlow.getExecutionPath());
if (!executionDir.exists()) {
@@ -550,22 +559,33 @@ public class ExecutorManager {
// Cleanup
logger.info("Flow " + exFlow.getExecutionId() + " has succeeded. Cleaning Up.");
try {
- cleanFinishedJob(exFlow);
+ ExecutableFlowLoader.updateFlowStatusFromFile(executionDir, exFlow);
+ cleanFinishedJob(exFlow);
} catch (ExecutorManagerException e) {
e.printStackTrace();
continue;
}
+ exFlow.setLastCheckedTime(System.currentTimeMillis());
}
else {
- logger.error("Flow " + exFlow.getExecutionId() + " has succeeded, but the Executor says its still running");
+ logger.error("Flow " + exFlow.getExecutionId() + " has succeeded, but the Executor says its still running with msg: " + status);
+ if (System.currentTimeMillis() - exFlow.getLastCheckedTime() > ACCESS_ERROR_THRESHOLD) {
+ exFlow.setStatus(Status.FAILED);
+ logger.error("It's been " + ACCESS_ERROR_THRESHOLD + " ms since last update. Auto-failing the job.");
+ }
}
}
else {
// If it's not finished, and not running, we will fail it and clean up.
if (status.equals("notfound")) {
logger.error("Flow " + exFlow.getExecutionId() + " is running, but the Executor can't find it.");
- exFlow.setEndTime(System.currentTimeMillis());
- exFlow.setStatus(Status.FAILED);
+ if (System.currentTimeMillis() - exFlow.getLastCheckedTime() > ACCESS_ERROR_THRESHOLD) {
+ exFlow.setStatus(Status.FAILED);
+ logger.error("It's been " + ACCESS_ERROR_THRESHOLD + " ms since last update. Auto-failing the job.");
+ }
+ }
+ else {
+ exFlow.setLastCheckedTime(System.currentTimeMillis());
}
}
src/java/azkaban/executor/FlowRunner.java 82(+52 -30)
diff --git a/src/java/azkaban/executor/FlowRunner.java b/src/java/azkaban/executor/FlowRunner.java
index 30516ae..0ffb5a1 100644
--- a/src/java/azkaban/executor/FlowRunner.java
+++ b/src/java/azkaban/executor/FlowRunner.java
@@ -3,9 +3,9 @@ package azkaban.executor;
import java.io.File;
import java.io.IOException;
import java.util.HashMap;
-import java.util.HashSet;
import java.util.Map;
import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
@@ -39,19 +39,20 @@ public class FlowRunner extends EventHandler implements Runnable {
private BlockingQueue<JobRunner> jobsToRun = new LinkedBlockingQueue<JobRunner>();
private int numThreads = NUM_CONCURRENT_THREADS;
private boolean cancelled = true;
-
- private Map<String, JobRunner> jobRunnersMap;
+
+ private Map<String, JobRunner> runningJobs;
private JobRunnerEventListener listener;
private Map<String, Props> sharedProps = new HashMap<String, Props>();
private Map<String, Props> outputProps = new HashMap<String, Props>();
private File basePath;
private AtomicInteger commitCount = new AtomicInteger(0);
- private HashSet<String> finalNodes = new HashSet<String>();
private Logger logger;
private Layout loggerLayout = DEFAULT_LAYOUT;
private Appender flowAppender;
+ private Thread currentThread;
+
public enum FailedFlowOptions {
FINISH_RUNNING_JOBS,
KILL_ALL
@@ -63,7 +64,7 @@ public class FlowRunner extends EventHandler implements Runnable {
this.flow = flow;
this.basePath = new File(flow.getExecutionPath());
this.executorService = Executors.newFixedThreadPool(numThreads);
- this.jobRunnersMap = new HashMap<String, JobRunner>();
+ this.runningJobs = new ConcurrentHashMap<String, JobRunner>();
this.listener = new JobRunnerEventListener(this);
createLogger();
@@ -99,19 +100,18 @@ public class FlowRunner extends EventHandler implements Runnable {
public void cancel() {
logger.info("Cancel Invoked");
- finalNodes.clear();
cancelled = true;
executorService.shutdownNow();
// Loop through job runners
- for (JobRunner runner: jobRunnersMap.values()) {
+ for (JobRunner runner: runningJobs.values()) {
if (runner.getStatus() == Status.WAITING || runner.getStatus() == Status.RUNNING) {
runner.cancel();
}
}
-
- this.notify();
+
+ flow.setStatus(Status.KILLED);
}
public boolean isCancelled() {
@@ -131,6 +131,8 @@ public class FlowRunner extends EventHandler implements Runnable {
@Override
public void run() {
+ currentThread = Thread.currentThread();
+
flow.setStatus(Status.RUNNING);
flow.setStartTime(System.currentTimeMillis());
logger.info("Starting Flow");
@@ -156,52 +158,58 @@ public class FlowRunner extends EventHandler implements Runnable {
ExecutableNode node = flow.getExecutableNode(startNode);
JobRunner jobRunner = createJobRunner(node, null);
jobsToRun.add(jobRunner);
+ runningJobs.put(startNode, jobRunner);
}
} catch (IOException e) {
logger.error("Starting job queueing failed due to " + e.getMessage());
flow.setStatus(Status.FAILED);
jobsToRun.clear();
+ runningJobs.clear();
logger.error("Exiting Prematurely.");
this.fireEventListeners(Event.create(this, Type.FLOW_FINISHED));
return;
}
- // When this is empty, we will stop.
- finalNodes.addAll(flow.getEndNodes());
-
// Main loop
- while(!finalNodes.isEmpty()) {
+ while(!runningJobs.isEmpty()) {
JobRunner runner = null;
try {
- runner = jobsToRun.take();
+ runner = jobsToRun.poll(5, TimeUnit.MINUTES);
} catch (InterruptedException e) {
+ logger.info("FlowRunner thread has been interrupted.");
+ if (runningJobs.isEmpty()) {
+ break;
+ }
+ else {
+ continue;
+ }
}
- if (!finalNodes.isEmpty() && runner != null) {
+ if (runner != null) {
try {
ExecutableNode node = runner.getNode();
node.setStatus(Status.WAITING);
executorService.submit(runner);
logger.info("Job Started " + node.getId());
- finalNodes.remove(node.getId());
} catch (RejectedExecutionException e) {
// Should reject if I shutdown executor.
break;
}
- }
-
- // Just to make sure we back off on the flooding.
- synchronized (this) {
- try {
- wait(5);
- } catch (InterruptedException e) {
-
+
+ // Just to make sure we back off so we don't flood.
+ synchronized (this) {
+ try {
+ wait(5);
+ } catch (InterruptedException e) {
+
+ }
}
}
}
logger.info("Finishing up flow. Awaiting Termination");
executorService.shutdown();
+
while (executorService.isTerminated()) {
try {
executorService.awaitTermination(1, TimeUnit.SECONDS);
@@ -216,6 +224,10 @@ public class FlowRunner extends EventHandler implements Runnable {
logger.info("Flow finished successfully in " + (flow.getEndTime() - flow.getStartTime()) + " ms.");
flow.setStatus(Status.SUCCEEDED);
}
+ else if (flow.getStatus() == Status.KILLED) {
+ logger.info("Flow was killed in " + (flow.getEndTime() - flow.getStartTime()) + " ms.");
+ flow.setStatus(Status.KILLED);
+ }
else {
logger.info("Flow finished with failures in " + (flow.getEndTime() - flow.getStartTime()) + " ms.");
flow.setStatus(Status.FAILED);
@@ -245,7 +257,6 @@ public class FlowRunner extends EventHandler implements Runnable {
JobRunner jobRunner = new JobRunner(node, jobProps, basePath);
jobRunner.addListener(listener);
- jobRunnersMap.put(node.getId(), jobRunner);
return jobRunner;
}
@@ -274,6 +285,10 @@ public class FlowRunner extends EventHandler implements Runnable {
}
}
+ private void interrupt() {
+ currentThread.interrupt();
+ }
+
private void handleSucceededJob(ExecutableNode node) {
for(String dependent: node.getOutNodes()) {
ExecutableNode dependentNode = flow.getExecutableNode(dependent);
@@ -306,15 +321,18 @@ public class FlowRunner extends EventHandler implements Runnable {
try {
runner = this.createJobRunner(dependentNode, previousOutput);
} catch (IOException e) {
- System.err.println("Failed due to " + e.getMessage());
+ logger.error("JobRunner creation failed due to " + e.getMessage());
dependentNode.setStatus(Status.FAILED);
handleFailedJob(dependentNode);
return;
}
-
+
+ runningJobs.put(dependentNode.getId(), runner);
jobsToRun.add(runner);
}
}
+
+ runningJobs.remove(node.getId());
}
private void handleFailedJob(ExecutableNode node) {
@@ -324,16 +342,16 @@ public class FlowRunner extends EventHandler implements Runnable {
switch (failedOptions) {
// We finish running current jobs and then fail. Do not accept new jobs.
case FINISH_RUNNING_JOBS:
- finalNodes.clear();
+ runningJobs.clear();
executorService.shutdown();
- this.notify();
break;
// We kill all running jobs and fail immediately
case KILL_ALL:
this.cancel();
break;
}
-
+
+ runningJobs.remove(node.getId());
}
private class JobRunnerEventListener implements EventListener {
@@ -364,6 +382,10 @@ public class FlowRunner extends EventHandler implements Runnable {
}
flowRunner.commitFlow();
+ if (runningJobs.isEmpty()) {
+ System.out.println("There are no more running jobs.");
+ flowRunner.interrupt();
+ }
}
}
}
\ No newline at end of file
diff --git a/src/java/azkaban/utils/ExecutableFlowLoader.java b/src/java/azkaban/utils/ExecutableFlowLoader.java
index 3f862b4..3130c55 100644
--- a/src/java/azkaban/utils/ExecutableFlowLoader.java
+++ b/src/java/azkaban/utils/ExecutableFlowLoader.java
@@ -102,6 +102,7 @@ public class ExecutableFlowLoader {
*/
public static boolean updateFlowStatusFromFile(File exDir, ExecutableFlow flow) throws ExecutorManagerException {
File file = getLatestExecutableFlowDir(exDir);
+ System.out.println("Loading from: " + file);
int number = getFlowUpdateNumber(file);
if (flow.getUpdateNumber() >= number) {
return false;
src/web/css/azkaban.css 51(+50 -1)
diff --git a/src/web/css/azkaban.css b/src/web/css/azkaban.css
index 02558c5..aec8164 100644
--- a/src/web/css/azkaban.css
+++ b/src/web/css/azkaban.css
@@ -1148,6 +1148,7 @@ tr:hover td {
margin: 4px 5px;
border-bottom: 1px solid #EEE;
cursor: pointer;
+ background-position: 16px 0px;
}
#list ul li:hover{
@@ -1164,6 +1165,30 @@ tr:hover td {
opacity: 0.3;
}
+#list ul li.DISABLED {
+ opacity: 0.3;
+}
+
+#list ul li.DISABLED .icon {
+ background-position: 16px 0px;
+}
+
+#list ul li.READY .icon {
+ background-position: 16px 0px;
+}
+
+#list ul li.RUNNING .icon {
+ background-position: 32px 0px;
+}
+
+#list ul li.SUCCEEDED .icon {
+ background-position: 48px 0px;
+}
+
+#list ul li.FAILED .icon {
+ background-position: 0px 0px;
+}
+
#list ul li a {
font-size: 10pt;
margin-left: 5px;
@@ -1174,6 +1199,14 @@ tr:hover td {
color: black;
}
+#list ul li .icon {
+ float: left;
+ width: 16px;
+ height: 16px;
+ background-image: url("./images/dot-icon.png");
+ background-position: 16px 0px;
+}
+
table.parameters tr td.first {
font-weight: bold;
}
@@ -1237,7 +1270,11 @@ svg .node:hover {
}
svg .node:hover .backboard {
- opacity: 0.6;
+ opacity: 0.7;
+}
+
+svg .selected .backboard {
+ opacity: 0.4;
}
svg .node circle {
@@ -1254,6 +1291,10 @@ svg .node:hover text {
fill: #009FC9;
}
+svg .selected text {
+ fill: #338AB0;
+}
+
svg .selected circle {
stroke: #009FC9;
stroke-width: 4;
@@ -1263,6 +1304,10 @@ svg .READY circle {
fill: #CCC;
}
+svg .RUNNING circle {
+ fill: #009FC9;
+}
+
svg .FAILED circle {
fill: #CC0000;
}
@@ -1271,6 +1316,10 @@ svg .SUCCEEDED circle {
fill: #00CC33;
}
+svg .DISABLED {
+ opacity: 0.3;
+}
+
span.sublabel {
font-size: 8pt;
margin-left: 12px;
src/web/css/images/dot-icon.png 0(+0 -0)
diff --git a/src/web/css/images/dot-icon.png b/src/web/css/images/dot-icon.png
new file mode 100644
index 0000000..d54afd0
Binary files /dev/null and b/src/web/css/images/dot-icon.png differ
src/web/js/azkaban.exflow.view.js 15(+10 -5)
diff --git a/src/web/js/azkaban.exflow.view.js b/src/web/js/azkaban.exflow.view.js
index d05976e..0bd2f68 100644
--- a/src/web/js/azkaban.exflow.view.js
+++ b/src/web/js/azkaban.exflow.view.js
@@ -168,6 +168,11 @@ azkaban.JobListView = Backbone.View.extend({
$(ul).attr("id", "jobs");
for (var i = 0; i < nodeArray.length; ++i) {
var li = document.createElement("li");
+
+ var iconDiv = document.createElement("div");
+ $(iconDiv).addClass("icon");
+ li.appendChild(iconDiv);
+
var a = document.createElement("a");
$(a).text(nodeArray[i].id);
li.appendChild(a);
@@ -364,8 +369,8 @@ azkaban.SvgGraphView = Backbone.View.extend({
var updateNode = updateData.nodes[i];
var g = document.getElementById(updateNode.id);
- for (var i = 0; i < statusList.length; ++i) {
- var status = statusList[i];
+ for (var j = 0; j < statusList.length; ++j) {
+ var status = statusList[j];
removeClass(g, status);
}
@@ -525,7 +530,8 @@ var updaterFunction = function() {
var data = graphModel.get("data");
if (data.status != "SUCCEEDED" && data.status != "FAILED" ) {
- setTimeout(function() {updaterFunction();}, 30000);
+ // 10 sec updates
+ setTimeout(function() {updaterFunction();}, 10000);
}
else {
console.log("Flow finished, so no more updates");
@@ -581,9 +587,8 @@ $(function() {
}
graphModel.set({nodeMap: nodeMap});
+ setTimeout(function() {updaterFunction()}, 2000);
},
"json"
);
-
- setTimeout(function() {updaterFunction()}, 1000);
});