azkaban-aplcache
Changes
src/java/azkaban/executor/FlowRunner.java 73(+68 -5)
src/java/azkaban/executor/JobRunner.java 67(+54 -13)
Details
diff --git a/src/java/azkaban/executor/ExecutableFlow.java b/src/java/azkaban/executor/ExecutableFlow.java
index 45ad983..2f3fbba 100644
--- a/src/java/azkaban/executor/ExecutableFlow.java
+++ b/src/java/azkaban/executor/ExecutableFlow.java
@@ -70,7 +70,7 @@ public class ExecutableFlow {
private void setFlow(Flow flow) {
for (Node node: flow.getNodes()) {
String id = node.getId();
- ExecutableNode exNode = new ExecutableNode(node);
+ ExecutableNode exNode = new ExecutableNode(node, this);
executableNodes.put(id, exNode);
}
@@ -233,7 +233,7 @@ public class ExecutableFlow {
List<Object> nodes = (List<Object>)flowObj.get("nodes");
for (Object nodeObj: nodes) {
- ExecutableNode node = ExecutableNode.createNodeFromObject(nodeObj);
+ ExecutableNode node = ExecutableNode.createNodeFromObject(nodeObj, exFlow);
exFlow.executableNodes.put(node.getId(), node);
}
@@ -311,17 +311,19 @@ public class ExecutableFlow {
private long startTime = -1;
private long endTime = -1;
private int level = 0;
-
+ private ExecutableFlow flow;
+
private Set<String> inNodes = new HashSet<String>();
private Set<String> outNodes = new HashSet<String>();
- private ExecutableNode(Node node) {
+ private ExecutableNode(Node node, ExecutableFlow flow) {
id = node.getId();
type = node.getType();
jobPropsSource = node.getJobSource();
inheritPropsSource = node.getPropsSource();
status = Status.READY;
level = node.getLevel();
+ this.flow = flow;
}
private ExecutableNode() {
@@ -381,7 +383,7 @@ public class ExecutableFlow {
}
@SuppressWarnings("unchecked")
- public static ExecutableNode createNodeFromObject(Object obj) {
+ public static ExecutableNode createNodeFromObject(Object obj, ExecutableFlow flow) {
ExecutableNode exNode = new ExecutableNode();
HashMap<String, Object> objMap = (HashMap<String,Object>)obj;
@@ -399,6 +401,8 @@ public class ExecutableFlow {
exNode.endTime = getLongFromObject(objMap.get("endTime"));
exNode.level = (Integer)objMap.get("level");
+ exNode.flow = flow;
+
return exNode;
}
@@ -438,5 +442,9 @@ public class ExecutableFlow {
public int getLevel() {
return level;
}
+
+ public ExecutableFlow getFlow() {
+ return flow;
+ }
}
}
diff --git a/src/java/azkaban/executor/ExecutorManager.java b/src/java/azkaban/executor/ExecutorManager.java
index 5893857..503efc4 100644
--- a/src/java/azkaban/executor/ExecutorManager.java
+++ b/src/java/azkaban/executor/ExecutorManager.java
@@ -147,7 +147,7 @@ public class ExecutorManager {
// Find execution
File executionDir;
String executionId;
- int count = counter.getAndIncrement();
+ int count = counter.getAndIncrement() % 100000;
String countString = String.format("%05d", count);
do {
executionId = String.valueOf(System.currentTimeMillis()) + "." + countString + "." + flowId;
src/java/azkaban/executor/FlowRunner.java 73(+68 -5)
diff --git a/src/java/azkaban/executor/FlowRunner.java b/src/java/azkaban/executor/FlowRunner.java
index 43dd743..8885f1e 100644
--- a/src/java/azkaban/executor/FlowRunner.java
+++ b/src/java/azkaban/executor/FlowRunner.java
@@ -13,6 +13,12 @@ import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
+import org.apache.log4j.Appender;
+import org.apache.log4j.Layout;
+import org.apache.log4j.FileAppender;
+import org.apache.log4j.Logger;
+import org.apache.log4j.PatternLayout;
+
import azkaban.executor.ExecutableFlow.ExecutableNode;
import azkaban.executor.ExecutableFlow.Status;
import azkaban.executor.event.Event;
@@ -24,6 +30,8 @@ import azkaban.utils.ExecutableFlowLoader;
import azkaban.utils.Props;
public class FlowRunner extends EventHandler implements Runnable {
+ private static final Layout DEFAULT_LAYOUT = new PatternLayout( "%d{dd-MM-yyyy HH:mm:ss z} %c{1} %p - %m\n");
+
public static final int NUM_CONCURRENT_THREADS = 10;
private ExecutableFlow flow;
@@ -40,6 +48,10 @@ public class FlowRunner extends EventHandler implements Runnable {
private AtomicInteger commitCount = new AtomicInteger(0);
private HashSet<String> finalNodes = new HashSet<String>();
+ private Logger logger;
+ private Layout loggerLayout = DEFAULT_LAYOUT;
+ private Appender flowAppender;
+
public enum FailedFlowOptions {
FINISH_RUNNING_JOBS,
KILL_ALL
@@ -53,13 +65,40 @@ public class FlowRunner extends EventHandler implements Runnable {
this.executorService = Executors.newFixedThreadPool(numThreads);
this.jobRunnersMap = new HashMap<String, JobRunner>();
this.listener = new JobRunnerEventListener(this);
+
+ createLogger();
}
public ExecutableFlow getFlow() {
return flow;
}
+ private void createLogger() {
+ // Create logger
+ String loggerName = System.currentTimeMillis() + "." + flow.getExecutionId();
+ logger = Logger.getLogger(loggerName);
+
+ // Create file appender
+ String logName = "_flow." + flow.getExecutionId() + ".log";
+ File logFile = new File(this.basePath, logName);
+ String absolutePath = logFile.getAbsolutePath();
+
+ flowAppender = null;
+ try {
+ flowAppender = new FileAppender(loggerLayout, absolutePath, false);
+ logger.addAppender(flowAppender);
+ } catch (IOException e) {
+ logger.error("Could not open log file in " + basePath, e);
+ }
+ }
+
+ private void closeLogger() {
+ logger.removeAppender(flowAppender);
+ flowAppender.close();
+ }
+
public void cancel() {
+ logger.info("Cancel Invoked");
finalNodes.clear();
cancelled = true;
@@ -94,30 +133,35 @@ public class FlowRunner extends EventHandler implements Runnable {
public void run() {
flow.setStatus(Status.RUNNING);
flow.setStartTime(System.currentTimeMillis());
+ logger.info("Starting Flow");
this.fireEventListeners(Event.create(this, Type.FLOW_STARTED));
// Load all shared props
try {
+ logger.info("Loading all shared properties");
loadAllProperties(flow);
}
catch (IOException e) {
flow.setStatus(Status.FAILED);
- System.err.println("Failed due to " + e.getMessage());
+ logger.error("Property loading failed due to " + e.getMessage());
+ logger.error("Exiting Prematurely.");
this.fireEventListeners(Event.create(this, Type.FLOW_FINISHED));
return;
}
// Set up starting nodes
try {
+ logger.info("Queuing starting jobs.");
for (String startNode: flow.getStartNodes()) {
ExecutableNode node = flow.getExecutableNode(startNode);
JobRunner jobRunner = createJobRunner(node, null);
jobsToRun.add(jobRunner);
}
} catch (IOException e) {
- System.err.println("Failed due to " + e.getMessage());
+ logger.error("Starting job queueing failed due to " + e.getMessage());
flow.setStatus(Status.FAILED);
jobsToRun.clear();
+ logger.error("Exiting Prematurely.");
this.fireEventListeners(Event.create(this, Type.FLOW_FINISHED));
return;
}
@@ -138,14 +182,25 @@ public class FlowRunner extends EventHandler implements Runnable {
ExecutableNode node = runner.getNode();
node.setStatus(Status.WAITING);
executorService.submit(runner);
+ logger.info("Job Started " + node.getId());
finalNodes.remove(node.getId());
} catch (RejectedExecutionException e) {
// Should reject if I shutdown executor.
break;
}
}
+
+ // Just to make sure we back off on the flooding.
+ synchronized (this) {
+ try {
+ wait(10);
+ } catch (InterruptedException e) {
+
+ }
+ }
}
+ logger.info("Finishing up flow. Awaiting Termination");
executorService.shutdown();
while (executorService.isTerminated()) {
try {
@@ -156,15 +211,19 @@ public class FlowRunner extends EventHandler implements Runnable {
}
}
+ flow.setEndTime(System.currentTimeMillis());
if (flow.getStatus() == Status.RUNNING) {
+ logger.info("Flow finished successfully in " + (flow.getEndTime() - flow.getStartTime()) + " ms.");
flow.setStatus(Status.SUCCEEDED);
}
else {
+ logger.info("Flow finished with failures in " + (flow.getEndTime() - flow.getStartTime()) + " ms.");
flow.setStatus(Status.FAILED);
}
- flow.setEndTime(System.currentTimeMillis());
+
commitFlow();
this.fireEventListeners(Event.create(this, Type.FLOW_FINISHED));
+ closeLogger();
}
private JobRunner createJobRunner(ExecutableNode node, Props previousOutput) throws IOException {
@@ -184,7 +243,7 @@ public class FlowRunner extends EventHandler implements Runnable {
File propsFile = new File(basePath, source);
Props jobProps = new Props(parentProps, propsFile);
- JobRunner jobRunner = new JobRunner(node, jobProps, new File(flow.getExecutionPath()));
+ JobRunner jobRunner = new JobRunner(node, jobProps, basePath);
jobRunner.addListener(listener);
jobRunnersMap.put(node.getId(), jobRunner);
@@ -287,16 +346,20 @@ public class FlowRunner extends EventHandler implements Runnable {
@Override
public synchronized void handleEvent(Event event) {
JobRunner runner = (JobRunner)event.getRunner();
- String jobID = runner.getNode().getId();
+ ExecutableNode node = runner.getNode();
+ String jobID = node.getId();
System.out.println("Event " + jobID + " " + event.getType().toString());
// On Job success, we add the output props and then set up the next run.
if (event.getType() == Type.JOB_SUCCEEDED) {
+ logger.info("Job Succeeded " + jobID + " in " + (node.getEndTime() - node.getStartTime()) + " ms");
Props props = runner.getOutputProps();
outputProps.put(jobID, props);
flowRunner.handleSucceededJob(runner.getNode());
}
else if (event.getType() == Type.JOB_FAILED) {
+ logger.info("Job Failed " + jobID + " in " + (node.getEndTime() - node.getStartTime()) + " ms");
+ logger.info(jobID + " FAILED");
flowRunner.handleFailedJob(runner.getNode());
}
src/java/azkaban/executor/JobRunner.java 67(+54 -13)
diff --git a/src/java/azkaban/executor/JobRunner.java b/src/java/azkaban/executor/JobRunner.java
index 24615a5..1ac4fb4 100644
--- a/src/java/azkaban/executor/JobRunner.java
+++ b/src/java/azkaban/executor/JobRunner.java
@@ -1,6 +1,14 @@
package azkaban.executor;
import java.io.File;
+import java.io.IOException;
+
+import org.apache.log4j.Appender;
+import org.apache.log4j.FileAppender;
+import org.apache.log4j.Layout;
+import org.apache.log4j.Logger;
+import org.apache.log4j.PatternLayout;
+
import azkaban.executor.ExecutableFlow.ExecutableNode;
import azkaban.executor.ExecutableFlow.Status;
@@ -9,62 +17,95 @@ import azkaban.executor.event.Event.Type;
import azkaban.executor.event.EventHandler;
import azkaban.utils.Props;
-public class JobRunner extends EventHandler implements Runnable {
+public class JobRunner extends EventHandler implements Runnable {
+ private static final Layout DEFAULT_LAYOUT = new PatternLayout(
+ "%d{dd-MM-yyyy HH:mm:ss z} %c{1} %p - %m\n");
+
private Props props;
private Props outputProps;
private ExecutableNode node;
private File workingDir;
+ private Logger logger;
+ private Layout loggerLayout = DEFAULT_LAYOUT;
+ private Appender jobAppender;
+
public JobRunner(ExecutableNode node, Props props, File workingDir) {
this.props = props;
this.node = node;
this.node.setStatus(Status.WAITING);
this.workingDir = workingDir;
+
+ createLogger();
}
-
+
public ExecutableNode getNode() {
return node;
}
+
+ private void createLogger() {
+ // Create logger
+ String loggerName = System.currentTimeMillis() + "." + node.getFlow().getExecutionId() + "." + node.getId();
+ logger = Logger.getLogger(loggerName);
+
+ // Create file appender
+ String logName = "_job." + node.getFlow().getExecutionId() + "." + node.getId() + ".log";
+ File logFile = new File(workingDir, logName);
+ String absolutePath = logFile.getAbsolutePath();
+
+ jobAppender = null;
+ try {
+ jobAppender = new FileAppender(loggerLayout, absolutePath, false);
+ logger.addAppender(jobAppender);
+ } catch (IOException e) {
+ logger.error("Could not open log file in " + workingDir, e);
+ }
+ }
+
+ private void closeLogger() {
+ logger.removeAppender(jobAppender);
+ jobAppender.close();
+ }
@Override
public void run() {
if (node.getStatus() == Status.DISABLED) {
this.fireEventListeners(Event.create(this, Type.JOB_SUCCEEDED));
return;
- }
- else if (node.getStatus() == Status.KILLED) {
+ } else if (node.getStatus() == Status.KILLED) {
this.fireEventListeners(Event.create(this, Type.JOB_KILLED));
return;
}
node.setStartTime(System.currentTimeMillis());
+ logger.info("Starting job " + node.getId() + " at " + node.getStartTime());
node.setStatus(Status.RUNNING);
this.fireEventListeners(Event.create(this, Type.JOB_STARTED));
-
-
+
// Run Job
boolean succeeded = true;
-
+
node.setEndTime(System.currentTimeMillis());
if (succeeded) {
node.setStatus(Status.SUCCEEDED);
this.fireEventListeners(Event.create(this, Type.JOB_SUCCEEDED));
- }
- else {
+ } else {
node.setStatus(Status.FAILED);
this.fireEventListeners(Event.create(this, Type.JOB_FAILED));
}
+ logger.info("Finishing job " + node.getId() + " at " + node.getEndTime());
+ closeLogger();
}
-
+
public void cancel() {
// Cancel code here
-
+
node.setStatus(Status.KILLED);
}
-
+
public Status getStatus() {
return node.getStatus();
}
-
+
public Props getOutputProps() {
return outputProps;
}