azkaban-uncached
Changes
src/java/azkaban/execapp/FlowRunner.java 13(+4 -9)
src/java/azkaban/execapp/JobRunner.java 43(+34 -9)
Details
src/java/azkaban/execapp/FlowRunner.java 13(+4 -9)
diff --git a/src/java/azkaban/execapp/FlowRunner.java b/src/java/azkaban/execapp/FlowRunner.java
index 74bff38..b2cfd4b 100644
--- a/src/java/azkaban/execapp/FlowRunner.java
+++ b/src/java/azkaban/execapp/FlowRunner.java
@@ -66,6 +66,7 @@ public class FlowRunner extends EventHandler implements Runnable {
private Map<String, Props> sharedProps = new HashMap<String, Props>();
private Map<String, Props> jobOutputProps = new HashMap<String, Props>();
+ private final Props azkabanProps;
private Props globalProps;
private final JobTypeManager jobtypeManager;
@@ -86,9 +87,8 @@ public class FlowRunner extends EventHandler implements Runnable {
private HashSet<String> proxyUsers = null;
- private boolean proxyUserLockDown = false;
- public FlowRunner(ExecutableFlow flow, ExecutorLoader executorLoader, ProjectLoader projectLoader, JobTypeManager jobtypeManager) throws ExecutorManagerException {
+ public FlowRunner(Props props, ExecutableFlow flow, ExecutorLoader executorLoader, ProjectLoader projectLoader, JobTypeManager jobtypeManager) throws ExecutorManagerException {
this.execId = flow.getExecutionId();
this.flow = flow;
this.executorLoader = executorLoader;
@@ -96,14 +96,10 @@ public class FlowRunner extends EventHandler implements Runnable {
this.executorService = Executors.newFixedThreadPool(numThreads);
this.execDir = new File(flow.getExecutionPath());
this.jobtypeManager = jobtypeManager;
-
+ this.azkabanProps = props;
this.proxyUsers = flow.getProxyUsers();
}
- public void setProxyUserLockDown(boolean doLockDown) {
- this.proxyUserLockDown = doLockDown;
- }
-
public FlowRunner setGlobalProps(Props globalProps) {
this.globalProps = globalProps;
return this;
@@ -359,8 +355,7 @@ public class FlowRunner extends EventHandler implements Runnable {
prop.setParent(parentProps);
// should have one prop with system secrets, the other user level props
- JobRunner jobRunner = new JobRunner(node, prop, path.getParentFile(), proxyUsers, executorLoader, jobtypeManager, logger);
- jobRunner.setUserLockDown(proxyUserLockDown);
+ JobRunner jobRunner = new JobRunner(azkabanProps, node, prop, path.getParentFile(), proxyUsers, executorLoader, jobtypeManager, logger);
jobRunner.addListener(listener);
return jobRunner;
diff --git a/src/java/azkaban/execapp/FlowRunnerManager.java b/src/java/azkaban/execapp/FlowRunnerManager.java
index 3b20d7d..6360197 100644
--- a/src/java/azkaban/execapp/FlowRunnerManager.java
+++ b/src/java/azkaban/execapp/FlowRunnerManager.java
@@ -76,18 +76,20 @@ public class FlowRunnerManager implements EventListener {
private Props globalProps;
+ private final Props azkabanProps;
+
private long lastSubmitterThreadCheckTime = -1;
private long lastCleanerThreadCheckTime = -1;
private long executionDirRetention = 7*24*60*60*1000;
private Object executionDirDeletionSync = new Object();
-
- private final boolean proxyUserLockDown;
-
+
public FlowRunnerManager(Props props, ExecutorLoader executorLoader, ProjectLoader projectLoader, ClassLoader parentClassLoader) throws IOException {
executionDirectory = new File(props.getString("azkaban.execution.dir", "executions"));
projectDirectory = new File(props.getString("azkaban.project.dir", "projects"));
+ azkabanProps = props;
+
//JobWrappingFactory.init(props, getClass().getClassLoader());
executionDirRetention = props.getLong("execution.dir.retention", executionDirRetention);
logger.info("Execution dir retention set to " + executionDirRetention + " ms");
@@ -114,8 +116,6 @@ public class FlowRunnerManager implements EventListener {
jobtypeManager = new JobTypeManager(props.getString(AzkabanExecutorServer.JOBTYPE_PLUGIN_DIR, JobTypeManager.DEFAULT_JOBTYPEPLUGINDIR), parentClassLoader);
- proxyUserLockDown = props.getBoolean("proxy.user.lock.down", false);
-
}
public Props getGlobalProps() {
@@ -323,8 +323,7 @@ public class FlowRunnerManager implements EventListener {
setupFlow(flow);
// Setup flow runner
- FlowRunner runner = new FlowRunner(flow, executorLoader, projectLoader, jobtypeManager);
- runner.setProxyUserLockDown(proxyUserLockDown);
+ FlowRunner runner = new FlowRunner(azkabanProps, flow, executorLoader, projectLoader, jobtypeManager);
runner.setGlobalProps(globalProps);
runner.addListener(this);
src/java/azkaban/execapp/JobRunner.java 43(+34 -9)
diff --git a/src/java/azkaban/execapp/JobRunner.java b/src/java/azkaban/execapp/JobRunner.java
index 9448dd1..3c61f53 100644
--- a/src/java/azkaban/execapp/JobRunner.java
+++ b/src/java/azkaban/execapp/JobRunner.java
@@ -16,15 +16,23 @@ package azkaban.execapp;
*/
import java.io.File;
+import java.io.FilenameFilter;
import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Comparator;
import java.util.HashSet;
import java.util.List;
+import org.apache.commons.collections.comparators.ComparatorChain;
import org.apache.log4j.Appender;
import org.apache.log4j.FileAppender;
import org.apache.log4j.Layout;
import org.apache.log4j.Logger;
import org.apache.log4j.PatternLayout;
+import org.apache.log4j.RollingFileAppender;
import azkaban.execapp.event.Event;
import azkaban.execapp.event.Event.Type;
@@ -43,7 +51,7 @@ import azkaban.utils.Props;
public class JobRunner extends EventHandler implements Runnable {
private static final Layout DEFAULT_LAYOUT = new PatternLayout("%d{dd-MM-yyyy HH:mm:ss z} %c{1} %p - %m\n");
-
+
private ExecutorLoader loader;
private Props props;
private Props outputProps;
@@ -67,8 +75,10 @@ public class JobRunner extends EventHandler implements Runnable {
private HashSet<String> proxyUsers = null;
private boolean userLockDown;
+ private String jobLogChunkSize;
+ private int jobLogBackupIndex;
- public JobRunner(ExecutableNode node, Props props, File workingDir, HashSet<String> proxyUsers, ExecutorLoader loader, JobTypeManager jobtypeManager, Logger flowLogger) {
+ public JobRunner(Props azkabanProps, ExecutableNode node, Props props, File workingDir, HashSet<String> proxyUsers, ExecutorLoader loader, JobTypeManager jobtypeManager, Logger flowLogger) {
this.props = props;
this.node = node;
this.workingDir = workingDir;
@@ -77,10 +87,13 @@ public class JobRunner extends EventHandler implements Runnable {
this.jobtypeManager = jobtypeManager;
this.flowLogger = flowLogger;
this.proxyUsers = proxyUsers;
- }
-
- public void setUserLockDown (boolean doLockDown) {
- this.userLockDown = doLockDown;
+
+ // default no lock down but warn
+ this.userLockDown = azkabanProps.getBoolean("proxy.user.lock.down", false);
+ // default 20MB log size rolling over.
+ this.jobLogChunkSize = azkabanProps.getString("job.log.chunk.size", "5MB");
+ this.jobLogBackupIndex = azkabanProps.getInt("job.log.backup.index", 4);
+
}
public ExecutableNode getNode() {
@@ -104,8 +117,9 @@ public class JobRunner extends EventHandler implements Runnable {
jobAppender = null;
try {
- FileAppender fileAppender = new FileAppender(loggerLayout, absolutePath, true);
-
+ RollingFileAppender fileAppender = new RollingFileAppender(loggerLayout, absolutePath, true);
+ fileAppender.setMaxBackupIndex(jobLogBackupIndex);
+ fileAppender.setMaxFileSize(jobLogChunkSize);
jobAppender = fileAppender;
logger.addAppender(jobAppender);
} catch (IOException e) {
@@ -177,7 +191,18 @@ public class JobRunner extends EventHandler implements Runnable {
if (logFile != null) {
try {
- loader.uploadLogFile(executionId, node.getJobId(), node.getAttempt(), logFile);
+ File[] files = logFile.getParentFile().listFiles(new FilenameFilter() {
+
+ @Override
+ public boolean accept(File dir, String name) {
+ return name.startsWith(logFile.getName());
+ }
+ }
+ );
+ Arrays.sort(files, Collections.reverseOrder());
+
+
+ loader.uploadLogFile(executionId, node.getJobId(), node.getAttempt(), files);
} catch (ExecutorManagerException e) {
flowLogger.error("Error writing out logs for job " + node.getJobId(), e);
}
diff --git a/unit/java/azkaban/test/execapp/FlowRunnerTest.java b/unit/java/azkaban/test/execapp/FlowRunnerTest.java
index 9a9d80c..8fa7e1b 100644
--- a/unit/java/azkaban/test/execapp/FlowRunnerTest.java
+++ b/unit/java/azkaban/test/execapp/FlowRunnerTest.java
@@ -24,6 +24,7 @@ import azkaban.jobtype.JobTypeManager;
import azkaban.project.ProjectLoader;
import azkaban.test.executor.JavaJob;
import azkaban.utils.JSONUtils;
+import azkaban.utils.Props;
public class FlowRunnerTest {
private File workingDir;
@@ -354,7 +355,7 @@ public class FlowRunnerTest {
//MockProjectLoader projectLoader = new MockProjectLoader(new File(flow.getExecutionPath()));
loader.uploadExecutableFlow(flow);
- FlowRunner runner = new FlowRunner(flow, loader, fakeProjectLoader, jobtypeManager);
+ FlowRunner runner = new FlowRunner(new Props(), flow, loader, fakeProjectLoader, jobtypeManager);
runner.addListener(eventCollector);
return runner;
@@ -366,7 +367,7 @@ public class FlowRunnerTest {
//MockProjectLoader projectLoader = new MockProjectLoader(new File(exFlow.getExecutionPath()));
loader.uploadExecutableFlow(exFlow);
- FlowRunner runner = new FlowRunner(exFlow, loader, fakeProjectLoader, jobtypeManager);
+ FlowRunner runner = new FlowRunner(new Props(), exFlow, loader, fakeProjectLoader, jobtypeManager);
runner.addListener(eventCollector);
return runner;
diff --git a/unit/java/azkaban/test/execapp/JobRunnerTest.java b/unit/java/azkaban/test/execapp/JobRunnerTest.java
index e7f15a8..93827b6 100644
--- a/unit/java/azkaban/test/execapp/JobRunnerTest.java
+++ b/unit/java/azkaban/test/execapp/JobRunnerTest.java
@@ -266,7 +266,7 @@ public class JobRunnerTest {
Props props = createProps(time, fail);
HashSet<String> proxyUsers = new HashSet<String>();
proxyUsers.add(flow.getSubmitUser());
- JobRunner runner = new JobRunner(node, props, workingDir, proxyUsers, loader, jobtypeManager, logger);
+ JobRunner runner = new JobRunner(new Props(), node, props, workingDir, proxyUsers, loader, jobtypeManager, logger);
runner.addListener(listener);
return runner;