azkaban-uncached
Changes
src/java/azkaban/execapp/JobRunner.java 14(+10 -4)
Details
diff --git a/src/java/azkaban/execapp/FlowRunner.java b/src/java/azkaban/execapp/FlowRunner.java
index 1a91438..c912ad9 100644
--- a/src/java/azkaban/execapp/FlowRunner.java
+++ b/src/java/azkaban/execapp/FlowRunner.java
@@ -290,6 +290,7 @@ public class FlowRunner extends EventHandler implements Runnable {
for (String startNode : flow.getStartNodes()) {
ExecutableNode node = flow.getExecutableNode(startNode);
JobRunner jobRunner = createJobRunner(node, null);
+ logger.info("Adding initial job " + startNode + " to run queue.");
jobsToRun.add(jobRunner);
}
}
@@ -344,7 +345,7 @@ public class FlowRunner extends EventHandler implements Runnable {
prop.setParent(parentProps);
// should have one prop with system secrets, the other user level props
- JobRunner jobRunner = new JobRunner(node, prop, path.getParentFile(), executorLoader, jobtypeManager);
+ JobRunner jobRunner = new JobRunner(node, prop, path.getParentFile(), executorLoader, jobtypeManager, logger);
jobRunner.addListener(listener);
return jobRunner;
@@ -665,6 +666,7 @@ public class FlowRunner extends EventHandler implements Runnable {
* @param node
*/
private synchronized void queueNextJobs(ExecutableNode finishedNode) {
+ String trigger = finishedNode.getAttempt() > 0 ? finishedNode.getJobId() + "." + finishedNode.getAttempt() : finishedNode.getJobId();
for (String dependent : finishedNode.getOutNodes()) {
ExecutableNode dependentNode = flow.getExecutableNode(dependent);
queueNextJob(dependentNode, finishedNode.getJobId());
src/java/azkaban/execapp/JobRunner.java 14(+10 -4)
diff --git a/src/java/azkaban/execapp/JobRunner.java b/src/java/azkaban/execapp/JobRunner.java
index 0ab138b..e34132e 100644
--- a/src/java/azkaban/execapp/JobRunner.java
+++ b/src/java/azkaban/execapp/JobRunner.java
@@ -49,6 +49,8 @@ public class JobRunner extends EventHandler implements Runnable {
private Logger logger = null;
private Layout loggerLayout = DEFAULT_LAYOUT;
+ private Logger flowLogger = null;
+
private Appender jobAppender;
private File logFile;
@@ -60,13 +62,14 @@ public class JobRunner extends EventHandler implements Runnable {
private final JobTypeManager jobtypeManager;
- public JobRunner(ExecutableNode node, Props props, File workingDir, ExecutorLoader loader, JobTypeManager jobtypeManager) {
+ public JobRunner(ExecutableNode node, Props props, File workingDir, ExecutorLoader loader, JobTypeManager jobtypeManager, Logger flowLogger) {
this.props = props;
this.node = node;
this.workingDir = workingDir;
this.executionId = node.getExecutionId();
this.loader = loader;
this.jobtypeManager = jobtypeManager;
+ this.flowLogger = flowLogger;
}
public ExecutableNode getNode() {
@@ -95,7 +98,7 @@ public class JobRunner extends EventHandler implements Runnable {
jobAppender = fileAppender;
logger.addAppender(jobAppender);
} catch (IOException e) {
- logger.error("Could not open log file in " + workingDir, e);
+ flowLogger.error("Could not open log file in " + workingDir + " for job " + node.getJobId(), e);
}
}
}
@@ -112,7 +115,7 @@ public class JobRunner extends EventHandler implements Runnable {
node.setUpdateTime(System.currentTimeMillis());
loader.updateExecutableNode(node);
} catch (ExecutorManagerException e) {
- logger.error("Error writing node properties", e);
+ flowLogger.error("Could not update job properties in db for " + node.getJobId(), e);
}
}
@@ -161,9 +164,12 @@ public class JobRunner extends EventHandler implements Runnable {
try {
loader.uploadLogFile(executionId, node.getJobId(), node.getAttempt(), logFile);
} catch (ExecutorManagerException e) {
- System.err.println("Error writing out logs for job " + node.getJobId());
+ flowLogger.error("Error writing out logs for job " + node.getJobId(), e);
}
}
+ else {
+ flowLogger.info("Log file for job " + node.getJobId() + " is null");
+ }
}
fireEvent(Event.create(this, Type.JOB_FINISHED));
}
diff --git a/unit/java/azkaban/test/execapp/JobRunnerTest.java b/unit/java/azkaban/test/execapp/JobRunnerTest.java
index c11b934..41d44f3 100644
--- a/unit/java/azkaban/test/execapp/JobRunnerTest.java
+++ b/unit/java/azkaban/test/execapp/JobRunnerTest.java
@@ -6,6 +6,7 @@ import java.io.IOException;
import junit.framework.Assert;
import org.apache.commons.io.FileUtils;
+import org.apache.log4j.Logger;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
@@ -26,6 +27,7 @@ import azkaban.utils.Props;
public class JobRunnerTest {
private File workingDir;
private JobTypeManager jobtypeManager;
+ private Logger logger = Logger.getLogger("JobRunnerTest");
public JobRunnerTest() {
@@ -260,7 +262,7 @@ public class JobRunnerTest {
Props props = createProps(time, fail);
- JobRunner runner = new JobRunner(node, props, workingDir, loader, jobtypeManager);
+ JobRunner runner = new JobRunner(node, props, workingDir, loader, jobtypeManager, logger);
runner.addListener(listener);
return runner;