azkaban-developers

Changes

Details

diff --git a/azkaban-common/src/main/java/azkaban/flow/CommonJobProperties.java b/azkaban-common/src/main/java/azkaban/flow/CommonJobProperties.java
index 0c98ecb..8c208a5 100644
--- a/azkaban-common/src/main/java/azkaban/flow/CommonJobProperties.java
+++ b/azkaban-common/src/main/java/azkaban/flow/CommonJobProperties.java
@@ -133,6 +133,11 @@ public class CommonJobProperties {
    * hotspot occurs.
    */
   public static final String PROJECT_VERSION = "azkaban.flow.projectversion";
+  
+  /**
+   * Find out who is the submit user, in addition to the user.to.proxy (they may be different)
+   */
+  public static final String SUBMIT_USER = "azkaban.flow.submituser";
 
   /**
    * A uuid assigned to every execution
diff --git a/azkaban-common/src/main/java/azkaban/jobcallback/JobCallbackValidator.java b/azkaban-common/src/main/java/azkaban/jobcallback/JobCallbackValidator.java
index 7b1462f..a85d1a3 100644
--- a/azkaban-common/src/main/java/azkaban/jobcallback/JobCallbackValidator.java
+++ b/azkaban-common/src/main/java/azkaban/jobcallback/JobCallbackValidator.java
@@ -54,8 +54,10 @@ public class JobCallbackValidator {
               maxPostBodyLength);
     }
 
-    logger.info("Found " + totalCallbackCount + " job callbacks for job "
-        + jobName);
+    if (logger.isDebugEnabled()) {
+      logger.debug("Found " + totalCallbackCount + " job callbacks for job "
+          + jobName);
+    }
     return totalCallbackCount;
   }
 
diff --git a/azkaban-common/src/main/java/azkaban/jobExecutor/ProcessJob.java b/azkaban-common/src/main/java/azkaban/jobExecutor/ProcessJob.java
index 4e268a0..3ba6d86 100644
--- a/azkaban-common/src/main/java/azkaban/jobExecutor/ProcessJob.java
+++ b/azkaban-common/src/main/java/azkaban/jobExecutor/ProcessJob.java
@@ -24,6 +24,7 @@ import java.util.concurrent.TimeUnit;
 
 import org.apache.log4j.Logger;
 
+import azkaban.flow.CommonJobProperties;
 import azkaban.jobExecutor.utils.process.AzkabanProcess;
 import azkaban.jobExecutor.utils.process.AzkabanProcessBuilder;
 import azkaban.utils.Pair;
@@ -36,15 +37,29 @@ import azkaban.utils.SystemMemoryInfo;
 public class ProcessJob extends AbstractProcessJob {
 
   public static final String COMMAND = "command";
+
   private static final long KILL_TIME_MS = 5000;
+
   private volatile AzkabanProcess process;
+
   private static final String MEMCHECK_ENABLED = "memCheck.enabled";
-  private static final String MEMCHECK_FREEMEMDECRAMT = "memCheck.freeMemDecrAmt";
+
+  private static final String MEMCHECK_FREEMEMDECRAMT =
+      "memCheck.freeMemDecrAmt";
+
   public static final String AZKABAN_MEMORY_CHECK = "azkaban.memory.check";
 
+  public static final String USER_TO_PROXY = "user.to.proxy";
+
+  public static final String KRB5CCNAME = "KRB5CCNAME";
+
   public ProcessJob(final String jobId, final Props sysProps,
       final Props jobProps, final Logger log) {
     super(jobId, sysProps, jobProps, log);
+
+    // this is in line with what other job types (hadoopJava, spark, pig, hive)
+    // is doing
+    jobProps.put(CommonJobProperties.JOB_ID, jobId);
   }
 
   @Override
@@ -55,13 +70,19 @@ public class ProcessJob extends AbstractProcessJob {
       handleError("Bad property definition! " + e.getMessage(), e);
     }
 
-    if (sysProps.getBoolean(MEMCHECK_ENABLED, true) && jobProps.getBoolean(AZKABAN_MEMORY_CHECK, true)) {
+    if (sysProps.getBoolean(MEMCHECK_ENABLED, true)
+        && jobProps.getBoolean(AZKABAN_MEMORY_CHECK, true)) {
       long freeMemDecrAmt = sysProps.getLong(MEMCHECK_FREEMEMDECRAMT, 0);
       Pair<Long, Long> memPair = getProcMemoryRequirement();
-      boolean isMemGranted = SystemMemoryInfo.canSystemGrantMemory(memPair.getFirst(), memPair.getSecond(), freeMemDecrAmt);
+      boolean isMemGranted =
+          SystemMemoryInfo.canSystemGrantMemory(memPair.getFirst(),
+              memPair.getSecond(), freeMemDecrAmt);
       if (!isMemGranted) {
-        throw new Exception(String.format("Cannot request memory (Xms %d kb, Xmx %d kb) from system for job %s",
-                memPair.getFirst(), memPair.getSecond(), getId()));
+        throw new Exception(
+            String
+                .format(
+                    "Cannot request memory (Xms %d kb, Xmx %d kb) from system for job %s",
+                    memPair.getFirst(), memPair.getSecond(), getId()));
       }
     }
 
@@ -82,6 +103,9 @@ public class ProcessJob extends AbstractProcessJob {
     File[] propFiles = initPropsFiles();
     Map<String, String> envVars = getEnvironmentVariables();
 
+    // change krb5ccname env var so that each job execution gets its own cache
+    envVars.put(KRB5CCNAME, getKrb5ccname(jobProps));
+
     for (String command : commands) {
       info("Command: " + command);
       AzkabanProcessBuilder builder =
@@ -120,9 +144,61 @@ public class ProcessJob extends AbstractProcessJob {
   }
 
   /**
+   * <pre>
+   * This method extracts the kerberos ticket cache file name from the jobprops.
+   * This method will ensure that each job execution will have its own kerberos ticket cache file
+   * Given that the code only sets an environmental variable, the number of files created corresponds
+   * to the number of processes that are doing kinit in their flow, which should not be an inordinately 
+   * high number.
+   * </pre>
+   * 
+   * @return file name: the kerberos ticket cache file to use
+   */
+  private String getKrb5ccname(Props jobProps) {
+    String effectiveUser = getEffectiveUser(jobProps);
+    String projectName =
+        jobProps.getString(CommonJobProperties.PROJECT_NAME).replace(" ", "_");
+    String flowId =
+        jobProps.getString(CommonJobProperties.FLOW_ID).replace(" ", "_");
+    String jobId =
+        jobProps.getString(CommonJobProperties.JOB_ID).replace(" ", "_");
+    // execId should be an int and should not have space in it, ever
+    String execId = jobProps.getString(CommonJobProperties.EXEC_ID);
+    String krb5ccname =
+        String.format("/tmp/krb5cc__%s__%s__%s__%s__%s", projectName, flowId,
+            jobId, execId, effectiveUser);
+
+    return krb5ccname;
+  }
+
+  /**
+   * <pre>
+   * Determines what user id should the process job run as, in the following order of precedence:
+   * 1. USER_TO_PROXY
+   * 2. SUBMIT_USER
+   * </pre>
+   * 
+   * @param jobProps
+   * @return the user that Azkaban is going to execute as
+   */
+  private String getEffectiveUser(Props jobProps) {
+    String effectiveUser = null;
+    if (jobProps.containsKey(USER_TO_PROXY)) {
+      effectiveUser = jobProps.getString(USER_TO_PROXY);
+    } else if (jobProps.containsKey(CommonJobProperties.SUBMIT_USER)) {
+      effectiveUser = jobProps.getString(CommonJobProperties.SUBMIT_USER);
+    } else {
+      throw new RuntimeException(
+          "Internal Error: No user.to.proxy or submit.user in the jobProps");
+    }
+    info("effective user is: " + effectiveUser);
+    return effectiveUser;
+  }
+
+  /**
    * This is used to get the min/max memory size requirement by processes.
-   * SystemMemoryInfo can use the info to determine if the memory request
-   * can be fulfilled. For Java process, this should be Xms/Xmx setting.
+   * SystemMemoryInfo can use the info to determine if the memory request can be
+   * fulfilled. For Java process, this should be Xms/Xmx setting.
    *
    * @return pair of min/max memory size
    */
diff --git a/azkaban-common/src/main/java/azkaban/project/JdbcProjectLoader.java b/azkaban-common/src/main/java/azkaban/project/JdbcProjectLoader.java
index 12b72dd..d726216 100644
--- a/azkaban-common/src/main/java/azkaban/project/JdbcProjectLoader.java
+++ b/azkaban-common/src/main/java/azkaban/project/JdbcProjectLoader.java
@@ -845,7 +845,6 @@ public class JdbcProjectLoader extends AbstractJdbcLoader implements
       byte[] stringData = json.getBytes("UTF-8");
       byte[] data = stringData;
 
-      logger.info("UTF-8 size:" + data.length);
       if (defaultEncodingType == EncodingType.GZIP) {
         data = GZIPUtils.gzipBytes(stringData);
       }
@@ -888,7 +887,6 @@ public class JdbcProjectLoader extends AbstractJdbcLoader implements
     byte[] stringData = json.getBytes("UTF-8");
     byte[] data = stringData;
 
-    logger.info("UTF-8 size:" + data.length);
     if (encType == EncodingType.GZIP) {
       data = GZIPUtils.gzipBytes(stringData);
     }
@@ -1009,7 +1007,6 @@ public class JdbcProjectLoader extends AbstractJdbcLoader implements
 
     String propertyJSON = PropsUtils.toJSONString(props, true);
     byte[] data = propertyJSON.getBytes("UTF-8");
-    logger.info("UTF-8 size:" + data.length);
     if (defaultEncodingType == EncodingType.GZIP) {
       data = GZIPUtils.gzipBytes(data);
     }
@@ -1032,7 +1029,6 @@ public class JdbcProjectLoader extends AbstractJdbcLoader implements
 
     String propertyJSON = PropsUtils.toJSONString(props, true);
     byte[] data = propertyJSON.getBytes("UTF-8");
-    logger.info("UTF-8 size:" + data.length);
     if (defaultEncodingType == EncodingType.GZIP) {
       data = GZIPUtils.gzipBytes(data);
     }
diff --git a/azkaban-common/src/main/java/azkaban/project/ProjectWhitelist.java b/azkaban-common/src/main/java/azkaban/project/ProjectWhitelist.java
index a6b1a39..039ab3a 100644
--- a/azkaban-common/src/main/java/azkaban/project/ProjectWhitelist.java
+++ b/azkaban-common/src/main/java/azkaban/project/ProjectWhitelist.java
@@ -22,14 +22,14 @@ import azkaban.utils.Props;
 
 /**
  * @author wkang
- * 
+ *
  * This class manages project whitelist defined in xml config file.
  * An single xml config file contains different types of whitelisted
  * projects. For additional type of whitelist, modify WhitelistType enum.
- * 
+ *
  * The xml config file should in the following format. Please note
  * the tag <MemoryCheck> is same as the defined enum MemoryCheck
- * 
+ *
  * <ProjectWhitelist>
  *  <MemoryCheck>
  *      <project projectname="project1" />
@@ -84,7 +84,7 @@ public class ProjectWhitelist {
     Map<WhitelistType, Set<Integer>> projsWhitelisted = new HashMap<WhitelistType, Set<Integer>>();
     NodeList tagList = doc.getChildNodes();
     if (!tagList.item(0).getNodeName().equals(PROJECT_WHITELIST_TAG)) {
-      throw new RuntimeException("Cannot find tag '" +  PROJECT_WHITELIST_TAG + "' in " + xmlFile);      
+      throw new RuntimeException("Cannot find tag '" +  PROJECT_WHITELIST_TAG + "' in " + xmlFile);
     }
 
     NodeList whitelist = tagList.item(0).getChildNodes();
@@ -114,7 +114,7 @@ public class ProjectWhitelist {
     NamedNodeMap projectAttrMap = node.getAttributes();
     Node projectIdAttr = projectAttrMap.getNamedItem(PROJECTID_ATTR);
     if (projectIdAttr == null) {
-      throw new RuntimeException("Error loading project. The '" + PROJECTID_ATTR 
+      throw new RuntimeException("Error loading project. The '" + PROJECTID_ATTR
               + "' attribute doesn't exist");
     }
 
@@ -127,7 +127,7 @@ public class ProjectWhitelist {
     if (projsWhitelisted != null) {
       Set<Integer> projs = projsWhitelisted.get(whitelistType);
       if (projs != null) {
-        return projs.contains(project); 
+        return projs.contains(project);
       }
     }
     return false;
@@ -138,6 +138,7 @@ public class ProjectWhitelist {
    * the defined enums.
    */
   public static enum WhitelistType {
-    MemoryCheck
+    MemoryCheck,
+    NumJobPerFlow
   }
 }
