azkaban-uncached

Details

diff --git a/src/java/azkaban/execapp/FlowRunner.java b/src/java/azkaban/execapp/FlowRunner.java
index 1a91438..c912ad9 100644
--- a/src/java/azkaban/execapp/FlowRunner.java
+++ b/src/java/azkaban/execapp/FlowRunner.java
@@ -290,6 +290,7 @@ public class FlowRunner extends EventHandler implements Runnable {
 		for (String startNode : flow.getStartNodes()) {
 			ExecutableNode node = flow.getExecutableNode(startNode);
 			JobRunner jobRunner = createJobRunner(node, null);
+			logger.info("Adding initial job " + startNode + " to run queue.");
 			jobsToRun.add(jobRunner);
 		}
 	}
@@ -344,7 +345,7 @@ public class FlowRunner extends EventHandler implements Runnable {
 		prop.setParent(parentProps);
 		
 		// should have one prop with system secrets, the other user level props
-		JobRunner jobRunner = new JobRunner(node, prop, path.getParentFile(), executorLoader, jobtypeManager);
+		JobRunner jobRunner = new JobRunner(node, prop, path.getParentFile(), executorLoader, jobtypeManager, logger);
 		jobRunner.addListener(listener);
 
 		return jobRunner;
@@ -665,6 +666,7 @@ public class FlowRunner extends EventHandler implements Runnable {
 	 * @param node
 	 */
 	private synchronized void queueNextJobs(ExecutableNode finishedNode) {
+		String trigger = finishedNode.getAttempt() > 0 ? finishedNode.getJobId() + "." + finishedNode.getAttempt() : finishedNode.getJobId();
 		for (String dependent : finishedNode.getOutNodes()) {
 			ExecutableNode dependentNode = flow.getExecutableNode(dependent);
 			queueNextJob(dependentNode, finishedNode.getJobId());
diff --git a/src/java/azkaban/execapp/JobRunner.java b/src/java/azkaban/execapp/JobRunner.java
index 0ab138b..e34132e 100644
--- a/src/java/azkaban/execapp/JobRunner.java
+++ b/src/java/azkaban/execapp/JobRunner.java
@@ -49,6 +49,8 @@ public class JobRunner extends EventHandler implements Runnable {
 
 	private Logger logger = null;
 	private Layout loggerLayout = DEFAULT_LAYOUT;
+	private Logger flowLogger = null;
+	
 	private Appender jobAppender;
 	private File logFile;
 	
@@ -60,13 +62,14 @@ public class JobRunner extends EventHandler implements Runnable {
 	
 	private final JobTypeManager jobtypeManager;
 
-	public JobRunner(ExecutableNode node, Props props, File workingDir, ExecutorLoader loader, JobTypeManager jobtypeManager) {
+	public JobRunner(ExecutableNode node, Props props, File workingDir, ExecutorLoader loader, JobTypeManager jobtypeManager, Logger flowLogger) {
 		this.props = props;
 		this.node = node;
 		this.workingDir = workingDir;
 		this.executionId = node.getExecutionId();
 		this.loader = loader;
 		this.jobtypeManager = jobtypeManager;
+		this.flowLogger = flowLogger;
 	}
 	
 	public ExecutableNode getNode() {
@@ -95,7 +98,7 @@ public class JobRunner extends EventHandler implements Runnable {
 				jobAppender = fileAppender;
 				logger.addAppender(jobAppender);
 			} catch (IOException e) {
-				logger.error("Could not open log file in " + workingDir, e);
+				flowLogger.error("Could not open log file in " + workingDir + " for job " + node.getJobId(), e);
 			}
 		}
 	}
@@ -112,7 +115,7 @@ public class JobRunner extends EventHandler implements Runnable {
 			node.setUpdateTime(System.currentTimeMillis());
 			loader.updateExecutableNode(node);
 		} catch (ExecutorManagerException e) {
-			logger.error("Error writing node properties", e);
+			flowLogger.error("Could not update job properties in db for " + node.getJobId(), e);
 		}
 	}
 	
@@ -161,9 +164,12 @@ public class JobRunner extends EventHandler implements Runnable {
 				try {
 					loader.uploadLogFile(executionId, node.getJobId(), node.getAttempt(), logFile);
 				} catch (ExecutorManagerException e) {
-					System.err.println("Error writing out logs for job " + node.getJobId());
+					flowLogger.error("Error writing out logs for job " + node.getJobId(), e);
 				}
 			}
+			else {
+				flowLogger.info("Log file for job " + node.getJobId() + " is null");
+			}
 		}
 		fireEvent(Event.create(this, Type.JOB_FINISHED));
 	}
diff --git a/unit/java/azkaban/test/execapp/JobRunnerTest.java b/unit/java/azkaban/test/execapp/JobRunnerTest.java
index c11b934..41d44f3 100644
--- a/unit/java/azkaban/test/execapp/JobRunnerTest.java
+++ b/unit/java/azkaban/test/execapp/JobRunnerTest.java
@@ -6,6 +6,7 @@ import java.io.IOException;
 import junit.framework.Assert;
 
 import org.apache.commons.io.FileUtils;
+import org.apache.log4j.Logger;
 import org.junit.After;
 import org.junit.Before;
 import org.junit.Test;
@@ -26,6 +27,7 @@ import azkaban.utils.Props;
 public class JobRunnerTest {
 	private File workingDir;
 	private JobTypeManager jobtypeManager;
+	private Logger logger = Logger.getLogger("JobRunnerTest");
 	
 	public JobRunnerTest() {
 
@@ -260,7 +262,7 @@ public class JobRunnerTest {
 		
 		Props props = createProps(time, fail);
 		
-		JobRunner runner = new JobRunner(node, props, workingDir, loader, jobtypeManager);
+		JobRunner runner = new JobRunner(node, props, workingDir, loader, jobtypeManager, logger);
 
 		runner.addListener(listener);
 		return runner;