azkaban-aplcache

Ugly way of preventing logs from overwriting each other. Probably

8/18/2012 2:25:41 AM

Details

diff --git a/src/java/azkaban/executor/ExecutableFlow.java b/src/java/azkaban/executor/ExecutableFlow.java
index 45ad983..2f3fbba 100644
--- a/src/java/azkaban/executor/ExecutableFlow.java
+++ b/src/java/azkaban/executor/ExecutableFlow.java
@@ -70,7 +70,7 @@ public class ExecutableFlow {
 	private void setFlow(Flow flow) {
 		for (Node node: flow.getNodes()) {
 			String id = node.getId();
-			ExecutableNode exNode = new ExecutableNode(node);
+			ExecutableNode exNode = new ExecutableNode(node, this);
 			executableNodes.put(id, exNode);
 		}
 		
@@ -233,7 +233,7 @@ public class ExecutableFlow {
 				
 		List<Object> nodes = (List<Object>)flowObj.get("nodes");
 		for (Object nodeObj: nodes) {
-			ExecutableNode node = ExecutableNode.createNodeFromObject(nodeObj);
+			ExecutableNode node = ExecutableNode.createNodeFromObject(nodeObj, exFlow);
 			exFlow.executableNodes.put(node.getId(), node);
 		}
 
@@ -311,17 +311,19 @@ public class ExecutableFlow {
 		private long startTime = -1;
 		private long endTime = -1;
 		private int level = 0;
-
+		private ExecutableFlow flow;
+		
 		private Set<String> inNodes = new HashSet<String>();
 		private Set<String> outNodes = new HashSet<String>();
 		
-		private ExecutableNode(Node node) {
+		private ExecutableNode(Node node, ExecutableFlow flow) {
 			id = node.getId();
 			type = node.getType();
 			jobPropsSource = node.getJobSource();
 			inheritPropsSource = node.getPropsSource();
 			status = Status.READY;
 			level = node.getLevel();
+			this.flow = flow;
 		}
 		
 		private ExecutableNode() {
@@ -381,7 +383,7 @@ public class ExecutableFlow {
 		}
 
 		@SuppressWarnings("unchecked")
-		public static ExecutableNode createNodeFromObject(Object obj) {
+		public static ExecutableNode createNodeFromObject(Object obj, ExecutableFlow flow) {
 			ExecutableNode exNode = new ExecutableNode();
 			
 			HashMap<String, Object> objMap = (HashMap<String,Object>)obj;
@@ -399,6 +401,8 @@ public class ExecutableFlow {
 			exNode.endTime = getLongFromObject(objMap.get("endTime"));
 			exNode.level = (Integer)objMap.get("level");
 			
+			exNode.flow = flow;
+			
 			return exNode;
 		}
 		
@@ -438,5 +442,9 @@ public class ExecutableFlow {
 		public int getLevel() {
 			return level;
 		}
+		
+		public ExecutableFlow getFlow() {
+			return flow;
+		}
 	}
 }
diff --git a/src/java/azkaban/executor/ExecutorManager.java b/src/java/azkaban/executor/ExecutorManager.java
index 5893857..503efc4 100644
--- a/src/java/azkaban/executor/ExecutorManager.java
+++ b/src/java/azkaban/executor/ExecutorManager.java
@@ -147,7 +147,7 @@ public class ExecutorManager {
 		// Find execution
 		File executionDir;
 		String executionId;
-		int count = counter.getAndIncrement();
+		int count = counter.getAndIncrement() % 100000;
 		String countString = String.format("%05d", count);
 		do {
 			executionId = String.valueOf(System.currentTimeMillis()) + "." + countString + "." + flowId;
diff --git a/src/java/azkaban/executor/FlowRunner.java b/src/java/azkaban/executor/FlowRunner.java
index 43dd743..8885f1e 100644
--- a/src/java/azkaban/executor/FlowRunner.java
+++ b/src/java/azkaban/executor/FlowRunner.java
@@ -13,6 +13,12 @@ import java.util.concurrent.RejectedExecutionException;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
 
+import org.apache.log4j.Appender;
+import org.apache.log4j.Layout;
+import org.apache.log4j.FileAppender;
+import org.apache.log4j.Logger;
+import org.apache.log4j.PatternLayout;
+
 import azkaban.executor.ExecutableFlow.ExecutableNode;
 import azkaban.executor.ExecutableFlow.Status;
 import azkaban.executor.event.Event;
@@ -24,6 +30,8 @@ import azkaban.utils.ExecutableFlowLoader;
 import azkaban.utils.Props;
 
 public class FlowRunner extends EventHandler implements Runnable {
+	private static final Layout DEFAULT_LAYOUT = new PatternLayout( "%d{dd-MM-yyyy HH:mm:ss z} %c{1} %p - %m\n");
+
 	public static final int NUM_CONCURRENT_THREADS = 10;
 
 	private ExecutableFlow flow;
@@ -40,6 +48,10 @@ public class FlowRunner extends EventHandler implements Runnable {
 	private AtomicInteger commitCount = new AtomicInteger(0);
 	private HashSet<String> finalNodes = new HashSet<String>();
 
+	private Logger logger;
+	private Layout loggerLayout = DEFAULT_LAYOUT;
+	private Appender flowAppender;
+	
 	public enum FailedFlowOptions {
 		FINISH_RUNNING_JOBS,
 		KILL_ALL
@@ -53,13 +65,40 @@ public class FlowRunner extends EventHandler implements Runnable {
 		this.executorService = Executors.newFixedThreadPool(numThreads);
 		this.jobRunnersMap = new HashMap<String, JobRunner>();
 		this.listener = new JobRunnerEventListener(this);
+		
+		createLogger();
 	}
 	
 	public ExecutableFlow getFlow() {
 		return flow;
 	}
 	
+	private void createLogger() {
+		// Create logger
+		String loggerName = System.currentTimeMillis() + "." + flow.getExecutionId();
+		logger = Logger.getLogger(loggerName);
+
+		// Create file appender
+		String logName = "_flow." + flow.getExecutionId() + ".log";
+		File logFile = new File(this.basePath, logName);
+		String absolutePath = logFile.getAbsolutePath();
+
+		flowAppender = null;
+		try {
+			flowAppender = new FileAppender(loggerLayout, absolutePath, false);
+			logger.addAppender(flowAppender);
+		} catch (IOException e) {
+			logger.error("Could not open log file in " + basePath, e);
+		}
+	}
+	
+	private void closeLogger() {
+		logger.removeAppender(flowAppender);
+		flowAppender.close();
+	}
+	
 	public void cancel() {
+		logger.info("Cancel Invoked");
 		finalNodes.clear();
 		cancelled = true;
 		
@@ -94,30 +133,35 @@ public class FlowRunner extends EventHandler implements Runnable {
 	public void run() {
 		flow.setStatus(Status.RUNNING);
 		flow.setStartTime(System.currentTimeMillis());
+		logger.info("Starting Flow");
 		this.fireEventListeners(Event.create(this, Type.FLOW_STARTED));
 		
 		// Load all shared props
 		try {
+			logger.info("Loading all shared properties");
 			loadAllProperties(flow);
 		}
 		catch (IOException e) {
 			flow.setStatus(Status.FAILED);
-			System.err.println("Failed due to " + e.getMessage());
+			logger.error("Property loading failed due to " + e.getMessage());
+			logger.error("Exiting Prematurely.");
 			this.fireEventListeners(Event.create(this, Type.FLOW_FINISHED));
 			return;
 		}
 
 		// Set up starting nodes
 		try {
+			logger.info("Queuing starting jobs.");
 			for (String startNode: flow.getStartNodes()) {
 				ExecutableNode node = flow.getExecutableNode(startNode);
 				JobRunner jobRunner = createJobRunner(node, null);
 				jobsToRun.add(jobRunner);
 			}
 		} catch (IOException e) {
-			System.err.println("Failed due to " + e.getMessage());
+			logger.error("Starting job queueing failed due to " + e.getMessage());
 			flow.setStatus(Status.FAILED);
 			jobsToRun.clear();
+			logger.error("Exiting Prematurely.");
 			this.fireEventListeners(Event.create(this, Type.FLOW_FINISHED));
 			return;
 		}
@@ -138,14 +182,25 @@ public class FlowRunner extends EventHandler implements Runnable {
 					ExecutableNode node = runner.getNode();
 					node.setStatus(Status.WAITING);
 					executorService.submit(runner);
+					logger.info("Job Started " + node.getId());
 					finalNodes.remove(node.getId());
 				} catch (RejectedExecutionException e) {
 					// Should reject if I shutdown executor.
 					break;
 				}
 			}
+			
+			// Just to make sure we back off on the flooding.
+			synchronized (this) {
+				try {
+					wait(10);
+				} catch (InterruptedException e) {
+					
+				}
+			}
 		}
 		
+		logger.info("Finishing up flow. Awaiting Termination");
 		executorService.shutdown();
 		while (executorService.isTerminated()) {
 			try {
@@ -156,15 +211,19 @@ public class FlowRunner extends EventHandler implements Runnable {
 			}
 		}
 
+		flow.setEndTime(System.currentTimeMillis());
 		if (flow.getStatus() == Status.RUNNING) {
+			logger.info("Flow finished successfully in " + (flow.getEndTime() - flow.getStartTime()) + " ms.");
 			flow.setStatus(Status.SUCCEEDED);
 		}
 		else {
+			logger.info("Flow finished with failures in " + (flow.getEndTime() - flow.getStartTime()) + " ms.");
 			flow.setStatus(Status.FAILED);
 		}
-		flow.setEndTime(System.currentTimeMillis());
+
 		commitFlow();
 		this.fireEventListeners(Event.create(this, Type.FLOW_FINISHED));
+		closeLogger();
 	}
 	
 	private JobRunner createJobRunner(ExecutableNode node, Props previousOutput) throws IOException {
@@ -184,7 +243,7 @@ public class FlowRunner extends EventHandler implements Runnable {
 		File propsFile = new File(basePath, source);
 		Props jobProps = new Props(parentProps, propsFile);
 		
-		JobRunner jobRunner = new JobRunner(node, jobProps, new File(flow.getExecutionPath()));
+		JobRunner jobRunner = new JobRunner(node, jobProps, basePath);
 		jobRunner.addListener(listener);
 		jobRunnersMap.put(node.getId(), jobRunner);
 		
@@ -287,16 +346,20 @@ public class FlowRunner extends EventHandler implements Runnable {
 		@Override
 		public synchronized void handleEvent(Event event) {
 			JobRunner runner = (JobRunner)event.getRunner();
-			String jobID = runner.getNode().getId();
+			ExecutableNode node = runner.getNode();
+			String jobID = node.getId();
 			System.out.println("Event " + jobID + " " + event.getType().toString());
 
 			// On Job success, we add the output props and then set up the next run.
 			if (event.getType() == Type.JOB_SUCCEEDED) {
+				logger.info("Job Succeeded " + jobID + " in " + (node.getEndTime() - node.getStartTime()) + " ms");
 				Props props = runner.getOutputProps();
 				outputProps.put(jobID, props);
 				flowRunner.handleSucceededJob(runner.getNode());
 			}
 			else if (event.getType() == Type.JOB_FAILED) {
+				logger.info("Job Failed " + jobID + " in " + (node.getEndTime() - node.getStartTime()) + " ms");
+				logger.info(jobID + " FAILED");
 				flowRunner.handleFailedJob(runner.getNode());
 			}
 			
diff --git a/src/java/azkaban/executor/JobRunner.java b/src/java/azkaban/executor/JobRunner.java
index 24615a5..1ac4fb4 100644
--- a/src/java/azkaban/executor/JobRunner.java
+++ b/src/java/azkaban/executor/JobRunner.java
@@ -1,6 +1,14 @@
 package azkaban.executor;
 
 import java.io.File;
+import java.io.IOException;
+
+import org.apache.log4j.Appender;
+import org.apache.log4j.FileAppender;
+import org.apache.log4j.Layout;
+import org.apache.log4j.Logger;
+import org.apache.log4j.PatternLayout;
+
 
 import azkaban.executor.ExecutableFlow.ExecutableNode;
 import azkaban.executor.ExecutableFlow.Status;
@@ -9,62 +17,95 @@ import azkaban.executor.event.Event.Type;
 import azkaban.executor.event.EventHandler;
 import azkaban.utils.Props;
 
-public class JobRunner  extends EventHandler implements Runnable {
+public class JobRunner extends EventHandler implements Runnable {
+	private static final Layout DEFAULT_LAYOUT = new PatternLayout(
+			"%d{dd-MM-yyyy HH:mm:ss z} %c{1} %p - %m\n");
+
 	private Props props;
 	private Props outputProps;
 	private ExecutableNode node;
 	private File workingDir;
 	
+	private Logger logger;
+	private Layout loggerLayout = DEFAULT_LAYOUT;
+	private Appender jobAppender;
+	
 	public JobRunner(ExecutableNode node, Props props, File workingDir) {
 		this.props = props;
 		this.node = node;
 		this.node.setStatus(Status.WAITING);
 		this.workingDir = workingDir;
+
+		createLogger();
 	}
-	
+
 	public ExecutableNode getNode() {
 		return node;
 	}
+
+	private void createLogger() {
+		// Create logger
+		String loggerName = System.currentTimeMillis() + "." + node.getFlow().getExecutionId() + "." + node.getId();
+		logger = Logger.getLogger(loggerName);
+
+		// Create file appender
+		String logName = "_job." + node.getFlow().getExecutionId() + "." + node.getId() + ".log";
+		File logFile = new File(workingDir, logName);
+		String absolutePath = logFile.getAbsolutePath();
+
+		jobAppender = null;
+		try {
+			jobAppender = new FileAppender(loggerLayout, absolutePath, false);
+			logger.addAppender(jobAppender);
+		} catch (IOException e) {
+			logger.error("Could not open log file in " + workingDir, e);
+		}
+	}
+
+	private void closeLogger() {
+		logger.removeAppender(jobAppender);
+		jobAppender.close();
+	}
 	
 	@Override
 	public void run() {
 		if (node.getStatus() == Status.DISABLED) {
 			this.fireEventListeners(Event.create(this, Type.JOB_SUCCEEDED));
 			return;
-		}
-		else if (node.getStatus() == Status.KILLED) {
+		} else if (node.getStatus() == Status.KILLED) {
 			this.fireEventListeners(Event.create(this, Type.JOB_KILLED));
 			return;
 		}
 		node.setStartTime(System.currentTimeMillis());
+		logger.info("Starting job " + node.getId() + " at " + node.getStartTime());
 		node.setStatus(Status.RUNNING);
 		this.fireEventListeners(Event.create(this, Type.JOB_STARTED));
-		
-		
+
 		// Run Job
 		boolean succeeded = true;
-		
+
 		node.setEndTime(System.currentTimeMillis());
 		if (succeeded) {
 			node.setStatus(Status.SUCCEEDED);
 			this.fireEventListeners(Event.create(this, Type.JOB_SUCCEEDED));
-		}
-		else {
+		} else {
 			node.setStatus(Status.FAILED);
 			this.fireEventListeners(Event.create(this, Type.JOB_FAILED));
 		}
+		logger.info("Finishing job " + node.getId() + " at " + node.getEndTime());
+		closeLogger();
 	}
-	
+
 	public void cancel() {
 		// Cancel code here
-		
+
 		node.setStatus(Status.KILLED);
 	}
-	
+
 	public Status getStatus() {
 		return node.getStatus();
 	}
-	
+
 	public Props getOutputProps() {
 		return outputProps;
 	}