\ No newline at end of file
diff --git a/azkaban-common/src/main/java/azkaban/trigger/builtin/SlaChecker.java b/azkaban-common/src/main/java/azkaban/trigger/builtin/SlaChecker.java
index 1141176..27b9ba4 100644
--- a/azkaban-common/src/main/java/azkaban/trigger/builtin/SlaChecker.java
+++ b/azkaban-common/src/main/java/azkaban/trigger/builtin/SlaChecker.java
@@ -20,7 +20,6 @@ import java.util.HashMap;
 import java.util.Map;
 
 import org.apache.log4j.Logger;
-
 import org.joda.time.DateTime;
 import org.joda.time.ReadablePeriod;
 
@@ -57,7 +56,6 @@ public class SlaChecker implements ConditionChecker {
 
   private Boolean isSlaMissed(ExecutableFlow flow) {
     String type = slaOption.getType();
-    logger.info("flow is " + flow.getStatus());
     if (flow.getStartTime() < 0) {
       return Boolean.FALSE;
     }
@@ -136,7 +134,6 @@ public class SlaChecker implements ConditionChecker {
 
   private Boolean isSlaGood(ExecutableFlow flow) {
     String type = slaOption.getType();
-    logger.info("flow is " + flow.getStatus());
     if (flow.getStartTime() < 0) {
       return Boolean.FALSE;
     }
@@ -218,13 +215,11 @@ public class SlaChecker implements ConditionChecker {
   }
 
   public Object isSlaFailed() {
-    logger.info("Testing if sla failed for execution " + execId);
     ExecutableFlow flow;
     try {
       flow = executorManager.getExecutableFlow(execId);
     } catch (ExecutorManagerException e) {
       logger.error("Can't get executable flow.", e);
-      e.printStackTrace();
       // something wrong, send out alerts
       return Boolean.TRUE;
     }
@@ -232,13 +227,11 @@ public class SlaChecker implements ConditionChecker {
   }
 
   public Object isSlaPassed() {
-    logger.info("Testing if sla is good for execution " + execId);
     ExecutableFlow flow;
     try {
       flow = executorManager.getExecutableFlow(execId);
     } catch (ExecutorManagerException e) {
       logger.error("Can't get executable flow.", e);
-      e.printStackTrace();
       // something wrong, send out alerts
       return Boolean.TRUE;
     }
diff --git a/azkaban-common/src/main/java/azkaban/trigger/Condition.java b/azkaban-common/src/main/java/azkaban/trigger/Condition.java
index 7bb275b..5e8e7b6 100644
--- a/azkaban-common/src/main/java/azkaban/trigger/Condition.java
+++ b/azkaban-common/src/main/java/azkaban/trigger/Condition.java
@@ -25,7 +25,6 @@ import org.apache.commons.jexl2.Expression;
 import org.apache.commons.jexl2.JexlEngine;
 import org.apache.commons.jexl2.MapContext;
 import org.apache.log4j.Logger;
-
 import org.joda.time.DateTime;
 
 public class Condition {
@@ -119,7 +118,9 @@ public class Condition {
   }
 
   public boolean isMet() {
-    logger.info("Testing condition " + expression);
+    if (logger.isDebugEnabled()) {
+      logger.debug("Testing condition " + expression);
+    }
     return expression.evaluate(context).equals(Boolean.TRUE);
   }
 
diff --git a/azkaban-common/src/main/java/azkaban/trigger/JdbcTriggerLoader.java b/azkaban-common/src/main/java/azkaban/trigger/JdbcTriggerLoader.java
index 68c0508..99b32f9 100644
--- a/azkaban-common/src/main/java/azkaban/trigger/JdbcTriggerLoader.java
+++ b/azkaban-common/src/main/java/azkaban/trigger/JdbcTriggerLoader.java
@@ -194,7 +194,9 @@ public class JdbcTriggerLoader extends AbstractJdbcLoader implements
 
   @Override
   public void updateTrigger(Trigger t) throws TriggerLoaderException {
-    logger.info("Updating trigger " + t.getTriggerId() + " into db.");
+    if (logger.isDebugEnabled()) {
+      logger.debug("Updating trigger " + t.getTriggerId() + " into db.");
+    }
     t.setLastModifyTime(System.currentTimeMillis());
     Connection connection = getConnection();
     try {
@@ -238,7 +240,9 @@ public class JdbcTriggerLoader extends AbstractJdbcLoader implements
       if (updates == 0) {
         throw new TriggerLoaderException("No trigger has been updated.");
       } else {
-        logger.info("Updated " + updates + " records.");
+        if (logger.isDebugEnabled()) {
+          logger.debug("Updated " + updates + " records.");
+        }
       }
     } catch (SQLException e) {
       logger.error(UPDATE_TRIGGER + " failed.");
diff --git a/azkaban-common/src/main/java/azkaban/trigger/TriggerManager.java b/azkaban-common/src/main/java/azkaban/trigger/TriggerManager.java
index fa8d130..97e858c 100644
--- a/azkaban-common/src/main/java/azkaban/trigger/TriggerManager.java
+++ b/azkaban-common/src/main/java/azkaban/trigger/TriggerManager.java
@@ -281,7 +281,9 @@ public class TriggerManager extends EventHandler implements
             logger.info("Skipping trigger" + t.getTriggerId() + " until " + t.getNextCheckTime());
           }
 
-          logger.info("Checking trigger " + t.getTriggerId());
+          if (logger.isDebugEnabled()) {
+            logger.info("Checking trigger " + t.getTriggerId());
+          }
           if (t.getStatus().equals(TriggerStatus.READY)) {
             if (t.triggerConditionMet()) {
               onTriggerTrigger(t);
diff --git a/azkaban-common/src/main/java/azkaban/utils/PropsUtils.java b/azkaban-common/src/main/java/azkaban/utils/PropsUtils.java
index 596e8b3..3412425 100644
--- a/azkaban-common/src/main/java/azkaban/utils/PropsUtils.java
+++ b/azkaban-common/src/main/java/azkaban/utils/PropsUtils.java
@@ -292,6 +292,7 @@ public class PropsUtils {
     props.put(CommonJobProperties.FLOW_UUID, UUID.randomUUID().toString());
     props.put(CommonJobProperties.PROJECT_LAST_CHANGED_BY, flow.getLastModifiedByUser());
     props.put(CommonJobProperties.PROJECT_LAST_CHANGED_DATE, flow.getLastModifiedTimestamp());
+    props.put(CommonJobProperties.SUBMIT_USER, flow.getExecutableFlow().getSubmitUser());  
 
     DateTime loadTime = new DateTime();
 
diff --git a/azkaban-common/src/main/java/azkaban/utils/StringUtils.java b/azkaban-common/src/main/java/azkaban/utils/StringUtils.java
index 17792a0..924afd2 100644
--- a/azkaban-common/src/main/java/azkaban/utils/StringUtils.java
+++ b/azkaban-common/src/main/java/azkaban/utils/StringUtils.java
@@ -18,6 +18,7 @@ package azkaban.utils;
 
 import java.util.Collection;
 import java.util.List;
+import java.util.regex.Pattern;
 
 public class StringUtils {
   public static final char SINGLE_QUOTE = '\'';
@@ -88,4 +89,19 @@ public class StringUtils {
 
     return buffer.toString();
   }
+
+  private static final Pattern BROWSWER_PATTERN = Pattern
+      .compile(".*Gecko.*|.*AppleWebKit.*|.*Trident.*|.*Chrome.*");
+
+  public static boolean isFromBrowser(String userAgent) {
+    if (userAgent == null) {
+      return false;
+    }
+
+    if (BROWSWER_PATTERN.matcher(userAgent).matches()) {
+      return true;
+    } else {
+      return false;
+    }
+  }
 }
diff --git a/azkaban-common/src/test/java/azkaban/jobExecutor/JavaProcessJobTest.java b/azkaban-common/src/test/java/azkaban/jobExecutor/JavaProcessJobTest.java
index 05a0a28..068935f 100644
--- a/azkaban-common/src/test/java/azkaban/jobExecutor/JavaProcessJobTest.java
+++ b/azkaban-common/src/test/java/azkaban/jobExecutor/JavaProcessJobTest.java
@@ -22,7 +22,6 @@ import java.util.Date;
 import java.util.Properties;
 
 import org.apache.log4j.Logger;
-
 import org.junit.After;
 import org.junit.AfterClass;
 import org.junit.Assert;
@@ -33,6 +32,7 @@ import org.junit.Test;
 import org.junit.Rule;
 import org.junit.rules.TemporaryFolder;
 
+import azkaban.flow.CommonJobProperties;
 import azkaban.utils.Props;
 
 public class JavaProcessJobTest {
@@ -108,6 +108,14 @@ public class JavaProcessJobTest {
     props.put(AbstractProcessJob.WORKING_DIR, workingDir.getCanonicalPath());
     props.put("type", "java");
     props.put("fullPath", ".");
+    
+    props.put(CommonJobProperties.PROJECT_NAME, "test_project");
+    props.put(CommonJobProperties.FLOW_ID, "test_flow");
+    props.put(CommonJobProperties.JOB_ID, "test_job");
+    props.put(CommonJobProperties.EXEC_ID, "123");
+    props.put(CommonJobProperties.SUBMIT_USER, "test_user");
+    
+
     job = new JavaProcessJob("testJavaProcess", props, props, log);
   }
 
diff --git a/azkaban-common/src/test/java/azkaban/jobExecutor/ProcessJobTest.java b/azkaban-common/src/test/java/azkaban/jobExecutor/ProcessJobTest.java
index 974895b..57f4505 100644
--- a/azkaban-common/src/test/java/azkaban/jobExecutor/ProcessJobTest.java
+++ b/azkaban-common/src/test/java/azkaban/jobExecutor/ProcessJobTest.java
@@ -20,7 +20,6 @@ import java.io.File;
 import java.io.IOException;
 
 import org.apache.log4j.Logger;
-
 import org.junit.After;
 import org.junit.Assert;
 import org.junit.Before;
@@ -28,6 +27,7 @@ import org.junit.Test;
 import org.junit.Rule;
 import org.junit.rules.TemporaryFolder;
 
+import azkaban.flow.CommonJobProperties;
 import azkaban.utils.Props;
 
 public class ProcessJobTest {
@@ -46,6 +46,12 @@ public class ProcessJobTest {
     props.put(AbstractProcessJob.WORKING_DIR, workingDir.getCanonicalPath());
     props.put("type", "command");
     props.put("fullPath", ".");
+    
+    props.put(CommonJobProperties.PROJECT_NAME, "test_project");
+    props.put(CommonJobProperties.FLOW_ID, "test_flow");
+    props.put(CommonJobProperties.JOB_ID, "test_job");
+    props.put(CommonJobProperties.EXEC_ID, "123");
+    props.put(CommonJobProperties.SUBMIT_USER, "test_user");
 
     job = new ProcessJob("TestProcess", props, props, log);
   }
@@ -62,6 +68,37 @@ public class ProcessJobTest {
     job.run();
 
   }
+  
+  /**
+   * this job should run fine if the props contain user.to.proxy
+   * @throws Exception
+   */
+  @Test
+  public void testOneUnixCommandWithProxyUserInsteadOfSubmitUser() throws Exception {
+    
+    // Initialize the Props
+    props.removeLocal(CommonJobProperties.SUBMIT_USER);
+    props.put("user.to.proxy", "test_user");
+    props.put(ProcessJob.COMMAND, "ls -al");
+    
+    job.run();
+
+  }
+  
+  /**
+   * this job should fail because there is no user.to.proxy and no CommonJobProperties.SUBMIT_USER
+   * @throws Exception
+   */
+  @Test (expected=RuntimeException.class)
+  public void testOneUnixCommandWithNoUser() throws Exception {
+    
+    // Initialize the Props
+    props.removeLocal(CommonJobProperties.SUBMIT_USER);    
+    props.put(ProcessJob.COMMAND, "ls -al");
+    
+    job.run();
+
+  }
 
   @Test
   public void testFailedUnixCommand() throws Exception {
diff --git a/azkaban-common/src/test/java/azkaban/utils/StringUtilsTest.java b/azkaban-common/src/test/java/azkaban/utils/StringUtilsTest.java
new file mode 100644
index 0000000..71c3d21
--- /dev/null
+++ b/azkaban-common/src/test/java/azkaban/utils/StringUtilsTest.java
@@ -0,0 +1,80 @@
+package azkaban.utils;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+public class StringUtilsTest {
+
+  private static final String chromeOnMac =
+      "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_9_4) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/44.0.2403.155 Safari/537.36";
+  private static final String fireFoxOnMac =
+      "Mozilla/5.0 (Macintosh; Intel Mac OS X 10.9; rv:40.0) Gecko/20100101 Firefox/40.0";
+  private static final String safariOnMac =
+      "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_9_4) AppleWebKit/537.78.2 (KHTML, like Gecko) Version/7.0.6 Safari/537.78.2";
+  private static final String chromeOnLinux =
+      "Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/35.0.1916.153 Safari/537.36";
+  private static final String fireFoxOnLinux =
+      "Mozilla/5.0 (X11; Linux x86_64; rv:39.0) Gecko/20100101 Firefox/39.0";
+
+  private static final String[] browserVariants = { chromeOnMac, fireFoxOnMac,
+      safariOnMac, chromeOnLinux, fireFoxOnLinux };
+
+  private static final String[] BROWSER_NAMES = { "AppleWebKit", "Gecko",
+      "Chrome" };
+
+  @Test
+  public void isBrowser() throws Exception {
+
+    for (String browser : browserVariants) {
+      Assert.assertTrue(browser, StringUtils.isFromBrowser(browser));
+    }
+  }
+
+  @Test
+  public void notBrowserWithLowercase() throws Exception {
+
+    for (String browser : browserVariants) {
+      Assert.assertFalse(browser.toLowerCase(),
+          StringUtils.isFromBrowser(browser.toLowerCase()));
+    }
+  }
+
+  @Test
+  public void notBrowser() throws Exception {
+    String testStr = "curl";
+    Assert.assertFalse(testStr, StringUtils.isFromBrowser(testStr));
+  }
+
+  @Test
+  public void emptyBrowserString() throws Exception {
+
+    Assert.assertFalse("empty string", StringUtils.isFromBrowser(""));
+  }
+
+  @Test
+  public void nullBrowserString() throws Exception {
+
+    Assert.assertFalse("null string", StringUtils.isFromBrowser(null));
+  }
+
+  @Test
+  public void startsWithBrowserName() {
+    for (String name : BROWSER_NAMES) {
+      Assert.assertTrue(StringUtils.isFromBrowser(name + " is awesome"));
+    }
+  }
+
+  @Test
+  public void endsWithBrowserName() {
+    for (String name : BROWSER_NAMES) {
+      Assert.assertTrue(StringUtils.isFromBrowser("awesome is" + name));
+    }
+  }
+
+  @Test
+  public void containsBrowserName() {
+    for (String name : BROWSER_NAMES) {
+      Assert.assertTrue(StringUtils.isFromBrowser("awesome " + name + " is"));
+    }
+  }
+}
diff --git a/azkaban-execserver/src/main/java/azkaban/execapp/FlowRunnerManager.java b/azkaban-execserver/src/main/java/azkaban/execapp/FlowRunnerManager.java
index 361db00..01ab373 100644
--- a/azkaban-execserver/src/main/java/azkaban/execapp/FlowRunnerManager.java
+++ b/azkaban-execserver/src/main/java/azkaban/execapp/FlowRunnerManager.java
@@ -52,6 +52,8 @@ import azkaban.jobtype.JobTypeManager;
 import azkaban.jobtype.JobTypeManagerException;
 import azkaban.metric.MetricReportManager;
 import azkaban.project.ProjectLoader;
+import azkaban.project.ProjectWhitelist;
+import azkaban.project.ProjectWhitelist.WhitelistType;
 import azkaban.utils.FileIOUtils;
 import azkaban.utils.FileIOUtils.JobMetaData;
 import azkaban.utils.FileIOUtils.LogData;
@@ -481,7 +483,9 @@ public class FlowRunnerManager implements EventListener,
         int numJobs =
             Integer.valueOf(options.getFlowParameters().get(
                 FLOW_NUM_JOB_THREADS));
-        if (numJobs > 0 && numJobs <= numJobThreads) {
+        if (numJobs > 0 && (numJobs <= numJobThreads || ProjectWhitelist
+                .isProjectWhitelisted(flow.getProjectId(),
+                    WhitelistType.NumJobPerFlow))) {
           numJobThreads = numJobs;
         }
       } catch (Exception e) {
diff --git a/azkaban-execserver/src/main/resources/log4j.properties b/azkaban-execserver/src/main/resources/log4j.properties
index e304ac7..464f36a 100644
--- a/azkaban-execserver/src/main/resources/log4j.properties
+++ b/azkaban-execserver/src/main/resources/log4j.properties
@@ -1,12 +1,14 @@
-log4j.rootLogger=INFO, Console
+log_dir=${log4j.log.dir}
+
+log4j.rootLogger=INFO, ExecServer
 log4j.logger.azkaban.execapp=INFO, ExecServer
 
-log4j.appender.ExecServer=org.apache.log4j.RollingFileAppender
+log4j.appender.ExecServer=org.apache.log4j.DailyRollingFileAppender
 log4j.appender.ExecServer.layout=org.apache.log4j.PatternLayout
-log4j.appender.ExecServer.File=azkaban-execserver.log
+log4j.appender.ExecServer.File=${log_dir}/azkaban-execserver.log
 log4j.appender.ExecServer.layout.ConversionPattern=%d{yyyy/MM/dd HH:mm:ss.SSS Z} %p [%c{1}] [Azkaban] %m%n
-log4j.appender.ExecServer.MaxFileSize=102400MB
 log4j.appender.ExecServer.MaxBackupIndex=2
+log4j.appender.ExecServer.DatePattern='.'yyyy-MM-dd
 
 log4j.appender.Console=org.apache.log4j.ConsoleAppender
 log4j.appender.Console.layout=org.apache.log4j.PatternLayout
diff --git a/azkaban-execserver/src/package/bin/start-exec.sh b/azkaban-execserver/src/package/bin/start-exec.sh
index fbb7124..d8f0f73 100755
--- a/azkaban-execserver/src/package/bin/start-exec.sh
+++ b/azkaban-execserver/src/package/bin/start-exec.sh
@@ -1,6 +1,5 @@
 #!/bin/bash
 
-base_dir=$(dirname $0)/..
-
-bin/azkaban-executor-start.sh $base_dir 2>&1>logs/executorServerLog__`date +%F+%T`.out &
+# pass along command line arguments to azkaban-executor-start.sh script
+bin/azkaban-executor-start.sh "$@" 2>&1>logs/executorServerLog__`date +%F+%T`.out &
 
diff --git a/azkaban-webserver/src/main/java/azkaban/webapp/AzkabanWebServer.java b/azkaban-webserver/src/main/java/azkaban/webapp/AzkabanWebServer.java
index 3171e9a..b5b8d0e 100644
--- a/azkaban-webserver/src/main/java/azkaban/webapp/AzkabanWebServer.java
+++ b/azkaban-webserver/src/main/java/azkaban/webapp/AzkabanWebServer.java
@@ -39,6 +39,7 @@ import javax.management.ObjectName;
 
 import org.apache.commons.lang.StringUtils;
 import org.apache.log4j.Logger;
+import org.apache.log4j.jmx.HierarchyDynamicMBean;
 import org.apache.velocity.app.VelocityEngine;
 import org.apache.velocity.runtime.log.Log4JLogChute;
 import org.apache.velocity.runtime.resource.loader.ClasspathResourceLoader;
@@ -53,8 +54,6 @@ import org.mortbay.jetty.servlet.DefaultServlet;
 import org.mortbay.jetty.servlet.ServletHolder;
 import org.mortbay.thread.QueuedThreadPool;
 
-import com.linkedin.restli.server.RestliServlet;
-
 import azkaban.alert.Alerter;
 import azkaban.database.AzkabanDatabaseSetup;
 import azkaban.executor.ExecutorManager;
@@ -88,19 +87,21 @@ import azkaban.utils.FileIOUtils;
 import azkaban.utils.Props;
 import azkaban.utils.PropsUtils;
 import azkaban.utils.Utils;
+import azkaban.webapp.plugin.PluginRegistry;
+import azkaban.webapp.plugin.TriggerPlugin;
+import azkaban.webapp.plugin.ViewerPlugin;
 import azkaban.webapp.servlet.AbstractAzkabanServlet;
 import azkaban.webapp.servlet.ExecutorServlet;
+import azkaban.webapp.servlet.HistoryServlet;
 import azkaban.webapp.servlet.IndexRedirectServlet;
 import azkaban.webapp.servlet.JMXHttpServlet;
-import azkaban.webapp.servlet.ScheduleServlet;
-import azkaban.webapp.servlet.HistoryServlet;
-import azkaban.webapp.servlet.ProjectServlet;
 import azkaban.webapp.servlet.ProjectManagerServlet;
+import azkaban.webapp.servlet.ProjectServlet;
+import azkaban.webapp.servlet.ScheduleServlet;
 import azkaban.webapp.servlet.StatsServlet;
 import azkaban.webapp.servlet.TriggerManagerServlet;
-import azkaban.webapp.plugin.TriggerPlugin;
-import azkaban.webapp.plugin.ViewerPlugin;
-import azkaban.webapp.plugin.PluginRegistry;
+
+import com.linkedin.restli.server.RestliServlet;
 
 /**
  * The Azkaban Jetty server class
@@ -123,6 +124,9 @@ import azkaban.webapp.plugin.PluginRegistry;
  * Jetty truststore password
  */
 public class AzkabanWebServer extends AzkabanServer {
+  private static final String AZKABAN_ACCESS_LOGGER_NAME =
+      "azkaban.webapp.servlet.LoginAbstractAzkabanServlet";
+
   private static final Logger logger = Logger.getLogger(AzkabanWebServer.class);
 
   public static final String AZKABAN_HOME = "AZKABAN_HOME";
@@ -823,23 +827,25 @@ public class AzkabanWebServer extends AzkabanServer {
 
       public void logTopMemoryConsumers() throws Exception, IOException {
         if (new File("/bin/bash").exists() && new File("/bin/ps").exists()
-                && new File("/usr/bin/head").exists()) {
+            && new File("/usr/bin/head").exists()) {
           logger.info("logging top memeory consumer");
 
           java.lang.ProcessBuilder processBuilder =
-                  new java.lang.ProcessBuilder("/bin/bash", "-c", "/bin/ps aux --sort -rss | /usr/bin/head");
+              new java.lang.ProcessBuilder("/bin/bash", "-c",
+                  "/bin/ps aux --sort -rss | /usr/bin/head");
           Process p = processBuilder.start();
           p.waitFor();
-  
+
           InputStream is = p.getInputStream();
-          java.io.BufferedReader reader = new java.io.BufferedReader(new InputStreamReader(is));
+          java.io.BufferedReader reader =
+              new java.io.BufferedReader(new InputStreamReader(is));
           String line = null;
           while ((line = reader.readLine()) != null) {
             logger.info(line);
           }
           is.close();
         }
-      }      
+      }
     });
     logger.info("Server running on " + (ssl ? "ssl" : "") + " port " + port
         + ".");
@@ -1233,6 +1239,22 @@ public class AzkabanWebServer extends AzkabanServer {
       registerMbean("executorManager", new JmxExecutorManager(
           (ExecutorManager) executorManager));
     }
+
+    // Register Log4J loggers as JMX beans so the log level can be
+    // updated via JConsole or Java VisualVM
+    HierarchyDynamicMBean log4jMBean = new HierarchyDynamicMBean();
+    registerMbean("log4jmxbean", log4jMBean);
+    ObjectName accessLogLoggerObjName =
+        log4jMBean.addLoggerMBean(AZKABAN_ACCESS_LOGGER_NAME);
+
+    if (accessLogLoggerObjName == null) {
+      System.out
+          .println("************* loginLoggerObjName is null, make sure there is a logger with name "
+              + AZKABAN_ACCESS_LOGGER_NAME);
+    } else {
+      System.out.println("******** loginLoggerObjName: "
+          + accessLogLoggerObjName.getCanonicalName());
+    }
   }
 
   public void close() {
diff --git a/azkaban-webserver/src/main/java/azkaban/webapp/servlet/ExecutorServlet.java b/azkaban-webserver/src/main/java/azkaban/webapp/servlet/ExecutorServlet.java
index 4970fce..ef588f7 100644
--- a/azkaban-webserver/src/main/java/azkaban/webapp/servlet/ExecutorServlet.java
+++ b/azkaban-webserver/src/main/java/azkaban/webapp/servlet/ExecutorServlet.java
@@ -503,7 +503,7 @@ public class ExecutorServlet extends LoginAbstractAzkabanServlet {
       } else {
         ret.put("length", data.getLength());
         ret.put("offset", data.getOffset());
-        ret.put("data", data.getData());
+        ret.put("data", StringEscapeUtils.escapeHtml(data.getData()));
       }
     } catch (ExecutorManagerException e) {
       throw new ServletException(e);
diff --git a/azkaban-webserver/src/main/java/azkaban/webapp/servlet/LoginAbstractAzkabanServlet.java b/azkaban-webserver/src/main/java/azkaban/webapp/servlet/LoginAbstractAzkabanServlet.java
index e0b9e38..1d9f315 100644
--- a/azkaban-webserver/src/main/java/azkaban/webapp/servlet/LoginAbstractAzkabanServlet.java
+++ b/azkaban-webserver/src/main/java/azkaban/webapp/servlet/LoginAbstractAzkabanServlet.java
@@ -43,6 +43,7 @@ import azkaban.user.Role;
 import azkaban.user.User;
 import azkaban.user.UserManager;
 import azkaban.user.UserManagerException;
+import azkaban.utils.StringUtils;
 
 /**
  * Abstract Servlet that handles auto login when the session hasn't been
@@ -77,11 +78,17 @@ public abstract class LoginAbstractAzkabanServlet extends
 
   private MultipartParser multipartParser;
 
+  private boolean shouldLogRawUserAgent = false;
+
   @Override
   public void init(ServletConfig config) throws ServletException {
     super.init(config);
 
     multipartParser = new MultipartParser(DEFAULT_UPLOAD_DISK_SPOOL_SIZE);
+
+    shouldLogRawUserAgent =
+        getApplication().getServerProps().getBoolean("accesslog.raw.useragent",
+            false);
   }
 
   public void setResourceDirectory(File file) {
@@ -93,6 +100,7 @@ public abstract class LoginAbstractAzkabanServlet extends
       throws ServletException, IOException {
     // Set session id
     Session session = getSessionFromRequest(req);
+    logRequest(req, session);
     if (hasParam(req, "logout")) {
       resp.sendRedirect(req.getContextPath());
       if (session != null) {
@@ -103,7 +111,9 @@ public abstract class LoginAbstractAzkabanServlet extends
     }
 
     if (session != null) {
-      logger.info("Found session " + session.getUser());
+      if (logger.isDebugEnabled()) {
+        logger.debug("Found session " + session.getUser());
+      }
       if (handleFileGet(req, resp)) {
         return;
       }
@@ -120,6 +130,46 @@ public abstract class LoginAbstractAzkabanServlet extends
     }
   }
 
+  /**
+   * Log out request - the format should be close to Apache access log format
+   * 
+   * @param req
+   * @param session
+   */
+  private void logRequest(HttpServletRequest req, Session session) {
+    StringBuilder buf = new StringBuilder();
+    buf.append(req.getRemoteAddr()).append(" ");
+    if (session != null && session.getUser() != null) {
+      buf.append(session.getUser().getUserId()).append(" ");
+    } else {
+      buf.append(" - ").append(" ");
+    }
+
+    buf.append("\"");
+    buf.append(req.getMethod()).append(" ");
+    buf.append(req.getRequestURI()).append(" ");
+    if (req.getQueryString() != null) {
+      buf.append(req.getQueryString()).append(" ");
+    } else {
+      buf.append("-").append(" ");
+    }
+    buf.append(req.getProtocol()).append("\" ");
+
+    String userAgent = req.getHeader("User-Agent");
+    if (shouldLogRawUserAgent) {
+      buf.append(userAgent);
+    } else {
+      // simply log a short string to indicate browser or not
+      if (StringUtils.isFromBrowser(userAgent)) {
+        buf.append("browser");
+      } else {
+        buf.append("not-browser");
+      }
+    }
+
+    logger.info(buf.toString());
+  }
+
   private boolean handleFileGet(HttpServletRequest req, HttpServletResponse resp)
       throws IOException {
     if (webResourceDirectory == null) {
@@ -168,7 +218,6 @@ public abstract class LoginAbstractAzkabanServlet extends
 
     if (cookie != null) {
       sessionId = cookie.getValue();
-      logger.info("Session id " + sessionId);
     }
 
     if (sessionId == null && hasParam(req, "session.id")) {
@@ -211,6 +260,8 @@ public abstract class LoginAbstractAzkabanServlet extends
       throws ServletException, IOException {
     Session session = getSessionFromRequest(req);
 
+    logRequest(req, session);
+
     // Handle Multipart differently from other post messages
     if (ServletFileUpload.isMultipartContent(req)) {
       Map<String, Object> params = multipartParser.parseMultipart(req);
diff --git a/azkaban-webserver/src/main/resources/log4j.properties b/azkaban-webserver/src/main/resources/log4j.properties
index b9fe746..4001d2c 100644
--- a/azkaban-webserver/src/main/resources/log4j.properties
+++ b/azkaban-webserver/src/main/resources/log4j.properties
@@ -1,21 +1,22 @@
-log4j.rootLogger=INFO, Console
+log_dir=${log4j.log.dir}
+
+log4j.rootLogger=INFO, WebServer
 log4j.logger.azkaban.webapp=INFO, WebServer
-log4j.logger.azkaban.webapp.servlet.AbstractAzkabanServlet=INFO, R
-log4j.logger.azkaban.webapp.servlet.LoginAbstractAzkabanServlet=INFO, R
+log4j.logger.azkaban.webapp.servlet.AbstractAzkabanServlet=INFO, Access
+log4j.logger.azkaban.webapp.servlet.LoginAbstractAzkabanServlet=INFO, Access
+log4j.additivity.azkaban.webapp.servlet.LoginAbstractAzkabanServlet=false
 
-log4j.appender.R=org.apache.log4j.RollingFileAppender
-log4j.appender.R.layout=org.apache.log4j.PatternLayout
-log4j.appender.R.File=azkaban-access.log
-log4j.appender.R.layout.ConversionPattern=%d{yyyy/MM/dd HH:mm:ss.SSS Z} %p [%c{1}] [Azkaban] %m%n
-log4j.appender.R.MaxFileSize=102400MB
-log4j.appender.R.MaxBackupIndex=2
+log4j.appender.Access=org.apache.log4j.DailyRollingFileAppender
+log4j.appender.Access.layout=org.apache.log4j.PatternLayout
+log4j.appender.Access.File=${log_dir}/azkaban-access.log
+log4j.appender.Access.layout.ConversionPattern=%d{yyyy/MM/dd HH:mm:ss.SSS Z} %p [%c{1}] [Azkaban] %m%n
+log4j.appender.Access.DatePattern='.'yyyy-MM-dd
 
-log4j.appender.WebServer=org.apache.log4j.RollingFileAppender
+log4j.appender.WebServer=org.apache.log4j.DailyRollingFileAppender
 log4j.appender.WebServer.layout=org.apache.log4j.PatternLayout
-log4j.appender.WebServer.File=azkaban-webserver.log
+log4j.appender.WebServer.File=${log_dir}/azkaban-webserver.log
 log4j.appender.WebServer.layout.ConversionPattern=%d{yyyy/MM/dd HH:mm:ss.SSS Z} %p [%c{1}] [Azkaban] %m%n
-log4j.appender.WebServer.MaxFileSize=102400MB
-log4j.appender.WebServer.MaxBackupIndex=2
+log4j.appender.WebServer.DatePattern='.'yyyy-MM-dd
 
 log4j.appender.Console=org.apache.log4j.ConsoleAppender
 log4j.appender.Console.layout=org.apache.log4j.PatternLayout