azkaban-developers

Changes

.travis.yml 6(+6 -0)

azkaban-common/src/main/java/azkaban/utils/cache/Cache.java 193(+0 -193)

azkaban-common/src/main/java/azkaban/utils/cache/CacheManager.java 132(+0 -132)

azkaban-common/src/main/java/azkaban/utils/cache/Element.java 48(+0 -48)

azkaban-common/src/test/java/azkaban/utils/cache/CacheTest.java 159(+0 -159)

build.gradle 53(+47 -6)

gradlew 6(+1 -5)

Details

.travis.yml 6(+6 -0)

diff --git a/.travis.yml b/.travis.yml
index 09aad73..d7354f9 100644
--- a/.travis.yml
+++ b/.travis.yml
@@ -1,2 +1,8 @@
 languages: java
+# TODO(dzc): Add openjdk8 once it is available in Travis.
+jdk:
+  - openjdk7
+  - oraclejdk7
+  - oraclejdk8
+sudo: false
 script: ./gradlew distTar
diff --git a/azkaban-common/src/main/c/execute-as-user.c b/azkaban-common/src/main/c/execute-as-user.c
new file mode 100644
index 0000000..f27d5a1
--- /dev/null
+++ b/azkaban-common/src/main/c/execute-as-user.c
@@ -0,0 +1,125 @@
+/*
+ * Copyright 2015 LinkedIn Corp.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+#include <dirent.h>
+#include <fcntl.h>
+#include <fts.h>
+#include <errno.h>
+#include <grp.h>
+#include <unistd.h>
+#include <signal.h>
+#include <stdarg.h>
+#include <stdio.h>
+#include <stdlib.h>
+#include <string.h>
+#include <limits.h>
+#include <sys/stat.h>
+#include <sys/mount.h>
+#include <sys/types.h>
+#include <pwd.h>
+
+FILE *LOGFILE = NULL;
+FILE *ERRORFILE = NULL;
+int SETUID_OPER_FAILED = 10;
+int USER_NOT_FOUND = 20;
+int INVALID_INPUT = 30;
+
+/*
+ *  Change the real and effective user and group from super user to the specified user
+ *  
+ *  Adopted from:
+ *  ./hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/native/container-executor/impl/container-executor.c
+ *  
+ */
+
+int change_user(uid_t user, gid_t group) {
+    if (user == getuid() && user == geteuid() &&
+            group == getgid() && group == getegid()) {
+        return 0;
+    }
+
+    if (seteuid(0) != 0) {
+        fprintf(LOGFILE, "unable to reacquire root - %s\n", strerror(errno));
+        fprintf(LOGFILE, "Real: %d:%d; Effective: %d:%d\n",
+                getuid(), getgid(), geteuid(), getegid());
+        return SETUID_OPER_FAILED;
+    }
+    if (setgid(group) != 0) {
+        fprintf(LOGFILE, "unable to set group to %d - %s\n", group,
+                strerror(errno));
+        fprintf(LOGFILE, "Real: %d:%d; Effective: %d:%d\n",
+                getuid(), getgid(), geteuid(), getegid());
+        return SETUID_OPER_FAILED;
+    }
+    if (setuid(user) != 0) {
+        fprintf(LOGFILE, "unable to set user to %d - %s\n", user, strerror(errno));
+        fprintf(LOGFILE, "Real: %d:%d; Effective: %d:%d\n",
+                getuid(), getgid(), geteuid(), getegid());
+        return SETUID_OPER_FAILED;
+    }
+
+    return 0;
+}
+
+int main(int argc, char **argv){
+
+    // set up the logging stream
+    if (!LOGFILE){
+        LOGFILE=stdout;
+    }
+    if (!ERRORFILE){
+        ERRORFILE=stderr;
+    }
+
+    if (argc < 3) {
+        fprintf(ERRORFILE, "Requires at least 3 variables: ./execute-as-user uid command [args]");
+        return INVALID_INPUT;
+    }
+
+    char *uid = argv[1];
+
+    // gather information about user
+    struct passwd *user_info = getpwnam(uid);
+    if (user_info == NULL){
+        fprintf(LOGFILE, "user does not exist: %s", uid);
+        return USER_NOT_FOUND;
+    }
+
+    // try to change user
+    fprintf(LOGFILE, "Changing user: user: %s, uid: %d, gid: %d\n", uid, user_info->pw_uid, user_info->pw_gid);
+    int retval = change_user(user_info->pw_uid, user_info->pw_gid);
+    if (retval != 0){
+        fprintf(LOGFILE, "Error changing user to %s\n", uid);
+        return SETUID_OPER_FAILED;
+    }
+
+    // execute the command
+    char **user_argv = &argv[2];
+    fprintf(LOGFILE, "user command starting from: %s\n", user_argv[0]);
+    fflush(LOGFILE);
+    retval = execvp(*user_argv, user_argv);
+    fprintf(LOGFILE, "system call return value: %d\n", retval);
+
+    // sometimes system(cmd) returns 256, which is interpreted to 0, making a failed job a successful job
+    // hence this goofy piece of if statement.
+    if (retval != 0){
+        return 1;
+    }
+    else{
+        return 0;
+    }
+
+}
diff --git a/azkaban-common/src/main/java/azkaban/executor/ExecutorLoader.java b/azkaban-common/src/main/java/azkaban/executor/ExecutorLoader.java
index 6120002..80e8167 100644
--- a/azkaban-common/src/main/java/azkaban/executor/ExecutorLoader.java
+++ b/azkaban-common/src/main/java/azkaban/executor/ExecutorLoader.java
@@ -171,6 +171,20 @@ public interface ExecutorLoader {
   public void removeActiveExecutableReference(int execId)
       throws ExecutorManagerException;
 
+
+  /**
+   * <pre>
+   * Unset executor Id for an execution
+   * Note:-
+   * throws an Exception in case of a SQL issue
+   * </pre>
+   *
+   * @param executorId
+   * @param execId
+   * @throws ExecutorManagerException
+   */
+  public void unassignExecutor(int executionId) throws ExecutorManagerException;
+
   /**
    * <pre>
    * Set an executor Id to an execution
diff --git a/azkaban-common/src/main/java/azkaban/executor/ExecutorManager.java b/azkaban-common/src/main/java/azkaban/executor/ExecutorManager.java
index 39dd831..cffabe7 100644
--- a/azkaban-common/src/main/java/azkaban/executor/ExecutorManager.java
+++ b/azkaban-common/src/main/java/azkaban/executor/ExecutorManager.java
@@ -74,12 +74,14 @@ public class ExecutorManager extends EventHandler implements
     "azkaban.use.multiple.executors";
   private static final String AZKABAN_WEBSERVER_QUEUE_SIZE =
     "azkaban.webserver.queue.size";
-  private static final String AZKABAN_ACTIVE_EXECUTOR_REFRESHINTERVAL_IN_MS =
+  private static final String AZKABAN_ACTIVE_EXECUTOR_REFRESH_IN_MS =
     "azkaban.activeexecutor.refresh.milisecinterval";
-  private static final String AZKABAN_ACTIVE_EXECUTOR_REFRESHINTERVAL_IN_NUM_FLOW =
+  private static final String AZKABAN_ACTIVE_EXECUTOR_REFRESH_IN_NUM_FLOW =
     "azkaban.activeexecutor.refresh.flowinterval";
   private static final String AZKABAN_EXECUTORINFO_REFRESH_MAX_THREADS =
       "azkaban.executorinfo.refresh.maxThreads";
+  private static final String AZKABAN_MAX_DISPATCHING_ERRORS_PERMITTED =
+    "azkaban.maxDispatchingErrors";
 
   private static Logger logger = Logger.getLogger(ExecutorManager.class);
   private ExecutorLoader executorLoader;
@@ -99,7 +101,7 @@ public class ExecutorManager extends EventHandler implements
   private ExecutingManagerUpdaterThread executingManager;
   // 12 weeks
   private static final long DEFAULT_EXECUTION_LOGS_RETENTION_MS = 3 * 4 * 7
-      * 24 * 60 * 60 * 1000l;
+      * 24 * 60 * 60 * 1000L;
   private long lastCleanerThreadCheckTime = -1;
 
   private long lastThreadCheckTime = -1;
@@ -115,20 +117,19 @@ public class ExecutorManager extends EventHandler implements
   private long lastSuccessfulExecutorInfoRefresh;
   private ExecutorService executorInforRefresherService;
 
-  public ExecutorManager(Props props, ExecutorLoader loader,
-      Map<String, Alerter> alters) throws ExecutorManagerException {
-    azkProps = props;
+  public ExecutorManager(Props azkProps, ExecutorLoader loader,
+      Map<String, Alerter> alerters) throws ExecutorManagerException {
+    this.alerters = alerters;
+    this.azkProps = azkProps;
     this.executorLoader = loader;
     this.setupExecutors();
     this.loadRunningFlows();
 
     queuedFlows =
-        new QueuedExecutions(props.getLong(AZKABAN_WEBSERVER_QUEUE_SIZE, 100000));
+        new QueuedExecutions(azkProps.getLong(AZKABAN_WEBSERVER_QUEUE_SIZE, 100000));
     this.loadQueuedFlows();
 
-    alerters = alters;
-
-    cacheDir = new File(props.getString("cache.directory", "cache"));
+    cacheDir = new File(azkProps.getString("cache.directory", "cache"));
 
     executingManager = new ExecutingManagerUpdaterThread();
     executingManager.start();
@@ -138,7 +139,7 @@ public class ExecutorManager extends EventHandler implements
     }
 
     long executionLogsRetentionMs =
-      props.getLong("execution.logs.retention.ms",
+        azkProps.getLong("execution.logs.retention.ms",
         DEFAULT_EXECUTION_LOGS_RETENTION_MS);
 
     cleanerThread = new CleanerThread(executionLogsRetentionMs);
@@ -172,8 +173,9 @@ public class ExecutorManager extends EventHandler implements
     queueProcessor =
       new QueueProcessorThread(azkProps.getBoolean(
         AZKABAN_QUEUEPROCESSING_ENABLED, true), azkProps.getLong(
-        AZKABAN_ACTIVE_EXECUTOR_REFRESHINTERVAL_IN_MS, 1000), azkProps.getInt(
-        AZKABAN_ACTIVE_EXECUTOR_REFRESHINTERVAL_IN_NUM_FLOW, 1000));
+        AZKABAN_ACTIVE_EXECUTOR_REFRESH_IN_MS, 50000), azkProps.getInt(
+        AZKABAN_ACTIVE_EXECUTOR_REFRESH_IN_NUM_FLOW, 5), azkProps.getInt(
+        AZKABAN_MAX_DISPATCHING_ERRORS_PERMITTED, activeExecutors.size()));
 
     queueProcessor.start();
   }
@@ -241,7 +243,7 @@ public class ExecutorManager extends EventHandler implements
             @Override
             public String call() throws Exception {
               return callExecutorForJsonString(executor.getHost(),
-                executor.getPort(), "/serverstastics", null);
+                executor.getPort(), "/serverStatistics", null);
             }
           });
         futures.add(new Pair<Executor, Future<String>>(executor,
@@ -251,12 +253,14 @@ public class ExecutorManager extends EventHandler implements
       boolean wasSuccess = true;
       for (Pair<Executor, Future<String>> refreshPair : futures) {
         Executor executor = refreshPair.getFirst();
+        executor.setExecutorInfo(null); // invalidate cached EXecutorInfo
         try {
           // max 5 secs
           String jsonString = refreshPair.getSecond().get(5, TimeUnit.SECONDS);
           executor.setExecutorInfo(ExecutorInfo.fromJSONString(jsonString));
-          logger.info("Successfully refreshed ExecutorInfo for executor: "
-            + executor);
+          logger.info(String.format(
+            "Successfully refreshed executor: %s with executor info : %s",
+            executor, jsonString));
         } catch (TimeoutException e) {
           wasSuccess = false;
           logger.error("Timed out while waiting for ExecutorInfo refresh"
@@ -425,13 +429,6 @@ public class ExecutorManager extends EventHandler implements
 
   private void loadRunningFlows() throws ExecutorManagerException {
     runningFlows.putAll(executorLoader.fetchActiveFlows());
-    // Finalize all flows which were running on an executor which is now
-    // inactive
-    for (Pair<ExecutionReference, ExecutableFlow> pair : runningFlows.values()) {
-      if (!activeExecutors.contains(pair.getFirst().getExecutor())) {
-        finalizeFlows(pair.getSecond());
-      }
-    }
   }
 
   /*
@@ -450,7 +447,8 @@ public class ExecutorManager extends EventHandler implements
 
   /**
    * Gets a list of all the active (running flows and non-dispatched flows)
-   * executions for a given project and flow {@inheritDoc}
+   * executions for a given project and flow {@inheritDoc}. Results should
+   * be sorted as we assume this while setting up pipelined execution Id.
    *
    * @see azkaban.executor.ExecutorManagerAdapter#getRunningFlows(int,
    *      java.lang.String)
@@ -462,6 +460,7 @@ public class ExecutorManager extends EventHandler implements
       queuedFlows.getAllEntries()));
     executionIds.addAll(getRunningFlowsHelper(projectId, flowId,
       runningFlows.values()));
+    Collections.sort(executionIds);
     return executionIds;
   }
 
@@ -1035,14 +1034,11 @@ public class ExecutorManager extends EventHandler implements
           executorLoader.addActiveExecutableReference(reference);
           queuedFlows.enqueue(exflow, reference);
         } else {
-          Executor executor = activeExecutors.iterator().next();
           // assign only local executor we have
-          reference.setExecutor(executor);
+          Executor choosenExecutor = activeExecutors.iterator().next();
           executorLoader.addActiveExecutableReference(reference);
           try {
-            callExecutorServer(exflow, executor, ConnectorParams.EXECUTE_ACTION);
-            runningFlows.put(exflow.getExecutionId(),
-              new Pair<ExecutionReference, ExecutableFlow>(reference, exflow));
+            dispatch(reference, exflow, choosenExecutor);
           } catch (ExecutorManagerException e) {
             executorLoader.removeActiveExecutableReference(reference
               .getExecId());
@@ -1729,13 +1725,43 @@ public class ExecutorManager extends EventHandler implements
     }
   }
 
+  /**
+   * Calls executor to dispatch the flow, update db to assign the executor and
+   * in-memory state of executableFlow
+   */
+  private void dispatch(ExecutionReference reference, ExecutableFlow exflow,
+    Executor choosenExecutor) throws ExecutorManagerException {
+    exflow.setUpdateTime(System.currentTimeMillis());
+
+    executorLoader.assignExecutor(choosenExecutor.getId(),
+      exflow.getExecutionId());
+    try {
+      callExecutorServer(exflow, choosenExecutor,
+        ConnectorParams.EXECUTE_ACTION);
+    } catch (ExecutorManagerException ex) {
+      logger.error("Rolling back executor assignment for execution id:"
+        + exflow.getExecutionId(), ex);
+      executorLoader.unassignExecutor(exflow.getExecutionId());
+      throw new ExecutorManagerException(ex);
+    }
+    reference.setExecutor(choosenExecutor);
+
+    // move from flow to running flows
+    runningFlows.put(exflow.getExecutionId(),
+      new Pair<ExecutionReference, ExecutableFlow>(reference, exflow));
+
+    logger.info(String.format(
+      "Successfully dispatched exec %d with error count %d",
+      exflow.getExecutionId(), reference.getNumErrors()));
+  }
+
   /*
    * This thread is responsible for processing queued flows using dispatcher and
    * making rest api calls to executor server
    */
   private class QueueProcessorThread extends Thread {
     private static final long QUEUE_PROCESSOR_WAIT_IN_MS = 1000;
-    private static final int MAX_DISPATCHING_ERRORS_PERMITTED = 5;
+    private final int maxDispatchingErrors;
     private final long activeExecutorRefreshWindowInMilisec;
     private final int activeExecutorRefreshWindowInFlows;
 
@@ -1744,8 +1770,10 @@ public class ExecutorManager extends EventHandler implements
 
     public QueueProcessorThread(boolean isActive,
       long activeExecutorRefreshWindowInTime,
-      int activeExecutorRefreshWindowInFlows) {
+      int activeExecutorRefreshWindowInFlows,
+      int maxDispatchingErrors) {
       setActive(isActive);
+      this.maxDispatchingErrors = maxDispatchingErrors;
       this.activeExecutorRefreshWindowInFlows =
         activeExecutorRefreshWindowInFlows;
       this.activeExecutorRefreshWindowInMilisec =
@@ -1790,7 +1818,7 @@ public class ExecutorManager extends EventHandler implements
     private void processQueuedFlows(long activeExecutorsRefreshWindow,
       int maxContinuousFlowProcessed) throws InterruptedException,
       ExecutorManagerException {
-      long lastExecutorRefreshTime = System.currentTimeMillis();
+      long lastExecutorRefreshTime = 0;
       Pair<ExecutionReference, ExecutableFlow> runningCandidate;
       int currentContinuousFlowProcessed = 0;
 
@@ -1811,15 +1839,42 @@ public class ExecutorManager extends EventHandler implements
           currentContinuousFlowProcessed = 0;
         }
 
-        exflow.setUpdateTime(currentTime);
-        // process flow with current snapshot of activeExecutors
-        processFlow(reference, exflow, new HashSet<Executor>(activeExecutors));
-        currentContinuousFlowProcessed++;
+        /**
+         * <pre>
+         *  TODO: Work around till we improve Filters to have a notion of GlobalSystemState.
+         *        Currently we try each queued flow once to infer a global busy state
+         * Possible improvements:-
+         *   1. Move system level filters in refreshExecutors and sleep if we have all executors busy after refresh
+         *   2. Implement GlobalSystemState in selector or in a third place to manage system filters. Basically
+         *      taking out all the filters which do not depend on the flow but are still being part of Selector.
+         * Assumptions:-
+         *   1. no one else except QueueProcessor is updating ExecutableFlow update time
+         *   2. re-attempting a flow (which has been tried before) is considered as all executors are busy
+         * </pre>
+         */
+        if(exflow.getUpdateTime() > lastExecutorRefreshTime) {
+          // put back in the queue
+          queuedFlows.enqueue(exflow, reference);
+          long sleepInterval =
+            activeExecutorsRefreshWindow
+              - (currentTime - lastExecutorRefreshTime);
+          // wait till next executor refresh
+          sleep(sleepInterval);
+        } else {
+          exflow.setUpdateTime(currentTime);
+          // process flow with current snapshot of activeExecutors
+          selectExecutorAndDispatchFlow(reference, exflow, new HashSet<Executor>(activeExecutors));
+        }
+
+        // do not count failed flow processsing (flows still in queue)
+        if(queuedFlows.getFlow(exflow.getExecutionId()) == null) {
+          currentContinuousFlowProcessed++;
+        }
       }
     }
 
     /* process flow with a snapshot of available Executors */
-    private void processFlow(ExecutionReference reference,
+    private void selectExecutorAndDispatchFlow(ExecutionReference reference,
       ExecutableFlow exflow, Set<Executor> availableExecutors)
       throws ExecutorManagerException {
       synchronized (exflow) {
@@ -1889,7 +1944,7 @@ public class ExecutorManager extends EventHandler implements
         logger.info("Using dispatcher for execution id :"
           + exflow.getExecutionId());
         ExecutorSelector selector = new ExecutorSelector(filterList, comparatorWeightsMap);
-        choosenExecutor = selector.getBest(activeExecutors, exflow);
+        choosenExecutor = selector.getBest(availableExecutors, exflow);
       }
       return choosenExecutor;
     }
@@ -1903,14 +1958,14 @@ public class ExecutorManager extends EventHandler implements
             "Reached handleDispatchExceptionCase stage for exec %d with error count %d",
             exflow.getExecutionId(), reference.getNumErrors()));
       reference.setNumErrors(reference.getNumErrors() + 1);
