Details
diff --git a/azkaban-common/src/main/java/azkaban/flow/CommonJobProperties.java b/azkaban-common/src/main/java/azkaban/flow/CommonJobProperties.java
index 0c98ecb..8c208a5 100644
--- a/azkaban-common/src/main/java/azkaban/flow/CommonJobProperties.java
+++ b/azkaban-common/src/main/java/azkaban/flow/CommonJobProperties.java
@@ -133,6 +133,11 @@ public class CommonJobProperties {
* hotspot occurs.
*/
public static final String PROJECT_VERSION = "azkaban.flow.projectversion";
+
+ /**
+ * Find out who is the submit user, in addition to the user.to.proxy (they may be different)
+ */
+ public static final String SUBMIT_USER = "azkaban.flow.submituser";
/**
* A uuid assigned to every execution
diff --git a/azkaban-common/src/main/java/azkaban/jobcallback/JobCallbackValidator.java b/azkaban-common/src/main/java/azkaban/jobcallback/JobCallbackValidator.java
index 7b1462f..a85d1a3 100644
--- a/azkaban-common/src/main/java/azkaban/jobcallback/JobCallbackValidator.java
+++ b/azkaban-common/src/main/java/azkaban/jobcallback/JobCallbackValidator.java
@@ -54,8 +54,10 @@ public class JobCallbackValidator {
maxPostBodyLength);
}
- logger.info("Found " + totalCallbackCount + " job callbacks for job "
- + jobName);
+ if (logger.isDebugEnabled()) {
+ logger.debug("Found " + totalCallbackCount + " job callbacks for job "
+ + jobName);
+ }
return totalCallbackCount;
}
diff --git a/azkaban-common/src/main/java/azkaban/jobExecutor/ProcessJob.java b/azkaban-common/src/main/java/azkaban/jobExecutor/ProcessJob.java
index 4e268a0..3ba6d86 100644
--- a/azkaban-common/src/main/java/azkaban/jobExecutor/ProcessJob.java
+++ b/azkaban-common/src/main/java/azkaban/jobExecutor/ProcessJob.java
@@ -24,6 +24,7 @@ import java.util.concurrent.TimeUnit;
import org.apache.log4j.Logger;
+import azkaban.flow.CommonJobProperties;
import azkaban.jobExecutor.utils.process.AzkabanProcess;
import azkaban.jobExecutor.utils.process.AzkabanProcessBuilder;
import azkaban.utils.Pair;
@@ -36,15 +37,29 @@ import azkaban.utils.SystemMemoryInfo;
public class ProcessJob extends AbstractProcessJob {
public static final String COMMAND = "command";
+
private static final long KILL_TIME_MS = 5000;
+
private volatile AzkabanProcess process;
+
private static final String MEMCHECK_ENABLED = "memCheck.enabled";
- private static final String MEMCHECK_FREEMEMDECRAMT = "memCheck.freeMemDecrAmt";
+
+ private static final String MEMCHECK_FREEMEMDECRAMT =
+ "memCheck.freeMemDecrAmt";
+
public static final String AZKABAN_MEMORY_CHECK = "azkaban.memory.check";
+ public static final String USER_TO_PROXY = "user.to.proxy";
+
+ public static final String KRB5CCNAME = "KRB5CCNAME";
+
public ProcessJob(final String jobId, final Props sysProps,
final Props jobProps, final Logger log) {
super(jobId, sysProps, jobProps, log);
+
+ // this is in line with what other job types (hadoopJava, spark, pig, hive)
+ // is doing
+ jobProps.put(CommonJobProperties.JOB_ID, jobId);
}
@Override
@@ -55,13 +70,19 @@ public class ProcessJob extends AbstractProcessJob {
handleError("Bad property definition! " + e.getMessage(), e);
}
- if (sysProps.getBoolean(MEMCHECK_ENABLED, true) && jobProps.getBoolean(AZKABAN_MEMORY_CHECK, true)) {
+ if (sysProps.getBoolean(MEMCHECK_ENABLED, true)
+ && jobProps.getBoolean(AZKABAN_MEMORY_CHECK, true)) {
long freeMemDecrAmt = sysProps.getLong(MEMCHECK_FREEMEMDECRAMT, 0);
Pair<Long, Long> memPair = getProcMemoryRequirement();
- boolean isMemGranted = SystemMemoryInfo.canSystemGrantMemory(memPair.getFirst(), memPair.getSecond(), freeMemDecrAmt);
+ boolean isMemGranted =
+ SystemMemoryInfo.canSystemGrantMemory(memPair.getFirst(),
+ memPair.getSecond(), freeMemDecrAmt);
if (!isMemGranted) {
- throw new Exception(String.format("Cannot request memory (Xms %d kb, Xmx %d kb) from system for job %s",
- memPair.getFirst(), memPair.getSecond(), getId()));
+ throw new Exception(
+ String
+ .format(
+ "Cannot request memory (Xms %d kb, Xmx %d kb) from system for job %s",
+ memPair.getFirst(), memPair.getSecond(), getId()));
}
}
@@ -82,6 +103,9 @@ public class ProcessJob extends AbstractProcessJob {
File[] propFiles = initPropsFiles();
Map<String, String> envVars = getEnvironmentVariables();
+ // change krb5ccname env var so that each job execution gets its own cache
+ envVars.put(KRB5CCNAME, getKrb5ccname(jobProps));
+
for (String command : commands) {
info("Command: " + command);
AzkabanProcessBuilder builder =
@@ -120,9 +144,61 @@ public class ProcessJob extends AbstractProcessJob {
}
/**
+ * <pre>
+ * This method extracts the kerberos ticket cache file name from the jobprops.
+ * This method will ensure that each job execution will have its own kerberos ticket cache file
+ * Given that the code only sets an environmental variable, the number of files created corresponds
+ * to the number of processes that are doing kinit in their flow, which should not be an inordinately
+ * high number.
+ * </pre>
+ *
+ * @return file name: the kerberos ticket cache file to use
+ */
+ private String getKrb5ccname(Props jobProps) {
+ String effectiveUser = getEffectiveUser(jobProps);
+ String projectName =
+ jobProps.getString(CommonJobProperties.PROJECT_NAME).replace(" ", "_");
+ String flowId =
+ jobProps.getString(CommonJobProperties.FLOW_ID).replace(" ", "_");
+ String jobId =
+ jobProps.getString(CommonJobProperties.JOB_ID).replace(" ", "_");
+ // execId should be an int and should not have space in it, ever
+ String execId = jobProps.getString(CommonJobProperties.EXEC_ID);
+ String krb5ccname =
+ String.format("/tmp/krb5cc__%s__%s__%s__%s__%s", projectName, flowId,
+ jobId, execId, effectiveUser);
+
+ return krb5ccname;
+ }
+
+ /**
+ * <pre>
+ * Determines what user id should the process job run as, in the following order of precedence:
+ * 1. USER_TO_PROXY
+ * 2. SUBMIT_USER
+ * </pre>
+ *
+ * @param jobProps
+ * @return the user that Azkaban is going to execute as
+ */
+ private String getEffectiveUser(Props jobProps) {
+ String effectiveUser = null;
+ if (jobProps.containsKey(USER_TO_PROXY)) {
+ effectiveUser = jobProps.getString(USER_TO_PROXY);
+ } else if (jobProps.containsKey(CommonJobProperties.SUBMIT_USER)) {
+ effectiveUser = jobProps.getString(CommonJobProperties.SUBMIT_USER);
+ } else {
+ throw new RuntimeException(
+ "Internal Error: No user.to.proxy or submit.user in the jobProps");
+ }
+ info("effective user is: " + effectiveUser);
+ return effectiveUser;
+ }
+
+ /**
* This is used to get the min/max memory size requirement by processes.
- * SystemMemoryInfo can use the info to determine if the memory request
- * can be fulfilled. For Java process, this should be Xms/Xmx setting.
+ * SystemMemoryInfo can use the info to determine if the memory request can be
+ * fulfilled. For Java process, this should be Xms/Xmx setting.
*
* @return pair of min/max memory size
*/
diff --git a/azkaban-common/src/main/java/azkaban/project/JdbcProjectLoader.java b/azkaban-common/src/main/java/azkaban/project/JdbcProjectLoader.java
index 12b72dd..d726216 100644
--- a/azkaban-common/src/main/java/azkaban/project/JdbcProjectLoader.java
+++ b/azkaban-common/src/main/java/azkaban/project/JdbcProjectLoader.java
@@ -845,7 +845,6 @@ public class JdbcProjectLoader extends AbstractJdbcLoader implements
byte[] stringData = json.getBytes("UTF-8");
byte[] data = stringData;
- logger.info("UTF-8 size:" + data.length);
if (defaultEncodingType == EncodingType.GZIP) {
data = GZIPUtils.gzipBytes(stringData);
}
@@ -888,7 +887,6 @@ public class JdbcProjectLoader extends AbstractJdbcLoader implements
byte[] stringData = json.getBytes("UTF-8");
byte[] data = stringData;
- logger.info("UTF-8 size:" + data.length);
if (encType == EncodingType.GZIP) {
data = GZIPUtils.gzipBytes(stringData);
}
@@ -1009,7 +1007,6 @@ public class JdbcProjectLoader extends AbstractJdbcLoader implements
String propertyJSON = PropsUtils.toJSONString(props, true);
byte[] data = propertyJSON.getBytes("UTF-8");
- logger.info("UTF-8 size:" + data.length);
if (defaultEncodingType == EncodingType.GZIP) {
data = GZIPUtils.gzipBytes(data);
}
@@ -1032,7 +1029,6 @@ public class JdbcProjectLoader extends AbstractJdbcLoader implements
String propertyJSON = PropsUtils.toJSONString(props, true);
byte[] data = propertyJSON.getBytes("UTF-8");
- logger.info("UTF-8 size:" + data.length);
if (defaultEncodingType == EncodingType.GZIP) {
data = GZIPUtils.gzipBytes(data);
}
diff --git a/azkaban-common/src/main/java/azkaban/project/ProjectWhitelist.java b/azkaban-common/src/main/java/azkaban/project/ProjectWhitelist.java
index a6b1a39..039ab3a 100644
--- a/azkaban-common/src/main/java/azkaban/project/ProjectWhitelist.java
+++ b/azkaban-common/src/main/java/azkaban/project/ProjectWhitelist.java
@@ -22,14 +22,14 @@ import azkaban.utils.Props;
/**
* @author wkang
- *
+ *
* This class manages project whitelist defined in xml config file.
* An single xml config file contains different types of whitelisted
* projects. For additional type of whitelist, modify WhitelistType enum.
- *
+ *
* The xml config file should in the following format. Please note
* the tag <MemoryCheck> is same as the defined enum MemoryCheck
- *
+ *
* <ProjectWhitelist>
* <MemoryCheck>
* <project projectname="project1" />
@@ -84,7 +84,7 @@ public class ProjectWhitelist {
Map<WhitelistType, Set<Integer>> projsWhitelisted = new HashMap<WhitelistType, Set<Integer>>();
NodeList tagList = doc.getChildNodes();
if (!tagList.item(0).getNodeName().equals(PROJECT_WHITELIST_TAG)) {
- throw new RuntimeException("Cannot find tag '" + PROJECT_WHITELIST_TAG + "' in " + xmlFile);
+ throw new RuntimeException("Cannot find tag '" + PROJECT_WHITELIST_TAG + "' in " + xmlFile);
}
NodeList whitelist = tagList.item(0).getChildNodes();
@@ -114,7 +114,7 @@ public class ProjectWhitelist {
NamedNodeMap projectAttrMap = node.getAttributes();
Node projectIdAttr = projectAttrMap.getNamedItem(PROJECTID_ATTR);
if (projectIdAttr == null) {
- throw new RuntimeException("Error loading project. The '" + PROJECTID_ATTR
+ throw new RuntimeException("Error loading project. The '" + PROJECTID_ATTR
+ "' attribute doesn't exist");
}
@@ -127,7 +127,7 @@ public class ProjectWhitelist {
if (projsWhitelisted != null) {
Set<Integer> projs = projsWhitelisted.get(whitelistType);
if (projs != null) {
- return projs.contains(project);
+ return projs.contains(project);
}
}
return false;
@@ -138,6 +138,7 @@ public class ProjectWhitelist {
* the defined enums.
*/
public static enum WhitelistType {
- MemoryCheck
+ MemoryCheck,
+ NumJobPerFlow
}
}
\ No newline at end of file
diff --git a/azkaban-common/src/main/java/azkaban/trigger/builtin/SlaChecker.java b/azkaban-common/src/main/java/azkaban/trigger/builtin/SlaChecker.java
index 1141176..27b9ba4 100644
--- a/azkaban-common/src/main/java/azkaban/trigger/builtin/SlaChecker.java
+++ b/azkaban-common/src/main/java/azkaban/trigger/builtin/SlaChecker.java
@@ -20,7 +20,6 @@ import java.util.HashMap;
import java.util.Map;
import org.apache.log4j.Logger;
-
import org.joda.time.DateTime;
import org.joda.time.ReadablePeriod;
@@ -57,7 +56,6 @@ public class SlaChecker implements ConditionChecker {
private Boolean isSlaMissed(ExecutableFlow flow) {
String type = slaOption.getType();
- logger.info("flow is " + flow.getStatus());
if (flow.getStartTime() < 0) {
return Boolean.FALSE;
}
@@ -136,7 +134,6 @@ public class SlaChecker implements ConditionChecker {
private Boolean isSlaGood(ExecutableFlow flow) {
String type = slaOption.getType();
- logger.info("flow is " + flow.getStatus());
if (flow.getStartTime() < 0) {
return Boolean.FALSE;
}
@@ -218,13 +215,11 @@ public class SlaChecker implements ConditionChecker {
}
public Object isSlaFailed() {
- logger.info("Testing if sla failed for execution " + execId);
ExecutableFlow flow;
try {
flow = executorManager.getExecutableFlow(execId);
} catch (ExecutorManagerException e) {
logger.error("Can't get executable flow.", e);
- e.printStackTrace();
// something wrong, send out alerts
return Boolean.TRUE;
}
@@ -232,13 +227,11 @@ public class SlaChecker implements ConditionChecker {
}
public Object isSlaPassed() {
- logger.info("Testing if sla is good for execution " + execId);
ExecutableFlow flow;
try {
flow = executorManager.getExecutableFlow(execId);
} catch (ExecutorManagerException e) {
logger.error("Can't get executable flow.", e);
- e.printStackTrace();
// something wrong, send out alerts
return Boolean.TRUE;
}
diff --git a/azkaban-common/src/main/java/azkaban/trigger/Condition.java b/azkaban-common/src/main/java/azkaban/trigger/Condition.java
index 7bb275b..5e8e7b6 100644
--- a/azkaban-common/src/main/java/azkaban/trigger/Condition.java
+++ b/azkaban-common/src/main/java/azkaban/trigger/Condition.java
@@ -25,7 +25,6 @@ import org.apache.commons.jexl2.Expression;
import org.apache.commons.jexl2.JexlEngine;
import org.apache.commons.jexl2.MapContext;
import org.apache.log4j.Logger;
-
import org.joda.time.DateTime;
public class Condition {
@@ -119,7 +118,9 @@ public class Condition {
}
public boolean isMet() {
- logger.info("Testing condition " + expression);
+ if (logger.isDebugEnabled()) {
+ logger.debug("Testing condition " + expression);
+ }
return expression.evaluate(context).equals(Boolean.TRUE);
}
diff --git a/azkaban-common/src/main/java/azkaban/trigger/JdbcTriggerLoader.java b/azkaban-common/src/main/java/azkaban/trigger/JdbcTriggerLoader.java
index 68c0508..99b32f9 100644
--- a/azkaban-common/src/main/java/azkaban/trigger/JdbcTriggerLoader.java
+++ b/azkaban-common/src/main/java/azkaban/trigger/JdbcTriggerLoader.java
@@ -194,7 +194,9 @@ public class JdbcTriggerLoader extends AbstractJdbcLoader implements
@Override
public void updateTrigger(Trigger t) throws TriggerLoaderException {
- logger.info("Updating trigger " + t.getTriggerId() + " into db.");
+ if (logger.isDebugEnabled()) {
+ logger.debug("Updating trigger " + t.getTriggerId() + " into db.");
+ }
t.setLastModifyTime(System.currentTimeMillis());
Connection connection = getConnection();
try {
@@ -238,7 +240,9 @@ public class JdbcTriggerLoader extends AbstractJdbcLoader implements
if (updates == 0) {
throw new TriggerLoaderException("No trigger has been updated.");
} else {
- logger.info("Updated " + updates + " records.");
+ if (logger.isDebugEnabled()) {
+ logger.debug("Updated " + updates + " records.");
+ }
}
} catch (SQLException e) {
logger.error(UPDATE_TRIGGER + " failed.");
diff --git a/azkaban-common/src/main/java/azkaban/trigger/TriggerManager.java b/azkaban-common/src/main/java/azkaban/trigger/TriggerManager.java
index fa8d130..97e858c 100644
--- a/azkaban-common/src/main/java/azkaban/trigger/TriggerManager.java
+++ b/azkaban-common/src/main/java/azkaban/trigger/TriggerManager.java
@@ -281,7 +281,9 @@ public class TriggerManager extends EventHandler implements
logger.info("Skipping trigger" + t.getTriggerId() + " until " + t.getNextCheckTime());
}
- logger.info("Checking trigger " + t.getTriggerId());
+ if (logger.isDebugEnabled()) {
+ logger.info("Checking trigger " + t.getTriggerId());
+ }
if (t.getStatus().equals(TriggerStatus.READY)) {
if (t.triggerConditionMet()) {
onTriggerTrigger(t);
diff --git a/azkaban-common/src/main/java/azkaban/utils/PropsUtils.java b/azkaban-common/src/main/java/azkaban/utils/PropsUtils.java
index 596e8b3..3412425 100644
--- a/azkaban-common/src/main/java/azkaban/utils/PropsUtils.java
+++ b/azkaban-common/src/main/java/azkaban/utils/PropsUtils.java
@@ -292,6 +292,7 @@ public class PropsUtils {
props.put(CommonJobProperties.FLOW_UUID, UUID.randomUUID().toString());
props.put(CommonJobProperties.PROJECT_LAST_CHANGED_BY, flow.getLastModifiedByUser());
props.put(CommonJobProperties.PROJECT_LAST_CHANGED_DATE, flow.getLastModifiedTimestamp());
+ props.put(CommonJobProperties.SUBMIT_USER, flow.getExecutableFlow().getSubmitUser());
DateTime loadTime = new DateTime();
diff --git a/azkaban-common/src/main/java/azkaban/utils/StringUtils.java b/azkaban-common/src/main/java/azkaban/utils/StringUtils.java
index 17792a0..924afd2 100644
--- a/azkaban-common/src/main/java/azkaban/utils/StringUtils.java
+++ b/azkaban-common/src/main/java/azkaban/utils/StringUtils.java
@@ -18,6 +18,7 @@ package azkaban.utils;
import java.util.Collection;
import java.util.List;
+import java.util.regex.Pattern;
public class StringUtils {
public static final char SINGLE_QUOTE = '\'';
@@ -88,4 +89,19 @@ public class StringUtils {
return buffer.toString();
}
+
+ private static final Pattern BROWSWER_PATTERN = Pattern
+ .compile(".*Gecko.*|.*AppleWebKit.*|.*Trident.*|.*Chrome.*");
+
+ public static boolean isFromBrowser(String userAgent) {
+ if (userAgent == null) {
+ return false;
+ }
+
+ if (BROWSWER_PATTERN.matcher(userAgent).matches()) {
+ return true;
+ } else {
+ return false;
+ }
+ }
}
diff --git a/azkaban-common/src/test/java/azkaban/jobExecutor/JavaProcessJobTest.java b/azkaban-common/src/test/java/azkaban/jobExecutor/JavaProcessJobTest.java
index 05a0a28..068935f 100644
--- a/azkaban-common/src/test/java/azkaban/jobExecutor/JavaProcessJobTest.java
+++ b/azkaban-common/src/test/java/azkaban/jobExecutor/JavaProcessJobTest.java
@@ -22,7 +22,6 @@ import java.util.Date;
import java.util.Properties;
import org.apache.log4j.Logger;
-
import org.junit.After;
import org.junit.AfterClass;
import org.junit.Assert;
@@ -33,6 +32,7 @@ import org.junit.Test;
import org.junit.Rule;
import org.junit.rules.TemporaryFolder;
+import azkaban.flow.CommonJobProperties;
import azkaban.utils.Props;
public class JavaProcessJobTest {
@@ -108,6 +108,14 @@ public class JavaProcessJobTest {
props.put(AbstractProcessJob.WORKING_DIR, workingDir.getCanonicalPath());
props.put("type", "java");
props.put("fullPath", ".");
+
+ props.put(CommonJobProperties.PROJECT_NAME, "test_project");
+ props.put(CommonJobProperties.FLOW_ID, "test_flow");
+ props.put(CommonJobProperties.JOB_ID, "test_job");
+ props.put(CommonJobProperties.EXEC_ID, "123");
+ props.put(CommonJobProperties.SUBMIT_USER, "test_user");
+
+
job = new JavaProcessJob("testJavaProcess", props, props, log);
}
diff --git a/azkaban-common/src/test/java/azkaban/jobExecutor/ProcessJobTest.java b/azkaban-common/src/test/java/azkaban/jobExecutor/ProcessJobTest.java
index 974895b..57f4505 100644
--- a/azkaban-common/src/test/java/azkaban/jobExecutor/ProcessJobTest.java
+++ b/azkaban-common/src/test/java/azkaban/jobExecutor/ProcessJobTest.java
@@ -20,7 +20,6 @@ import java.io.File;
import java.io.IOException;
import org.apache.log4j.Logger;
-
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
@@ -28,6 +27,7 @@ import org.junit.Test;
import org.junit.Rule;
import org.junit.rules.TemporaryFolder;
+import azkaban.flow.CommonJobProperties;
import azkaban.utils.Props;
public class ProcessJobTest {
@@ -46,6 +46,12 @@ public class ProcessJobTest {
props.put(AbstractProcessJob.WORKING_DIR, workingDir.getCanonicalPath());
props.put("type", "command");
props.put("fullPath", ".");
+
+ props.put(CommonJobProperties.PROJECT_NAME, "test_project");
+ props.put(CommonJobProperties.FLOW_ID, "test_flow");
+ props.put(CommonJobProperties.JOB_ID, "test_job");
+ props.put(CommonJobProperties.EXEC_ID, "123");
+ props.put(CommonJobProperties.SUBMIT_USER, "test_user");
job = new ProcessJob("TestProcess", props, props, log);
}
@@ -62,6 +68,37 @@ public class ProcessJobTest {
job.run();
}
+
+ /**
+ * this job should run fine if the props contain user.to.proxy
+ * @throws Exception
+ */
+ @Test
+ public void testOneUnixCommandWithProxyUserInsteadOfSubmitUser() throws Exception {
+
+ // Initialize the Props
+ props.removeLocal(CommonJobProperties.SUBMIT_USER);
+ props.put("user.to.proxy", "test_user");
+ props.put(ProcessJob.COMMAND, "ls -al");
+
+ job.run();
+
+ }
+
+ /**
+ * this job should fail because there is no user.to.proxy and no CommonJobProperties.SUBMIT_USER
+ * @throws Exception
+ */
+ @Test (expected=RuntimeException.class)
+ public void testOneUnixCommandWithNoUser() throws Exception {
+
+ // Initialize the Props
+ props.removeLocal(CommonJobProperties.SUBMIT_USER);
+ props.put(ProcessJob.COMMAND, "ls -al");
+
+ job.run();
+
+ }
@Test
public void testFailedUnixCommand() throws Exception {
diff --git a/azkaban-common/src/test/java/azkaban/utils/StringUtilsTest.java b/azkaban-common/src/test/java/azkaban/utils/StringUtilsTest.java
new file mode 100644
index 0000000..71c3d21
--- /dev/null
+++ b/azkaban-common/src/test/java/azkaban/utils/StringUtilsTest.java
@@ -0,0 +1,80 @@
+package azkaban.utils;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+public class StringUtilsTest {
+
+ private static final String chromeOnMac =
+ "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_9_4) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/44.0.2403.155 Safari/537.36";
+ private static final String fireFoxOnMac =
+ "Mozilla/5.0 (Macintosh; Intel Mac OS X 10.9; rv:40.0) Gecko/20100101 Firefox/40.0";
+ private static final String safariOnMac =
+ "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_9_4) AppleWebKit/537.78.2 (KHTML, like Gecko) Version/7.0.6 Safari/537.78.2";
+ private static final String chromeOnLinux =
+ "Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/35.0.1916.153 Safari/537.36";
+ private static final String fireFoxOnLinux =
+ "Mozilla/5.0 (X11; Linux x86_64; rv:39.0) Gecko/20100101 Firefox/39.0";
+
+ private static final String[] browserVariants = { chromeOnMac, fireFoxOnMac,
+ safariOnMac, chromeOnLinux, fireFoxOnLinux };
+
+ private static final String[] BROWSER_NAMES = { "AppleWebKit", "Gecko",
+ "Chrome" };
+
+ @Test
+ public void isBrowser() throws Exception {
+
+ for (String browser : browserVariants) {
+ Assert.assertTrue(browser, StringUtils.isFromBrowser(browser));
+ }
+ }
+
+ @Test
+ public void notBrowserWithLowercase() throws Exception {
+
+ for (String browser : browserVariants) {
+ Assert.assertFalse(browser.toLowerCase(),
+ StringUtils.isFromBrowser(browser.toLowerCase()));
+ }
+ }
+
+ @Test
+ public void notBrowser() throws Exception {
+ String testStr = "curl";
+ Assert.assertFalse(testStr, StringUtils.isFromBrowser(testStr));
+ }
+
+ @Test
+ public void emptyBrowserString() throws Exception {
+
+ Assert.assertFalse("empty string", StringUtils.isFromBrowser(""));
+ }
+
+ @Test
+ public void nullBrowserString() throws Exception {
+
+ Assert.assertFalse("null string", StringUtils.isFromBrowser(null));
+ }
+
+ @Test
+ public void startsWithBrowserName() {
+ for (String name : BROWSER_NAMES) {
+ Assert.assertTrue(StringUtils.isFromBrowser(name + " is awesome"));
+ }
+ }
+
+ @Test
+ public void endsWithBrowserName() {
+ for (String name : BROWSER_NAMES) {
+ Assert.assertTrue(StringUtils.isFromBrowser("awesome is" + name));
+ }
+ }
+
+ @Test
+ public void containsBrowserName() {
+ for (String name : BROWSER_NAMES) {
+ Assert.assertTrue(StringUtils.isFromBrowser("awesome " + name + " is"));
+ }
+ }
+}
diff --git a/azkaban-execserver/src/main/java/azkaban/execapp/FlowRunnerManager.java b/azkaban-execserver/src/main/java/azkaban/execapp/FlowRunnerManager.java
index 361db00..01ab373 100644
--- a/azkaban-execserver/src/main/java/azkaban/execapp/FlowRunnerManager.java
+++ b/azkaban-execserver/src/main/java/azkaban/execapp/FlowRunnerManager.java
@@ -52,6 +52,8 @@ import azkaban.jobtype.JobTypeManager;
import azkaban.jobtype.JobTypeManagerException;
import azkaban.metric.MetricReportManager;
import azkaban.project.ProjectLoader;
+import azkaban.project.ProjectWhitelist;
+import azkaban.project.ProjectWhitelist.WhitelistType;
import azkaban.utils.FileIOUtils;
import azkaban.utils.FileIOUtils.JobMetaData;
import azkaban.utils.FileIOUtils.LogData;
@@ -481,7 +483,9 @@ public class FlowRunnerManager implements EventListener,
int numJobs =
Integer.valueOf(options.getFlowParameters().get(
FLOW_NUM_JOB_THREADS));
- if (numJobs > 0 && numJobs <= numJobThreads) {
+ if (numJobs > 0 && (numJobs <= numJobThreads || ProjectWhitelist
+ .isProjectWhitelisted(flow.getProjectId(),
+ WhitelistType.NumJobPerFlow))) {
numJobThreads = numJobs;
}
} catch (Exception e) {
diff --git a/azkaban-execserver/src/main/resources/log4j.properties b/azkaban-execserver/src/main/resources/log4j.properties
index e304ac7..464f36a 100644
--- a/azkaban-execserver/src/main/resources/log4j.properties
+++ b/azkaban-execserver/src/main/resources/log4j.properties
@@ -1,12 +1,14 @@
-log4j.rootLogger=INFO, Console
+log_dir=${log4j.log.dir}
+
+log4j.rootLogger=INFO, ExecServer
log4j.logger.azkaban.execapp=INFO, ExecServer
-log4j.appender.ExecServer=org.apache.log4j.RollingFileAppender
+log4j.appender.ExecServer=org.apache.log4j.DailyRollingFileAppender
log4j.appender.ExecServer.layout=org.apache.log4j.PatternLayout
-log4j.appender.ExecServer.File=azkaban-execserver.log
+log4j.appender.ExecServer.File=${log_dir}/azkaban-execserver.log
log4j.appender.ExecServer.layout.ConversionPattern=%d{yyyy/MM/dd HH:mm:ss.SSS Z} %p [%c{1}] [Azkaban] %m%n
-log4j.appender.ExecServer.MaxFileSize=102400MB
log4j.appender.ExecServer.MaxBackupIndex=2
+log4j.appender.ExecServer.DatePattern='.'yyyy-MM-dd
log4j.appender.Console=org.apache.log4j.ConsoleAppender
log4j.appender.Console.layout=org.apache.log4j.PatternLayout
diff --git a/azkaban-execserver/src/package/bin/start-exec.sh b/azkaban-execserver/src/package/bin/start-exec.sh
index fbb7124..d8f0f73 100755
--- a/azkaban-execserver/src/package/bin/start-exec.sh
+++ b/azkaban-execserver/src/package/bin/start-exec.sh
@@ -1,6 +1,5 @@
#!/bin/bash
-base_dir=$(dirname $0)/..
-
-bin/azkaban-executor-start.sh $base_dir 2>&1>logs/executorServerLog__`date +%F+%T`.out &
+# pass along command line arguments to azkaban-executor-start.sh script
+bin/azkaban-executor-start.sh "$@" 2>&1>logs/executorServerLog__`date +%F+%T`.out &
diff --git a/azkaban-webserver/src/main/java/azkaban/webapp/AzkabanWebServer.java b/azkaban-webserver/src/main/java/azkaban/webapp/AzkabanWebServer.java
index 3171e9a..b5b8d0e 100644
--- a/azkaban-webserver/src/main/java/azkaban/webapp/AzkabanWebServer.java
+++ b/azkaban-webserver/src/main/java/azkaban/webapp/AzkabanWebServer.java
@@ -39,6 +39,7 @@ import javax.management.ObjectName;
import org.apache.commons.lang.StringUtils;
import org.apache.log4j.Logger;
+import org.apache.log4j.jmx.HierarchyDynamicMBean;
import org.apache.velocity.app.VelocityEngine;
import org.apache.velocity.runtime.log.Log4JLogChute;
import org.apache.velocity.runtime.resource.loader.ClasspathResourceLoader;
@@ -53,8 +54,6 @@ import org.mortbay.jetty.servlet.DefaultServlet;
import org.mortbay.jetty.servlet.ServletHolder;
import org.mortbay.thread.QueuedThreadPool;
-import com.linkedin.restli.server.RestliServlet;
-
import azkaban.alert.Alerter;
import azkaban.database.AzkabanDatabaseSetup;
import azkaban.executor.ExecutorManager;
@@ -88,19 +87,21 @@ import azkaban.utils.FileIOUtils;
import azkaban.utils.Props;
import azkaban.utils.PropsUtils;
import azkaban.utils.Utils;
+import azkaban.webapp.plugin.PluginRegistry;
+import azkaban.webapp.plugin.TriggerPlugin;
+import azkaban.webapp.plugin.ViewerPlugin;
import azkaban.webapp.servlet.AbstractAzkabanServlet;
import azkaban.webapp.servlet.ExecutorServlet;
+import azkaban.webapp.servlet.HistoryServlet;
import azkaban.webapp.servlet.IndexRedirectServlet;
import azkaban.webapp.servlet.JMXHttpServlet;
-import azkaban.webapp.servlet.ScheduleServlet;
-import azkaban.webapp.servlet.HistoryServlet;
-import azkaban.webapp.servlet.ProjectServlet;
import azkaban.webapp.servlet.ProjectManagerServlet;
+import azkaban.webapp.servlet.ProjectServlet;
+import azkaban.webapp.servlet.ScheduleServlet;
import azkaban.webapp.servlet.StatsServlet;
import azkaban.webapp.servlet.TriggerManagerServlet;
-import azkaban.webapp.plugin.TriggerPlugin;
-import azkaban.webapp.plugin.ViewerPlugin;
-import azkaban.webapp.plugin.PluginRegistry;
+
+import com.linkedin.restli.server.RestliServlet;
/**
* The Azkaban Jetty server class
@@ -123,6 +124,9 @@ import azkaban.webapp.plugin.PluginRegistry;
* Jetty truststore password
*/
public class AzkabanWebServer extends AzkabanServer {
+ private static final String AZKABAN_ACCESS_LOGGER_NAME =
+ "azkaban.webapp.servlet.LoginAbstractAzkabanServlet";
+
private static final Logger logger = Logger.getLogger(AzkabanWebServer.class);
public static final String AZKABAN_HOME = "AZKABAN_HOME";
@@ -823,23 +827,25 @@ public class AzkabanWebServer extends AzkabanServer {
public void logTopMemoryConsumers() throws Exception, IOException {
if (new File("/bin/bash").exists() && new File("/bin/ps").exists()
- && new File("/usr/bin/head").exists()) {
+ && new File("/usr/bin/head").exists()) {
logger.info("logging top memeory consumer");
java.lang.ProcessBuilder processBuilder =
- new java.lang.ProcessBuilder("/bin/bash", "-c", "/bin/ps aux --sort -rss | /usr/bin/head");
+ new java.lang.ProcessBuilder("/bin/bash", "-c",
+ "/bin/ps aux --sort -rss | /usr/bin/head");
Process p = processBuilder.start();
p.waitFor();
-
+
InputStream is = p.getInputStream();
- java.io.BufferedReader reader = new java.io.BufferedReader(new InputStreamReader(is));
+ java.io.BufferedReader reader =
+ new java.io.BufferedReader(new InputStreamReader(is));
String line = null;
while ((line = reader.readLine()) != null) {
logger.info(line);
}
is.close();
}
- }
+ }
});
logger.info("Server running on " + (ssl ? "ssl" : "") + " port " + port
+ ".");
@@ -1233,6 +1239,22 @@ public class AzkabanWebServer extends AzkabanServer {
registerMbean("executorManager", new JmxExecutorManager(
(ExecutorManager) executorManager));
}
+
+ // Register Log4J loggers as JMX beans so the log level can be
+ // updated via JConsole or Java VisualVM
+ HierarchyDynamicMBean log4jMBean = new HierarchyDynamicMBean();
+ registerMbean("log4jmxbean", log4jMBean);
+ ObjectName accessLogLoggerObjName =
+ log4jMBean.addLoggerMBean(AZKABAN_ACCESS_LOGGER_NAME);
+
+ if (accessLogLoggerObjName == null) {
+ System.out
+ .println("************* loginLoggerObjName is null, make sure there is a logger with name "
+ + AZKABAN_ACCESS_LOGGER_NAME);
+ } else {
+ System.out.println("******** loginLoggerObjName: "
+ + accessLogLoggerObjName.getCanonicalName());
+ }
}
public void close() {
diff --git a/azkaban-webserver/src/main/java/azkaban/webapp/servlet/ExecutorServlet.java b/azkaban-webserver/src/main/java/azkaban/webapp/servlet/ExecutorServlet.java
index 4970fce..ef588f7 100644
--- a/azkaban-webserver/src/main/java/azkaban/webapp/servlet/ExecutorServlet.java
+++ b/azkaban-webserver/src/main/java/azkaban/webapp/servlet/ExecutorServlet.java
@@ -503,7 +503,7 @@ public class ExecutorServlet extends LoginAbstractAzkabanServlet {
} else {
ret.put("length", data.getLength());
ret.put("offset", data.getOffset());
- ret.put("data", data.getData());
+ ret.put("data", StringEscapeUtils.escapeHtml(data.getData()));
}
} catch (ExecutorManagerException e) {
throw new ServletException(e);
diff --git a/azkaban-webserver/src/main/java/azkaban/webapp/servlet/LoginAbstractAzkabanServlet.java b/azkaban-webserver/src/main/java/azkaban/webapp/servlet/LoginAbstractAzkabanServlet.java
index e0b9e38..1d9f315 100644
--- a/azkaban-webserver/src/main/java/azkaban/webapp/servlet/LoginAbstractAzkabanServlet.java
+++ b/azkaban-webserver/src/main/java/azkaban/webapp/servlet/LoginAbstractAzkabanServlet.java
@@ -43,6 +43,7 @@ import azkaban.user.Role;
import azkaban.user.User;
import azkaban.user.UserManager;
import azkaban.user.UserManagerException;
+import azkaban.utils.StringUtils;
/**
* Abstract Servlet that handles auto login when the session hasn't been
@@ -77,11 +78,17 @@ public abstract class LoginAbstractAzkabanServlet extends
private MultipartParser multipartParser;
+ private boolean shouldLogRawUserAgent = false;
+
@Override
public void init(ServletConfig config) throws ServletException {
super.init(config);
multipartParser = new MultipartParser(DEFAULT_UPLOAD_DISK_SPOOL_SIZE);
+
+ shouldLogRawUserAgent =
+ getApplication().getServerProps().getBoolean("accesslog.raw.useragent",
+ false);
}
public void setResourceDirectory(File file) {
@@ -93,6 +100,7 @@ public abstract class LoginAbstractAzkabanServlet extends
throws ServletException, IOException {
// Set session id
Session session = getSessionFromRequest(req);
+ logRequest(req, session);
if (hasParam(req, "logout")) {
resp.sendRedirect(req.getContextPath());
if (session != null) {
@@ -103,7 +111,9 @@ public abstract class LoginAbstractAzkabanServlet extends
}
if (session != null) {
- logger.info("Found session " + session.getUser());
+ if (logger.isDebugEnabled()) {
+ logger.debug("Found session " + session.getUser());
+ }
if (handleFileGet(req, resp)) {
return;
}
@@ -120,6 +130,46 @@ public abstract class LoginAbstractAzkabanServlet extends
}
}
+ /**
+ * Log out request - the format should be close to Apache access log format
+ *
+ * @param req
+ * @param session
+ */
+ private void logRequest(HttpServletRequest req, Session session) {
+ StringBuilder buf = new StringBuilder();
+ buf.append(req.getRemoteAddr()).append(" ");
+ if (session != null && session.getUser() != null) {
+ buf.append(session.getUser().getUserId()).append(" ");
+ } else {
+ buf.append(" - ").append(" ");
+ }
+
+ buf.append("\"");
+ buf.append(req.getMethod()).append(" ");
+ buf.append(req.getRequestURI()).append(" ");
+ if (req.getQueryString() != null) {
+ buf.append(req.getQueryString()).append(" ");
+ } else {
+ buf.append("-").append(" ");
+ }
+ buf.append(req.getProtocol()).append("\" ");
+
+ String userAgent = req.getHeader("User-Agent");
+ if (shouldLogRawUserAgent) {
+ buf.append(userAgent);
+ } else {
+ // simply log a short string to indicate browser or not
+ if (StringUtils.isFromBrowser(userAgent)) {
+ buf.append("browser");
+ } else {
+ buf.append("not-browser");
+ }
+ }
+
+ logger.info(buf.toString());
+ }
+
private boolean handleFileGet(HttpServletRequest req, HttpServletResponse resp)
throws IOException {
if (webResourceDirectory == null) {
@@ -168,7 +218,6 @@ public abstract class LoginAbstractAzkabanServlet extends
if (cookie != null) {
sessionId = cookie.getValue();
- logger.info("Session id " + sessionId);
}
if (sessionId == null && hasParam(req, "session.id")) {
@@ -211,6 +260,8 @@ public abstract class LoginAbstractAzkabanServlet extends
throws ServletException, IOException {
Session session = getSessionFromRequest(req);
+ logRequest(req, session);
+
// Handle Multipart differently from other post messages
if (ServletFileUpload.isMultipartContent(req)) {
Map<String, Object> params = multipartParser.parseMultipart(req);
diff --git a/azkaban-webserver/src/main/resources/log4j.properties b/azkaban-webserver/src/main/resources/log4j.properties
index b9fe746..4001d2c 100644
--- a/azkaban-webserver/src/main/resources/log4j.properties
+++ b/azkaban-webserver/src/main/resources/log4j.properties
@@ -1,21 +1,22 @@
-log4j.rootLogger=INFO, Console
+log_dir=${log4j.log.dir}
+
+log4j.rootLogger=INFO, WebServer
log4j.logger.azkaban.webapp=INFO, WebServer
-log4j.logger.azkaban.webapp.servlet.AbstractAzkabanServlet=INFO, R
-log4j.logger.azkaban.webapp.servlet.LoginAbstractAzkabanServlet=INFO, R
+log4j.logger.azkaban.webapp.servlet.AbstractAzkabanServlet=INFO, Access
+log4j.logger.azkaban.webapp.servlet.LoginAbstractAzkabanServlet=INFO, Access
+log4j.additivity.azkaban.webapp.servlet.LoginAbstractAzkabanServlet=false
-log4j.appender.R=org.apache.log4j.RollingFileAppender
-log4j.appender.R.layout=org.apache.log4j.PatternLayout
-log4j.appender.R.File=azkaban-access.log
-log4j.appender.R.layout.ConversionPattern=%d{yyyy/MM/dd HH:mm:ss.SSS Z} %p [%c{1}] [Azkaban] %m%n
-log4j.appender.R.MaxFileSize=102400MB
-log4j.appender.R.MaxBackupIndex=2
+log4j.appender.Access=org.apache.log4j.DailyRollingFileAppender
+log4j.appender.Access.layout=org.apache.log4j.PatternLayout
+log4j.appender.Access.File=${log_dir}/azkaban-access.log
+log4j.appender.Access.layout.ConversionPattern=%d{yyyy/MM/dd HH:mm:ss.SSS Z} %p [%c{1}] [Azkaban] %m%n
+log4j.appender.Access.DatePattern='.'yyyy-MM-dd
-log4j.appender.WebServer=org.apache.log4j.RollingFileAppender
+log4j.appender.WebServer=org.apache.log4j.DailyRollingFileAppender
log4j.appender.WebServer.layout=org.apache.log4j.PatternLayout
-log4j.appender.WebServer.File=azkaban-webserver.log
+log4j.appender.WebServer.File=${log_dir}/azkaban-webserver.log
log4j.appender.WebServer.layout.ConversionPattern=%d{yyyy/MM/dd HH:mm:ss.SSS Z} %p [%c{1}] [Azkaban] %m%n
-log4j.appender.WebServer.MaxFileSize=102400MB
-log4j.appender.WebServer.MaxBackupIndex=2
+log4j.appender.WebServer.DatePattern='.'yyyy-MM-dd
log4j.appender.Console=org.apache.log4j.ConsoleAppender
log4j.appender.Console.layout=org.apache.log4j.PatternLayout