azkaban-developers

Details

diff --git a/src/java/azkaban/executor/ExecutableFlow.java b/src/java/azkaban/executor/ExecutableFlow.java
index 2f3fbba..d358b02 100644
--- a/src/java/azkaban/executor/ExecutableFlow.java
+++ b/src/java/azkaban/executor/ExecutableFlow.java
@@ -18,6 +18,7 @@ public class ExecutableFlow {
 	private String flowId;
 	private String projectId;
 	private String executionPath;
+	private long lastCheckedTime;
 	
 	private HashMap<String, FlowProps> flowProps = new HashMap<String, FlowProps>();
 	private HashMap<String, ExecutableNode> executableNodes = new HashMap<String, ExecutableNode>();;
@@ -31,6 +32,7 @@ public class ExecutableFlow {
 	private int updateNumber = 0;
 	private Status flowStatus = Status.UNKNOWN;
 	private String submitUser;
+	private boolean submitted = false;
 	
 	public enum Status {
 		FAILED, FAILED_FINISHING, SUCCEEDED, RUNNING, WAITING, KILLED, DISABLED, READY, UNKNOWN
@@ -47,6 +49,14 @@ public class ExecutableFlow {
 	public ExecutableFlow() {
 	}
 	
+	public long getLastCheckedTime() {
+		return lastCheckedTime;
+	}
+	
+	public void setLastCheckedTime(long lastCheckedTime) {
+		this.lastCheckedTime = lastCheckedTime;
+	}
+	
 	public List<ExecutableNode> getExecutableNodes() {
 		return new ArrayList<ExecutableNode>(executableNodes.values());
 	}
@@ -300,6 +310,14 @@ public class ExecutableFlow {
 		this.submitUser = submitUser;
 	}
 
+	public boolean isSubmitted() {
+		return submitted;
+	}
+
+	public void setSubmitted(boolean submitted) {
+		this.submitted = submitted;
+	}
+
 	public static class ExecutableNode {
 		private String id;
 
diff --git a/src/java/azkaban/executor/ExecutorManager.java b/src/java/azkaban/executor/ExecutorManager.java
index 503efc4..54c61d5 100644
--- a/src/java/azkaban/executor/ExecutorManager.java
+++ b/src/java/azkaban/executor/ExecutorManager.java
@@ -41,6 +41,7 @@ import azkaban.webapp.AzkabanExecutorServer;
  */
 public class ExecutorManager {
 	private static Logger logger = Logger.getLogger(ExecutorManager.class);
+	private static final long ACCESS_ERROR_THRESHOLD = 60000;
 	private File basePath;
 
 	private AtomicInteger counter = new AtomicInteger();
@@ -124,6 +125,8 @@ public class ExecutorManager {
 				ExecutionReference reference = ExecutionReference.readFromDirectory(activeFlowDir);
 				
 				ExecutableFlow flow = this.getFlowFromReference(reference);
+				flow.setLastCheckedTime(System.currentTimeMillis());
+				flow.setSubmitted(true);
 				if (flow != null) {
 					runningFlows.put(flow.getExecutionId(), flow);
 				}
@@ -237,6 +240,7 @@ public class ExecutorManager {
 		writeResourceFile(executionDir, flow);
 		ExecutableFlowLoader.writeExecutableFlowFile(executionDir, flow, null);
 		addActiveExecutionReference(flow);
+		flow.setLastCheckedTime(System.currentTimeMillis());
 		runningFlows.put(flow.getExecutionId(), flow);
 		
 		logger.info("Setting up " + flow.getExecutionId() + " for execution.");
@@ -277,6 +281,8 @@ public class ExecutorManager {
 			httpclient.getConnectionManager().shutdown();
 		}
 		
+		flow.setLastCheckedTime(System.currentTimeMillis());
+		flow.setSubmitted(true);
 		logger.debug("Submitted Response: " + response);
 	}
 	
@@ -404,11 +410,11 @@ public class ExecutorManager {
 			throw new ExecutorManagerException("Cleaning failed. Resource file " + flowFilename + " parse error.", e);
 		}
 		
+		logger.info("Deleting resources for " + exflow.getFlowId());
 		for (String deletable: deletableResources) {
 			File deleteFile = new File(executionPath, deletable);
 			if (deleteFile.exists()) {
 				if (deleteFile.isDirectory()) {
-					logger.info("Deleting directory " + deleteFile);
 					try {
 						FileUtils.deleteDirectory(deleteFile);
 					} catch (IOException e) {
@@ -416,7 +422,6 @@ public class ExecutorManager {
 					}
 				}
 				else {
-					logger.info("Deleting file " + deleteFile);
 					if(!deleteFile.delete()) {
 						logger.error("Deleting of resource file '" + deleteFile + "' failed.");
 					}
@@ -495,11 +500,15 @@ public class ExecutorManager {
 	
 	private class ExecutingManagerUpdaterThread extends Thread {
 		private boolean shutdown = false;
-		private int updateTimeMs = 100;
+		private int updateTimeMs = 1000;
 		public void run() {
 			while (!shutdown) {
 				ArrayList<ExecutableFlow> flows = new ArrayList<ExecutableFlow>(runningFlows.values());
 				for(ExecutableFlow exFlow : flows) {
+					if (!exFlow.isSubmitted()) {
+						continue;
+					}
+					
 					File executionDir = new File(exFlow.getExecutionPath());
 					
 					if (!executionDir.exists()) {
@@ -550,22 +559,33 @@ public class ExecutorManager {
 							// Cleanup
 							logger.info("Flow " + exFlow.getExecutionId() + " has succeeded. Cleaning Up.");
 							try {
-								cleanFinishedJob(exFlow);
+								ExecutableFlowLoader.updateFlowStatusFromFile(executionDir, exFlow);
+								cleanFinishedJob(exFlow);						
 							} catch (ExecutorManagerException e) {
 								e.printStackTrace();
 								continue;
 							}
+							exFlow.setLastCheckedTime(System.currentTimeMillis());
 						}
 						else {
-							logger.error("Flow " + exFlow.getExecutionId() + " has succeeded, but the Executor says its still running");
+							logger.error("Flow " + exFlow.getExecutionId() + " has succeeded, but the Executor says its still running with msg: " + status);
+							if (System.currentTimeMillis() - exFlow.getLastCheckedTime() > ACCESS_ERROR_THRESHOLD) {
+								exFlow.setStatus(Status.FAILED);
+								logger.error("It's been " + ACCESS_ERROR_THRESHOLD + " ms since last update. Auto-failing the job.");
+							}
 						}
 					}
 					else {
 						// If it's not finished, and not running, we will fail it and clean up.
 						if (status.equals("notfound")) {
 							logger.error("Flow " + exFlow.getExecutionId() + " is running, but the Executor can't find it.");
-							exFlow.setEndTime(System.currentTimeMillis());
-							exFlow.setStatus(Status.FAILED);
+							if (System.currentTimeMillis() - exFlow.getLastCheckedTime() > ACCESS_ERROR_THRESHOLD) {
+								exFlow.setStatus(Status.FAILED);
+								logger.error("It's been " + ACCESS_ERROR_THRESHOLD + " ms since last update. Auto-failing the job.");
+							}
+						}
+						else {
+							exFlow.setLastCheckedTime(System.currentTimeMillis());
 						}
 					}
 					
diff --git a/src/java/azkaban/executor/FlowRunner.java b/src/java/azkaban/executor/FlowRunner.java
index 30516ae..0ffb5a1 100644
--- a/src/java/azkaban/executor/FlowRunner.java
+++ b/src/java/azkaban/executor/FlowRunner.java
@@ -3,9 +3,9 @@ package azkaban.executor;
 import java.io.File;
 import java.io.IOException;
 import java.util.HashMap;
-import java.util.HashSet;
 import java.util.Map;
 import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.LinkedBlockingQueue;
@@ -39,19 +39,20 @@ public class FlowRunner extends EventHandler implements Runnable {
 	private BlockingQueue<JobRunner> jobsToRun = new LinkedBlockingQueue<JobRunner>();
 	private int numThreads = NUM_CONCURRENT_THREADS;
 	private boolean cancelled = true;
-	
-	private Map<String, JobRunner> jobRunnersMap;
+
+	private Map<String, JobRunner> runningJobs;
 	private JobRunnerEventListener listener;
 	private Map<String, Props> sharedProps = new HashMap<String, Props>();
 	private Map<String, Props> outputProps = new HashMap<String, Props>();
 	private File basePath;
 	private AtomicInteger commitCount = new AtomicInteger(0);
-	private HashSet<String> finalNodes = new HashSet<String>();
 
 	private Logger logger;
 	private Layout loggerLayout = DEFAULT_LAYOUT;
 	private Appender flowAppender;
 	
+	private Thread currentThread;
+	
 	public enum FailedFlowOptions {
 		FINISH_RUNNING_JOBS,
 		KILL_ALL
@@ -63,7 +64,7 @@ public class FlowRunner extends EventHandler implements Runnable {
 		this.flow = flow;
 		this.basePath = new File(flow.getExecutionPath());
 		this.executorService = Executors.newFixedThreadPool(numThreads);
-		this.jobRunnersMap = new HashMap<String, JobRunner>();
+		this.runningJobs = new ConcurrentHashMap<String, JobRunner>();
 		this.listener = new JobRunnerEventListener(this);
 		
 		createLogger();
@@ -99,19 +100,18 @@ public class FlowRunner extends EventHandler implements Runnable {
 	
 	public void cancel() {
 		logger.info("Cancel Invoked");
-		finalNodes.clear();
 		cancelled = true;
 		
 		executorService.shutdownNow();
 		
 		// Loop through job runners
-		for (JobRunner runner: jobRunnersMap.values()) {
+		for (JobRunner runner: runningJobs.values()) {
 			if (runner.getStatus() == Status.WAITING || runner.getStatus() == Status.RUNNING) {
 				runner.cancel();
 			}
 		}
-		
-		this.notify();
+
+		flow.setStatus(Status.KILLED);
 	}
 	
 	public boolean isCancelled() {
@@ -131,6 +131,8 @@ public class FlowRunner extends EventHandler implements Runnable {
 	
 	@Override
 	public void run() {
+		currentThread = Thread.currentThread();
+		
 		flow.setStatus(Status.RUNNING);
 		flow.setStartTime(System.currentTimeMillis());
 		logger.info("Starting Flow");
@@ -156,52 +158,58 @@ public class FlowRunner extends EventHandler implements Runnable {
 				ExecutableNode node = flow.getExecutableNode(startNode);
 				JobRunner jobRunner = createJobRunner(node, null);
 				jobsToRun.add(jobRunner);
+				runningJobs.put(startNode, jobRunner);
 			}
 		} catch (IOException e) {
 			logger.error("Starting job queueing failed due to " + e.getMessage());
 			flow.setStatus(Status.FAILED);
 			jobsToRun.clear();
+			runningJobs.clear();
 			logger.error("Exiting Prematurely.");
 			this.fireEventListeners(Event.create(this, Type.FLOW_FINISHED));
 			return;
 		}
 		
-		// When this is empty, we will stop.
-		finalNodes.addAll(flow.getEndNodes());
-		
 		// Main loop
-		while(!finalNodes.isEmpty()) {
+		while(!runningJobs.isEmpty()) {
 			JobRunner runner = null;
 			try {
-				runner = jobsToRun.take();
+				runner = jobsToRun.poll(5, TimeUnit.MINUTES);
 			} catch (InterruptedException e) {
+				logger.info("FlowRunner thread has been interrupted.");
+				if (runningJobs.isEmpty()) {
+					break;
+				}
+				else {
+					continue;
+				}
 			}
 			
-			if (!finalNodes.isEmpty() && runner != null) {
+			if (runner != null) {
 				try {
 					ExecutableNode node = runner.getNode();
 					node.setStatus(Status.WAITING);
 					executorService.submit(runner);
 					logger.info("Job Started " + node.getId());
-					finalNodes.remove(node.getId());
 				} catch (RejectedExecutionException e) {
 					// Should reject if I shutdown executor.
 					break;
 				}
-			}
-			
-			// Just to make sure we back off on the flooding.
-			synchronized (this) {
-				try {
-					wait(5);
-				} catch (InterruptedException e) {
-					
+				
+				// Just to make sure we back off so we don't flood.
+				synchronized (this) {
+					try {
+						wait(5);
+					} catch (InterruptedException e) {
+						
+					}
 				}
 			}
 		}
 		
 		logger.info("Finishing up flow. Awaiting Termination");
 		executorService.shutdown();
+		
 		while (executorService.isTerminated()) {
 			try {
 				executorService.awaitTermination(1, TimeUnit.SECONDS);
@@ -216,6 +224,10 @@ public class FlowRunner extends EventHandler implements Runnable {
 			logger.info("Flow finished successfully in " + (flow.getEndTime() - flow.getStartTime()) + " ms.");
 			flow.setStatus(Status.SUCCEEDED);
 		}
+		else if (flow.getStatus() == Status.KILLED) {
+			logger.info("Flow was killed in " + (flow.getEndTime() - flow.getStartTime()) + " ms.");
+			flow.setStatus(Status.KILLED);
+		}
 		else {
 			logger.info("Flow finished with failures in " + (flow.getEndTime() - flow.getStartTime()) + " ms.");
 			flow.setStatus(Status.FAILED);
@@ -245,7 +257,6 @@ public class FlowRunner extends EventHandler implements Runnable {
 		
 		JobRunner jobRunner = new JobRunner(node, jobProps, basePath);
 		jobRunner.addListener(listener);
-		jobRunnersMap.put(node.getId(), jobRunner);
 		
 		return jobRunner;
 	}
@@ -274,6 +285,10 @@ public class FlowRunner extends EventHandler implements Runnable {
 		}
 	}
 	
+	private void interrupt() {
+		currentThread.interrupt();
+	}
+	
 	private void handleSucceededJob(ExecutableNode node) {
 		for(String dependent: node.getOutNodes()) {
 			ExecutableNode dependentNode = flow.getExecutableNode(dependent);
@@ -306,15 +321,18 @@ public class FlowRunner extends EventHandler implements Runnable {
 				try {
 					runner = this.createJobRunner(dependentNode, previousOutput);
 				} catch (IOException e) {
-					System.err.println("Failed due to " + e.getMessage());
+					logger.error("JobRunner creation failed due to " + e.getMessage());
 					dependentNode.setStatus(Status.FAILED);
 					handleFailedJob(dependentNode);
 					return;
 				}
-				
+			
+				runningJobs.put(dependentNode.getId(), runner);
 				jobsToRun.add(runner);
 			}
 		}
+
+		runningJobs.remove(node.getId());
 	}
 	
 	private void handleFailedJob(ExecutableNode node) {
@@ -324,16 +342,16 @@ public class FlowRunner extends EventHandler implements Runnable {
 		switch (failedOptions) {
 			// We finish running current jobs and then fail. Do not accept new jobs.
 			case FINISH_RUNNING_JOBS:
-				finalNodes.clear();
+				runningJobs.clear();
 				executorService.shutdown();
-				this.notify();
 			break;
 			// We kill all running jobs and fail immediately
 			case KILL_ALL:
 				this.cancel();
 			break;
 		}
-
+		
+		runningJobs.remove(node.getId());
 	}
 	
 	private class JobRunnerEventListener implements EventListener {
@@ -364,6 +382,10 @@ public class FlowRunner extends EventHandler implements Runnable {
 			}
 			
 			flowRunner.commitFlow();
+			if (runningJobs.isEmpty()) {
+				System.out.println("There are no more running jobs.");
+				flowRunner.interrupt();
+			}
 		}
 	}
 }
\ No newline at end of file
diff --git a/src/java/azkaban/utils/ExecutableFlowLoader.java b/src/java/azkaban/utils/ExecutableFlowLoader.java
index 3f862b4..3130c55 100644
--- a/src/java/azkaban/utils/ExecutableFlowLoader.java
+++ b/src/java/azkaban/utils/ExecutableFlowLoader.java
@@ -102,6 +102,7 @@ public class ExecutableFlowLoader {
 	 */
 	public static boolean updateFlowStatusFromFile(File exDir, ExecutableFlow flow) throws ExecutorManagerException {
 		File file = getLatestExecutableFlowDir(exDir);
+		System.out.println("Loading from: " + file);
 		int number =  getFlowUpdateNumber(file);
 		if (flow.getUpdateNumber() >= number) {
 			return false;
diff --git a/src/web/css/azkaban.css b/src/web/css/azkaban.css
index 02558c5..aec8164 100644
--- a/src/web/css/azkaban.css
+++ b/src/web/css/azkaban.css
@@ -1148,6 +1148,7 @@ tr:hover td {
 	margin: 4px 5px;
 	border-bottom: 1px solid #EEE;
 	cursor: pointer;
+	background-position: 16px 0px;
 }
 
 #list ul li:hover{
@@ -1164,6 +1165,30 @@ tr:hover td {
 	opacity: 0.3;
 }
 
+#list ul li.DISABLED {
+	opacity: 0.3;
+}
+
+#list ul li.DISABLED .icon {
+	background-position: 16px 0px;
+}
+
+#list ul li.READY .icon {
+	background-position: 16px 0px;
+}
+
+#list ul li.RUNNING .icon {
+	background-position: 32px 0px;
+}
+
+#list ul li.SUCCEEDED .icon {
+	background-position: 48px 0px;
+}
+
+#list ul li.FAILED .icon {
+	background-position: 0px 0px;
+}
+
 #list ul li a {
 	font-size: 10pt;
 	margin-left: 5px;
@@ -1174,6 +1199,14 @@ tr:hover td {
 	color: black;
 }
 
+#list ul li .icon {
+	float: left;
+	width: 16px;
+	height: 16px;
+	background-image: url("./images/dot-icon.png");
+	background-position: 16px 0px;
+}
+
 table.parameters tr td.first {
 	font-weight: bold;
 }
@@ -1237,7 +1270,11 @@ svg .node:hover {
 }
 
 svg .node:hover .backboard {
-	opacity: 0.6;
+	opacity: 0.7;
+}
+
+svg .selected .backboard {
+	opacity: 0.4;
 }
 
 svg .node circle {
@@ -1254,6 +1291,10 @@ svg .node:hover text {
 	fill: #009FC9;
 }
 
+svg .selected text {
+	fill: #338AB0;
+}
+
 svg .selected circle {
 	stroke: #009FC9;
 	stroke-width: 4;
@@ -1263,6 +1304,10 @@ svg .READY circle {
 	fill: #CCC;
 }
 
+svg .RUNNING circle {
+	fill: #009FC9;
+}
+
 svg .FAILED circle {
 	fill: #CC0000;
 }
@@ -1271,6 +1316,10 @@ svg .SUCCEEDED circle {
 	fill: #00CC33;
 }
 
+svg .DISABLED {
+	opacity: 0.3;
+}
+
 span.sublabel {
 	font-size: 8pt;
 	margin-left: 12px;
diff --git a/src/web/css/images/dot-icon.png b/src/web/css/images/dot-icon.png
new file mode 100644
index 0000000..d54afd0
Binary files /dev/null and b/src/web/css/images/dot-icon.png differ
diff --git a/src/web/js/azkaban.exflow.view.js b/src/web/js/azkaban.exflow.view.js
index d05976e..0bd2f68 100644
--- a/src/web/js/azkaban.exflow.view.js
+++ b/src/web/js/azkaban.exflow.view.js
@@ -168,6 +168,11 @@ azkaban.JobListView = Backbone.View.extend({
 		$(ul).attr("id", "jobs");
 		for (var i = 0; i < nodeArray.length; ++i) {
 			var li = document.createElement("li");
+			
+			var iconDiv = document.createElement("div");
+			$(iconDiv).addClass("icon");
+			li.appendChild(iconDiv);
+			
 			var a = document.createElement("a");
 			$(a).text(nodeArray[i].id);
 			li.appendChild(a);
@@ -364,8 +369,8 @@ azkaban.SvgGraphView = Backbone.View.extend({
 			var updateNode = updateData.nodes[i];
 			var g = document.getElementById(updateNode.id);
 			
-			for (var i = 0; i < statusList.length; ++i) {
-				var status = statusList[i];
+			for (var j = 0; j < statusList.length; ++j) {
+				var status = statusList[j];
 				removeClass(g, status);
 			}
 			
@@ -525,7 +530,8 @@ var updaterFunction = function() {
 		
 		var data = graphModel.get("data");
 		if (data.status != "SUCCEEDED" && data.status != "FAILED" ) {
-			setTimeout(function() {updaterFunction();}, 30000);
+			// 10 sec updates
+			setTimeout(function() {updaterFunction();}, 10000);
 		}
 		else {
 			console.log("Flow finished, so no more updates");
@@ -581,9 +587,8 @@ $(function() {
 	          }
 	          
 	          graphModel.set({nodeMap: nodeMap});
+	      	  setTimeout(function() {updaterFunction()}, 2000);
 	      },
 	      "json"
 	    );
-
-	setTimeout(function() {updaterFunction()}, 1000);
 });