-      if (reference.getNumErrors() >= MAX_DISPATCHING_ERRORS_PERMITTED
+      if (reference.getNumErrors() > this.maxDispatchingErrors
         || remainingExecutors.size() <= 1) {
         logger.error("Failed to process queued flow");
         finalizeFlows(exflow);
       } else {
         remainingExecutors.remove(lastSelectedExecutor);
         // try other executors except chosenExecutor
-        processFlow(reference, exflow, remainingExecutors);
+        selectExecutorAndDispatchFlow(reference, exflow, remainingExecutors);
       }
     }
 
@@ -1925,23 +1980,5 @@ public class ExecutorManager extends EventHandler implements
       // schedule can starve all others
       queuedFlows.enqueue(exflow, reference);
     }
-
-    private void dispatch(ExecutionReference reference, ExecutableFlow exflow,
-      Executor choosenExecutor) throws ExecutorManagerException {
-      exflow.setUpdateTime(System.currentTimeMillis());
-      callExecutorServer(exflow, choosenExecutor,
-        ConnectorParams.EXECUTE_ACTION);
-      executorLoader.assignExecutor(choosenExecutor.getId(),
-        exflow.getExecutionId());
-      reference.setExecutor(choosenExecutor);
-
-      // move from flow to running flows
-      runningFlows.put(exflow.getExecutionId(),
-        new Pair<ExecutionReference, ExecutableFlow>(reference, exflow));
-
-      logger.info(String.format(
-        "Successfully dispatched exec %d with error count %d",
-        exflow.getExecutionId(), reference.getNumErrors()));
-    }
   }
-}
\ No newline at end of file
+}
diff --git a/azkaban-common/src/main/java/azkaban/executor/JdbcExecutorLoader.java b/azkaban-common/src/main/java/azkaban/executor/JdbcExecutorLoader.java
index 7740163..a588cca 100644
--- a/azkaban-common/src/main/java/azkaban/executor/JdbcExecutorLoader.java
+++ b/azkaban-common/src/main/java/azkaban/executor/JdbcExecutorLoader.java
@@ -104,7 +104,7 @@ public class JdbcExecutorLoader extends AbstractJdbcLoader implements
           runner.query(connection, LastInsertID.LAST_INSERT_ID,
               new LastInsertID());
 
-      if (id == -1l) {
+      if (id == -1L) {
         throw new ExecutorManagerException(
             "Execution id is not properly created.");
       }
@@ -1061,7 +1061,7 @@ public class JdbcExecutorLoader extends AbstractJdbcLoader implements
     @Override
     public Long handle(ResultSet rs) throws SQLException {
       if (!rs.next()) {
-        return -1l;
+        return -1L;
       }
       long id = rs.getLong(1);
       return id;
@@ -1559,4 +1559,27 @@ public class JdbcExecutorLoader extends AbstractJdbcLoader implements
       return events;
     }
   }
+
+  /**
+   *
+   * {@inheritDoc}
+   * @see azkaban.executor.ExecutorLoader#unassignExecutor(int)
+   */
+  @Override
+  public void unassignExecutor(int executionId) throws ExecutorManagerException {
+    final String UPDATE =
+      "UPDATE execution_flows SET executor_id=NULL where exec_id=?";
+
+    QueryRunner runner = createQueryRunner();
+    try {
+      int rows = runner.update(UPDATE, executionId);
+      if (rows == 0) {
+        throw new ExecutorManagerException(String.format(
+          "Failed to unassign executor for execution : %d  ", executionId));
+      }
+    } catch (SQLException e) {
+      throw new ExecutorManagerException("Error updating execution id "
+        + executionId, e);
+    }
+  }
 }
