azkaban-uncached

user job log capping

3/5/2013 11:54:19 PM

Details

diff --git a/src/java/azkaban/execapp/FlowRunner.java b/src/java/azkaban/execapp/FlowRunner.java
index 74bff38..b2cfd4b 100644
--- a/src/java/azkaban/execapp/FlowRunner.java
+++ b/src/java/azkaban/execapp/FlowRunner.java
@@ -66,6 +66,7 @@ public class FlowRunner extends EventHandler implements Runnable {
 	private Map<String, Props> sharedProps = new HashMap<String, Props>();
 	private Map<String, Props> jobOutputProps = new HashMap<String, Props>();
 	
+	private final Props azkabanProps;
 	private Props globalProps;
 	private final JobTypeManager jobtypeManager;
 	
@@ -86,9 +87,8 @@ public class FlowRunner extends EventHandler implements Runnable {
 	
 	private HashSet<String> proxyUsers = null;
 	
-	private boolean proxyUserLockDown = false;
 	
-	public FlowRunner(ExecutableFlow flow, ExecutorLoader executorLoader, ProjectLoader projectLoader, JobTypeManager jobtypeManager) throws ExecutorManagerException {
+	public FlowRunner(Props props, ExecutableFlow flow, ExecutorLoader executorLoader, ProjectLoader projectLoader, JobTypeManager jobtypeManager) throws ExecutorManagerException {
 		this.execId = flow.getExecutionId();
 		this.flow = flow;
 		this.executorLoader = executorLoader;
@@ -96,14 +96,10 @@ public class FlowRunner extends EventHandler implements Runnable {
 		this.executorService = Executors.newFixedThreadPool(numThreads);
 		this.execDir = new File(flow.getExecutionPath());
 		this.jobtypeManager = jobtypeManager;
-		
+		this.azkabanProps = props;
 		this.proxyUsers = flow.getProxyUsers();
 	}
 
-	public void setProxyUserLockDown(boolean doLockDown) {
-		this.proxyUserLockDown = doLockDown;
-	}
-
 	public FlowRunner setGlobalProps(Props globalProps) {
 		this.globalProps = globalProps;
 		return this;
@@ -359,8 +355,7 @@ public class FlowRunner extends EventHandler implements Runnable {
 		prop.setParent(parentProps);
 		
 		// should have one prop with system secrets, the other user level props
-		JobRunner jobRunner = new JobRunner(node, prop, path.getParentFile(), proxyUsers, executorLoader, jobtypeManager, logger);
-		jobRunner.setUserLockDown(proxyUserLockDown);
+		JobRunner jobRunner = new JobRunner(azkabanProps, node, prop, path.getParentFile(), proxyUsers, executorLoader, jobtypeManager, logger);
 		jobRunner.addListener(listener);
 
 		return jobRunner;
diff --git a/src/java/azkaban/execapp/FlowRunnerManager.java b/src/java/azkaban/execapp/FlowRunnerManager.java
index 3b20d7d..6360197 100644
--- a/src/java/azkaban/execapp/FlowRunnerManager.java
+++ b/src/java/azkaban/execapp/FlowRunnerManager.java
@@ -76,18 +76,20 @@ public class FlowRunnerManager implements EventListener {
 	
 	private Props globalProps;
 	
+	private final Props azkabanProps;
+	
 	private long lastSubmitterThreadCheckTime = -1;
 	private long lastCleanerThreadCheckTime = -1;
 	private long executionDirRetention = 7*24*60*60*1000;
 	
 	private Object executionDirDeletionSync = new Object();
-	
-	private final boolean proxyUserLockDown;
-	
+		
 	public FlowRunnerManager(Props props, ExecutorLoader executorLoader, ProjectLoader projectLoader, ClassLoader parentClassLoader) throws IOException {
 		executionDirectory = new File(props.getString("azkaban.execution.dir", "executions"));
 		projectDirectory = new File(props.getString("azkaban.project.dir", "projects"));
 		
+		azkabanProps = props;
+		
 		//JobWrappingFactory.init(props, getClass().getClassLoader());
 		executionDirRetention = props.getLong("execution.dir.retention", executionDirRetention);
 		logger.info("Execution dir retention set to " + executionDirRetention + " ms");
@@ -114,8 +116,6 @@ public class FlowRunnerManager implements EventListener {
 		
 		jobtypeManager = new JobTypeManager(props.getString(AzkabanExecutorServer.JOBTYPE_PLUGIN_DIR, JobTypeManager.DEFAULT_JOBTYPEPLUGINDIR), parentClassLoader);
 		
-		proxyUserLockDown = props.getBoolean("proxy.user.lock.down", false);
-		
 	}
 
 	public Props getGlobalProps() {
@@ -323,8 +323,7 @@ public class FlowRunnerManager implements EventListener {
 		setupFlow(flow);
 		
 		// Setup flow runner
-		FlowRunner runner = new FlowRunner(flow, executorLoader, projectLoader, jobtypeManager);
-		runner.setProxyUserLockDown(proxyUserLockDown);
+		FlowRunner runner = new FlowRunner(azkabanProps, flow, executorLoader, projectLoader, jobtypeManager);
 		runner.setGlobalProps(globalProps);
 		runner.addListener(this);
 		
diff --git a/src/java/azkaban/execapp/JobRunner.java b/src/java/azkaban/execapp/JobRunner.java
index 9448dd1..3c61f53 100644
--- a/src/java/azkaban/execapp/JobRunner.java
+++ b/src/java/azkaban/execapp/JobRunner.java
@@ -16,15 +16,23 @@ package azkaban.execapp;
  */
 
 import java.io.File;
+import java.io.FilenameFilter;
 import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Comparator;
 import java.util.HashSet;
 import java.util.List;
 
+import org.apache.commons.collections.comparators.ComparatorChain;
 import org.apache.log4j.Appender;
 import org.apache.log4j.FileAppender;
 import org.apache.log4j.Layout;
 import org.apache.log4j.Logger;
 import org.apache.log4j.PatternLayout;
+import org.apache.log4j.RollingFileAppender;
 
 import azkaban.execapp.event.Event;
 import azkaban.execapp.event.Event.Type;
@@ -43,7 +51,7 @@ import azkaban.utils.Props;
 
 public class JobRunner extends EventHandler implements Runnable {
 	private static final Layout DEFAULT_LAYOUT = new PatternLayout("%d{dd-MM-yyyy HH:mm:ss z} %c{1} %p - %m\n");
-
+	
 	private ExecutorLoader loader;
 	private Props props;
 	private Props outputProps;
@@ -67,8 +75,10 @@ public class JobRunner extends EventHandler implements Runnable {
 	private HashSet<String> proxyUsers = null;
 	
 	private boolean userLockDown;
+	private String jobLogChunkSize;
+	private int jobLogBackupIndex;
 
-	public JobRunner(ExecutableNode node, Props props, File workingDir, HashSet<String> proxyUsers, ExecutorLoader loader, JobTypeManager jobtypeManager, Logger flowLogger) {
+	public JobRunner(Props azkabanProps, ExecutableNode node, Props props, File workingDir, HashSet<String> proxyUsers, ExecutorLoader loader, JobTypeManager jobtypeManager, Logger flowLogger) {
 		this.props = props;
 		this.node = node;
 		this.workingDir = workingDir;
@@ -77,10 +87,13 @@ public class JobRunner extends EventHandler implements Runnable {
 		this.jobtypeManager = jobtypeManager;
 		this.flowLogger = flowLogger;
 		this.proxyUsers = proxyUsers;
-	}
-	
-	public void setUserLockDown (boolean doLockDown) {
-		this.userLockDown = doLockDown;
+		
+		// default no lock down but warn
+		this.userLockDown = azkabanProps.getBoolean("proxy.user.lock.down", false);
+		// default 20MB log size rolling over.
+		this.jobLogChunkSize = azkabanProps.getString("job.log.chunk.size", "5MB");
+		this.jobLogBackupIndex = azkabanProps.getInt("job.log.backup.index", 4);
+		
 	}
 	
 	public ExecutableNode getNode() {
@@ -104,8 +117,9 @@ public class JobRunner extends EventHandler implements Runnable {
 
 			jobAppender = null;
 			try {
-				FileAppender fileAppender = new FileAppender(loggerLayout, absolutePath, true);
-				
+				RollingFileAppender fileAppender = new RollingFileAppender(loggerLayout, absolutePath, true);
+				fileAppender.setMaxBackupIndex(jobLogBackupIndex);
+				fileAppender.setMaxFileSize(jobLogChunkSize);
 				jobAppender = fileAppender;
 				logger.addAppender(jobAppender);
 			} catch (IOException e) {
@@ -177,7 +191,18 @@ public class JobRunner extends EventHandler implements Runnable {
 			
 			if (logFile != null) {
 				try {
-					loader.uploadLogFile(executionId, node.getJobId(), node.getAttempt(), logFile);
+					File[] files = logFile.getParentFile().listFiles(new FilenameFilter() {
+						
+						@Override
+						public boolean accept(File dir, String name) {
+							return name.startsWith(logFile.getName());
+						}
+					} 
+					);
+					Arrays.sort(files, Collections.reverseOrder());
+					
+					
+					loader.uploadLogFile(executionId, node.getJobId(), node.getAttempt(), files);
 				} catch (ExecutorManagerException e) {
 					flowLogger.error("Error writing out logs for job " + node.getJobId(), e);
 				}
diff --git a/unit/java/azkaban/test/execapp/FlowRunnerTest.java b/unit/java/azkaban/test/execapp/FlowRunnerTest.java
index 9a9d80c..8fa7e1b 100644
--- a/unit/java/azkaban/test/execapp/FlowRunnerTest.java
+++ b/unit/java/azkaban/test/execapp/FlowRunnerTest.java
@@ -24,6 +24,7 @@ import azkaban.jobtype.JobTypeManager;
 import azkaban.project.ProjectLoader;
 import azkaban.test.executor.JavaJob;
 import azkaban.utils.JSONUtils;
+import azkaban.utils.Props;
 
 public class FlowRunnerTest {
 	private File workingDir;
@@ -354,7 +355,7 @@ public class FlowRunnerTest {
 		//MockProjectLoader projectLoader = new MockProjectLoader(new File(flow.getExecutionPath()));
 		
 		loader.uploadExecutableFlow(flow);
-		FlowRunner runner = new FlowRunner(flow, loader, fakeProjectLoader, jobtypeManager);
+		FlowRunner runner = new FlowRunner(new Props(), flow, loader, fakeProjectLoader, jobtypeManager);
 		runner.addListener(eventCollector);
 		
 		return runner;
@@ -366,7 +367,7 @@ public class FlowRunnerTest {
 		//MockProjectLoader projectLoader = new MockProjectLoader(new File(exFlow.getExecutionPath()));
 		
 		loader.uploadExecutableFlow(exFlow);
-		FlowRunner runner = new FlowRunner(exFlow, loader, fakeProjectLoader, jobtypeManager);
+		FlowRunner runner = new FlowRunner(new Props(), exFlow, loader, fakeProjectLoader, jobtypeManager);
 		runner.addListener(eventCollector);
 		
 		return runner;
diff --git a/unit/java/azkaban/test/execapp/JobRunnerTest.java b/unit/java/azkaban/test/execapp/JobRunnerTest.java
index e7f15a8..93827b6 100644
--- a/unit/java/azkaban/test/execapp/JobRunnerTest.java
+++ b/unit/java/azkaban/test/execapp/JobRunnerTest.java
@@ -266,7 +266,7 @@ public class JobRunnerTest {
 		Props props = createProps(time, fail);
 		HashSet<String> proxyUsers = new HashSet<String>();
 		proxyUsers.add(flow.getSubmitUser());
-		JobRunner runner = new JobRunner(node, props, workingDir, proxyUsers, loader, jobtypeManager, logger);
+		JobRunner runner = new JobRunner(new Props(), node, props, workingDir, proxyUsers, loader, jobtypeManager, logger);
 
 		runner.addListener(listener);
 		return runner;