diff --git a/azkaban-common/src/main/java/azkaban/executor/selector/CandidateComparator.java b/azkaban-common/src/main/java/azkaban/executor/selector/CandidateComparator.java
index 2f89129..f83788f 100644
--- a/azkaban-common/src/main/java/azkaban/executor/selector/CandidateComparator.java
+++ b/azkaban-common/src/main/java/azkaban/executor/selector/CandidateComparator.java
@@ -66,7 +66,7 @@ public abstract class CandidateComparator<T> implements Comparator<T> {
 
       // add or replace the Comparator.
       this.factorComparatorList.put(comparator.getFactorName(),comparator);
-      logger.info(String.format("Factor comparator added for '%s'. Weight = '%s'",
+      logger.debug(String.format("Factor comparator added for '%s'. Weight = '%s'",
           comparator.getFactorName(), comparator.getWeight()));
   }
 
@@ -104,7 +104,7 @@ public abstract class CandidateComparator<T> implements Comparator<T> {
    * @return a pair structure contains the score for both sides.
    * */
   public Pair<Integer,Integer> getComparisonScore(T object1, T object2){
-    logger.info(String.format("start comparing '%s' with '%s',  total weight = %s ",
+    logger.debug(String.format("start comparing '%s' with '%s',  total weight = %s ",
         object1 == null ? "(null)" : object1.toString(),
         object2 == null ? "(null)" : object2.toString(),
         this.getTotalWeight()));
@@ -114,16 +114,16 @@ public abstract class CandidateComparator<T> implements Comparator<T> {
 
     // short cut if object equals.
     if (object1 ==  object2){
-      logger.info("[Comparator] same object.");
+      logger.debug("[Comparator] same object.");
     } else
     // left side is null.
     if (object1 == null){
-      logger.info("[Comparator] left side is null, right side gets total weight.");
+      logger.debug("[Comparator] left side is null, right side gets total weight.");
       result2 = this.getTotalWeight();
     } else
     // right side is null.
     if (object2 == null){
-      logger.info("[Comparator] right side is null, left side gets total weight.");
+      logger.debug("[Comparator] right side is null, left side gets total weight.");
       result1 = this.getTotalWeight();
     } else
     // both side is not null,put them thru the full loop
@@ -133,20 +133,20 @@ public abstract class CandidateComparator<T> implements Comparator<T> {
         int result = comparator.compare(object1, object2);
         result1  = result1 + (result > 0 ? comparator.getWeight() : 0);
         result2  = result2 + (result < 0 ? comparator.getWeight() : 0);
-        logger.info(String.format("[Factor: %s] compare result : %s (current score %s vs %s)",
+        logger.debug(String.format("[Factor: %s] compare result : %s (current score %s vs %s)",
             comparator.getFactorName(), result, result1, result2));
       }
     }
     // in case of same score, use tie-breaker to stabilize the result.
     if (result1 == result2){
       boolean result = this.tieBreak(object1, object2);
-      logger.info("[TieBreaker] TieBreaker chose " +
+      logger.debug("[TieBreaker] TieBreaker chose " +
       (result? String.format("left side (%s)",  null== object1 ? "null": object1.toString()) :
                String.format("right side (%s)", null== object2 ? "null": object2.toString()) ));
       if (result) result1++; else result2++;
     }
 
-    logger.info(String.format("Result : %s vs %s ",result1,result2));
+    logger.debug(String.format("Result : %s vs %s ",result1,result2));
     return new Pair<Integer,Integer>(result1,result2);
   }
 
diff --git a/azkaban-common/src/main/java/azkaban/executor/selector/CandidateFilter.java b/azkaban-common/src/main/java/azkaban/executor/selector/CandidateFilter.java
index 94b8dfa..f927a2a 100644
--- a/azkaban-common/src/main/java/azkaban/executor/selector/CandidateFilter.java
+++ b/azkaban-common/src/main/java/azkaban/executor/selector/CandidateFilter.java
@@ -51,7 +51,7 @@ public abstract class CandidateFilter<T,V>  {
 
       // add or replace the filter.
       this.factorFilterList.put(filter.getFactorName(),filter);
-      logger.info(String.format("Factor filter added for '%s'.",
+      logger.debug(String.format("Factor filter added for '%s'.",
           filter.getFactorName()));
   }
 
@@ -62,7 +62,7 @@ public abstract class CandidateFilter<T,V>  {
    * @return true if the check passed, false if check failed, which means the item need to be filtered.
    * */
   public boolean filterTarget(T filteringTarget, V referencingObject){
-    logger.info(String.format("start filtering '%s' with factor filter for '%s'",
+    logger.debug(String.format("start filtering '%s' with factor filter for '%s'",
         filteringTarget == null ? "(null)" : filteringTarget.toString(),
         this.getName()));
 
@@ -70,13 +70,13 @@ public abstract class CandidateFilter<T,V>  {
     boolean result = true;
     for (FactorFilter<T,V> filter : filterList){
       result &= filter.filterTarget(filteringTarget,referencingObject);
-      logger.info(String.format("[Factor: %s] filter result : %s ",
+      logger.debug(String.format("[Factor: %s] filter result : %s ",
           filter.getFactorName(), result));
       if (!result){
         break;
       }
     }
-    logger.info(String.format("Final filtering result : %s ",result));
+    logger.debug(String.format("Final filtering result : %s ",result));
     return result;
   }
 }
diff --git a/azkaban-common/src/main/java/azkaban/executor/selector/CandidateSelector.java b/azkaban-common/src/main/java/azkaban/executor/selector/CandidateSelector.java
index 5cae9ef..8fa91d0 100644
--- a/azkaban-common/src/main/java/azkaban/executor/selector/CandidateSelector.java
+++ b/azkaban-common/src/main/java/azkaban/executor/selector/CandidateSelector.java
@@ -50,8 +50,8 @@ public class CandidateSelector<K extends Comparable<K>, V> implements Selector<K
        return null;
      }
 
-     logger.info("start candidate selection logic.");
-     logger.info(String.format("candidate count before filtering: %s", candidateList.size()));
+     logger.debug("start candidate selection logic.");
+     logger.debug(String.format("candidate count before filtering: %s", candidateList.size()));
 
      // to keep the input untouched, we will form up a new list based off the filtering result.
      Collection<K> filteredList = new ArrayList<K>();
@@ -64,22 +64,22 @@ public class CandidateSelector<K extends Comparable<K>, V> implements Selector<K
        }
      } else{
        filteredList = candidateList;
-       logger.info("skipping the candidate filtering as the filter object is not specifed.");
+       logger.debug("skipping the candidate filtering as the filter object is not specifed.");
      }
 
-     logger.info(String.format("candidate count after filtering: %s", filteredList.size()));
+     logger.debug(String.format("candidate count after filtering: %s", filteredList.size()));
      if (filteredList.size() == 0){
-       logger.info("failed to select candidate as the filtered candidate list is empty.");
+       logger.debug("failed to select candidate as the filtered candidate list is empty.");
        return null;
      }
 
      if (null == comparator){
-       logger.info("candidate comparator is not specified, default hash code comparator class will be used.");
+       logger.debug("candidate comparator is not specified, default hash code comparator class will be used.");
      }
 
      // final work - find the best candidate from the filtered list.
      K executor = Collections.max(filteredList, comparator);
-     logger.info(String.format("candidate selected %s",
+     logger.debug(String.format("candidate selected %s",
          null == executor ? "(null)" : executor.toString()));
      return executor;
   }
diff --git a/azkaban-common/src/main/java/azkaban/executor/selector/ExecutorComparator.java b/azkaban-common/src/main/java/azkaban/executor/selector/ExecutorComparator.java
index eafab67..978bcb9 100644
--- a/azkaban-common/src/main/java/azkaban/executor/selector/ExecutorComparator.java
+++ b/azkaban-common/src/main/java/azkaban/executor/selector/ExecutorComparator.java
@@ -122,14 +122,14 @@ public class ExecutorComparator extends CandidateComparator<Executor> {
     result = 0 ;
     // both doesn't expose the info
     if (null == statisticsObj1 && null == statisticsObj2){
-      logger.info(String.format("%s : neither of the executors exposed statistics info.",
+      logger.debug(String.format("%s : neither of the executors exposed statistics info.",
           caller));
       return true;
     }
 
     //right side doesn't expose the info.
     if (null == statisticsObj2 ){
-        logger.info(String.format("%s : choosing left side and the right side executor doesn't expose statistics info",
+        logger.debug(String.format("%s : choosing left side and the right side executor doesn't expose statistics info",
             caller));
         result = 1;
         return true;
@@ -137,7 +137,7 @@ public class ExecutorComparator extends CandidateComparator<Executor> {
 
     //left side doesn't expose the info.
     if (null == statisticsObj1 ){
-      logger.info(String.format("%s : choosing right side and the left side executor doesn't expose statistics info",
+      logger.debug(String.format("%s : choosing right side and the left side executor doesn't expose statistics info",
           caller));
       result = -1;
       return true;
diff --git a/azkaban-common/src/main/java/azkaban/executor/selector/ExecutorFilter.java b/azkaban-common/src/main/java/azkaban/executor/selector/ExecutorFilter.java
index 19baa10..a47bc16 100644
--- a/azkaban-common/src/main/java/azkaban/executor/selector/ExecutorFilter.java
+++ b/azkaban-common/src/main/java/azkaban/executor/selector/ExecutorFilter.java
@@ -42,7 +42,7 @@ public final class ExecutorFilter extends CandidateFilter<Executor, ExecutableFl
 
   // factor filter names.
   private static final String STATICREMAININGFLOWSIZE_FILTER_NAME = "StaticRemainingFlowSize";
-  private static final String MINIMUMFREEMEMORY_FILTER_NAME = "MinimunFreeMemory";
+  private static final String MINIMUMFREEMEMORY_FILTER_NAME = "MinimumFreeMemory";
   private static final String CPUSTATUS_FILTER_NAME = "CpuStatus";
 
   /**<pre>
@@ -98,13 +98,13 @@ public final class ExecutorFilter extends CandidateFilter<Executor, ExecutableFl
     return FactorFilter.create(STATICREMAININGFLOWSIZE_FILTER_NAME, new FactorFilter.Filter<Executor, ExecutableFlow>() {
       public boolean filterTarget(Executor filteringTarget, ExecutableFlow referencingObject) {
         if (null == filteringTarget){
-          logger.info(String.format("%s : filtering out the target as it is null.", STATICREMAININGFLOWSIZE_FILTER_NAME));
+          logger.debug(String.format("%s : filtering out the target as it is null.", STATICREMAININGFLOWSIZE_FILTER_NAME));
           return false;
         }
 
         ExecutorInfo stats = filteringTarget.getExecutorInfo();
         if (null == stats) {
-          logger.info(String.format("%s : filtering out %s as it's stats is unavailable.",
+          logger.debug(String.format("%s : filtering out %s as it's stats is unavailable.",
               STATICREMAININGFLOWSIZE_FILTER_NAME,
               filteringTarget.toString()));
           return false;
@@ -126,13 +126,13 @@ public final class ExecutorFilter extends CandidateFilter<Executor, ExecutableFl
       private static final int MINIMUM_FREE_MEMORY = 6 * 1024;
       public boolean filterTarget(Executor filteringTarget, ExecutableFlow referencingObject) {
         if (null == filteringTarget){
-          logger.info(String.format("%s : filtering out the target as it is null.", MINIMUMFREEMEMORY_FILTER_NAME));
+          logger.debug(String.format("%s : filtering out the target as it is null.", MINIMUMFREEMEMORY_FILTER_NAME));
           return false;
         }
 
         ExecutorInfo stats = filteringTarget.getExecutorInfo();
         if (null == stats) {
-          logger.info(String.format("%s : filtering out %s as it's stats is unavailable.",
+          logger.debug(String.format("%s : filtering out %s as it's stats is unavailable.",
               MINIMUMFREEMEMORY_FILTER_NAME,
               filteringTarget.toString()));
           return false;
@@ -156,13 +156,13 @@ public final class ExecutorFilter extends CandidateFilter<Executor, ExecutableFl
       private static final int MAX_CPU_CURRENT_USAGE = 95;
       public boolean filterTarget(Executor filteringTarget, ExecutableFlow referencingObject) {
         if (null == filteringTarget){
-          logger.info(String.format("%s : filtering out the target as it is null.", CPUSTATUS_FILTER_NAME));
+          logger.debug(String.format("%s : filtering out the target as it is null.", CPUSTATUS_FILTER_NAME));
           return false;
         }
 
         ExecutorInfo stats = filteringTarget.getExecutorInfo();
         if (null == stats) {
-          logger.info(String.format("%s : filtering out %s as it's stats is unavailable.",
+          logger.debug(String.format("%s : filtering out %s as it's stats is unavailable.",
               MINIMUMFREEMEMORY_FILTER_NAME,
               filteringTarget.toString()));
           return false;
diff --git a/azkaban-common/src/main/java/azkaban/flow/CommonJobProperties.java b/azkaban-common/src/main/java/azkaban/flow/CommonJobProperties.java
index 0c98ecb..8c208a5 100644
--- a/azkaban-common/src/main/java/azkaban/flow/CommonJobProperties.java
+++ b/azkaban-common/src/main/java/azkaban/flow/CommonJobProperties.java
@@ -133,6 +133,11 @@ public class CommonJobProperties {
    * hotspot occurs.
    */
   public static final String PROJECT_VERSION = "azkaban.flow.projectversion";
+  
+  /**
+   * Find out who is the submit user, in addition to the user.to.proxy (they may be different)
+   */
+  public static final String SUBMIT_USER = "azkaban.flow.submituser";
 
   /**
    * A uuid assigned to every execution
diff --git a/azkaban-common/src/main/java/azkaban/jobcallback/JobCallbackValidator.java b/azkaban-common/src/main/java/azkaban/jobcallback/JobCallbackValidator.java
index 7b1462f..a85d1a3 100644
--- a/azkaban-common/src/main/java/azkaban/jobcallback/JobCallbackValidator.java
+++ b/azkaban-common/src/main/java/azkaban/jobcallback/JobCallbackValidator.java
@@ -54,8 +54,10 @@ public class JobCallbackValidator {
               maxPostBodyLength);
     }
 
-    logger.info("Found " + totalCallbackCount + " job callbacks for job "
-        + jobName);
+    if (logger.isDebugEnabled()) {
+      logger.debug("Found " + totalCallbackCount + " job callbacks for job "
+          + jobName);
+    }
     return totalCallbackCount;
   }
 
diff --git a/azkaban-common/src/main/java/azkaban/jobExecutor/JavaProcessJob.java b/azkaban-common/src/main/java/azkaban/jobExecutor/JavaProcessJob.java
index e8e2dac..d3ab3f1 100644
--- a/azkaban-common/src/main/java/azkaban/jobExecutor/JavaProcessJob.java
+++ b/azkaban-common/src/main/java/azkaban/jobExecutor/JavaProcessJob.java
@@ -22,8 +22,8 @@ import java.util.List;
 
 import org.apache.log4j.Logger;
 
+import azkaban.project.DirectoryFlowLoader;
 import azkaban.server.AzkabanServer;
-import azkaban.utils.DirectoryFlowLoader;
 import azkaban.utils.Pair;
 import azkaban.utils.Props;
 import azkaban.utils.Utils;
diff --git a/azkaban-common/src/main/java/azkaban/jobExecutor/ProcessJob.java b/azkaban-common/src/main/java/azkaban/jobExecutor/ProcessJob.java
index 4e268a0..a6e4f30 100644
--- a/azkaban-common/src/main/java/azkaban/jobExecutor/ProcessJob.java
+++ b/azkaban-common/src/main/java/azkaban/jobExecutor/ProcessJob.java
@@ -24,6 +24,7 @@ import java.util.concurrent.TimeUnit;
 
 import org.apache.log4j.Logger;
 
+import azkaban.flow.CommonJobProperties;
 import azkaban.jobExecutor.utils.process.AzkabanProcess;
 import azkaban.jobExecutor.utils.process.AzkabanProcessBuilder;
 import azkaban.utils.Pair;
@@ -36,15 +37,32 @@ import azkaban.utils.SystemMemoryInfo;
 public class ProcessJob extends AbstractProcessJob {
 
   public static final String COMMAND = "command";
+
   private static final long KILL_TIME_MS = 5000;
+
   private volatile AzkabanProcess process;
+
   private static final String MEMCHECK_ENABLED = "memCheck.enabled";
-  private static final String MEMCHECK_FREEMEMDECRAMT = "memCheck.freeMemDecrAmt";
+
+  private static final String MEMCHECK_FREEMEMDECRAMT =
+      "memCheck.freeMemDecrAmt";
+
   public static final String AZKABAN_MEMORY_CHECK = "azkaban.memory.check";
 
+  public static final String NATIVE_LIB_FOLDER = "azkaban.native.lib";
+  public static final String EXECUTE_AS_USER = "execute.as.user";
+  public static final String EXECUTE_AS_USER_OVERRIDE =
+      "execute.as.user.override";
+  public static final String USER_TO_PROXY = "user.to.proxy";
+  public static final String KRB5CCNAME = "KRB5CCNAME";
+
   public ProcessJob(final String jobId, final Props sysProps,
       final Props jobProps, final Logger log) {
     super(jobId, sysProps, jobProps, log);
+
+    // this is in line with what other job types (hadoopJava, spark, pig, hive)
+    // is doing
+    jobProps.put(CommonJobProperties.JOB_ID, jobId);
   }
 
   @Override
@@ -55,13 +73,19 @@ public class ProcessJob extends AbstractProcessJob {
       handleError("Bad property definition! " + e.getMessage(), e);
     }
 
-    if (sysProps.getBoolean(MEMCHECK_ENABLED, true) && jobProps.getBoolean(AZKABAN_MEMORY_CHECK, true)) {
+    if (sysProps.getBoolean(MEMCHECK_ENABLED, true)
+        && jobProps.getBoolean(AZKABAN_MEMORY_CHECK, true)) {
       long freeMemDecrAmt = sysProps.getLong(MEMCHECK_FREEMEMDECRAMT, 0);
       Pair<Long, Long> memPair = getProcMemoryRequirement();
-      boolean isMemGranted = SystemMemoryInfo.canSystemGrantMemory(memPair.getFirst(), memPair.getSecond(), freeMemDecrAmt);
+      boolean isMemGranted =
+          SystemMemoryInfo.canSystemGrantMemory(memPair.getFirst(),
+              memPair.getSecond(), freeMemDecrAmt);
       if (!isMemGranted) {
-        throw new Exception(String.format("Cannot request memory (Xms %d kb, Xmx %d kb) from system for job %s",
-                memPair.getFirst(), memPair.getSecond(), getId()));
+        throw new Exception(
+            String
+                .format(
+                    "Cannot request memory (Xms %d kb, Xmx %d kb) from system for job %s",
+                    memPair.getFirst(), memPair.getSecond(), getId()));
       }
     }
 
@@ -80,13 +104,45 @@ public class ProcessJob extends AbstractProcessJob {
 
     info(commands.size() + " commands to execute.");
     File[] propFiles = initPropsFiles();
+
+    // change krb5ccname env var so that each job execution gets its own cache
     Map<String, String> envVars = getEnvironmentVariables();
+    envVars.put(KRB5CCNAME, getKrb5ccname(jobProps));
+
+    // determine whether to run as Azkaban or run as effectiveUser
+    String executeAsUserBinaryPath = null;
+    String effectiveUser = null;
+    boolean isExecuteAsUser = determineExecuteAsUser(sysProps, jobProps);
+
+    if (isExecuteAsUser) {
+      String nativeLibFolder = sysProps.getString(NATIVE_LIB_FOLDER);
+      executeAsUserBinaryPath =
+          String.format("%s/%s", nativeLibFolder, "execute-as-user");
+      effectiveUser = getEffectiveUser(jobProps);
+      if ("root".equals(effectiveUser)) {
+        throw new RuntimeException(
+            "Not permitted to proxy as root through Azkaban");
+      }
+    }
 
     for (String command : commands) {
-      info("Command: " + command);
-      AzkabanProcessBuilder builder =
-          new AzkabanProcessBuilder(partitionCommandLine(command))
-              .setEnv(envVars).setWorkingDir(getCwd()).setLogger(getLog());
+      AzkabanProcessBuilder builder = null;
+      if (isExecuteAsUser) {
+        command =
+            String.format("%s %s %s", executeAsUserBinaryPath, effectiveUser,
+                command);
+        info("Command: " + command);
+        builder =
+            new AzkabanProcessBuilder(partitionCommandLine(command))
+                .setEnv(envVars).setWorkingDir(getCwd()).setLogger(getLog())
+                .enableExecuteAsUser().setExecuteAsUserBinaryPath(executeAsUserBinaryPath)
+                .setEffectiveUser(effectiveUser);
+      } else {
+        info("Command: " + command);
+        builder =
+            new AzkabanProcessBuilder(partitionCommandLine(command))
+                .setEnv(envVars).setWorkingDir(getCwd()).setLogger(getLog());
+      }
 
       if (builder.getEnv().size() > 0) {
         info("Environment variables: " + builder.getEnv());
@@ -119,10 +175,71 @@ public class ProcessJob extends AbstractProcessJob {
     generateProperties(propFiles[1]);
   }
 
+  private boolean determineExecuteAsUser(Props sysProps, Props jobProps) {
+    boolean isExecuteAsUser = sysProps.getBoolean(EXECUTE_AS_USER, false);
+    // putting an override in case user needs to override. A temporary opening
+    if (jobProps.containsKey(EXECUTE_AS_USER_OVERRIDE))
+      isExecuteAsUser = jobProps.getBoolean(EXECUTE_AS_USER_OVERRIDE, false);
+
+    return isExecuteAsUser;
+  }
+
+  /**
+   * <pre>
+   * This method extracts the kerberos ticket cache file name from the jobprops.
+   * This method will ensure that each job execution will have its own kerberos ticket cache file
+   * Given that the code only sets an environmental variable, the number of files created corresponds
+   * to the number of processes that are doing kinit in their flow, which should not be an inordinately 
+   * high number.
+   * </pre>
+   * 
+   * @return file name: the kerberos ticket cache file to use
+   */
+  private String getKrb5ccname(Props jobProps) {
+    String effectiveUser = getEffectiveUser(jobProps);
+    String projectName =
+        jobProps.getString(CommonJobProperties.PROJECT_NAME).replace(" ", "_");
+    String flowId =
+        jobProps.getString(CommonJobProperties.FLOW_ID).replace(" ", "_");
+    String jobId =
+        jobProps.getString(CommonJobProperties.JOB_ID).replace(" ", "_");
+    // execId should be an int and should not have space in it, ever
+    String execId = jobProps.getString(CommonJobProperties.EXEC_ID);
+    String krb5ccname =
+        String.format("/tmp/krb5cc__%s__%s__%s__%s__%s", projectName, flowId,
+            jobId, execId, effectiveUser);
+
+    return krb5ccname;
+  }
+
+  /**
+   * <pre>
+   * Determines what user id should the process job run as, in the following order of precedence:
+   * 1. USER_TO_PROXY
+   * 2. SUBMIT_USER
+   * </pre>
+   * 
+   * @param jobProps
+   * @return the user that Azkaban is going to execute as
+   */
+  private String getEffectiveUser(Props jobProps) {
+    String effectiveUser = null;
+    if (jobProps.containsKey(USER_TO_PROXY)) {
+      effectiveUser = jobProps.getString(USER_TO_PROXY);
+    } else if (jobProps.containsKey(CommonJobProperties.SUBMIT_USER)) {
+      effectiveUser = jobProps.getString(CommonJobProperties.SUBMIT_USER);
+    } else {
+      throw new RuntimeException(
+          "Internal Error: No user.to.proxy or submit.user in the jobProps");
+    }
+    info("effective user is: " + effectiveUser);
+    return effectiveUser;
+  }
+
   /**
    * This is used to get the min/max memory size requirement by processes.
-   * SystemMemoryInfo can use the info to determine if the memory request
-   * can be fulfilled. For Java process, this should be Xms/Xmx setting.
+   * SystemMemoryInfo can use the info to determine if the memory request can be
+   * fulfilled. For Java process, this should be Xms/Xmx setting.
    *
    * @return pair of min/max memory size
    */
diff --git a/azkaban-common/src/main/java/azkaban/jobExecutor/utils/process/AzkabanProcess.java b/azkaban-common/src/main/java/azkaban/jobExecutor/utils/process/AzkabanProcess.java
index 9c3f092..9ad88d3 100644
--- a/azkaban-common/src/main/java/azkaban/jobExecutor/utils/process/AzkabanProcess.java
+++ b/azkaban-common/src/main/java/azkaban/jobExecutor/utils/process/AzkabanProcess.java
@@ -40,6 +40,9 @@ import com.google.common.base.Joiner;
  * loggers.
  */
 public class AzkabanProcess {
+  
+  public static String KILL_COMMAND = "kill";
+  
   private final String workingDir;
   private final List<String> cmd;
   private final Map<String, String> env;
@@ -50,6 +53,10 @@ public class AzkabanProcess {
   private volatile int processId;
   private volatile Process process;
 
+  private boolean isExecuteAsUser = false;
+  private String executeAsUserBinary = null;
+  private String effectiveUser = null;
+
   public AzkabanProcess(final List<String> cmd, final Map<String, String> env,
       final String workingDir, final Logger logger) {
     this.cmd = cmd;
@@ -61,6 +68,15 @@ public class AzkabanProcess {
     this.logger = logger;
   }
 
+  public AzkabanProcess(List<String> cmd, Map<String, String> env,
+      String workingDir, Logger logger, String executeAsUserBinary,
+      String effectiveUser) {
+    this(cmd, env, workingDir, logger);
+    this.isExecuteAsUser = true;
+    this.executeAsUserBinary = executeAsUserBinary;
+    this.effectiveUser = effectiveUser;
+  }
+
   /**
    * Execute this process, blocking until it has completed.
    */
@@ -101,13 +117,20 @@ public class AzkabanProcess {
       }
 
       completeLatch.countDown();
-      if (exitCode != 0) {
-        throw new ProcessFailureException(exitCode, errorGobbler.getRecentLog());
-      }
 
       // try to wait for everything to get logged out before exiting
       outputGobbler.awaitCompletion(5000);
       errorGobbler.awaitCompletion(5000);
+
+      if (exitCode != 0) {
+        String output =
+            new StringBuilder().append("Stdout:\n")
+                .append(outputGobbler.getRecentLog()).append("\n\n")
+                .append("Stderr:\n").append(errorGobbler.getRecentLog())
+                .append("\n").toString();
+        throw new ProcessFailureException(exitCode, output);
+      }
+
     } finally {
       IOUtils.closeQuietly(process.getInputStream());
       IOUtils.closeQuietly(process.getOutputStream());
@@ -155,7 +178,15 @@ public class AzkabanProcess {
     checkStarted();
     if (processId != 0 && isStarted()) {
       try {
-        Runtime.getRuntime().exec("kill " + processId);
+        if (isExecuteAsUser) {
+          String cmd =
+              String.format("%s %s %s %d", executeAsUserBinary,
+                  effectiveUser, KILL_COMMAND, processId);
+          Runtime.getRuntime().exec(cmd);
+        } else {
+          String cmd = String.format("%s %d", KILL_COMMAND, processId);
+          Runtime.getRuntime().exec(cmd);
+        }
         return completeLatch.await(time, unit);
       } catch (IOException e) {
         logger.error("Kill attempt failed.", e);
@@ -173,7 +204,15 @@ public class AzkabanProcess {
     if (isRunning()) {
       if (processId != 0) {
         try {
-          Runtime.getRuntime().exec("kill -9 " + processId);
+          if (isExecuteAsUser) {
+            String cmd =
+                String.format("%s %s %s -9 %d", executeAsUserBinary,
+                    effectiveUser, KILL_COMMAND, processId);
+            Runtime.getRuntime().exec(cmd);
+          } else {
+            String cmd = String.format("%s -9 %d", KILL_COMMAND, processId);
+            Runtime.getRuntime().exec(cmd);
+          }
         } catch (IOException e) {
           logger.error("Kill attempt failed.", e);
         }
@@ -234,4 +273,12 @@ public class AzkabanProcess {
     return "Process(cmd = " + Joiner.on(" ").join(cmd) + ", env = " + env
         + ", cwd = " + workingDir + ")";
   }
+
+  public boolean isExecuteAsUser() {
+    return isExecuteAsUser;
+  }
+
+  public String getEffectiveUser() {
+    return effectiveUser;
+  }
 }
diff --git a/azkaban-common/src/main/java/azkaban/jobExecutor/utils/process/AzkabanProcessBuilder.java b/azkaban-common/src/main/java/azkaban/jobExecutor/utils/process/AzkabanProcessBuilder.java
index 8832195..9e2c2f7 100644
--- a/azkaban-common/src/main/java/azkaban/jobExecutor/utils/process/AzkabanProcessBuilder.java
+++ b/azkaban-common/src/main/java/azkaban/jobExecutor/utils/process/AzkabanProcessBuilder.java
@@ -35,6 +35,9 @@ public class AzkabanProcessBuilder {
   private Map<String, String> env = new HashMap<String, String>();
   private String workingDir = System.getProperty("user.dir");
   private Logger logger = Logger.getLogger(AzkabanProcess.class);
+  private boolean isExecuteAsUser = false;
+  private String executeAsUserBinaryPath = null;
+  private String effectiveUser = null;
 
   private int stdErrSnippetSize = 30;
   private int stdOutSnippetSize = 30;
@@ -100,7 +103,12 @@ public class AzkabanProcessBuilder {
   }
 
   public AzkabanProcess build() {
-    return new AzkabanProcess(cmd, env, workingDir, logger);
+    if (isExecuteAsUser) {
+      return new AzkabanProcess(cmd, env, workingDir, logger,
+          executeAsUserBinaryPath, effectiveUser);
+    } else {
+      return new AzkabanProcess(cmd, env, workingDir, logger);
+    }
   }
 
   public List<String> getCommand() {
@@ -116,4 +124,19 @@ public class AzkabanProcessBuilder {
     return "ProcessBuilder(cmd = " + Joiner.on(" ").join(cmd) + ", env = "
         + env + ", cwd = " + workingDir + ")";
   }
+
+  public AzkabanProcessBuilder enableExecuteAsUser() {
+    this.isExecuteAsUser = true;
+    return this;
+  }
+
+  public AzkabanProcessBuilder setExecuteAsUserBinaryPath(String executeAsUserBinaryPath) {
+    this.executeAsUserBinaryPath = executeAsUserBinaryPath;
+    return this;
+  }
+
+  public AzkabanProcessBuilder setEffectiveUser(String effectiveUser) {
+    this.effectiveUser = effectiveUser;
+    return this;
+  }
 }
diff --git a/azkaban-common/src/main/java/azkaban/jobtype/JobTypeManager.java b/azkaban-common/src/main/java/azkaban/jobtype/JobTypeManager.java
index 212f1a9..f1cf351 100644
--- a/azkaban-common/src/main/java/azkaban/jobtype/JobTypeManager.java
+++ b/azkaban-common/src/main/java/azkaban/jobtype/JobTypeManager.java
@@ -340,6 +340,7 @@ public class JobTypeManager {
             jobProps, jobType));
       }
 
+      // TODO: should the logic below mirror the logic for PluginLoadProps?
       Props pluginJobProps = pluginSet.getPluginJobProps(jobType);
       if (pluginJobProps != null) {
         for (String k : pluginJobProps.getKeySet()) {
@@ -354,7 +355,11 @@ public class JobTypeManager {
       if (pluginLoadProps != null) {
         pluginLoadProps = PropsUtils.resolveProps(pluginLoadProps);
       } else {
-        pluginLoadProps = new Props();
+        // pluginSet.getCommonPluginLoadProps() will return null if there is no plugins directory.
+        // hence assigning default Props() if that's the case
+        pluginLoadProps = pluginSet.getCommonPluginLoadProps();
+        if(pluginJobProps == null)
+          pluginJobProps = new Props();
       }
 
       job =
diff --git a/azkaban-common/src/main/java/azkaban/project/JdbcProjectLoader.java b/azkaban-common/src/main/java/azkaban/project/JdbcProjectLoader.java
index 12b72dd..d726216 100644
--- a/azkaban-common/src/main/java/azkaban/project/JdbcProjectLoader.java
+++ b/azkaban-common/src/main/java/azkaban/project/JdbcProjectLoader.java
@@ -845,7 +845,6 @@ public class JdbcProjectLoader extends AbstractJdbcLoader implements
       byte[] stringData = json.getBytes("UTF-8");
       byte[] data = stringData;
 
-      logger.info("UTF-8 size:" + data.length);
       if (defaultEncodingType == EncodingType.GZIP) {
         data = GZIPUtils.gzipBytes(stringData);
       }
@@ -888,7 +887,6 @@ public class JdbcProjectLoader extends AbstractJdbcLoader implements
     byte[] stringData = json.getBytes("UTF-8");
     byte[] data = stringData;
 
-    logger.info("UTF-8 size:" + data.length);
     if (encType == EncodingType.GZIP) {
       data = GZIPUtils.gzipBytes(stringData);
     }
@@ -1009,7 +1007,6 @@ public class JdbcProjectLoader extends AbstractJdbcLoader implements
 
     String propertyJSON = PropsUtils.toJSONString(props, true);
     byte[] data = propertyJSON.getBytes("UTF-8");
-    logger.info("UTF-8 size:" + data.length);
     if (defaultEncodingType == EncodingType.GZIP) {
       data = GZIPUtils.gzipBytes(data);
     }
@@ -1032,7 +1029,6 @@ public class JdbcProjectLoader extends AbstractJdbcLoader implements
 
     String propertyJSON = PropsUtils.toJSONString(props, true);
     byte[] data = propertyJSON.getBytes("UTF-8");
-    logger.info("UTF-8 size:" + data.length);
     if (defaultEncodingType == EncodingType.GZIP) {
       data = GZIPUtils.gzipBytes(data);
     }
diff --git a/azkaban-common/src/main/java/azkaban/project/ProjectManager.java b/azkaban-common/src/main/java/azkaban/project/ProjectManager.java
index 0f09b9c..52024f1 100644
--- a/azkaban-common/src/main/java/azkaban/project/ProjectManager.java
+++ b/azkaban-common/src/main/java/azkaban/project/ProjectManager.java
@@ -32,6 +32,7 @@ import org.apache.commons.io.FileUtils;
 import org.apache.log4j.Logger;
 
 import azkaban.flow.Flow;
+import azkaban.project.DirectoryFlowLoader;
 import azkaban.project.ProjectLogEvent.EventType;
 import azkaban.project.ProjectWhitelist.WhitelistType;
 import azkaban.project.validator.ValidationReport;
@@ -42,7 +43,6 @@ import azkaban.project.validator.XmlValidatorManager;
 import azkaban.user.Permission;
 import azkaban.user.Permission.Type;
 import azkaban.user.User;
-import azkaban.utils.DirectoryFlowLoader;
 import azkaban.utils.Props;
 import azkaban.utils.Utils;
 
@@ -269,7 +269,7 @@ public class ProjectManager {
           "Project names must start with a letter, followed by any number of letters, digits, '-' or '_'.");
     }
 
-    if (projectsByName.contains(projectName)) {
+    if (projectsByName.containsKey(projectName)) {
       throw new ProjectManagerException("Project already exists.");
     }
 
diff --git a/azkaban-common/src/main/java/azkaban/project/ProjectWhitelist.java b/azkaban-common/src/main/java/azkaban/project/ProjectWhitelist.java
index a6b1a39..039ab3a 100644
--- a/azkaban-common/src/main/java/azkaban/project/ProjectWhitelist.java
+++ b/azkaban-common/src/main/java/azkaban/project/ProjectWhitelist.java
@@ -22,14 +22,14 @@ import azkaban.utils.Props;
 
 /**
  * @author wkang
- * 
+ *
  * This class manages project whitelist defined in xml config file.
  * An single xml config file contains different types of whitelisted
  * projects. For additional type of whitelist, modify WhitelistType enum.
- * 
+ *
  * The xml config file should in the following format. Please note
  * the tag <MemoryCheck> is same as the defined enum MemoryCheck
- * 
+ *
  * <ProjectWhitelist>
  *  <MemoryCheck>
  *      <project projectname="project1" />
@@ -84,7 +84,7 @@ public class ProjectWhitelist {
     Map<WhitelistType, Set<Integer>> projsWhitelisted = new HashMap<WhitelistType, Set<Integer>>();
     NodeList tagList = doc.getChildNodes();
     if (!tagList.item(0).getNodeName().equals(PROJECT_WHITELIST_TAG)) {
-      throw new RuntimeException("Cannot find tag '" +  PROJECT_WHITELIST_TAG + "' in " + xmlFile);      
+      throw new RuntimeException("Cannot find tag '" +  PROJECT_WHITELIST_TAG + "' in " + xmlFile);
     }
 
     NodeList whitelist = tagList.item(0).getChildNodes();
@@ -114,7 +114,7 @@ public class ProjectWhitelist {
     NamedNodeMap projectAttrMap = node.getAttributes();
     Node projectIdAttr = projectAttrMap.getNamedItem(PROJECTID_ATTR);
     if (projectIdAttr == null) {
-      throw new RuntimeException("Error loading project. The '" + PROJECTID_ATTR 
+      throw new RuntimeException("Error loading project. The '" + PROJECTID_ATTR
               + "' attribute doesn't exist");
     }
 
@@ -127,7 +127,7 @@ public class ProjectWhitelist {
     if (projsWhitelisted != null) {
       Set<Integer> projs = projsWhitelisted.get(whitelistType);
       if (projs != null) {
-        return projs.contains(project); 
+        return projs.contains(project);
       }
     }
     return false;
@@ -138,6 +138,7 @@ public class ProjectWhitelist {
    * the defined enums.
    */
   public static enum WhitelistType {
-    MemoryCheck
+    MemoryCheck,
+    NumJobPerFlow
   }
 }
\ No newline at end of file
diff --git a/azkaban-common/src/main/java/azkaban/project/validator/XmlValidatorManager.java b/azkaban-common/src/main/java/azkaban/project/validator/XmlValidatorManager.java
index 0f240da..cc6d005 100644
--- a/azkaban-common/src/main/java/azkaban/project/validator/XmlValidatorManager.java
+++ b/azkaban-common/src/main/java/azkaban/project/validator/XmlValidatorManager.java
@@ -24,7 +24,7 @@ import org.w3c.dom.NodeList;
 import org.xml.sax.SAXException;
 
 import azkaban.project.Project;
-import azkaban.utils.DirectoryFlowLoader;
+import azkaban.project.DirectoryFlowLoader;
 import azkaban.utils.Props;
 
 /**
diff --git a/azkaban-common/src/main/java/azkaban/server/session/SessionCache.java b/azkaban-common/src/main/java/azkaban/server/session/SessionCache.java
index fc4d64f..20f4496 100644
--- a/azkaban-common/src/main/java/azkaban/server/session/SessionCache.java
+++ b/azkaban-common/src/main/java/azkaban/server/session/SessionCache.java
@@ -16,10 +16,12 @@
 
 package azkaban.server.session;
 
+import com.google.common.cache.CacheBuilder;
+import com.google.common.cache.Cache;
+
+import java.util.concurrent.TimeUnit;
+
 import azkaban.utils.Props;
-import azkaban.utils.cache.Cache;
-import azkaban.utils.cache.CacheManager;
-import azkaban.utils.cache.Cache.EjectionPolicy;
 
 /**
  * Cache for web session.
@@ -34,7 +36,7 @@ public class SessionCache {
   private static final long SESSION_TIME_TO_LIVE = 24 * 60 * 60 * 1000L;
 
   // private CacheManager manager = CacheManager.create();
-  private Cache cache;
+  private Cache<String, Session> cache;
 
   /**
    * Constructor taking global props.
@@ -42,13 +44,12 @@ public class SessionCache {
    * @param props
    */
   public SessionCache(Props props) {
-    CacheManager manager = CacheManager.getInstance();
-
-    cache = manager.createCache();
-    cache.setEjectionPolicy(EjectionPolicy.LRU);
-    cache.setMaxCacheSize(props.getInt("max.num.sessions", MAX_NUM_SESSIONS));
-    cache.setExpiryTimeToLiveMs(props.getLong("session.time.to.live",
-        SESSION_TIME_TO_LIVE));
+    cache = CacheBuilder.newBuilder()
+        .maximumSize(props.getInt("max.num.sessions", MAX_NUM_SESSIONS))
+        .expireAfterAccess(
+            props.getLong("session.time.to.live", SESSION_TIME_TO_LIVE),
+            TimeUnit.MILLISECONDS)
+        .build();
   }
 
   /**
@@ -58,8 +59,7 @@ public class SessionCache {
    * @return
    */
   public Session getSession(String sessionId) {
-    Session elem = cache.<Session> get(sessionId);
-
+    Session elem = cache.getIfPresent(sessionId);
     return elem;
   }
 
@@ -79,7 +79,7 @@ public class SessionCache {
    * @param id
    * @return
    */
-  public boolean removeSession(String id) {
-    return cache.remove(id);
+  public void removeSession(String id) {
+    cache.invalidate(id);
   }
 }
diff --git a/azkaban-common/src/main/java/azkaban/trigger/builtin/SlaChecker.java b/azkaban-common/src/main/java/azkaban/trigger/builtin/SlaChecker.java
index 1141176..27b9ba4 100644
--- a/azkaban-common/src/main/java/azkaban/trigger/builtin/SlaChecker.java
+++ b/azkaban-common/src/main/java/azkaban/trigger/builtin/SlaChecker.java
@@ -20,7 +20,6 @@ import java.util.HashMap;
 import java.util.Map;
 
 import org.apache.log4j.Logger;
-
 import org.joda.time.DateTime;
 import org.joda.time.ReadablePeriod;
 
@@ -57,7 +56,6 @@ public class SlaChecker implements ConditionChecker {
 
   private Boolean isSlaMissed(ExecutableFlow flow) {
     String type = slaOption.getType();
-    logger.info("flow is " + flow.getStatus());
     if (flow.getStartTime() < 0) {
       return Boolean.FALSE;
     }
@@ -136,7 +134,6 @@ public class SlaChecker implements ConditionChecker {
 
   private Boolean isSlaGood(ExecutableFlow flow) {
     String type = slaOption.getType();
-    logger.info("flow is " + flow.getStatus());
     if (flow.getStartTime() < 0) {
       return Boolean.FALSE;
     }
@@ -218,13 +215,11 @@ public class SlaChecker implements ConditionChecker {
   }
 
   public Object isSlaFailed() {
-    logger.info("Testing if sla failed for execution " + execId);
     ExecutableFlow flow;
     try {
       flow = executorManager.getExecutableFlow(execId);
     } catch (ExecutorManagerException e) {
       logger.error("Can't get executable flow.", e);
-      e.printStackTrace();
       // something wrong, send out alerts
       return Boolean.TRUE;
     }
@@ -232,13 +227,11 @@ public class SlaChecker implements ConditionChecker {
   }
 
   public Object isSlaPassed() {
-    logger.info("Testing if sla is good for execution " + execId);
     ExecutableFlow flow;
     try {
       flow = executorManager.getExecutableFlow(execId);
     } catch (ExecutorManagerException e) {
       logger.error("Can't get executable flow.", e);
-      e.printStackTrace();
       // something wrong, send out alerts
       return Boolean.TRUE;
     }
diff --git a/azkaban-common/src/main/java/azkaban/trigger/Condition.java b/azkaban-common/src/main/java/azkaban/trigger/Condition.java
index 7bb275b..5e8e7b6 100644
--- a/azkaban-common/src/main/java/azkaban/trigger/Condition.java
+++ b/azkaban-common/src/main/java/azkaban/trigger/Condition.java
@@ -25,7 +25,6 @@ import org.apache.commons.jexl2.Expression;
 import org.apache.commons.jexl2.JexlEngine;
 import org.apache.commons.jexl2.MapContext;
 import org.apache.log4j.Logger;
-
 import org.joda.time.DateTime;
 
 public class Condition {
@@ -119,7 +118,9 @@ public class Condition {
   }
 
   public boolean isMet() {
-    logger.info("Testing condition " + expression);
+    if (logger.isDebugEnabled()) {
+      logger.debug("Testing condition " + expression);
+    }
     return expression.evaluate(context).equals(Boolean.TRUE);
   }
 
diff --git a/azkaban-common/src/main/java/azkaban/trigger/JdbcTriggerLoader.java b/azkaban-common/src/main/java/azkaban/trigger/JdbcTriggerLoader.java
index 68c0508..007d872 100644
--- a/azkaban-common/src/main/java/azkaban/trigger/JdbcTriggerLoader.java
+++ b/azkaban-common/src/main/java/azkaban/trigger/JdbcTriggerLoader.java
@@ -178,7 +178,7 @@ public class JdbcTriggerLoader extends AbstractJdbcLoader implements
           runner.query(connection, LastInsertID.LAST_INSERT_ID,
               new LastInsertID());
 
-      if (id == -1l) {
+      if (id == -1L) {
         logger.error("trigger id is not properly created.");
         throw new TriggerLoaderException("trigger id is not properly created.");
       }
@@ -194,7 +194,9 @@ public class JdbcTriggerLoader extends AbstractJdbcLoader implements
 
   @Override
   public void updateTrigger(Trigger t) throws TriggerLoaderException {
-    logger.info("Updating trigger " + t.getTriggerId() + " into db.");
+    if (logger.isDebugEnabled()) {
+      logger.debug("Updating trigger " + t.getTriggerId() + " into db.");
+    }
     t.setLastModifyTime(System.currentTimeMillis());
     Connection connection = getConnection();
     try {
@@ -238,7 +240,9 @@ public class JdbcTriggerLoader extends AbstractJdbcLoader implements
       if (updates == 0) {
         throw new TriggerLoaderException("No trigger has been updated.");
       } else {
-        logger.info("Updated " + updates + " records.");
+        if (logger.isDebugEnabled()) {
+          logger.debug("Updated " + updates + " records.");
+        }
       }
     } catch (SQLException e) {
       logger.error(UPDATE_TRIGGER + " failed.");
@@ -253,7 +257,7 @@ public class JdbcTriggerLoader extends AbstractJdbcLoader implements
     @Override
     public Long handle(ResultSet rs) throws SQLException {
       if (!rs.next()) {
-        return -1l;
+        return -1L;
       }
 
       long id = rs.getLong(1);
diff --git a/azkaban-common/src/main/java/azkaban/trigger/TriggerManager.java b/azkaban-common/src/main/java/azkaban/trigger/TriggerManager.java
index fa8d130..97e858c 100644
--- a/azkaban-common/src/main/java/azkaban/trigger/TriggerManager.java
+++ b/azkaban-common/src/main/java/azkaban/trigger/TriggerManager.java
@@ -281,7 +281,9 @@ public class TriggerManager extends EventHandler implements
             logger.info("Skipping trigger" + t.getTriggerId() + " until " + t.getNextCheckTime());
           }
 
-          logger.info("Checking trigger " + t.getTriggerId());
+          if (logger.isDebugEnabled()) {
+            logger.info("Checking trigger " + t.getTriggerId());
+          }
           if (t.getStatus().equals(TriggerStatus.READY)) {
             if (t.triggerConditionMet()) {
               onTriggerTrigger(t);
diff --git a/azkaban-common/src/main/java/azkaban/utils/PropsUtils.java b/azkaban-common/src/main/java/azkaban/utils/PropsUtils.java
index 596e8b3..3412425 100644
--- a/azkaban-common/src/main/java/azkaban/utils/PropsUtils.java
+++ b/azkaban-common/src/main/java/azkaban/utils/PropsUtils.java
@@ -292,6 +292,7 @@ public class PropsUtils {
     props.put(CommonJobProperties.FLOW_UUID, UUID.randomUUID().toString());
     props.put(CommonJobProperties.PROJECT_LAST_CHANGED_BY, flow.getLastModifiedByUser());
     props.put(CommonJobProperties.PROJECT_LAST_CHANGED_DATE, flow.getLastModifiedTimestamp());
+    props.put(CommonJobProperties.SUBMIT_USER, flow.getExecutableFlow().getSubmitUser());  
 
     DateTime loadTime = new DateTime();
 
diff --git a/azkaban-common/src/main/java/azkaban/utils/StringUtils.java b/azkaban-common/src/main/java/azkaban/utils/StringUtils.java
index 17792a0..924afd2 100644
--- a/azkaban-common/src/main/java/azkaban/utils/StringUtils.java
+++ b/azkaban-common/src/main/java/azkaban/utils/StringUtils.java
@@ -18,6 +18,7 @@ package azkaban.utils;
 
 import java.util.Collection;
 import java.util.List;
+import java.util.regex.Pattern;
 
 public class StringUtils {
   public static final char SINGLE_QUOTE = '\'';
@@ -88,4 +89,19 @@ public class StringUtils {
 
     return buffer.toString();
   }
+
+  private static final Pattern BROWSWER_PATTERN = Pattern
+      .compile(".*Gecko.*|.*AppleWebKit.*|.*Trident.*|.*Chrome.*");
+
+  public static boolean isFromBrowser(String userAgent) {
+    if (userAgent == null) {
+      return false;
+    }
+
+    if (BROWSWER_PATTERN.matcher(userAgent).matches()) {
+      return true;
+    } else {
+      return false;
+    }
+  }
 }
diff --git a/azkaban-common/src/main/java/azkaban/utils/TypedMapWrapper.java b/azkaban-common/src/main/java/azkaban/utils/TypedMapWrapper.java
index 7118e8c..62208f3 100644
--- a/azkaban-common/src/main/java/azkaban/utils/TypedMapWrapper.java
+++ b/azkaban-common/src/main/java/azkaban/utils/TypedMapWrapper.java
@@ -71,7 +71,7 @@ public class TypedMapWrapper<K, V> {
   }
 
   public Long getLong(K key) {
-    return getLong(key, -1l);
+    return getLong(key, -1L);
   }
 
   public Long getLong(K key, Long defaultVal) {
diff --git a/azkaban-common/src/test/java/azkaban/executor/ExecutableFlowTest.java b/azkaban-common/src/test/java/azkaban/executor/ExecutableFlowTest.java
index 91a4290..c6400f8 100644
--- a/azkaban-common/src/test/java/azkaban/executor/ExecutableFlowTest.java
+++ b/azkaban-common/src/test/java/azkaban/executor/ExecutableFlowTest.java
@@ -28,13 +28,13 @@ import org.apache.log4j.Logger;
 import org.junit.After;
 import org.junit.Assert;
 import org.junit.Before;
-import org.junit.Ignore;
 import org.junit.Test;
 
 import azkaban.executor.ExecutionOptions.FailureAction;
 import azkaban.flow.Flow;
+import azkaban.project.DirectoryFlowLoader;
 import azkaban.project.Project;
-import azkaban.utils.DirectoryFlowLoader;
+import azkaban.test.executions.TestExecutions;
 import azkaban.utils.JSONUtils;
 import azkaban.utils.Props;
 
@@ -47,7 +47,8 @@ public class ExecutableFlowTest {
 
     Logger logger = Logger.getLogger(this.getClass());
     DirectoryFlowLoader loader = new DirectoryFlowLoader(new Props(), logger);
-    loader.loadProjectFlow(project, new File("unit/executions/embedded"));
+
+    loader.loadProjectFlow(project, TestExecutions.getFlowDir("embedded"));
     Assert.assertEquals(0, loader.getErrors().size());
 
     project.setFlows(loader.getFlowMap());
@@ -58,7 +59,7 @@ public class ExecutableFlowTest {
   public void tearDown() throws Exception {
   }
 
-  @Ignore @Test
+  @Test
   public void testExecutorFlowCreation() throws Exception {
     Flow flow = project.getFlow("jobe");
     Assert.assertNotNull(flow);
@@ -96,7 +97,7 @@ public class ExecutableFlowTest {
     Assert.assertEquals(4, jobdFlow.getExecutableNodes().size());
   }
 
-  @Ignore @Test
+  @Test
   public void testExecutorFlowJson() throws Exception {
     Flow flow = project.getFlow("jobe");
     Assert.assertNotNull(flow);
@@ -114,7 +115,7 @@ public class ExecutableFlowTest {
     testEquals(exFlow, parsedExFlow);
   }
 
-  @Ignore @Test
+  @Test
   public void testExecutorFlowJson2() throws Exception {
     Flow flow = project.getFlow("jobe");
     Assert.assertNotNull(flow);
@@ -154,7 +155,7 @@ public class ExecutableFlowTest {
   }
 
   @SuppressWarnings("rawtypes")
-  @Ignore @Test
+  @Test
   public void testExecutorFlowUpdates() throws Exception {
     Flow flow = project.getFlow("jobe");
     ExecutableFlow exFlow = new ExecutableFlow(project, flow);
@@ -250,7 +251,7 @@ public class ExecutableFlowTest {
     }
   }
 
-  public static void testEquals(ExecutableNode a, ExecutableNode b) {
+  private static void testEquals(ExecutableNode a, ExecutableNode b) {
     if (a instanceof ExecutableFlow) {
       if (b instanceof ExecutableFlow) {
         ExecutableFlow exA = (ExecutableFlow) a;
@@ -304,7 +305,7 @@ public class ExecutableFlowTest {
     Assert.assertEquals(a.getOutNodes(), a.getOutNodes());
   }
 
-  public static void testEquals(ExecutionOptions optionsA,
+  private static void testEquals(ExecutionOptions optionsA,
       ExecutionOptions optionsB) {
     Assert.assertEquals(optionsA.getConcurrentOption(),
         optionsB.getConcurrentOption());
@@ -329,7 +330,7 @@ public class ExecutableFlowTest {
     testEquals(optionsA.getFlowParameters(), optionsB.getFlowParameters());
   }
 
-  public static void testEquals(Set<String> a, Set<String> b) {
+  private static void testEquals(Set<String> a, Set<String> b) {
     if (a == b) {
       return;
     }
@@ -348,7 +349,7 @@ public class ExecutableFlowTest {
     }
   }
 
-  public static void testEquals(List<String> a, List<String> b) {
+  private static void testEquals(List<String> a, List<String> b) {
     if (a == b) {
       return;
     }
@@ -370,7 +371,7 @@ public class ExecutableFlowTest {
   }
 
   @SuppressWarnings("unchecked")
-  public static void testDisabledEquals(List<Object> a, List<Object> b) {
+  private static void testDisabledEquals(List<Object> a, List<Object> b) {
     if (a == b) {
       return;
     }
@@ -401,7 +402,7 @@ public class ExecutableFlowTest {
     }
   }
 
-  public static void testEquals(Map<String, String> a, Map<String, String> b) {
+  private static void testEquals(Map<String, String> a, Map<String, String> b) {
     if (a == b) {
       return;
     }
diff --git a/azkaban-common/src/test/java/azkaban/executor/JavaJobRunnerMain.java b/azkaban-common/src/test/java/azkaban/executor/JavaJobRunnerMain.java
index 67d0284..cf46637 100644
--- a/azkaban-common/src/test/java/azkaban/executor/JavaJobRunnerMain.java
+++ b/azkaban-common/src/test/java/azkaban/executor/JavaJobRunnerMain.java
@@ -196,7 +196,7 @@ public class JavaJobRunnerMain {
       }
       writer.write("}".getBytes());
     } catch (Exception e) {
-      new RuntimeException("Unable to store output properties to: "
+      throw new RuntimeException("Unable to store output properties to: "
           + outputFileStr);
     } finally {
       try {
diff --git a/azkaban-common/src/test/java/azkaban/executor/JdbcExecutorLoaderTest.java b/azkaban-common/src/test/java/azkaban/executor/JdbcExecutorLoaderTest.java
index a7e2b5f..dd32302 100644
--- a/azkaban-common/src/test/java/azkaban/executor/JdbcExecutorLoaderTest.java
+++ b/azkaban-common/src/test/java/azkaban/executor/JdbcExecutorLoaderTest.java
@@ -335,6 +335,43 @@ public class JdbcExecutorLoaderTest {
 
   }
 
+  /* Test exception when unassigning an missing execution */
+  @Test
+  public void testUnassignExecutorException() throws ExecutorManagerException,
+    IOException {
+    if (!isTestSetup()) {
+      return;
+    }
+    ExecutorLoader loader = createLoader();
+    try {
+      loader.unassignExecutor(2);
+      Assert.fail("Expecting exception, but didn't get one");
+    } catch (ExecutorManagerException ex) {
+      System.out.println("Test true");
+    }
+  }
+
+  /* Test happy case when unassigning executor for a flow execution */
+  @Test
+  public void testUnassignExecutor() throws ExecutorManagerException,
+    IOException {
+    if (!isTestSetup()) {
+      return;
+    }
+    ExecutorLoader loader = createLoader();
+    String host = "localhost";
+    int port = 12345;
+    Executor executor = loader.addExecutor(host, port);
+    ExecutableFlow flow = TestUtils.createExecutableFlow("exectest1", "exec1");
+    loader.uploadExecutableFlow(flow);
+    loader.assignExecutor(executor.getId(), flow.getExecutionId());
+    Assert.assertEquals(
+      loader.fetchExecutorByExecutionId(flow.getExecutionId()), executor);
+    loader.unassignExecutor(flow.getExecutionId());
+    Assert.assertEquals(
+      loader.fetchExecutorByExecutionId(flow.getExecutionId()), null);
+  }
+
   /* Test exception when assigning a non-existent executor to a flow */
   @Test
   public void testAssignExecutorInvalidExecutor()
diff --git a/azkaban-common/src/test/java/azkaban/executor/MockExecutorLoader.java b/azkaban-common/src/test/java/azkaban/executor/MockExecutorLoader.java
index 833e0c6..0db2de5 100644
--- a/azkaban-common/src/test/java/azkaban/executor/MockExecutorLoader.java
+++ b/azkaban-common/src/test/java/azkaban/executor/MockExecutorLoader.java
@@ -368,4 +368,9 @@ public class MockExecutorLoader implements ExecutorLoader {
     }
     return queuedFlows;
   }
+
+  @Override
+  public void unassignExecutor(int executionId) throws ExecutorManagerException {
+    executionExecutorMapping.remove(executionId);
+  }
 }
diff --git a/azkaban-common/src/test/java/azkaban/executor/QueuedExecutionsTest.java b/azkaban-common/src/test/java/azkaban/executor/QueuedExecutionsTest.java
index 94bbd7e..b938b56 100644
--- a/azkaban-common/src/test/java/azkaban/executor/QueuedExecutionsTest.java
+++ b/azkaban-common/src/test/java/azkaban/executor/QueuedExecutionsTest.java
@@ -13,14 +13,12 @@ import azkaban.flow.Flow;
 import azkaban.project.Project;
 import azkaban.utils.JSONUtils;
 import azkaban.utils.Pair;
+import azkaban.utils.TestUtils;
 
 public class QueuedExecutionsTest {
-  /* Directory with serialized description of test flows */
-  private static final String UNIT_BASE_DIR =
-    "../azkaban-test/src/test/resources/executions/exectest1/";
 
   private File getFlowDir(String flow) {
-    return new File(UNIT_BASE_DIR + flow + ".flow");
+    return TestUtils.getFlowDir("exectest1", flow);
   }
 
   /*
@@ -205,4 +203,4 @@ public class QueuedExecutionsTest {
         queue.getReference(pair.getFirst().getExecId()));
     }
   }
-}
\ No newline at end of file
+}
diff --git a/azkaban-common/src/test/java/azkaban/jobExecutor/JavaProcessJobTest.java b/azkaban-common/src/test/java/azkaban/jobExecutor/JavaProcessJobTest.java
index 05a0a28..068935f 100644
--- a/azkaban-common/src/test/java/azkaban/jobExecutor/JavaProcessJobTest.java
+++ b/azkaban-common/src/test/java/azkaban/jobExecutor/JavaProcessJobTest.java
@@ -22,7 +22,6 @@ import java.util.Date;
 import java.util.Properties;
 
 import org.apache.log4j.Logger;
-
 import org.junit.After;
 import org.junit.AfterClass;
 import org.junit.Assert;
@@ -33,6 +32,7 @@ import org.junit.Test;
 import org.junit.Rule;
 import org.junit.rules.TemporaryFolder;
 
+import azkaban.flow.CommonJobProperties;
 import azkaban.utils.Props;
 
 public class JavaProcessJobTest {
@@ -108,6 +108,14 @@ public class JavaProcessJobTest {
     props.put(AbstractProcessJob.WORKING_DIR, workingDir.getCanonicalPath());
     props.put("type", "java");
     props.put("fullPath", ".");
+    
+    props.put(CommonJobProperties.PROJECT_NAME, "test_project");
+    props.put(CommonJobProperties.FLOW_ID, "test_flow");
+    props.put(CommonJobProperties.JOB_ID, "test_job");
+    props.put(CommonJobProperties.EXEC_ID, "123");
+    props.put(CommonJobProperties.SUBMIT_USER, "test_user");
+    
+
     job = new JavaProcessJob("testJavaProcess", props, props, log);
   }
 
diff --git a/azkaban-common/src/test/java/azkaban/jobExecutor/ProcessJobTest.java b/azkaban-common/src/test/java/azkaban/jobExecutor/ProcessJobTest.java
index 974895b..57f4505 100644
--- a/azkaban-common/src/test/java/azkaban/jobExecutor/ProcessJobTest.java
+++ b/azkaban-common/src/test/java/azkaban/jobExecutor/ProcessJobTest.java
@@ -20,7 +20,6 @@ import java.io.File;
 import java.io.IOException;
 
 import org.apache.log4j.Logger;
-
 import org.junit.After;
 import org.junit.Assert;
 import org.junit.Before;
@@ -28,6 +27,7 @@ import org.junit.Test;
 import org.junit.Rule;
 import org.junit.rules.TemporaryFolder;
 
+import azkaban.flow.CommonJobProperties;
 import azkaban.utils.Props;
 
 public class ProcessJobTest {
@@ -46,6 +46,12 @@ public class ProcessJobTest {
     props.put(AbstractProcessJob.WORKING_DIR, workingDir.getCanonicalPath());
     props.put("type", "command");
     props.put("fullPath", ".");
+    
+    props.put(CommonJobProperties.PROJECT_NAME, "test_project");
+    props.put(CommonJobProperties.FLOW_ID, "test_flow");
+    props.put(CommonJobProperties.JOB_ID, "test_job");
+    props.put(CommonJobProperties.EXEC_ID, "123");
+    props.put(CommonJobProperties.SUBMIT_USER, "test_user");
 
     job = new ProcessJob("TestProcess", props, props, log);
   }
@@ -62,6 +68,37 @@ public class ProcessJobTest {
     job.run();
 
   }
+  
+  /**
+   * this job should run fine if the props contain user.to.proxy
+   * @throws Exception
+   */
+  @Test
+  public void testOneUnixCommandWithProxyUserInsteadOfSubmitUser() throws Exception {
+    
+    // Initialize the Props
+    props.removeLocal(CommonJobProperties.SUBMIT_USER);
+    props.put("user.to.proxy", "test_user");
+    props.put(ProcessJob.COMMAND, "ls -al");
+    
+    job.run();
+
+  }
+  
+  /**
+   * this job should fail because there is no user.to.proxy and no CommonJobProperties.SUBMIT_USER
+   * @throws Exception
+   */
+  @Test (expected=RuntimeException.class)
+  public void testOneUnixCommandWithNoUser() throws Exception {
+    
+    // Initialize the Props
+    props.removeLocal(CommonJobProperties.SUBMIT_USER);    
+    props.put(ProcessJob.COMMAND, "ls -al");
+    
+    job.run();
+
+  }
 
   @Test
   public void testFailedUnixCommand() throws Exception {
diff --git a/azkaban-common/src/test/java/azkaban/jobExecutor/PythonJobTest.java b/azkaban-common/src/test/java/azkaban/jobExecutor/PythonJobTest.java
index ee767de..7320c15 100644
--- a/azkaban-common/src/test/java/azkaban/jobExecutor/PythonJobTest.java
+++ b/azkaban-common/src/test/java/azkaban/jobExecutor/PythonJobTest.java
@@ -24,6 +24,7 @@ import org.apache.log4j.Logger;
 import org.junit.AfterClass;
 import org.junit.Assert;
 import org.junit.BeforeClass;
+import org.junit.Ignore;
 import org.junit.Test;
 
 import azkaban.utils.Props;
@@ -82,6 +83,7 @@ public class PythonJobTest {
     Utils.removeFile(scriptFile);
   }
 
+  @Ignore("Test appears to hang.")
   @Test
   public void testPythonJob() {
 
diff --git a/azkaban-common/src/test/java/azkaban/project/ProjectTest.java b/azkaban-common/src/test/java/azkaban/project/ProjectTest.java
index da36d46..63c19f0 100644
--- a/azkaban-common/src/test/java/azkaban/project/ProjectTest.java
+++ b/azkaban-common/src/test/java/azkaban/project/ProjectTest.java
@@ -28,8 +28,8 @@ public class ProjectTest {
   @Test
   public void testToAndFromObject() throws Exception {
     Project project = new Project(1, "tesTing");
-    project.setCreateTimestamp(1l);
-    project.setLastModifiedTimestamp(2l);
+    project.setCreateTimestamp(1L);
+    project.setLastModifiedTimestamp(2L);
     project.setDescription("I am a test");
     project.setUserPermission("user1", new Permission(new Type[] { Type.ADMIN,
         Type.EXECUTE }));
diff --git a/azkaban-common/src/test/java/azkaban/utils/StringUtilsTest.java b/azkaban-common/src/test/java/azkaban/utils/StringUtilsTest.java
new file mode 100644
index 0000000..71c3d21
--- /dev/null
+++ b/azkaban-common/src/test/java/azkaban/utils/StringUtilsTest.java
@@ -0,0 +1,80 @@
+package azkaban.utils;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+public class StringUtilsTest {
+
+  private static final String chromeOnMac =
+      "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_9_4) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/44.0.2403.155 Safari/537.36";
+  private static final String fireFoxOnMac =
+      "Mozilla/5.0 (Macintosh; Intel Mac OS X 10.9; rv:40.0) Gecko/20100101 Firefox/40.0";
+  private static final String safariOnMac =
+      "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_9_4) AppleWebKit/537.78.2 (KHTML, like Gecko) Version/7.0.6 Safari/537.78.2";
+  private static final String chromeOnLinux =
+      "Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/35.0.1916.153 Safari/537.36";
+  private static final String fireFoxOnLinux =
+      "Mozilla/5.0 (X11; Linux x86_64; rv:39.0) Gecko/20100101 Firefox/39.0";
+
+  private static final String[] browserVariants = { chromeOnMac, fireFoxOnMac,
+      safariOnMac, chromeOnLinux, fireFoxOnLinux };
+
+  private static final String[] BROWSER_NAMES = { "AppleWebKit", "Gecko",
+      "Chrome" };
+
+  @Test
+  public void isBrowser() throws Exception {
+
+    for (String browser : browserVariants) {
+      Assert.assertTrue(browser, StringUtils.isFromBrowser(browser));
+    }
+  }
+
+  @Test
+  public void notBrowserWithLowercase() throws Exception {
+
+    for (String browser : browserVariants) {
+      Assert.assertFalse(browser.toLowerCase(),
+          StringUtils.isFromBrowser(browser.toLowerCase()));
+    }
+  }
+
+  @Test
+  public void notBrowser() throws Exception {
+    String testStr = "curl";
+    Assert.assertFalse(testStr, StringUtils.isFromBrowser(testStr));
+  }
+
+  @Test
+  public void emptyBrowserString() throws Exception {
+
+    Assert.assertFalse("empty string", StringUtils.isFromBrowser(""));
+  }
+
+  @Test
+  public void nullBrowserString() throws Exception {
+
+    Assert.assertFalse("null string", StringUtils.isFromBrowser(null));
+  }
+
+  @Test
+  public void startsWithBrowserName() {
+    for (String name : BROWSER_NAMES) {
+      Assert.assertTrue(StringUtils.isFromBrowser(name + " is awesome"));
+    }
+  }
+
+  @Test
+  public void endsWithBrowserName() {
+    for (String name : BROWSER_NAMES) {
+      Assert.assertTrue(StringUtils.isFromBrowser("awesome is" + name));
+    }
+  }
+
+  @Test
+  public void containsBrowserName() {
+    for (String name : BROWSER_NAMES) {
+      Assert.assertTrue(StringUtils.isFromBrowser("awesome " + name + " is"));
+    }
+  }
+}
diff --git a/azkaban-common/src/test/java/azkaban/utils/TestUtils.java b/azkaban-common/src/test/java/azkaban/utils/TestUtils.java
index 68b10ee..3eff490 100644
--- a/azkaban-common/src/test/java/azkaban/utils/TestUtils.java
+++ b/azkaban-common/src/test/java/azkaban/utils/TestUtils.java
@@ -33,7 +33,7 @@ import azkaban.user.XmlUserManager;
 public class TestUtils {
   /* Base  resource direcotyr for unit tests */
   private static final String UNIT_RESOURCE_DIR =
-      "../azkaban-test/src/test/resources";
+      "../azkaban-test/src/test/resources/azkaban/test";
   /* Directory with serialized description of test flows */
   private static final String UNIT_EXECUTION_DIR =
       UNIT_RESOURCE_DIR + "/executions";
diff --git a/azkaban-execserver/src/main/java/azkaban/execapp/AzkabanExecutorServer.java b/azkaban-execserver/src/main/java/azkaban/execapp/AzkabanExecutorServer.java
index 8f5df1a..47fbfad 100644
--- a/azkaban-execserver/src/main/java/azkaban/execapp/AzkabanExecutorServer.java
+++ b/azkaban-execserver/src/main/java/azkaban/execapp/AzkabanExecutorServer.java
@@ -132,7 +132,7 @@ public class AzkabanExecutorServer {
     root.addServlet(new ServletHolder(new ExecutorServlet()), "/executor");
     root.addServlet(new ServletHolder(new JMXHttpServlet()), "/jmx");
     root.addServlet(new ServletHolder(new StatsServlet()), "/stats");
-    root.addServlet(new ServletHolder(new ServerStatisticsServlet()), "/serverstastics");
+    root.addServlet(new ServletHolder(new ServerStatisticsServlet()), "/serverStatistics");
 
     root.setAttribute(ServerConstants.AZKABAN_SERVLET_CONTEXT_KEY, this);
 
diff --git a/azkaban-execserver/src/main/java/azkaban/execapp/FlowRunnerManager.java b/azkaban-execserver/src/main/java/azkaban/execapp/FlowRunnerManager.java
index 361db00..01ab373 100644
--- a/azkaban-execserver/src/main/java/azkaban/execapp/FlowRunnerManager.java
+++ b/azkaban-execserver/src/main/java/azkaban/execapp/FlowRunnerManager.java
@@ -52,6 +52,8 @@ import azkaban.jobtype.JobTypeManager;
 import azkaban.jobtype.JobTypeManagerException;
 import azkaban.metric.MetricReportManager;
 import azkaban.project.ProjectLoader;
+import azkaban.project.ProjectWhitelist;
+import azkaban.project.ProjectWhitelist.WhitelistType;
 import azkaban.utils.FileIOUtils;
 import azkaban.utils.FileIOUtils.JobMetaData;
 import azkaban.utils.FileIOUtils.LogData;
@@ -481,7 +483,9 @@ public class FlowRunnerManager implements EventListener,
         int numJobs =
             Integer.valueOf(options.getFlowParameters().get(
                 FLOW_NUM_JOB_THREADS));
-        if (numJobs > 0 && numJobs <= numJobThreads) {
+        if (numJobs > 0 && (numJobs <= numJobThreads || ProjectWhitelist
+                .isProjectWhitelisted(flow.getProjectId(),
+                    WhitelistType.NumJobPerFlow))) {
           numJobThreads = numJobs;
         }
       } catch (Exception e) {
diff --git a/azkaban-execserver/src/main/java/azkaban/execapp/ServerStatisticsServlet.java b/azkaban-execserver/src/main/java/azkaban/execapp/ServerStatisticsServlet.java
index d5cb550..17ba6e9 100644
--- a/azkaban-execserver/src/main/java/azkaban/execapp/ServerStatisticsServlet.java
+++ b/azkaban-execserver/src/main/java/azkaban/execapp/ServerStatisticsServlet.java
@@ -21,8 +21,6 @@ import java.io.IOException;
 import java.io.InputStream;
 import java.io.InputStreamReader;
 import java.util.ArrayList;
-import java.util.List;
-
 import javax.servlet.ServletException;
 import javax.servlet.http.HttpServlet;
 import javax.servlet.http.HttpServletRequest;
@@ -33,11 +31,17 @@ import org.apache.log4j.Logger;
 import azkaban.executor.ExecutorInfo;
 import azkaban.utils.JSONUtils;
 
-public class ServerStatisticsServlet extends HttpServlet  {
+
+public class ServerStatisticsServlet extends HttpServlet {
   private static final long serialVersionUID = 1L;
-  private static final int  cacheTimeInMilliseconds = 1000;
+  private static final int cacheTimeInMilliseconds = 1000;
   private static final Logger logger = Logger.getLogger(ServerStatisticsServlet.class);
   private static final String noCacheParamName = "nocache";
+  private static final boolean exists_Bash = new File("/bin/bash").exists();
+  private static final boolean exists_Cat = new File("/bin/cat").exists();
+  private static final boolean exists_Grep = new File("/bin/grep").exists();
+  private static final boolean exists_Meminfo = new File("/proc/meminfo").exists();
+  private static final boolean exists_LoadAvg = new File("/proc/loadavg").exists();
 
   protected static long lastRefreshedTime = 0;
   protected static ExecutorInfo cachedstats = null;
@@ -48,12 +52,11 @@ public class ServerStatisticsServlet extends HttpServlet  {
    * @see javax.servlet.http.HttpServlet#doGet(javax.servlet.http.HttpServletRequest,
    *      javax.servlet.http.HttpServletResponse)
    */
-  protected void doGet(HttpServletRequest req, HttpServletResponse resp)
-      throws ServletException, IOException {
+  protected void doGet(HttpServletRequest req, HttpServletResponse resp) throws ServletException, IOException {
 
-    boolean noCache = null!= req && Boolean.valueOf(req.getParameter(noCacheParamName));
+    boolean noCache = null != req && Boolean.valueOf(req.getParameter(noCacheParamName));
 
-    if (noCache || System.currentTimeMillis() - lastRefreshedTime > cacheTimeInMilliseconds){
+    if (noCache || System.currentTimeMillis() - lastRefreshedTime > cacheTimeInMilliseconds) {
       this.populateStatistics(noCache);
     }
 
@@ -69,82 +72,116 @@ public class ServerStatisticsServlet extends HttpServlet  {
    * a double value will be used to present the remaining memory,
    *         a returning value of '55.6' means 55.6%
    */
-  protected void fillRemainingMemoryPercent(ExecutorInfo stats){
-    if (new File("/bin/bash").exists()
-        && new File("/bin/cat").exists()
-        && new File("/bin/grep").exists()
-        &&  new File("/proc/meminfo").exists()) {
-    java.lang.ProcessBuilder processBuilder =
-        new java.lang.ProcessBuilder("/bin/bash", "-c", "/bin/cat /proc/meminfo | grep -E \"MemTotal|MemFree\"");
-    try {
-      ArrayList<String> output = new ArrayList<String>();
-      Process process = processBuilder.start();
-      process.waitFor();
-      InputStream inputStream = process.getInputStream();
+  protected void fillRemainingMemoryPercent(ExecutorInfo stats) {
+    if (exists_Bash && exists_Cat && exists_Grep && exists_Meminfo) {
+      java.lang.ProcessBuilder processBuilder =
+          new java.lang.ProcessBuilder("/bin/bash", "-c",
+              "/bin/cat /proc/meminfo | grep -E \"^MemTotal:|^MemFree:|^Buffers:|^Cached:|^SwapCached:\"");
       try {
-        java.io.BufferedReader reader =
-            new java.io.BufferedReader(new InputStreamReader(inputStream));
-        String line = null;
-        while ((line = reader.readLine()) != null) {
-          output.add(line);
+        ArrayList<String> output = new ArrayList<String>();
+        Process process = processBuilder.start();
+        process.waitFor();
+        InputStream inputStream = process.getInputStream();
+        try {
+          java.io.BufferedReader reader = new java.io.BufferedReader(new InputStreamReader(inputStream));
+          String line = null;
+          while ((line = reader.readLine()) != null) {
+            output.add(line);
+          }
+        } finally {
+          inputStream.close();
         }
-      }finally {
-        inputStream.close();
-      }
 
-      long totalMemory = 0;
-      long  freeMemory = 0;
-
-      // process the output from bash call.
-      // we expect the result from the bash call to be something like following -
-      // MemTotal:       65894264 kB
-      // MemFree:        61104076 kB
-      if (output.size() == 2) {
-        for (String result : output){
-          // find the total memory and value the variable.
-          if (result.contains("MemTotal") && result.split("\\s+").length > 2){
-            try {
-              totalMemory = Long.parseLong(result.split("\\s+")[1]);
-              logger.info("Total memory : " + totalMemory);
-            }catch(NumberFormatException e){
-              logger.error("yielding 0 for total memory as output is invalid -" + result);
+        long totalMemory = 0;
+        long totalFreeMemory = 0;
+        Long parsedResult = (long) 0;
+
+        // process the output from bash call.
+        // we expect the result from the bash call to be something like following -
+        // MemTotal:       65894264 kB
+        // MemFree:        57753844 kB
+        // Buffers:          305552 kB
+        // Cached:          3802432 kB
+        // SwapCached:            0 kB
+        // Note : total free memory = freeMemory + cached + buffers + swapCached
+        // TODO : think about merging the logic in systemMemoryInfo as the logic is similar
+        if (output.size() == 5) {
+          for (String result : output) {
+            // find the total memory and value the variable.
+            parsedResult = extractMemoryInfo("MemTotal", result);
+            if (null != parsedResult) {
+              totalMemory = parsedResult;
+              continue;
             }
-          }
-          // find the free memory and value the variable.
-          if (result.contains("MemFree") && result.split("\\s+").length > 2){
-            try {
-              freeMemory = Long.parseLong(result.split("\\s+")[1]);
-              logger.info("Free memory : " + freeMemory);
-            }catch(NumberFormatException e){
-              logger.error("yielding 0 for total memory as output is invalid -" + result);
+
+            // find the free memory.
+            parsedResult = extractMemoryInfo("MemFree", result);
+            if (null != parsedResult) {
+              totalFreeMemory += parsedResult;
+              continue;
+            }
+
+            // find the Buffers.
+            parsedResult = extractMemoryInfo("Buffers", result);
+            if (null != parsedResult) {
+              totalFreeMemory += parsedResult;
+              continue;
+            }
+
+            // find the Cached.
+            parsedResult = extractMemoryInfo("SwapCached", result);
+            if (null != parsedResult) {
+              totalFreeMemory += parsedResult;
+              continue;
+            }
+
+            // find the Cached.
+            parsedResult = extractMemoryInfo("Cached", result);
+            if (null != parsedResult) {
+              totalFreeMemory += parsedResult;
+              continue;
             }
           }
+        } else {
+          logger.error("failed to get total/free memory info as the bash call returned invalid result."
+              + String.format(" Output from the bash call - %s ", output.toString()));
         }
-      }else {
-        logger.error("failed to get total/free memory info as the bash call returned invalid result.");
-      }
 
-      // the number got from the proc file is in KBs we want to see the number in MBs so we are deviding it by 1024.
-      stats.setRemainingMemoryInMB(freeMemory/1024);
-      stats.setRemainingMemoryPercent(totalMemory == 0 ? 0 : ((double)freeMemory / (double)totalMemory)*100);
-    }
-    catch (Exception ex){
-      logger.error("failed fetch system memory info " +
-                   "as exception is captured when fetching result from bash call. Ex -" + ex.getMessage());
+        // the number got from the proc file is in KBs we want to see the number in MBs so we are dividing it by 1024.
+        stats.setRemainingMemoryInMB(totalFreeMemory / 1024);
+        stats.setRemainingMemoryPercent(totalMemory == 0 ? 0 : ((double) totalFreeMemory / (double) totalMemory) * 100);
+      } catch (Exception ex) {
+        logger.error("failed fetch system memory info "
+            + "as exception is captured when fetching result from bash call. Ex -" + ex.getMessage());
+      }
+    } else {
+      logger.error("failed fetch system memory info, one or more files from the following list are missing -  "
+          + "'/bin/bash'," + "'/bin/cat'," + "'/proc/loadavg'");
     }
-  } else {
-      logger.error("failed fetch system memory info, one or more files from the following list are missing -  " +
-                   "'/bin/bash'," + "'/bin/cat'," +"'/proc/loadavg'");
   }
+
+  private Long extractMemoryInfo(String field, String result) {
+    Long returnResult = null;
+    if (null != result && null != field && result.matches(String.format("^%s:.*", field))
+        && result.split("\\s+").length > 2) {
+      try {
+        returnResult = Long.parseLong(result.split("\\s+")[1]);
+        logger.debug(field + ":" + returnResult);
+      } catch (NumberFormatException e) {
+        returnResult = 0L;
+        logger.error(String.format("yielding 0 for %s as output is invalid - %s", field, result));
+      }
+    }
+    return returnResult;
   }
 
   /**
    * call the data providers to fill the returning data container for statistics data.
    * This function refreshes the static cached copy of data in case if necessary.
    * */
-  protected synchronized void populateStatistics(boolean noCache){
+  protected synchronized void populateStatistics(boolean noCache) {
     //check again before starting the work.
-    if (noCache || System.currentTimeMillis() - lastRefreshedTime  > cacheTimeInMilliseconds){
+    if (noCache || System.currentTimeMillis() - lastRefreshedTime > cacheTimeInMilliseconds) {
       final ExecutorInfo stats = new ExecutorInfo();
 
       fillRemainingMemoryPercent(stats);
@@ -152,7 +189,7 @@ public class ServerStatisticsServlet extends HttpServlet  {
       fillCpuUsage(stats);
 
       cachedstats = stats;
-      lastRefreshedTime =  System.currentTimeMillis();
+      lastRefreshedTime = System.currentTimeMillis();
     }
   }
 
@@ -161,33 +198,32 @@ public class ServerStatisticsServlet extends HttpServlet  {
    * @param stats reference to the result container which contains all the results, this specific method
    *              will only work on the property "remainingFlowCapacity".
    */
-  protected void fillRemainingFlowCapacityAndLastDispatchedTime(ExecutorInfo stats){
+  protected void fillRemainingFlowCapacityAndLastDispatchedTime(ExecutorInfo stats) {
 
     AzkabanExecutorServer server = AzkabanExecutorServer.getApp();
-    if (server != null){
-      FlowRunnerManager runnerMgr =  AzkabanExecutorServer.getApp().getFlowRunnerManager();
+    if (server != null) {
+      FlowRunnerManager runnerMgr = AzkabanExecutorServer.getApp().getFlowRunnerManager();
       int assignedFlows = runnerMgr.getNumRunningFlows() + runnerMgr.getNumQueuedFlows();
       stats.setRemainingFlowCapacity(runnerMgr.getMaxNumRunningFlows() - assignedFlows);
       stats.setNumberOfAssignedFlows(assignedFlows);
       stats.setLastDispatchedTime(runnerMgr.getLastFlowSubmittedTime());
-    }else {
-      logger.error("failed to get data for remaining flow capacity or LastDispatchedTime" +
-                   " as the AzkabanExecutorServer has yet been initialized.");
+    } else {
+      logger.error("failed to get data for remaining flow capacity or LastDispatchedTime"
+          + " as the AzkabanExecutorServer has yet been initialized.");
     }
   }
 
-
   /**<pre>
-   * fill the result set with the Remaining temp Storage .
-   * Note : As the Top bash call doesn't yield accurate result for the system load,
+   * fill the result set with the CPU usage .
+   * Note : As the 'Top' bash call doesn't yield accurate result for the system load,
    *        the implementation has been changed to load from the "proc/loadavg" which keeps
    *        the moving average of the system load, we are pulling the average for the recent 1 min.
    *</pre>
    * @param stats reference to the result container which contains all the results, this specific method
-   *              will only work on the property "cpuUdage".
+   *              will only work on the property "cpuUsage".
    */
-  protected void fillCpuUsage(ExecutorInfo stats){
-    if (new File("/bin/bash").exists() && new File("/bin/cat").exists() &&  new File("/proc/loadavg").exists()) {
+  protected void fillCpuUsage(ExecutorInfo stats) {
+    if (exists_Bash && exists_Cat && exists_LoadAvg) {
       java.lang.ProcessBuilder processBuilder =
           new java.lang.ProcessBuilder("/bin/bash", "-c", "/bin/cat /proc/loadavg");
       try {
@@ -196,13 +232,12 @@ public class ServerStatisticsServlet extends HttpServlet  {
         process.waitFor();
         InputStream inputStream = process.getInputStream();
         try {
-          java.io.BufferedReader reader =
-              new java.io.BufferedReader(new InputStreamReader(inputStream));
+          java.io.BufferedReader reader = new java.io.BufferedReader(new InputStreamReader(inputStream));
           String line = null;
           while ((line = reader.readLine()) != null) {
             output.add(line);
           }
-        }finally {
+        } finally {
           inputStream.close();
         }
 
@@ -213,20 +248,19 @@ public class ServerStatisticsServlet extends HttpServlet  {
 
           try {
             cpuUsage = Double.parseDouble(splitedresult[0]);
-          }catch(NumberFormatException e){
+          } catch (NumberFormatException e) {
             logger.error("yielding 0.0 for CPU usage as output is invalid -" + output.get(0));
           }
           logger.info("System load : " + cpuUsage);
           stats.setCpuUpsage(cpuUsage);
         }
-      }
-      catch (Exception ex){
-        logger.error("failed fetch system load info " +
-                     "as exception is captured when fetching result from bash call. Ex -" + ex.getMessage());
+      } catch (Exception ex) {
+        logger.error("failed fetch system load info "
+            + "as exception is captured when fetching result from bash call. Ex -" + ex.getMessage());
       }
     } else {
-        logger.error("failed fetch system load info, one or more files from the following list are missing -  " +
-                     "'/bin/bash'," + "'/bin/cat'," +"'/proc/loadavg'");
+      logger.error("failed fetch system load info, one or more files from the following list are missing -  "
+          + "'/bin/bash'," + "'/bin/cat'," + "'/proc/loadavg'");
     }
   }
 }
diff --git a/azkaban-execserver/src/main/resources/log4j.properties b/azkaban-execserver/src/main/resources/log4j.properties
index e304ac7..464f36a 100644
--- a/azkaban-execserver/src/main/resources/log4j.properties
+++ b/azkaban-execserver/src/main/resources/log4j.properties
@@ -1,12 +1,14 @@
-log4j.rootLogger=INFO, Console
+log_dir=${log4j.log.dir}
+
+log4j.rootLogger=INFO, ExecServer
 log4j.logger.azkaban.execapp=INFO, ExecServer
 
-log4j.appender.ExecServer=org.apache.log4j.RollingFileAppender
+log4j.appender.ExecServer=org.apache.log4j.DailyRollingFileAppender
 log4j.appender.ExecServer.layout=org.apache.log4j.PatternLayout
-log4j.appender.ExecServer.File=azkaban-execserver.log
+log4j.appender.ExecServer.File=${log_dir}/azkaban-execserver.log
 log4j.appender.ExecServer.layout.ConversionPattern=%d{yyyy/MM/dd HH:mm:ss.SSS Z} %p [%c{1}] [Azkaban] %m%n
-log4j.appender.ExecServer.MaxFileSize=102400MB
 log4j.appender.ExecServer.MaxBackupIndex=2
+log4j.appender.ExecServer.DatePattern='.'yyyy-MM-dd
 
 log4j.appender.Console=org.apache.log4j.ConsoleAppender
 log4j.appender.Console.layout=org.apache.log4j.PatternLayout
diff --git a/azkaban-execserver/src/package/bin/azkaban-executor-start.sh b/azkaban-execserver/src/package/bin/azkaban-executor-start.sh
index c281890..446571b 100755
--- a/azkaban-execserver/src/package/bin/azkaban-executor-start.sh
+++ b/azkaban-execserver/src/package/bin/azkaban-executor-start.sh
@@ -44,7 +44,7 @@ serverpath=`pwd`
 if [ -z $AZKABAN_OPTS ]; then
   AZKABAN_OPTS="-Xmx3G"
 fi
-AZKABAN_OPTS="$AZKABAN_OPTS -server -Dcom.sun.management.jmxremote -Djava.io.tmpdir=$tmpdir -Dexecutorport=$executorport -Dserverpath=$serverpath"
+AZKABAN_OPTS="$AZKABAN_OPTS -server -Dcom.sun.management.jmxremote -Djava.io.tmpdir=$tmpdir -Dexecutorport=$executorport -Dserverpath=$serverpath -Dlog4j.log.dir=$azkaban_dir/logs"
 
 java $AZKABAN_OPTS $JAVA_LIB_PATH -cp $CLASSPATH azkaban.execapp.AzkabanExecutorServer -conf $azkaban_dir/conf $@ &
 
diff --git a/azkaban-execserver/src/package/bin/start-exec.sh b/azkaban-execserver/src/package/bin/start-exec.sh
index fbb7124..d8f0f73 100755
--- a/azkaban-execserver/src/package/bin/start-exec.sh
+++ b/azkaban-execserver/src/package/bin/start-exec.sh
@@ -1,6 +1,5 @@
 #!/bin/bash
 
-base_dir=$(dirname $0)/..
-
-bin/azkaban-executor-start.sh $base_dir 2>&1>logs/executorServerLog__`date +%F+%T`.out &
+# pass along command line arguments to azkaban-executor-start.sh script
+bin/azkaban-executor-start.sh "$@" 2>&1>logs/executorServerLog__`date +%F+%T`.out &
 
diff --git a/azkaban-execserver/src/test/java/azkaban/execapp/FlowRunnerPipelineTest.java b/azkaban-execserver/src/test/java/azkaban/execapp/FlowRunnerPipelineTest.java
index 360d4f4..e29e973 100644
--- a/azkaban-execserver/src/test/java/azkaban/execapp/FlowRunnerPipelineTest.java
+++ b/azkaban-execserver/src/test/java/azkaban/execapp/FlowRunnerPipelineTest.java
@@ -43,11 +43,11 @@ import azkaban.executor.Status;
 import azkaban.flow.Flow;
 import azkaban.jobtype.JobTypeManager;
 import azkaban.jobtype.JobTypePluginSet;
+import azkaban.project.DirectoryFlowLoader;
 import azkaban.project.Project;
 import azkaban.project.ProjectLoader;
 import azkaban.project.ProjectManagerException;
 import azkaban.project.MockProjectLoader;
-import azkaban.utils.DirectoryFlowLoader;
 import azkaban.utils.Props;
 
 /**
diff --git a/azkaban-execserver/src/test/java/azkaban/execapp/FlowRunnerPropertyResolutionTest.java b/azkaban-execserver/src/test/java/azkaban/execapp/FlowRunnerPropertyResolutionTest.java
index d5988bb..422f622 100644
--- a/azkaban-execserver/src/test/java/azkaban/execapp/FlowRunnerPropertyResolutionTest.java
+++ b/azkaban-execserver/src/test/java/azkaban/execapp/FlowRunnerPropertyResolutionTest.java
@@ -39,11 +39,11 @@ import azkaban.executor.JavaJob;
 import azkaban.executor.MockExecutorLoader;
 import azkaban.flow.Flow;
 import azkaban.jobtype.JobTypeManager;
+import azkaban.project.DirectoryFlowLoader;
 import azkaban.project.Project;
 import azkaban.project.ProjectLoader;
 import azkaban.project.ProjectManagerException;
 import azkaban.project.MockProjectLoader;
-import azkaban.utils.DirectoryFlowLoader;
 import azkaban.utils.Props;
 
 /**
diff --git a/azkaban-execserver/src/test/java/azkaban/execapp/FlowRunnerTest2.java b/azkaban-execserver/src/test/java/azkaban/execapp/FlowRunnerTest2.java
index c78abaa..84416de 100644
--- a/azkaban-execserver/src/test/java/azkaban/execapp/FlowRunnerTest2.java
+++ b/azkaban-execserver/src/test/java/azkaban/execapp/FlowRunnerTest2.java
@@ -42,11 +42,11 @@ import azkaban.executor.Status;
 import azkaban.flow.Flow;
 import azkaban.jobtype.JobTypeManager;
 import azkaban.jobtype.JobTypePluginSet;
+import azkaban.project.DirectoryFlowLoader;
 import azkaban.project.Project;
 import azkaban.project.ProjectLoader;
 import azkaban.project.ProjectManagerException;
 import azkaban.project.MockProjectLoader;
-import azkaban.utils.DirectoryFlowLoader;
 import azkaban.utils.Props;
 
 /**
diff --git a/azkaban-execserver/src/test/java/azkaban/execapp/StatisticsServletTest.java b/azkaban-execserver/src/test/java/azkaban/execapp/StatisticsServletTest.java
index 55078b5..15078b4 100644
--- a/azkaban-execserver/src/test/java/azkaban/execapp/StatisticsServletTest.java
+++ b/azkaban-execserver/src/test/java/azkaban/execapp/StatisticsServletTest.java
@@ -1,37 +1,41 @@
 package azkaban.execapp;
 
 import org.junit.Assert;
+import org.junit.Ignore;
 import org.junit.Test;
 
 import azkaban.executor.ExecutorInfo;
 
+@Ignore
 public class StatisticsServletTest {
-  private class MockStatisticsServlet extends ServerStatisticsServlet{
+  private class MockStatisticsServlet extends ServerStatisticsServlet {
     /** */
     private static final long serialVersionUID = 1L;
 
-    public  ExecutorInfo getStastics(){
+    public ExecutorInfo getStastics() {
       return cachedstats;
     }
 
-    public  long getUpdatedTime(){
+    public long getUpdatedTime() {
       return lastRefreshedTime;
     }
 
-    public void callPopulateStatistics(){
-       this.populateStatistics(false);
+    public void callPopulateStatistics() {
+      this.populateStatistics(false);
     }
 
-    public void callFillCpuUsage(ExecutorInfo stats){
-      this.fillCpuUsage(stats);}
+    public void callFillCpuUsage(ExecutorInfo stats) {
+      this.fillCpuUsage(stats);
+    }
 
-    public void callFillRemainingMemoryPercent(ExecutorInfo stats){
-        this.fillRemainingMemoryPercent(stats);}
+    public void callFillRemainingMemoryPercent(ExecutorInfo stats) {
+      this.fillRemainingMemoryPercent(stats);
+    }
   }
   private MockStatisticsServlet statServlet = new MockStatisticsServlet();
 
   @Test
-  public void testFillMemory()  {
+  public void testFillMemory() {
     ExecutorInfo stats = new ExecutorInfo();
     this.statServlet.callFillRemainingMemoryPercent(stats);
     // assume any machine that runs this test should
@@ -41,14 +45,14 @@ public class StatisticsServletTest {
   }
 
   @Test
-  public void testFillCpu()  {
+  public void testFillCpu() {
     ExecutorInfo stats = new ExecutorInfo();
     this.statServlet.callFillCpuUsage(stats);
     Assert.assertTrue(stats.getCpuUsage() > 0);
   }
 
   @Test
-  public void testPopulateStatistics()  {
+  public void testPopulateStatistics() {
     this.statServlet.callPopulateStatistics();
     Assert.assertNotNull(this.statServlet.getStastics());
     Assert.assertTrue(this.statServlet.getStastics().getRemainingMemoryInMB() > 0);
@@ -57,17 +61,18 @@ public class StatisticsServletTest {
   }
 
   @Test
-  public void testPopulateStatisticsCache()  {
+  public void testPopulateStatisticsCache() {
     this.statServlet.callPopulateStatistics();
     final long updatedTime = this.statServlet.getUpdatedTime();
-    while (System.currentTimeMillis() - updatedTime < 1000){
+    while (System.currentTimeMillis() - updatedTime < 1000) {
       this.statServlet.callPopulateStatistics();
       Assert.assertEquals(updatedTime, this.statServlet.getUpdatedTime());
     }
 
     try {
       Thread.sleep(1000);
-    } catch (InterruptedException e) {}
+    } catch (InterruptedException e) {
+    }
 
     // make sure cache expires after timeout.
     this.statServlet.callPopulateStatistics();
diff --git a/azkaban-test/src/test/java/azkaban/test/executions/TestExecutions.java b/azkaban-test/src/test/java/azkaban/test/executions/TestExecutions.java
new file mode 100644
index 0000000..56cf0c6
--- /dev/null
+++ b/azkaban-test/src/test/java/azkaban/test/executions/TestExecutions.java
@@ -0,0 +1,31 @@
+/*
+ * Copyright 2015 Azkaban Authors
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package azkaban.test.executions;
+
+import java.io.File;
+import java.net.URL;
+import java.net.URISyntaxException;
+
+import org.junit.Assert;
+
+public class TestExecutions {
+  public static File getFlowDir(final String path) throws URISyntaxException {
+    URL url = TestExecutions.class.getResource(path);
+    Assert.assertNotNull(url);
+    return new File(url.toURI());
+  }
+}
diff --git a/azkaban-webserver/src/main/java/azkaban/webapp/AzkabanWebServer.java b/azkaban-webserver/src/main/java/azkaban/webapp/AzkabanWebServer.java
index 3171e9a..89dea03 100644
--- a/azkaban-webserver/src/main/java/azkaban/webapp/AzkabanWebServer.java
+++ b/azkaban-webserver/src/main/java/azkaban/webapp/AzkabanWebServer.java
@@ -39,6 +39,7 @@ import javax.management.ObjectName;
 
 import org.apache.commons.lang.StringUtils;
 import org.apache.log4j.Logger;
+import org.apache.log4j.jmx.HierarchyDynamicMBean;
 import org.apache.velocity.app.VelocityEngine;
 import org.apache.velocity.runtime.log.Log4JLogChute;
 import org.apache.velocity.runtime.resource.loader.ClasspathResourceLoader;
@@ -53,8 +54,6 @@ import org.mortbay.jetty.servlet.DefaultServlet;
 import org.mortbay.jetty.servlet.ServletHolder;
 import org.mortbay.thread.QueuedThreadPool;
 
-import com.linkedin.restli.server.RestliServlet;
-
 import azkaban.alert.Alerter;
 import azkaban.database.AzkabanDatabaseSetup;
 import azkaban.executor.ExecutorManager;
@@ -88,19 +87,21 @@ import azkaban.utils.FileIOUtils;
 import azkaban.utils.Props;
 import azkaban.utils.PropsUtils;
 import azkaban.utils.Utils;
+import azkaban.webapp.plugin.PluginRegistry;
+import azkaban.webapp.plugin.TriggerPlugin;
+import azkaban.webapp.plugin.ViewerPlugin;
 import azkaban.webapp.servlet.AbstractAzkabanServlet;
 import azkaban.webapp.servlet.ExecutorServlet;
+import azkaban.webapp.servlet.HistoryServlet;
 import azkaban.webapp.servlet.IndexRedirectServlet;
 import azkaban.webapp.servlet.JMXHttpServlet;
-import azkaban.webapp.servlet.ScheduleServlet;
-import azkaban.webapp.servlet.HistoryServlet;
-import azkaban.webapp.servlet.ProjectServlet;
 import azkaban.webapp.servlet.ProjectManagerServlet;
+import azkaban.webapp.servlet.ProjectServlet;
+import azkaban.webapp.servlet.ScheduleServlet;
 import azkaban.webapp.servlet.StatsServlet;
 import azkaban.webapp.servlet.TriggerManagerServlet;
-import azkaban.webapp.plugin.TriggerPlugin;
-import azkaban.webapp.plugin.ViewerPlugin;
-import azkaban.webapp.plugin.PluginRegistry;
+
+import com.linkedin.restli.server.RestliServlet;
 
 /**
  * The Azkaban Jetty server class
@@ -123,6 +124,9 @@ import azkaban.webapp.plugin.PluginRegistry;
  * Jetty truststore password
  */
 public class AzkabanWebServer extends AzkabanServer {
+  private static final String AZKABAN_ACCESS_LOGGER_NAME =
+      "azkaban.webapp.servlet.LoginAbstractAzkabanServlet";
+
   private static final Logger logger = Logger.getLogger(AzkabanWebServer.class);
 
   public static final String AZKABAN_HOME = "AZKABAN_HOME";
@@ -705,6 +709,13 @@ public class AzkabanWebServer extends AzkabanServer {
           .getString("jetty.trustpassword"));
       secureConnector.setHeaderBufferSize(MAX_HEADER_BUFFER_SIZE);
 
+      // set up vulnerable cipher suites to exclude
+      List<String> cipherSuitesToExclude = azkabanSettings.getStringList("jetty.excludeCipherSuites");
+      logger.info("Excluded Cipher Suites: " + String.valueOf(cipherSuitesToExclude));
+      if (cipherSuitesToExclude != null && !cipherSuitesToExclude.isEmpty()) {
+        secureConnector.setExcludeCipherSuites(cipherSuitesToExclude.toArray(new String[0]));
+      }
+
       server.addConnector(secureConnector);
     } else {
       ssl = false;
@@ -823,23 +834,25 @@ public class AzkabanWebServer extends AzkabanServer {
 
       public void logTopMemoryConsumers() throws Exception, IOException {
         if (new File("/bin/bash").exists() && new File("/bin/ps").exists()
-                && new File("/usr/bin/head").exists()) {
+            && new File("/usr/bin/head").exists()) {
           logger.info("logging top memeory consumer");
 
           java.lang.ProcessBuilder processBuilder =
-                  new java.lang.ProcessBuilder("/bin/bash", "-c", "/bin/ps aux --sort -rss | /usr/bin/head");
+              new java.lang.ProcessBuilder("/bin/bash", "-c",
+                  "/bin/ps aux --sort -rss | /usr/bin/head");
           Process p = processBuilder.start();
           p.waitFor();
-  
+
           InputStream is = p.getInputStream();
-          java.io.BufferedReader reader = new java.io.BufferedReader(new InputStreamReader(is));
+          java.io.BufferedReader reader =
+              new java.io.BufferedReader(new InputStreamReader(is));
           String line = null;
           while ((line = reader.readLine()) != null) {
             logger.info(line);
           }
           is.close();
         }
-      }      
+      }
     });
     logger.info("Server running on " + (ssl ? "ssl" : "") + " port " + port
         + ".");
@@ -1233,6 +1246,22 @@ public class AzkabanWebServer extends AzkabanServer {
       registerMbean("executorManager", new JmxExecutorManager(
           (ExecutorManager) executorManager));
     }
+
+    // Register Log4J loggers as JMX beans so the log level can be
+    // updated via JConsole or Java VisualVM
+    HierarchyDynamicMBean log4jMBean = new HierarchyDynamicMBean();
+    registerMbean("log4jmxbean", log4jMBean);
+    ObjectName accessLogLoggerObjName =
+        log4jMBean.addLoggerMBean(AZKABAN_ACCESS_LOGGER_NAME);
+
+    if (accessLogLoggerObjName == null) {
+      System.out
+          .println("************* loginLoggerObjName is null, make sure there is a logger with name "
+              + AZKABAN_ACCESS_LOGGER_NAME);
+    } else {
+      System.out.println("******** loginLoggerObjName: "
+          + accessLogLoggerObjName.getCanonicalName());
+    }
   }
 
   public void close() {
diff --git a/azkaban-webserver/src/main/java/azkaban/webapp/servlet/ExecutorServlet.java b/azkaban-webserver/src/main/java/azkaban/webapp/servlet/ExecutorServlet.java
index 4970fce..ef588f7 100644
--- a/azkaban-webserver/src/main/java/azkaban/webapp/servlet/ExecutorServlet.java
+++ b/azkaban-webserver/src/main/java/azkaban/webapp/servlet/ExecutorServlet.java
@@ -503,7 +503,7 @@ public class ExecutorServlet extends LoginAbstractAzkabanServlet {
       } else {
         ret.put("length", data.getLength());
         ret.put("offset", data.getOffset());
-        ret.put("data", data.getData());
+        ret.put("data", StringEscapeUtils.escapeHtml(data.getData()));
       }
     } catch (ExecutorManagerException e) {
       throw new ServletException(e);
diff --git a/azkaban-webserver/src/main/java/azkaban/webapp/servlet/LoginAbstractAzkabanServlet.java b/azkaban-webserver/src/main/java/azkaban/webapp/servlet/LoginAbstractAzkabanServlet.java
index e0b9e38..1d9f315 100644
--- a/azkaban-webserver/src/main/java/azkaban/webapp/servlet/LoginAbstractAzkabanServlet.java
+++ b/azkaban-webserver/src/main/java/azkaban/webapp/servlet/LoginAbstractAzkabanServlet.java
@@ -43,6 +43,7 @@ import azkaban.user.Role;
 import azkaban.user.User;
 import azkaban.user.UserManager;
 import azkaban.user.UserManagerException;
+import azkaban.utils.StringUtils;
 
 /**
  * Abstract Servlet that handles auto login when the session hasn't been
@@ -77,11 +78,17 @@ public abstract class LoginAbstractAzkabanServlet extends
 
   private MultipartParser multipartParser;
 
+  private boolean shouldLogRawUserAgent = false;
+
   @Override
   public void init(ServletConfig config) throws ServletException {
     super.init(config);
 
     multipartParser = new MultipartParser(DEFAULT_UPLOAD_DISK_SPOOL_SIZE);
+
+    shouldLogRawUserAgent =
+        getApplication().getServerProps().getBoolean("accesslog.raw.useragent",
+            false);
   }
 
   public void setResourceDirectory(File file) {
@@ -93,6 +100,7 @@ public abstract class LoginAbstractAzkabanServlet extends
       throws ServletException, IOException {
     // Set session id
     Session session = getSessionFromRequest(req);
+    logRequest(req, session);
     if (hasParam(req, "logout")) {
       resp.sendRedirect(req.getContextPath());
       if (session != null) {
@@ -103,7 +111,9 @@ public abstract class LoginAbstractAzkabanServlet extends
     }
 
     if (session != null) {
-      logger.info("Found session " + session.getUser());
+      if (logger.isDebugEnabled()) {
+        logger.debug("Found session " + session.getUser());
+      }
       if (handleFileGet(req, resp)) {
         return;
       }
@@ -120,6 +130,46 @@ public abstract class LoginAbstractAzkabanServlet extends
     }
   }
 
+  /**
+   * Log out request - the format should be close to Apache access log format
+   * 
+   * @param req
+   * @param session
+   */
+  private void logRequest(HttpServletRequest req, Session session) {
+    StringBuilder buf = new StringBuilder();
+    buf.append(req.getRemoteAddr()).append(" ");
+    if (session != null && session.getUser() != null) {
+      buf.append(session.getUser().getUserId()).append(" ");
+    } else {
+      buf.append(" - ").append(" ");
+    }
+
+    buf.append("\"");
+    buf.append(req.getMethod()).append(" ");
+    buf.append(req.getRequestURI()).append(" ");
+    if (req.getQueryString() != null) {
+      buf.append(req.getQueryString()).append(" ");
+    } else {
+      buf.append("-").append(" ");
+    }
+    buf.append(req.getProtocol()).append("\" ");
+
+    String userAgent = req.getHeader("User-Agent");
+    if (shouldLogRawUserAgent) {
+      buf.append(userAgent);
+    } else {
+      // simply log a short string to indicate browser or not
+      if (StringUtils.isFromBrowser(userAgent)) {
+        buf.append("browser");
+      } else {
+        buf.append("not-browser");
+      }
+    }
+
+    logger.info(buf.toString());
+  }
+
   private boolean handleFileGet(HttpServletRequest req, HttpServletResponse resp)
       throws IOException {
     if (webResourceDirectory == null) {
@@ -168,7 +218,6 @@ public abstract class LoginAbstractAzkabanServlet extends
 
     if (cookie != null) {
       sessionId = cookie.getValue();
-      logger.info("Session id " + sessionId);
     }
 
     if (sessionId == null && hasParam(req, "session.id")) {
@@ -211,6 +260,8 @@ public abstract class LoginAbstractAzkabanServlet extends
       throws ServletException, IOException {
     Session session = getSessionFromRequest(req);
 
+    logRequest(req, session);
+
     // Handle Multipart differently from other post messages
     if (ServletFileUpload.isMultipartContent(req)) {
       Map<String, Object> params = multipartParser.parseMultipart(req);
diff --git a/azkaban-webserver/src/main/resources/log4j.properties b/azkaban-webserver/src/main/resources/log4j.properties
index b9fe746..4001d2c 100644
--- a/azkaban-webserver/src/main/resources/log4j.properties
+++ b/azkaban-webserver/src/main/resources/log4j.properties
@@ -1,21 +1,22 @@
-log4j.rootLogger=INFO, Console
+log_dir=${log4j.log.dir}
+
+log4j.rootLogger=INFO, WebServer
 log4j.logger.azkaban.webapp=INFO, WebServer
-log4j.logger.azkaban.webapp.servlet.AbstractAzkabanServlet=INFO, R
-log4j.logger.azkaban.webapp.servlet.LoginAbstractAzkabanServlet=INFO, R
+log4j.logger.azkaban.webapp.servlet.AbstractAzkabanServlet=INFO, Access
+log4j.logger.azkaban.webapp.servlet.LoginAbstractAzkabanServlet=INFO, Access
+log4j.additivity.azkaban.webapp.servlet.LoginAbstractAzkabanServlet=false
 
-log4j.appender.R=org.apache.log4j.RollingFileAppender
-log4j.appender.R.layout=org.apache.log4j.PatternLayout
-log4j.appender.R.File=azkaban-access.log
-log4j.appender.R.layout.ConversionPattern=%d{yyyy/MM/dd HH:mm:ss.SSS Z} %p [%c{1}] [Azkaban] %m%n
-log4j.appender.R.MaxFileSize=102400MB
-log4j.appender.R.MaxBackupIndex=2
+log4j.appender.Access=org.apache.log4j.DailyRollingFileAppender
+log4j.appender.Access.layout=org.apache.log4j.PatternLayout
+log4j.appender.Access.File=${log_dir}/azkaban-access.log
+log4j.appender.Access.layout.ConversionPattern=%d{yyyy/MM/dd HH:mm:ss.SSS Z} %p [%c{1}] [Azkaban] %m%n
+log4j.appender.Access.DatePattern='.'yyyy-MM-dd
 
-log4j.appender.WebServer=org.apache.log4j.RollingFileAppender
+log4j.appender.WebServer=org.apache.log4j.DailyRollingFileAppender
 log4j.appender.WebServer.layout=org.apache.log4j.PatternLayout
-log4j.appender.WebServer.File=azkaban-webserver.log
+log4j.appender.WebServer.File=${log_dir}/azkaban-webserver.log
 log4j.appender.WebServer.layout.ConversionPattern=%d{yyyy/MM/dd HH:mm:ss.SSS Z} %p [%c{1}] [Azkaban] %m%n
-log4j.appender.WebServer.MaxFileSize=102400MB
-log4j.appender.WebServer.MaxBackupIndex=2
+log4j.appender.WebServer.DatePattern='.'yyyy-MM-dd
 
 log4j.appender.Console=org.apache.log4j.ConsoleAppender
 log4j.appender.Console.layout=org.apache.log4j.PatternLayout
diff --git a/azkaban-webserver/src/package/bin/azkaban-web-start.sh b/azkaban-webserver/src/package/bin/azkaban-web-start.sh
index 744320a..f0cc3c8 100755
--- a/azkaban-webserver/src/package/bin/azkaban-web-start.sh
+++ b/azkaban-webserver/src/package/bin/azkaban-web-start.sh
@@ -43,7 +43,7 @@ serverpath=`pwd`
 if [ -z $AZKABAN_OPTS ]; then
   AZKABAN_OPTS="-Xmx4G"
 fi
-AZKABAN_OPTS="$AZKABAN_OPTS -server -Dcom.sun.management.jmxremote -Djava.io.tmpdir=$tmpdir -Dexecutorport=$executorport -Dserverpath=$serverpath"
+AZKABAN_OPTS="$AZKABAN_OPTS -server -Dcom.sun.management.jmxremote -Djava.io.tmpdir=$tmpdir -Dexecutorport=$executorport -Dserverpath=$serverpath -Dlog4j.log.dir=$azkaban_dir/logs"
 
 java $AZKABAN_OPTS $JAVA_LIB_PATH -cp $CLASSPATH azkaban.webapp.AzkabanWebServer -conf $azkaban_dir/conf $@ &
 
diff --git a/azkaban-webserver/src/package/conf/azkaban.properties b/azkaban-webserver/src/package/conf/azkaban.properties
index 609a917..a89a46a 100644
--- a/azkaban-webserver/src/package/conf/azkaban.properties
+++ b/azkaban-webserver/src/package/conf/azkaban.properties
@@ -34,6 +34,7 @@ jetty.password=password
 jetty.keypassword=password
 jetty.truststore=keystore
 jetty.trustpassword=password
+jetty.excludeCipherSuites=SSL_RSA_WITH_DES_CBC_SHA,SSL_DHE_RSA_WITH_DES_CBC_SHA,SSL_DHE_DSS_WITH_DES_CBC_SHA,SSL_RSA_EXPORT_WITH_RC4_40_MD5,SSL_RSA_EXPORT_WITH_DES40_CBC_SHA,SSL_DHE_RSA_EXPORT_WITH_DES40_CBC_SHA,SSL_DHE_RSA_WITH_3DES_EDE_CBC_SHA,SSL_DHE_DSS_WITH_3DES_EDE_CBC_SHA,TLS_DHE_RSA_WITH_AES_256_CBC_SHA256,TLS_DHE_DSS_WITH_AES_256_CBC_SHA256,TLS_DHE_RSA_WITH_AES_256_CBC_SHA,TLS_DHE_DSS_WITH_AES_256_CBC_SHA,TLS_DHE_RSA_WITH_AES_128_CBC_SHA256,TLS_DHE_DSS_WITH_AES_128_CBC_SHA256,TLS_DHE_RSA_WITH_AES_128_CBC_SHA,TLS_DHE_DSS_WITH_AES_128_CBC_SHA
 
 # Azkaban Executor settings
 executor.port=12321

build.gradle 53(+47 -6)

diff --git a/build.gradle b/build.gradle
index d8afcca..51e4a15 100644
--- a/build.gradle
+++ b/build.gradle
@@ -1,10 +1,14 @@
 buildscript {
   repositories {
     mavenCentral()
+    maven {
+      url 'https://plugins.gradle.org/m2/'
+    }
   }
   dependencies {
     classpath 'com.linkedin:gradle-dustjs-plugin:1.0.0'
     classpath 'de.obqo.gradle:gradle-lesscss-plugin:1.0-1.3.3'
+    classpath 'net.ltgt.gradle:gradle-errorprone-plugin:0.0.8'
   }
 }
 
@@ -32,6 +36,11 @@ def cmdCaller = { commandln ->
 subprojects {
   apply plugin: 'java'
   apply plugin: 'eclipse'
+  apply plugin: 'net.ltgt.errorprone'
+
+  configurations.errorprone {
+    resolutionStrategy.force 'com.google.errorprone:error_prone_core:2.0.5'
+  }
 
   /**
    * Gets the version name from the latest Git tag
@@ -78,6 +87,25 @@ project(':azkaban-common') {
     all {
       transitive = false
     }
+    errorprone {
+      transitive = true
+    }
+  }
+
+  apply plugin: 'c'
+  model {
+    components {
+      main(NativeExecutableSpec) {
+        sources {
+          c {
+            source {
+              srcDir "src/main"
+              include "**/*.c"
+            }
+          }
+        }
+      }
+    }
   }
 
   dependencies {
@@ -111,14 +139,15 @@ project(':azkaban-common') {
     compile('org.mortbay.jetty:jetty-util:6.1.26')
     compile('org.slf4j:slf4j-api:1.6.1')
 
+    testCompile(project(':azkaban-test').sourceSets.test.output)
     testCompile('junit:junit:4.11')
     testCompile('org.hamcrest:hamcrest-all:1.3')
   }
-  
+
   tasks.withType(JavaCompile) {
     options.encoding = "UTF-8"
   }
-  
+
 }
 
 project(':azkaban-migration') {
@@ -126,6 +155,9 @@ project(':azkaban-migration') {
     all {
       transitive = false
     }
+    errorprone {
+      transitive = true
+    }
   }
 
   dependencies {
@@ -174,6 +206,9 @@ project(':azkaban-webserver') {
     generateRestli {
       transitive = true
     }
+    errorprone {
+      transitive = true
+    }
   }
 
   dependencies {
@@ -297,6 +332,9 @@ project(':azkaban-execserver') {
     all {
       transitive = false
     }
+    errorprone {
+      transitive = true
+    }
   }
 
   dependencies {
@@ -312,7 +350,6 @@ project(':azkaban-execserver') {
     compile('org.mortbay.jetty:jetty-util:6.1.26')
     compile('org.codehaus.jackson:jackson-core-asl:1.9.5')
     compile('org.codehaus.jackson:jackson-mapper-asl:1.9.5')
-    
 
     testCompile('junit:junit:4.11')
     testCompile('org.hamcrest:hamcrest-all:1.3')
@@ -413,6 +450,10 @@ project(':azkaban-sql') {
 project(':azkaban-test') {
   apply plugin: 'distribution'
 
+  dependencies {
+    testCompile('junit:junit:4.11')
+  }
+
   distributions {
     animal {
       baseName = 'test-animal'
@@ -478,11 +519,11 @@ project(':azkaban-test') {
     }
   }
 
-  distZip.dependsOn build, animalDistZip, embeddedDistZip, embedded2DistZip,
+  distZip.dependsOn animalDistZip, embeddedDistZip, embedded2DistZip,
       embedded3DistZip, embeddedBadDistZip, execpropstestDistZip,
       exectest1DistZip, exectest2DistZip, logtestDistZip
 
-  distTar.dependsOn build, animalDistTar, embeddedDistTar, embedded2DistTar,
+  distTar.dependsOn animalDistTar, embeddedDistTar, embedded2DistTar,
       embedded3DistTar, embeddedBadDistTar, execpropstestDistTar,
       exectest1DistTar, exectest2DistTar, logtestDistTar
 }
@@ -570,5 +611,5 @@ distZip.dependsOn migrationDistZip, webserverDistZip, execserverDistZip, soloser
  * Gradle wrapper task.
  */
 task wrapper(type: Wrapper) {
-  gradleVersion = '1.12'
+  gradleVersion = '2.7'
 }
diff --git a/gradle.properties b/gradle.properties
index 6f7f687..ef3a7de 100644
--- a/gradle.properties
+++ b/gradle.properties
@@ -1,3 +1,3 @@
 org.gradle.daemon=true
 group=com.linkedin
-version=2.6.4
+version=2.7.0
diff --git a/gradle/wrapper/gradle-wrapper.jar b/gradle/wrapper/gradle-wrapper.jar
index 0087cd3..e8c6bf7 100644
Binary files a/gradle/wrapper/gradle-wrapper.jar and b/gradle/wrapper/gradle-wrapper.jar differ
diff --git a/gradle/wrapper/gradle-wrapper.properties b/gradle/wrapper/gradle-wrapper.properties
index 506745b..bbc82a1 100644
--- a/gradle/wrapper/gradle-wrapper.properties
+++ b/gradle/wrapper/gradle-wrapper.properties
@@ -1,6 +1,6 @@
-#Wed Jun 11 01:55:01 PDT 2014
+#Sat Sep 26 15:48:43 PDT 2015
 distributionBase=GRADLE_USER_HOME
 distributionPath=wrapper/dists
 zipStoreBase=GRADLE_USER_HOME
 zipStorePath=wrapper/dists
-distributionUrl=https\://services.gradle.org/distributions/gradle-1.12-bin.zip
+distributionUrl=https\://services.gradle.org/distributions/gradle-2.7-bin.zip

gradlew 6(+1 -5)

diff --git a/gradlew b/gradlew
index 91a7e26..97fac78 100755
--- a/gradlew
+++ b/gradlew
@@ -42,11 +42,6 @@ case "`uname`" in
     ;;
 esac
 
-# For Cygwin, ensure paths are in UNIX format before anything is touched.
-if $cygwin ; then
-    [ -n "$JAVA_HOME" ] && JAVA_HOME=`cygpath --unix "$JAVA_HOME"`
-fi
-
 # Attempt to set APP_HOME
 # Resolve links: $0 may be a link
 PRG="$0"
@@ -114,6 +109,7 @@ fi
 if $cygwin ; then
     APP_HOME=`cygpath --path --mixed "$APP_HOME"`
     CLASSPATH=`cygpath --path --mixed "$CLASSPATH"`
+    JAVACMD=`cygpath --unix "$JAVACMD"`
 
     # We build the pattern for arguments to be converted via cygpath
     ROOTDIRSRAW=`find -L / -maxdepth 1 -mindepth 1 -type d 2>/dev/null`