azkaban-aplcache

Changes

Details

diff --git a/azkaban-common/src/main/java/azkaban/AzkabanCommonModule.java b/azkaban-common/src/main/java/azkaban/AzkabanCommonModule.java
index 71fb6b9..1397bb2 100644
--- a/azkaban-common/src/main/java/azkaban/AzkabanCommonModule.java
+++ b/azkaban-common/src/main/java/azkaban/AzkabanCommonModule.java
@@ -26,7 +26,6 @@ import azkaban.db.DatabaseOperatorImpl;
 import azkaban.db.H2FileDataSource;
 import azkaban.db.MySQLDataSource;
 import azkaban.executor.ExecutorLoader;
-import azkaban.executor.ExecutorManager;
 import azkaban.executor.JdbcExecutorLoader;
 import azkaban.project.JdbcProjectImpl;
 import azkaban.project.ProjectLoader;
diff --git a/azkaban-common/src/main/java/azkaban/Constants.java b/azkaban-common/src/main/java/azkaban/Constants.java
index 54d1b70..f97c7fe 100644
--- a/azkaban-common/src/main/java/azkaban/Constants.java
+++ b/azkaban-common/src/main/java/azkaban/Constants.java
@@ -20,13 +20,10 @@ package azkaban;
 /**
  * Constants
  *
- * Global place for storing constants.
- * Conventions:
- * <p>
- * - All internal constants to be put in the root level ie. {@link Constants} class <p>
- * - All Configuration keys to be put in {@link ConfigurationKeys} class <p>
- * - Flow level Properties keys go to {@link FlowProperties} <p>
- * - Job  level Properties keys go to {@link JobProperties} <p>
+ * Global place for storing constants. Conventions: <p> - All internal constants to be put in the
+ * root level ie. {@link Constants} class <p> - All Configuration keys to be put in {@link
+ * ConfigurationKeys} class <p> - Flow level Properties keys go to {@link FlowProperties} <p> - Job
+ * level Properties keys go to {@link JobProperties} <p>
  */
 public class Constants {
 
@@ -54,6 +51,7 @@ public class Constants {
   public static final long DEFAULT_SCHEDULE_END_EPOCH_TIME = 2524608000000L;
 
   public static class ConfigurationKeys {
+
     // These properties are configurable through azkaban.properties
     public static final String AZKABAN_PID_FILENAME = "azkaban.pid.filename";
 
diff --git a/azkaban-common/src/main/java/azkaban/executor/AlerterHolder.java b/azkaban-common/src/main/java/azkaban/executor/AlerterHolder.java
index 70b6f83..b6e63db 100644
--- a/azkaban-common/src/main/java/azkaban/executor/AlerterHolder.java
+++ b/azkaban-common/src/main/java/azkaban/executor/AlerterHolder.java
@@ -22,7 +22,6 @@ import azkaban.utils.Emailer;
 import azkaban.utils.FileIOUtils;
 import azkaban.utils.Props;
 import azkaban.utils.PropsUtils;
-import azkaban.metrics.CommonMetrics;
 import com.google.inject.Inject;
 import com.google.inject.Singleton;
 import java.io.File;
diff --git a/azkaban-common/src/main/java/azkaban/executor/AssignExecutorDao.java b/azkaban-common/src/main/java/azkaban/executor/AssignExecutorDao.java
index c526d4f..a4e01a5 100644
--- a/azkaban-common/src/main/java/azkaban/executor/AssignExecutorDao.java
+++ b/azkaban-common/src/main/java/azkaban/executor/AssignExecutorDao.java
@@ -29,7 +29,7 @@ public class AssignExecutorDao {
 
   @Inject
   public AssignExecutorDao(final DatabaseOperator dbOperator,
-                           final ExecutorDao executorDao) {
+      final ExecutorDao executorDao) {
     this.dbOperator = dbOperator;
     this.executorDao = executorDao;
   }
diff --git a/azkaban-common/src/main/java/azkaban/executor/ExecutableNode.java b/azkaban-common/src/main/java/azkaban/executor/ExecutableNode.java
index ad63a36..a82486a 100644
--- a/azkaban-common/src/main/java/azkaban/executor/ExecutableNode.java
+++ b/azkaban-common/src/main/java/azkaban/executor/ExecutableNode.java
@@ -157,14 +157,14 @@ public class ExecutableNode {
     this.updateTime = updateTime;
   }
 
-  public void setKilledBySLA(boolean killedBySLA) {
-    this.killedBySLA = killedBySLA;
-  }
-
   public boolean isKilledBySLA() {
     return this.killedBySLA;
   }
 
+  public void setKilledBySLA(final boolean killedBySLA) {
+    this.killedBySLA = killedBySLA;
+  }
+
   public void addOutNode(final String exNode) {
     this.outNodes.add(exNode);
   }
diff --git a/azkaban-common/src/main/java/azkaban/executor/ExecutionFlowDao.java b/azkaban-common/src/main/java/azkaban/executor/ExecutionFlowDao.java
index 457c4f5..5ccac59 100644
--- a/azkaban-common/src/main/java/azkaban/executor/ExecutionFlowDao.java
+++ b/azkaban-common/src/main/java/azkaban/executor/ExecutionFlowDao.java
@@ -88,7 +88,7 @@ public class ExecutionFlowDao {
   }
 
   List<ExecutableFlow> fetchFlowHistory(final int projectId, final String flowId,
-                                        final int skip, final int num)
+      final int skip, final int num)
       throws ExecutorManagerException {
     try {
       return this.dbOperator.query(FetchExecutableFlows.FETCH_EXECUTABLE_FLOW_HISTORY,
@@ -109,8 +109,8 @@ public class ExecutionFlowDao {
   }
 
   List<ExecutableFlow> fetchFlowHistory(final int projectId, final String flowId,
-                                        final int skip, final int num,
-                                        final Status status)
+      final int skip, final int num,
+      final Status status)
       throws ExecutorManagerException {
     try {
       return this.dbOperator.query(FetchExecutableFlows.FETCH_EXECUTABLE_FLOW_BY_STATUS,
@@ -133,9 +133,9 @@ public class ExecutionFlowDao {
   }
 
   List<ExecutableFlow> fetchFlowHistory(final String projContain, final String flowContains,
-                                        final String userNameContains, final int status,
-                                        final long startTime, final long endTime,
-                                        final int skip, final int num)
+      final String userNameContains, final int status,
+      final long startTime, final long endTime,
+      final int skip, final int num)
       throws ExecutorManagerException {
     String query = FetchExecutableFlows.FETCH_BASE_EXECUTABLE_FLOW_QUERY;
     final List<Object> params = new ArrayList<>();
@@ -318,6 +318,7 @@ public class ExecutionFlowDao {
    */
   private static class FetchQueuedExecutableFlows implements
       ResultSetHandler<List<Pair<ExecutionReference, ExecutableFlow>>> {
+
     // Select queued unassigned flows
     private static final String FETCH_QUEUED_EXECUTABLE_FLOW =
         "SELECT exec_id, enc_type, flow_data FROM execution_flows"
@@ -360,6 +361,7 @@ public class ExecutionFlowDao {
 
   private static class FetchRecentlyFinishedFlows implements
       ResultSetHandler<List<ExecutableFlow>> {
+
     // Execution_flows table is already indexed by end_time
     private static final String FETCH_RECENTLY_FINISHED_FLOW =
         "SELECT exec_id, enc_type, flow_data FROM execution_flows "
diff --git a/azkaban-common/src/main/java/azkaban/executor/ExecutionJobDao.java b/azkaban-common/src/main/java/azkaban/executor/ExecutionJobDao.java
index 88fe492..da13928 100644
--- a/azkaban-common/src/main/java/azkaban/executor/ExecutionJobDao.java
+++ b/azkaban-common/src/main/java/azkaban/executor/ExecutionJobDao.java
@@ -174,9 +174,9 @@ public class ExecutionJobDao {
   }
 
   public List<ExecutableJobInfo> fetchJobHistory(final int projectId,
-                                                 final String jobId,
-                                                 final int skip,
-                                                 final int size) throws ExecutorManagerException {
+      final String jobId,
+      final int skip,
+      final int size) throws ExecutorManagerException {
     try {
       final List<ExecutableJobInfo> info =
           this.dbOperator.query(FetchExecutableJobHandler.FETCH_PROJECT_EXECUTABLE_NODE,
@@ -229,6 +229,7 @@ public class ExecutionJobDao {
 
   private static class FetchExecutableJobHandler implements
       ResultSetHandler<List<ExecutableJobInfo>> {
+
     private static final String FETCH_EXECUTABLE_NODE =
         "SELECT exec_id, project_id, version, flow_id, job_id, "
             + "start_time, end_time, status, attempt "
@@ -247,7 +248,7 @@ public class ExecutionJobDao {
     @Override
     public List<ExecutableJobInfo> handle(final ResultSet rs) throws SQLException {
       if (!rs.next()) {
-        return Collections.<ExecutableJobInfo> emptyList();
+        return Collections.<ExecutableJobInfo>emptyList();
       }
 
       final List<ExecutableJobInfo> execNodes = new ArrayList<>();
@@ -274,6 +275,7 @@ public class ExecutionJobDao {
 
   private static class FetchExecutableJobPropsHandler implements
       ResultSetHandler<Pair<Props, Props>> {
+
     private static final String FETCH_OUTPUT_PARAM_EXECUTABLE_NODE =
         "SELECT output_params FROM execution_jobs WHERE exec_id=? AND job_id=?";
     private static final String FETCH_INPUT_PARAM_EXECUTABLE_NODE =
@@ -335,6 +337,7 @@ public class ExecutionJobDao {
 
   private static class FetchExecutableJobAttachmentsHandler implements
       ResultSetHandler<String> {
+
     private static final String FETCH_ATTACHMENTS_EXECUTABLE_NODE =
         "SELECT attachments FROM execution_jobs WHERE exec_id=? AND job_id=?";
 
@@ -354,4 +357,4 @@ public class ExecutionJobDao {
       return attachmentsJson;
     }
   }
-}
\ No newline at end of file
+}
diff --git a/azkaban-common/src/main/java/azkaban/executor/ExecutionLogsDao.java b/azkaban-common/src/main/java/azkaban/executor/ExecutionLogsDao.java
index 0821814..707c1c8 100644
--- a/azkaban-common/src/main/java/azkaban/executor/ExecutionLogsDao.java
+++ b/azkaban-common/src/main/java/azkaban/executor/ExecutionLogsDao.java
@@ -55,8 +55,8 @@ public class ExecutionLogsDao {
 
   // TODO kunkun-tang: the interface's parameter is called endByte, but actually is length.
   LogData fetchLogs(final int execId, final String name, final int attempt,
-                    final int startByte,
-                    final int length) throws ExecutorManagerException {
+      final int startByte,
+      final int length) throws ExecutorManagerException {
     final FetchLogsHandler handler = new FetchLogsHandler(startByte, length + startByte);
     try {
       return this.dbOperator.query(FetchLogsHandler.FETCH_LOGS, handler,
@@ -68,7 +68,7 @@ public class ExecutionLogsDao {
   }
 
   public void uploadLogFile(final int execId, final String name, final int attempt,
-                            final File... files) throws ExecutorManagerException {
+      final File... files) throws ExecutorManagerException {
     final SQLTransaction<Integer> transaction = transOperator -> {
       uploadLogFile(transOperator, execId, name, attempt, files, this.defaultEncodingType);
       transOperator.getConnection().commit();
@@ -82,8 +82,9 @@ public class ExecutionLogsDao {
     }
   }
 
-  private void uploadLogFile(final DatabaseTransOperator transOperator, final int execId, final String name,
-                             final int attempt, final File[] files, final EncodingType encType)
+  private void uploadLogFile(final DatabaseTransOperator transOperator, final int execId,
+      final String name,
+      final int attempt, final File[] files, final EncodingType encType)
       throws SQLException {
     // 50K buffer... if logs are greater than this, we chunk.
     // However, we better prevent large log files from being uploaded somehow
@@ -148,10 +149,10 @@ public class ExecutionLogsDao {
   }
 
   private void uploadLogPart(final DatabaseTransOperator transOperator, final int execId,
-                             final String name,
-                             final int attempt, final int startByte, final int endByte,
-                             final EncodingType encType,
-                             final byte[] buffer, final int length)
+      final String name,
+      final int attempt, final int startByte, final int endByte,
+      final EncodingType encType,
+      final byte[] buffer, final int length)
       throws SQLException, IOException {
     final String INSERT_EXECUTION_LOGS = "INSERT INTO execution_logs "
         + "(exec_id, name, attempt, enc_type, start_byte, end_byte, "
@@ -228,4 +229,4 @@ public class ExecutionLogsDao {
           new String(buffer, result.getFirst(), result.getSecond(), StandardCharsets.UTF_8));
     }
   }
-}
\ No newline at end of file
+}
diff --git a/azkaban-common/src/main/java/azkaban/executor/ExecutorDao.java b/azkaban-common/src/main/java/azkaban/executor/ExecutorDao.java
index 35a1882..12653b4 100644
--- a/azkaban-common/src/main/java/azkaban/executor/ExecutorDao.java
+++ b/azkaban-common/src/main/java/azkaban/executor/ExecutorDao.java
@@ -34,7 +34,7 @@ public class ExecutorDao {
   private final DatabaseOperator dbOperator;
 
   @Inject
-  public ExecutorDao (final DatabaseOperator dbOperator) {
+  public ExecutorDao(final DatabaseOperator dbOperator) {
     this.dbOperator = dbOperator;
   }
 
diff --git a/azkaban-common/src/main/java/azkaban/executor/ExecutorEventsDao.java b/azkaban-common/src/main/java/azkaban/executor/ExecutorEventsDao.java
index d4e581c..90e17f0 100644
--- a/azkaban-common/src/main/java/azkaban/executor/ExecutorEventsDao.java
+++ b/azkaban-common/src/main/java/azkaban/executor/ExecutorEventsDao.java
@@ -39,7 +39,7 @@ public class ExecutorEventsDao {
   }
 
   public void postExecutorEvent(final Executor executor, final EventType type, final String user,
-                                final String message) throws ExecutorManagerException {
+      final String message) throws ExecutorManagerException {
     final String INSERT_PROJECT_EVENTS =
         "INSERT INTO executor_events (executor_id, event_type, event_time, username, message) values (?,?,?,?,?)";
     try {
@@ -51,7 +51,7 @@ public class ExecutorEventsDao {
   }
 
   public List<ExecutorLogEvent> getExecutorEvents(final Executor executor, final int num,
-                                                  final int offset)
+      final int offset)
       throws ExecutorManagerException {
     try {
       return this.dbOperator.query(ExecutorLogsResultHandler.SELECT_EXECUTOR_EVENTS_ORDER,
@@ -95,4 +95,4 @@ public class ExecutorEventsDao {
       return events;
     }
   }
-}
\ No newline at end of file
+}
diff --git a/azkaban-common/src/main/java/azkaban/executor/ExecutorInfo.java b/azkaban-common/src/main/java/azkaban/executor/ExecutorInfo.java
index d269f9a..c743499 100644
--- a/azkaban-common/src/main/java/azkaban/executor/ExecutorInfo.java
+++ b/azkaban-common/src/main/java/azkaban/executor/ExecutorInfo.java
@@ -20,14 +20,9 @@ import java.io.IOException;
 import org.codehaus.jackson.map.ObjectMapper;
 
 /**
- * Class that exposes the statistics from the executor server.
- * List of the statistics -
- * remainingMemoryPercent;
- * remainingMemory;
- * remainingFlowCapacity;
- * numberOfAssignedFlows;
- * lastDispatchedTime;
- * cpuUsage;
+ * Class that exposes the statistics from the executor server. List of the statistics -
+ * remainingMemoryPercent; remainingMemory; remainingFlowCapacity; numberOfAssignedFlows;
+ * lastDispatchedTime; cpuUsage;
  */
 public class ExecutorInfo implements java.io.Serializable {
 
diff --git a/azkaban-common/src/main/java/azkaban/executor/ExecutorLoader.java b/azkaban-common/src/main/java/azkaban/executor/ExecutorLoader.java
index a8a719d..7702a73 100644
--- a/azkaban-common/src/main/java/azkaban/executor/ExecutorLoader.java
+++ b/azkaban-common/src/main/java/azkaban/executor/ExecutorLoader.java
@@ -26,6 +26,7 @@ import java.util.List;
 import java.util.Map;
 
 public interface ExecutorLoader {
+
   void uploadExecutableFlow(ExecutableFlow flow)
       throws ExecutorManagerException;
 
@@ -63,7 +64,6 @@ public interface ExecutorLoader {
    * </pre>
    *
    * @return List<Executor>
-   * @throws ExecutorManagerException
    */
   List<Executor> fetchAllExecutors() throws ExecutorManagerException;
 
@@ -76,7 +76,6 @@ public interface ExecutorLoader {
    * </pre>
    *
    * @return List<Executor>
-   * @throws ExecutorManagerException
    */
   List<Executor> fetchActiveExecutors() throws ExecutorManagerException;
 
@@ -90,10 +89,9 @@ public interface ExecutorLoader {
    * </pre>
    *
    * @return Executor
-   * @throws ExecutorManagerException
    */
   Executor fetchExecutor(String host, int port)
-    throws ExecutorManagerException;
+      throws ExecutorManagerException;
 
   /**
    * <pre>
@@ -104,7 +102,6 @@ public interface ExecutorLoader {
    * </pre>
    *
    * @return Executor
-   * @throws ExecutorManagerException
    */
   Executor fetchExecutor(int executorId) throws ExecutorManagerException;
 
@@ -118,10 +115,9 @@ public interface ExecutorLoader {
    * </pre>
    *
    * @return Executor
-   * @throws ExecutorManagerException
    */
   Executor addExecutor(String host, int port)
-    throws ExecutorManagerException;
+      throws ExecutorManagerException;
 
   /**
    * <pre>
@@ -131,9 +127,6 @@ public interface ExecutorLoader {
    * 2. throws an Exception if there is no executor with the given id
    * 3. return null when no executor is found with the given executorId
    * </pre>
-   *
-   * @param executorId
-   * @throws ExecutorManagerException
    */
   void updateExecutor(Executor executor) throws ExecutorManagerException;
 
@@ -144,9 +137,6 @@ public interface ExecutorLoader {
    * 1. throws an Exception in case of a SQL issue
    * 2. throws an Exception if there is no executor in the table* </pre>
    * </pre>
-   * @param host
-   * @param port
-   * @throws ExecutorManagerException
    */
   void removeExecutor(String host, int port) throws ExecutorManagerException;
 
@@ -157,14 +147,10 @@ public interface ExecutorLoader {
    * Note: throws an Exception in case of a SQL issue
    * </pre>
    *
-   * @param executor
-   * @param type
-   * @param user
-   * @param message
    * @return isSuccess
    */
   void postExecutorEvent(Executor executor, EventType type, String user,
-    String message) throws ExecutorManagerException;
+      String message) throws ExecutorManagerException;
 
   /**
    * <pre>
@@ -175,14 +161,10 @@ public interface ExecutorLoader {
    * 2. Returns an empty list in case of no events
    * </pre>
    *
-   * @param executor
-   * @param num
-   * @param skip
    * @return List<ExecutorLogEvent>
-   * @throws ExecutorManagerException
    */
   List<ExecutorLogEvent> getExecutorEvents(Executor executor, int num,
-    int offset) throws ExecutorManagerException;
+      int offset) throws ExecutorManagerException;
 
   void addActiveExecutableReference(ExecutionReference ref)
       throws ExecutorManagerException;
@@ -197,10 +179,6 @@ public interface ExecutorLoader {
    * Note:-
    * throws an Exception in case of a SQL issue
    * </pre>
-   *
-   * @param executorId
-   * @param execId
-   * @throws ExecutorManagerException
    */
   void unassignExecutor(int executionId) throws ExecutorManagerException;
 
@@ -211,13 +189,9 @@ public interface ExecutorLoader {
    * 1. throws an Exception in case of a SQL issue
    * 2. throws an Exception in case executionId or executorId do not exist
    * </pre>
-   *
-   * @param executorId
-   * @param execId
-   * @throws ExecutorManagerException
    */
   void assignExecutor(int executorId, int execId)
-    throws ExecutorManagerException;
+      throws ExecutorManagerException;
 
   /**
    * <pre>
@@ -227,12 +201,10 @@ public interface ExecutorLoader {
    * 2. return null when no executor is found with the given executionId
    * </pre>
    *
-   * @param executionId
    * @return fetched Executor
-   * @throws ExecutorManagerException
    */
   Executor fetchExecutorByExecutionId(int executionId)
-    throws ExecutorManagerException;
+      throws ExecutorManagerException;
 
   /**
    * <pre>
@@ -243,10 +215,9 @@ public interface ExecutorLoader {
    * </pre>
    *
    * @return List of queued flows and corresponding execution reference
-   * @throws ExecutorManagerException
    */
   List<Pair<ExecutionReference, ExecutableFlow>> fetchQueuedFlows()
-    throws ExecutorManagerException;
+      throws ExecutorManagerException;
 
   boolean updateExecutableReference(int execId, long updateTime)
       throws ExecutorManagerException;
diff --git a/azkaban-common/src/main/java/azkaban/executor/ExecutorLogEvent.java b/azkaban-common/src/main/java/azkaban/executor/ExecutorLogEvent.java
index cd0d38c..d763ff9 100644
--- a/azkaban-common/src/main/java/azkaban/executor/ExecutorLogEvent.java
+++ b/azkaban-common/src/main/java/azkaban/executor/ExecutorLogEvent.java
@@ -61,8 +61,8 @@ public class ExecutorLogEvent {
   }
 
   /**
-   * Log event type messages. Do not change the numeric representation of each
-   * enum. Only represent from 0 to 255 different codes.
+   * Log event type messages. Do not change the numeric representation of each enum. Only represent
+   * from 0 to 255 different codes.
    */
   public enum EventType {
     ERROR(128), HOST_UPDATE(1), PORT_UPDATE(2), ACTIVATION(3), INACTIVATION(4),
diff --git a/azkaban-common/src/main/java/azkaban/executor/ExecutorManager.java b/azkaban-common/src/main/java/azkaban/executor/ExecutorManager.java
index a0990fc..b4632ec 100644
--- a/azkaban-common/src/main/java/azkaban/executor/ExecutorManager.java
+++ b/azkaban-common/src/main/java/azkaban/executor/ExecutorManager.java
@@ -17,8 +17,20 @@
 package azkaban.executor;
 
 import azkaban.Constants;
+import azkaban.alert.Alerter;
+import azkaban.event.EventHandler;
+import azkaban.executor.selector.ExecutorComparator;
+import azkaban.executor.selector.ExecutorFilter;
+import azkaban.executor.selector.ExecutorSelector;
 import azkaban.metrics.CommonMetrics;
+import azkaban.project.Project;
+import azkaban.project.ProjectWhitelist;
+import azkaban.utils.FileIOUtils.JobMetaData;
+import azkaban.utils.FileIOUtils.LogData;
 import azkaban.utils.FlowUtils;
+import azkaban.utils.JSONUtils;
+import azkaban.utils.Pair;
+import azkaban.utils.Props;
 import com.google.common.collect.Lists;
 import com.google.inject.Inject;
 import com.google.inject.Singleton;
@@ -44,92 +56,65 @@ import java.util.concurrent.Executors;
 import java.util.concurrent.Future;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
-
 import org.apache.commons.lang.StringUtils;
 import org.apache.log4j.Logger;
 import org.joda.time.DateTime;
 
-import azkaban.alert.Alerter;
-import azkaban.event.Event;
-import azkaban.event.Event.Type;
-import azkaban.event.EventData;
-import azkaban.event.EventHandler;
-import azkaban.executor.selector.ExecutorComparator;
-import azkaban.executor.selector.ExecutorFilter;
-import azkaban.executor.selector.ExecutorSelector;
-import azkaban.project.Project;
-import azkaban.project.ProjectWhitelist;
-import azkaban.utils.FileIOUtils.JobMetaData;
-import azkaban.utils.FileIOUtils.LogData;
-import azkaban.utils.JSONUtils;
-import azkaban.utils.Pair;
-import azkaban.utils.Props;
-
 /**
  * Executor manager used to manage the client side job.
- *
  */
 @Singleton
 public class ExecutorManager extends EventHandler implements
     ExecutorManagerAdapter {
+
+  public static final String AZKABAN_USE_MULTIPLE_EXECUTORS =
+      "azkaban.use.multiple.executors";
   static final String AZKABAN_EXECUTOR_SELECTOR_FILTERS =
       "azkaban.executorselector.filters";
   static final String AZKABAN_EXECUTOR_SELECTOR_COMPARATOR_PREFIX =
       "azkaban.executorselector.comparator.";
   static final String AZKABAN_QUEUEPROCESSING_ENABLED =
-    "azkaban.queueprocessing.enabled";
-  public static final String AZKABAN_USE_MULTIPLE_EXECUTORS =
-    "azkaban.use.multiple.executors";
+      "azkaban.queueprocessing.enabled";
   private static final String AZKABAN_WEBSERVER_QUEUE_SIZE =
-    "azkaban.webserver.queue.size";
+      "azkaban.webserver.queue.size";
   private static final String AZKABAN_ACTIVE_EXECUTOR_REFRESH_IN_MS =
-    "azkaban.activeexecutor.refresh.milisecinterval";
+      "azkaban.activeexecutor.refresh.milisecinterval";
   private static final String AZKABAN_ACTIVE_EXECUTOR_REFRESH_IN_NUM_FLOW =
-    "azkaban.activeexecutor.refresh.flowinterval";
+      "azkaban.activeexecutor.refresh.flowinterval";
   private static final String AZKABAN_EXECUTORINFO_REFRESH_MAX_THREADS =
       "azkaban.executorinfo.refresh.maxThreads";
   private static final String AZKABAN_MAX_DISPATCHING_ERRORS_PERMITTED =
-    "azkaban.maxDispatchingErrors";
-
-  private static Logger logger = Logger.getLogger(ExecutorManager.class);
-  private ExecutorLoader executorLoader;
-
-  private CleanerThread cleanerThread;
-
-  private ConcurrentHashMap<Integer, Pair<ExecutionReference, ExecutableFlow>> runningFlows =
-      new ConcurrentHashMap<Integer, Pair<ExecutionReference, ExecutableFlow>>();
-
-
-  QueuedExecutions queuedFlows;
-
-  final private Set<Executor> activeExecutors = new HashSet<Executor>();
-  private QueueProcessorThread queueProcessor;
-  private volatile Pair<ExecutionReference, ExecutableFlow> runningCandidate = null;
-
-  private ExecutingManagerUpdaterThread executingManager;
+      "azkaban.maxDispatchingErrors";
   // 12 weeks
   private static final long DEFAULT_EXECUTION_LOGS_RETENTION_MS = 3 * 4 * 7
       * 24 * 60 * 60 * 1000L;
   private static final Duration RECENTLY_FINISHED_LIFETIME = Duration.ofMinutes(10);
-  private long lastCleanerThreadCheckTime = -1;
-
-  private long lastThreadCheckTime = -1;
-  private String updaterStage = "not started";
-
+  private static final Logger logger = Logger.getLogger(ExecutorManager.class);
+  final private Set<Executor> activeExecutors = new HashSet<>();
   private final AlerterHolder alerterHolder;
-
-  File cacheDir;
-
   private final Props azkProps;
   private final CommonMetrics commonMetrics;
+  private final ExecutorLoader executorLoader;
+  private final CleanerThread cleanerThread;
+  private final ConcurrentHashMap<Integer, Pair<ExecutionReference, ExecutableFlow>> runningFlows =
+      new ConcurrentHashMap<>();
+  private final ExecutingManagerUpdaterThread executingManager;
+  QueuedExecutions queuedFlows;
+  File cacheDir;
+  private QueueProcessorThread queueProcessor;
+  private volatile Pair<ExecutionReference, ExecutableFlow> runningCandidate = null;
+  private long lastCleanerThreadCheckTime = -1;
+  private long lastThreadCheckTime = -1;
+  private String updaterStage = "not started";
   private List<String> filterList;
   private Map<String, Integer> comparatorWeightsMap;
   private long lastSuccessfulExecutorInfoRefresh;
   private ExecutorService executorInforRefresherService;
 
   @Inject
-  public ExecutorManager(Props azkProps, ExecutorLoader loader, AlerterHolder alerterHolder,
-      CommonMetrics commonMetrics) throws ExecutorManagerException {
+  public ExecutorManager(final Props azkProps, final ExecutorLoader loader,
+      final AlerterHolder alerterHolder,
+      final CommonMetrics commonMetrics) throws ExecutorManagerException {
     this.alerterHolder = alerterHolder;
     this.azkProps = azkProps;
     this.commonMetrics = commonMetrics;
@@ -137,213 +122,212 @@ public class ExecutorManager extends EventHandler implements
     this.setupExecutors();
     this.loadRunningFlows();
 
-    queuedFlows = new QueuedExecutions(azkProps.getLong(AZKABAN_WEBSERVER_QUEUE_SIZE, 100000));
+    this.queuedFlows = new QueuedExecutions(azkProps.getLong(AZKABAN_WEBSERVER_QUEUE_SIZE, 100000));
     this.loadQueuedFlows();
 
-    cacheDir = new File(azkProps.getString("cache.directory", "cache"));
+    this.cacheDir = new File(azkProps.getString("cache.directory", "cache"));
 
-    executingManager = new ExecutingManagerUpdaterThread();
-    executingManager.start();
+    this.executingManager = new ExecutingManagerUpdaterThread();
+    this.executingManager.start();
 
-    if(isMultiExecutorMode()) {
+    if (isMultiExecutorMode()) {
       setupMultiExecutorMode();
     }
 
-    long executionLogsRetentionMs =
+    final long executionLogsRetentionMs =
         azkProps.getLong("execution.logs.retention.ms",
-        DEFAULT_EXECUTION_LOGS_RETENTION_MS);
+            DEFAULT_EXECUTION_LOGS_RETENTION_MS);
 
-    cleanerThread = new CleanerThread(executionLogsRetentionMs);
-    cleanerThread.start();
+    this.cleanerThread = new CleanerThread(executionLogsRetentionMs);
+    this.cleanerThread.start();
 
   }
 
   private void setupMultiExecutorMode() {
     // initliatize hard filters for executor selector from azkaban.properties
-    String filters = azkProps.getString(AZKABAN_EXECUTOR_SELECTOR_FILTERS, "");
+    final String filters = this.azkProps.getString(AZKABAN_EXECUTOR_SELECTOR_FILTERS, "");
     if (filters != null) {
-      filterList = Arrays.asList(StringUtils.split(filters, ","));
+      this.filterList = Arrays.asList(StringUtils.split(filters, ","));
     }
 
     // initliatize comparator feature weights for executor selector from
     // azkaban.properties
-    Map<String, String> compListStrings =
-      azkProps.getMapByPrefix(AZKABAN_EXECUTOR_SELECTOR_COMPARATOR_PREFIX);
+    final Map<String, String> compListStrings =
+        this.azkProps.getMapByPrefix(AZKABAN_EXECUTOR_SELECTOR_COMPARATOR_PREFIX);
     if (compListStrings != null) {
-      comparatorWeightsMap = new TreeMap<String, Integer>();
-      for (Map.Entry<String, String> entry : compListStrings.entrySet()) {
-        comparatorWeightsMap.put(entry.getKey(), Integer.valueOf(entry.getValue()));
+      this.comparatorWeightsMap = new TreeMap<>();
+      for (final Map.Entry<String, String> entry : compListStrings.entrySet()) {
+        this.comparatorWeightsMap.put(entry.getKey(), Integer.valueOf(entry.getValue()));
       }
     }
 
-    executorInforRefresherService =
-        Executors.newFixedThreadPool(azkProps.getInt(
-          AZKABAN_EXECUTORINFO_REFRESH_MAX_THREADS, 5));
+    this.executorInforRefresherService =
+        Executors.newFixedThreadPool(this.azkProps.getInt(
+            AZKABAN_EXECUTORINFO_REFRESH_MAX_THREADS, 5));
 
     // configure queue processor
-    queueProcessor =
-      new QueueProcessorThread(azkProps.getBoolean(
-        AZKABAN_QUEUEPROCESSING_ENABLED, true), azkProps.getLong(
-        AZKABAN_ACTIVE_EXECUTOR_REFRESH_IN_MS, 50000), azkProps.getInt(
-        AZKABAN_ACTIVE_EXECUTOR_REFRESH_IN_NUM_FLOW, 5), azkProps.getInt(
-        AZKABAN_MAX_DISPATCHING_ERRORS_PERMITTED, activeExecutors.size()));
+    this.queueProcessor =
+        new QueueProcessorThread(this.azkProps.getBoolean(
+            AZKABAN_QUEUEPROCESSING_ENABLED, true), this.azkProps.getLong(
+            AZKABAN_ACTIVE_EXECUTOR_REFRESH_IN_MS, 50000), this.azkProps.getInt(
+            AZKABAN_ACTIVE_EXECUTOR_REFRESH_IN_NUM_FLOW, 5), this.azkProps.getInt(
+            AZKABAN_MAX_DISPATCHING_ERRORS_PERMITTED, this.activeExecutors.size()));
 
-    queueProcessor.start();
+    this.queueProcessor.start();
   }
 
   /**
-   *
    * {@inheritDoc}
+   *
    * @see azkaban.executor.ExecutorManagerAdapter#setupExecutors()
    */
   @Override
   public void setupExecutors() throws ExecutorManagerException {
-    Set<Executor> newExecutors = new HashSet<Executor>();
+    final Set<Executor> newExecutors = new HashSet<>();
 
     if (isMultiExecutorMode()) {
       logger.info("Initializing multi executors from database");
-      newExecutors.addAll(executorLoader.fetchActiveExecutors());
-    } else if (azkProps.containsKey("executor.port")) {
+      newExecutors.addAll(this.executorLoader.fetchActiveExecutors());
+    } else if (this.azkProps.containsKey("executor.port")) {
       // Add local executor, if specified as per properties
-      String executorHost = azkProps.getString(Constants.ConfigurationKeys.EXECUTOR_HOST, "localhost");
-      int executorPort = azkProps.getInt("executor.port");
+      final String executorHost = this.azkProps
+          .getString(Constants.ConfigurationKeys.EXECUTOR_HOST, "localhost");
+      final int executorPort = this.azkProps.getInt("executor.port");
       logger.info(String.format("Initializing local executor %s:%d",
-        executorHost, executorPort));
+          executorHost, executorPort));
       Executor executor =
-        executorLoader.fetchExecutor(executorHost, executorPort);
+          this.executorLoader.fetchExecutor(executorHost, executorPort);
       if (executor == null) {
-        executor = executorLoader.addExecutor(executorHost, executorPort);
+        executor = this.executorLoader.addExecutor(executorHost, executorPort);
       } else if (!executor.isActive()) {
         executor.setActive(true);
-        executorLoader.updateExecutor(executor);
+        this.executorLoader.updateExecutor(executor);
       }
       newExecutors.add(new Executor(executor.getId(), executorHost,
-        executorPort, true));
+          executorPort, true));
     }
 
     if (newExecutors.isEmpty()) {
       logger.error("No active executor found");
       throw new ExecutorManagerException("No active executor found");
-    } else if(newExecutors.size() > 1 && !isMultiExecutorMode()) {
+    } else if (newExecutors.size() > 1 && !isMultiExecutorMode()) {
       logger.error("Multiple local executors specified");
       throw new ExecutorManagerException("Multiple local executors specified");
     } else {
       // clear all active executors, only if we have at least one new active
       // executors
-      activeExecutors.clear();
-      activeExecutors.addAll(newExecutors);
+      this.activeExecutors.clear();
+      this.activeExecutors.addAll(newExecutors);
     }
   }
 
   private boolean isMultiExecutorMode() {
-    return azkProps.getBoolean(AZKABAN_USE_MULTIPLE_EXECUTORS, false);
+    return this.azkProps.getBoolean(AZKABAN_USE_MULTIPLE_EXECUTORS, false);
   }
 
   /**
    * Refresh Executor stats for all the actie executors in this executorManager
    */
   private void refreshExecutors() {
-    synchronized (activeExecutors) {
+    synchronized (this.activeExecutors) {
 
-      List<Pair<Executor, Future<String>>> futures =
-        new ArrayList<Pair<Executor, Future<String>>>();
-      for (final Executor executor : activeExecutors) {
+      final List<Pair<Executor, Future<String>>> futures =
+          new ArrayList<>();
+      for (final Executor executor : this.activeExecutors) {
         // execute each executorInfo refresh task to fetch
-        Future<String> fetchExecutionInfo =
-          executorInforRefresherService.submit(new Callable<String>() {
-            @Override
-            public String call() throws Exception {
-              return callExecutorForJsonString(executor.getHost(),
-                executor.getPort(), "/serverStatistics", null);
-            }
-          });
-        futures.add(new Pair<Executor, Future<String>>(executor,
-          fetchExecutionInfo));
+        final Future<String> fetchExecutionInfo =
+            this.executorInforRefresherService.submit(new Callable<String>() {
+              @Override
+              public String call() throws Exception {
+                return callExecutorForJsonString(executor.getHost(),
+                    executor.getPort(), "/serverStatistics", null);
+              }
+            });
+        futures.add(new Pair<>(executor,
+            fetchExecutionInfo));
       }
 
       boolean wasSuccess = true;
-      for (Pair<Executor, Future<String>> refreshPair : futures) {
-        Executor executor = refreshPair.getFirst();
+      for (final Pair<Executor, Future<String>> refreshPair : futures) {
+        final Executor executor = refreshPair.getFirst();
         executor.setExecutorInfo(null); // invalidate cached EXecutorInfo
         try {
           // max 5 secs
-          String jsonString = refreshPair.getSecond().get(5, TimeUnit.SECONDS);
+          final String jsonString = refreshPair.getSecond().get(5, TimeUnit.SECONDS);
           executor.setExecutorInfo(ExecutorInfo.fromJSONString(jsonString));
           logger.info(String.format(
-            "Successfully refreshed executor: %s with executor info : %s",
-            executor, jsonString));
-        } catch (TimeoutException e) {
+              "Successfully refreshed executor: %s with executor info : %s",
+              executor, jsonString));
+        } catch (final TimeoutException e) {
           wasSuccess = false;
           logger.error("Timed out while waiting for ExecutorInfo refresh"
-            + executor, e);
-        } catch (Exception e) {
+              + executor, e);
+        } catch (final Exception e) {
           wasSuccess = false;
           logger.error("Failed to update ExecutorInfo for executor : "
-            + executor, e);
+              + executor, e);
         }
       }
 
       // update is successful for all executors
       if (wasSuccess) {
-        lastSuccessfulExecutorInfoRefresh = System.currentTimeMillis();
+        this.lastSuccessfulExecutorInfoRefresh = System.currentTimeMillis();
       }
     }
   }
 
   /**
-   * Throws exception if running in local mode
-   * {@inheritDoc}
+   * Throws exception if running in local mode {@inheritDoc}
+   *
    * @see azkaban.executor.ExecutorManagerAdapter#disableQueueProcessorThread()
    */
   @Override
   public void disableQueueProcessorThread() throws ExecutorManagerException {
     if (isMultiExecutorMode()) {
-      queueProcessor.setActive(false);
+      this.queueProcessor.setActive(false);
     } else {
       throw new ExecutorManagerException(
-        "Cannot disable QueueProcessor in local mode");
+          "Cannot disable QueueProcessor in local mode");
     }
   }
 
   /**
-   * Throws exception if running in local mode
-   * {@inheritDoc}
+   * Throws exception if running in local mode {@inheritDoc}
+   *
    * @see azkaban.executor.ExecutorManagerAdapter#enableQueueProcessorThread()
    */
   @Override
   public void enableQueueProcessorThread() throws ExecutorManagerException {
     if (isMultiExecutorMode()) {
-      queueProcessor.setActive(true);
+      this.queueProcessor.setActive(true);
     } else {
       throw new ExecutorManagerException(
-        "Cannot enable QueueProcessor in local mode");
+          "Cannot enable QueueProcessor in local mode");
     }
   }
 
   public State getQueueProcessorThreadState() {
-    if (isMultiExecutorMode())
-      return queueProcessor.getState();
-    else
+    if (isMultiExecutorMode()) {
+      return this.queueProcessor.getState();
+    } else {
       return State.NEW; // not started in local mode
+    }
   }
 
   /**
-   * Returns state of QueueProcessor False, no flow is being dispatched True ,
-   * flows are being dispatched as expected
-   *
-   * @return
+   * Returns state of QueueProcessor False, no flow is being dispatched True , flows are being
+   * dispatched as expected
    */
   public boolean isQueueProcessorThreadActive() {
-    if (isMultiExecutorMode())
-      return queueProcessor.isActive();
-    else
+    if (isMultiExecutorMode()) {
+      return this.queueProcessor.isActive();
+    } else {
       return false;
+    }
   }
 
   /**
    * Return last Successful ExecutorInfo Refresh for all active executors
-   *
-   * @return
    */
   public long getLastSuccessfulExecutorInfoRefresh() {
     return this.lastSuccessfulExecutorInfoRefresh;
@@ -351,8 +335,6 @@ public class ExecutorManager extends EventHandler implements
 
   /**
    * Get currently supported Comparators available to use via azkaban.properties
-   *
-   * @return
    */
   public Set<String> getAvailableExecutorComparatorNames() {
     return ExecutorComparator.getAvailableComparatorNames();
@@ -361,8 +343,6 @@ public class ExecutorManager extends EventHandler implements
 
   /**
    * Get currently supported filters available to use via azkaban.properties
-   *
-   * @return
    */
   public Set<String> getAvailableExecutorFilterNames() {
     return ExecutorFilter.getAvailableFilterNames();
@@ -370,21 +350,21 @@ public class ExecutorManager extends EventHandler implements
 
   @Override
   public State getExecutorManagerThreadState() {
-    return executingManager.getState();
+    return this.executingManager.getState();
   }
 
   public String getExecutorThreadStage() {
-    return updaterStage;
+    return this.updaterStage;
   }
 
   @Override
   public boolean isExecutorManagerThreadActive() {
-    return executingManager.isAlive();
+    return this.executingManager.isAlive();
   }
 
   @Override
   public long getLastExecutorManagerThreadCheckTime() {
-    return lastThreadCheckTime;
+    return this.lastThreadCheckTime;
   }
 
   public long getLastCleanerThreadCheckTime() {
@@ -393,30 +373,29 @@ public class ExecutorManager extends EventHandler implements
 
   @Override
   public Collection<Executor> getAllActiveExecutors() {
-    return Collections.unmodifiableCollection(activeExecutors);
+    return Collections.unmodifiableCollection(this.activeExecutors);
   }
 
   /**
-   *
    * {@inheritDoc}
    *
    * @see azkaban.executor.ExecutorManagerAdapter#fetchExecutor(int)
    */
   @Override
-  public Executor fetchExecutor(int executorId) throws ExecutorManagerException {
-    for (Executor executor : activeExecutors) {
+  public Executor fetchExecutor(final int executorId) throws ExecutorManagerException {
+    for (final Executor executor : this.activeExecutors) {
       if (executor.getId() == executorId) {
         return executor;
       }
     }
-    return executorLoader.fetchExecutor(executorId);
+    return this.executorLoader.fetchExecutor(executorId);
   }
 
   @Override
   public Set<String> getPrimaryServerHosts() {
     // Only one for now. More probably later.
-    HashSet<String> ports = new HashSet<String>();
-    for (Executor executor : activeExecutors) {
+    final HashSet<String> ports = new HashSet<>();
+    for (final Executor executor : this.activeExecutors) {
       ports.add(executor.getHost() + ":" + executor.getPort());
     }
     return ports;
@@ -425,21 +404,21 @@ public class ExecutorManager extends EventHandler implements
   @Override
   public Set<String> getAllActiveExecutorServerHosts() {
     // Includes non primary server/hosts
-    HashSet<String> ports = new HashSet<String>();
-    for (Executor executor : activeExecutors) {
+    final HashSet<String> ports = new HashSet<>();
+    for (final Executor executor : this.activeExecutors) {
       ports.add(executor.getHost() + ":" + executor.getPort());
     }
     // include executor which were initially active and still has flows running
-    for (Pair<ExecutionReference, ExecutableFlow> running : runningFlows
-      .values()) {
-      ExecutionReference ref = running.getFirst();
+    for (final Pair<ExecutionReference, ExecutableFlow> running : this.runningFlows
+        .values()) {
+      final ExecutionReference ref = running.getFirst();
       ports.add(ref.getHost() + ":" + ref.getPort());
     }
     return ports;
   }
 
   private void loadRunningFlows() throws ExecutorManagerException {
-    runningFlows.putAll(executorLoader.fetchActiveFlows());
+    this.runningFlows.putAll(this.executorLoader.fetchActiveFlows());
   }
 
   /*
@@ -447,46 +426,47 @@ public class ExecutorManager extends EventHandler implements
    * any executor
    */
   private void loadQueuedFlows() throws ExecutorManagerException {
-    List<Pair<ExecutionReference, ExecutableFlow>> retrievedExecutions =
-      executorLoader.fetchQueuedFlows();
+    final List<Pair<ExecutionReference, ExecutableFlow>> retrievedExecutions =
+        this.executorLoader.fetchQueuedFlows();
     if (retrievedExecutions != null) {
-      for (Pair<ExecutionReference, ExecutableFlow> pair : retrievedExecutions) {
-        queuedFlows.enqueue(pair.getSecond(), pair.getFirst());
+      for (final Pair<ExecutionReference, ExecutableFlow> pair : retrievedExecutions) {
+        this.queuedFlows.enqueue(pair.getSecond(), pair.getFirst());
       }
     }
   }
 
   /**
-   * Gets a list of all the active (running flows and non-dispatched flows)
-   * executions for a given project and flow {@inheritDoc}. Results should
-   * be sorted as we assume this while setting up pipelined execution Id.
+   * Gets a list of all the active (running flows and non-dispatched flows) executions for a given
+   * project and flow {@inheritDoc}. Results should be sorted as we assume this while setting up
+   * pipelined execution Id.
    *
-   * @see azkaban.executor.ExecutorManagerAdapter#getRunningFlows(int,
-   *      java.lang.String)
+   * @see azkaban.executor.ExecutorManagerAdapter#getRunningFlows(int, java.lang.String)
    */
   @Override
-  public List<Integer> getRunningFlows(int projectId, String flowId) {
-    List<Integer> executionIds = new ArrayList<Integer>();
+  public List<Integer> getRunningFlows(final int projectId, final String flowId) {
+    final List<Integer> executionIds = new ArrayList<>();
     executionIds.addAll(getRunningFlowsHelper(projectId, flowId,
-      queuedFlows.getAllEntries()));
+        this.queuedFlows.getAllEntries()));
     // it's possible an execution is runningCandidate, meaning it's in dispatching state neither in queuedFlows nor runningFlows,
     // so checks the runningCandidate as well.
-    if (runningCandidate != null) {
-      executionIds.addAll(getRunningFlowsHelper(projectId, flowId, Lists.newArrayList(runningCandidate)));
+    if (this.runningCandidate != null) {
+      executionIds
+          .addAll(
+              getRunningFlowsHelper(projectId, flowId, Lists.newArrayList(this.runningCandidate)));
     }
     executionIds.addAll(getRunningFlowsHelper(projectId, flowId,
-      runningFlows.values()));
+        this.runningFlows.values()));
     Collections.sort(executionIds);
     return executionIds;
   }
 
   /* Helper method for getRunningFlows */
-  private List<Integer> getRunningFlowsHelper(int projectId, String flowId,
-    Collection<Pair<ExecutionReference, ExecutableFlow>> collection) {
-    List<Integer> executionIds = new ArrayList<Integer>();
-    for (Pair<ExecutionReference, ExecutableFlow> ref : collection) {
+  private List<Integer> getRunningFlowsHelper(final int projectId, final String flowId,
+      final Collection<Pair<ExecutionReference, ExecutableFlow>> collection) {
+    final List<Integer> executionIds = new ArrayList<>();
+    for (final Pair<ExecutionReference, ExecutableFlow> ref : collection) {
       if (ref.getSecond().getFlowId().equals(flowId)
-        && ref.getSecond().getProjectId() == projectId) {
+          && ref.getSecond().getProjectId() == projectId) {
         executionIds.add(ref.getFirst().getExecId());
       }
     }
@@ -494,56 +474,53 @@ public class ExecutorManager extends EventHandler implements
   }
 
   /**
-   *
    * {@inheritDoc}
    *
    * @see azkaban.executor.ExecutorManagerAdapter#getActiveFlowsWithExecutor()
    */
   @Override
   public List<Pair<ExecutableFlow, Executor>> getActiveFlowsWithExecutor()
-    throws IOException {
-    List<Pair<ExecutableFlow, Executor>> flows =
-      new ArrayList<Pair<ExecutableFlow, Executor>>();
-    getActiveFlowsWithExecutorHelper(flows, queuedFlows.getAllEntries());
-    getActiveFlowsWithExecutorHelper(flows, runningFlows.values());
+      throws IOException {
+    final List<Pair<ExecutableFlow, Executor>> flows =
+        new ArrayList<>();
+    getActiveFlowsWithExecutorHelper(flows, this.queuedFlows.getAllEntries());
+    getActiveFlowsWithExecutorHelper(flows, this.runningFlows.values());
     return flows;
   }
 
   /* Helper method for getActiveFlowsWithExecutor */
   private void getActiveFlowsWithExecutorHelper(
-    List<Pair<ExecutableFlow, Executor>> flows,
-    Collection<Pair<ExecutionReference, ExecutableFlow>> collection) {
-    for (Pair<ExecutionReference, ExecutableFlow> ref : collection) {
-      flows.add(new Pair<ExecutableFlow, Executor>(ref.getSecond(), ref
-        .getFirst().getExecutor()));
+      final List<Pair<ExecutableFlow, Executor>> flows,
+      final Collection<Pair<ExecutionReference, ExecutableFlow>> collection) {
+    for (final Pair<ExecutionReference, ExecutableFlow> ref : collection) {
+      flows.add(new Pair<>(ref.getSecond(), ref
+          .getFirst().getExecutor()));
     }
   }
 
   /**
-   * Checks whether the given flow has an active (running, non-dispatched)
-   * executions {@inheritDoc}
+   * Checks whether the given flow has an active (running, non-dispatched) executions {@inheritDoc}
    *
-   * @see azkaban.executor.ExecutorManagerAdapter#isFlowRunning(int,
-   *      java.lang.String)
+   * @see azkaban.executor.ExecutorManagerAdapter#isFlowRunning(int, java.lang.String)
    */
   @Override
-  public boolean isFlowRunning(int projectId, String flowId) {
+  public boolean isFlowRunning(final int projectId, final String flowId) {
     boolean isRunning = false;
     isRunning =
-      isRunning
-        || isFlowRunningHelper(projectId, flowId, queuedFlows.getAllEntries());
+        isRunning
+            || isFlowRunningHelper(projectId, flowId, this.queuedFlows.getAllEntries());
     isRunning =
-      isRunning
-        || isFlowRunningHelper(projectId, flowId, runningFlows.values());
+        isRunning
+            || isFlowRunningHelper(projectId, flowId, this.runningFlows.values());
     return isRunning;
   }
 
   /* Search a running flow in a collection */
-  private boolean isFlowRunningHelper(int projectId, String flowId,
-    Collection<Pair<ExecutionReference, ExecutableFlow>> collection) {
-    for (Pair<ExecutionReference, ExecutableFlow> ref : collection) {
+  private boolean isFlowRunningHelper(final int projectId, final String flowId,
+      final Collection<Pair<ExecutionReference, ExecutableFlow>> collection) {
+    for (final Pair<ExecutionReference, ExecutableFlow> ref : collection) {
       if (ref.getSecond().getProjectId() == projectId
-        && ref.getSecond().getFlowId().equals(flowId)) {
+          && ref.getSecond().getFlowId().equals(flowId)) {
         return true;
       }
     }
@@ -556,9 +533,9 @@ public class ExecutorManager extends EventHandler implements
    * @see azkaban.executor.ExecutorManagerAdapter#getExecutableFlow(int)
    */
   @Override
-  public ExecutableFlow getExecutableFlow(int execId)
-    throws ExecutorManagerException {
-      return executorLoader.fetchExecutableFlow(execId);
+  public ExecutableFlow getExecutableFlow(final int execId)
+      throws ExecutorManagerException {
+    return this.executorLoader.fetchExecutableFlow(execId);
   }
 
   /**
@@ -570,9 +547,9 @@ public class ExecutorManager extends EventHandler implements
    */
   @Override
   public List<ExecutableFlow> getRunningFlows() {
-    ArrayList<ExecutableFlow> flows = new ArrayList<ExecutableFlow>();
-    getActiveFlowHelper(flows, queuedFlows.getAllEntries());
-    getActiveFlowHelper(flows, runningFlows.values());
+    final ArrayList<ExecutableFlow> flows = new ArrayList<>();
+    getActiveFlowHelper(flows, this.queuedFlows.getAllEntries());
+    getActiveFlowHelper(flows, this.runningFlows.values());
     return flows;
   }
 
@@ -580,9 +557,9 @@ public class ExecutorManager extends EventHandler implements
    * Helper method to get all running flows from a Pair<ExecutionReference,
    * ExecutableFlow collection
    */
-  private void getActiveFlowHelper(ArrayList<ExecutableFlow> flows,
-    Collection<Pair<ExecutionReference, ExecutableFlow>> collection) {
-    for (Pair<ExecutionReference, ExecutableFlow> ref : collection) {
+  private void getActiveFlowHelper(final ArrayList<ExecutableFlow> flows,
+      final Collection<Pair<ExecutionReference, ExecutableFlow>> collection) {
+    for (final Pair<ExecutionReference, ExecutableFlow> ref : collection) {
       flows.add(ref.getSecond());
     }
   }
@@ -595,9 +572,9 @@ public class ExecutorManager extends EventHandler implements
    * @see azkaban.executor.ExecutorManagerAdapter#getRunningFlows()
    */
   public String getRunningFlowIds() {
-    List<Integer> allIds = new ArrayList<Integer>();
-    getRunningFlowsIdsHelper(allIds, queuedFlows.getAllEntries());
-    getRunningFlowsIdsHelper(allIds, runningFlows.values());
+    final List<Integer> allIds = new ArrayList<>();
+    getRunningFlowsIdsHelper(allIds, this.queuedFlows.getAllEntries());
+    getRunningFlowsIdsHelper(allIds, this.runningFlows.values());
     Collections.sort(allIds);
     return allIds.toString();
   }
@@ -610,21 +587,21 @@ public class ExecutorManager extends EventHandler implements
    * @see azkaban.executor.ExecutorManagerAdapter#getRunningFlows()
    */
   public String getQueuedFlowIds() {
-    List<Integer> allIds = new ArrayList<Integer>();
-    getRunningFlowsIdsHelper(allIds, queuedFlows.getAllEntries());
+    final List<Integer> allIds = new ArrayList<>();
+    getRunningFlowsIdsHelper(allIds, this.queuedFlows.getAllEntries());
     Collections.sort(allIds);
     return allIds.toString();
   }
 
 
   public long getQueuedFlowSize() {
-    return queuedFlows.size();
+    return this.queuedFlows.size();
   }
 
   /* Helper method to flow ids of all running flows */
-  private void getRunningFlowsIdsHelper(List<Integer> allIds,
-    Collection<Pair<ExecutionReference, ExecutableFlow>> collection) {
-    for (Pair<ExecutionReference, ExecutableFlow> ref : collection) {
+  private void getRunningFlowsIdsHelper(final List<Integer> allIds,
+      final Collection<Pair<ExecutionReference, ExecutableFlow>> collection) {
+    for (final Pair<ExecutionReference, ExecutableFlow> ref : collection) {
       allIds.add(ref.getSecond().getExecutionId());
     }
   }
@@ -633,9 +610,9 @@ public class ExecutorManager extends EventHandler implements
   public List<ExecutableFlow> getRecentlyFinishedFlows() {
     List<ExecutableFlow> flows = new ArrayList<>();
     try {
-      flows = executorLoader.fetchRecentlyFinishedFlows(
+      flows = this.executorLoader.fetchRecentlyFinishedFlows(
           RECENTLY_FINISHED_LIFETIME);
-    } catch(ExecutorManagerException e) {
+    } catch (final ExecutorManagerException e) {
       //Todo jamiesjc: fix error handling.
       logger.error("Failed to fetch recently finished flows.", e);
     }
@@ -643,158 +620,155 @@ public class ExecutorManager extends EventHandler implements
   }
 
   @Override
-  public List<ExecutableFlow> getExecutableFlows(Project project,
-      String flowId, int skip, int size) throws ExecutorManagerException {
-    List<ExecutableFlow> flows =
-        executorLoader.fetchFlowHistory(project.getId(), flowId, skip, size);
+  public List<ExecutableFlow> getExecutableFlows(final Project project,
+      final String flowId, final int skip, final int size) throws ExecutorManagerException {
+    final List<ExecutableFlow> flows =
+        this.executorLoader.fetchFlowHistory(project.getId(), flowId, skip, size);
     return flows;
   }
 
   @Override
-  public List<ExecutableFlow> getExecutableFlows(int skip, int size)
+  public List<ExecutableFlow> getExecutableFlows(final int skip, final int size)
       throws ExecutorManagerException {
-    List<ExecutableFlow> flows = executorLoader.fetchFlowHistory(skip, size);
+    final List<ExecutableFlow> flows = this.executorLoader.fetchFlowHistory(skip, size);
     return flows;
   }
 
   @Override
-  public List<ExecutableFlow> getExecutableFlows(String flowIdContains,
-      int skip, int size) throws ExecutorManagerException {
-    List<ExecutableFlow> flows =
-        executorLoader.fetchFlowHistory(null, '%' + flowIdContains + '%', null,
+  public List<ExecutableFlow> getExecutableFlows(final String flowIdContains,
+      final int skip, final int size) throws ExecutorManagerException {
+    final List<ExecutableFlow> flows =
+        this.executorLoader.fetchFlowHistory(null, '%' + flowIdContains + '%', null,
             0, -1, -1, skip, size);
     return flows;
   }
 
   @Override
-  public List<ExecutableFlow> getExecutableFlows(String projContain,
-      String flowContain, String userContain, int status, long begin, long end,
-      int skip, int size) throws ExecutorManagerException {
-    List<ExecutableFlow> flows =
-        executorLoader.fetchFlowHistory(projContain, flowContain, userContain,
+  public List<ExecutableFlow> getExecutableFlows(final String projContain,
+      final String flowContain, final String userContain, final int status, final long begin,
+      final long end,
+      final int skip, final int size) throws ExecutorManagerException {
+    final List<ExecutableFlow> flows =
+        this.executorLoader.fetchFlowHistory(projContain, flowContain, userContain,
             status, begin, end, skip, size);
     return flows;
   }
 
   @Override
-  public List<ExecutableJobInfo> getExecutableJobs(Project project,
-      String jobId, int skip, int size) throws ExecutorManagerException {
-    List<ExecutableJobInfo> nodes =
-        executorLoader.fetchJobHistory(project.getId(), jobId, skip, size);
+  public List<ExecutableJobInfo> getExecutableJobs(final Project project,
+      final String jobId, final int skip, final int size) throws ExecutorManagerException {
+    final List<ExecutableJobInfo> nodes =
+        this.executorLoader.fetchJobHistory(project.getId(), jobId, skip, size);
     return nodes;
   }
 
   @Override
-  public int getNumberOfJobExecutions(Project project, String jobId)
+  public int getNumberOfJobExecutions(final Project project, final String jobId)
       throws ExecutorManagerException {
-    return executorLoader.fetchNumExecutableNodes(project.getId(), jobId);
+    return this.executorLoader.fetchNumExecutableNodes(project.getId(), jobId);
   }
 
   @Override
-  public int getNumberOfExecutions(Project project, String flowId)
+  public int getNumberOfExecutions(final Project project, final String flowId)
       throws ExecutorManagerException {
-    return executorLoader.fetchNumExecutableFlows(project.getId(), flowId);
+    return this.executorLoader.fetchNumExecutableFlows(project.getId(), flowId);
   }
 
   @Override
-  public LogData getExecutableFlowLog(ExecutableFlow exFlow, int offset,
-      int length) throws ExecutorManagerException {
-    Pair<ExecutionReference, ExecutableFlow> pair =
-        runningFlows.get(exFlow.getExecutionId());
+  public LogData getExecutableFlowLog(final ExecutableFlow exFlow, final int offset,
+      final int length) throws ExecutorManagerException {
+    final Pair<ExecutionReference, ExecutableFlow> pair =
+        this.runningFlows.get(exFlow.getExecutionId());
     if (pair != null) {
-      Pair<String, String> typeParam = new Pair<String, String>("type", "flow");
-      Pair<String, String> offsetParam =
-          new Pair<String, String>("offset", String.valueOf(offset));
-      Pair<String, String> lengthParam =
-          new Pair<String, String>("length", String.valueOf(length));
-
-      @SuppressWarnings("unchecked")
-      Map<String, Object> result =
+      final Pair<String, String> typeParam = new Pair<>("type", "flow");
+      final Pair<String, String> offsetParam =
+          new Pair<>("offset", String.valueOf(offset));
+      final Pair<String, String> lengthParam =
+          new Pair<>("length", String.valueOf(length));
+
+      @SuppressWarnings("unchecked") final Map<String, Object> result =
           callExecutorServer(pair.getFirst(), ConnectorParams.LOG_ACTION,
               typeParam, offsetParam, lengthParam);
       return LogData.createLogDataFromObject(result);
     } else {
-      LogData value =
-          executorLoader.fetchLogs(exFlow.getExecutionId(), "", 0, offset,
+      final LogData value =
+          this.executorLoader.fetchLogs(exFlow.getExecutionId(), "", 0, offset,
               length);
       return value;
     }
   }
 
   @Override
-  public LogData getExecutionJobLog(ExecutableFlow exFlow, String jobId,
-      int offset, int length, int attempt) throws ExecutorManagerException {
-    Pair<ExecutionReference, ExecutableFlow> pair =
-        runningFlows.get(exFlow.getExecutionId());
+  public LogData getExecutionJobLog(final ExecutableFlow exFlow, final String jobId,
+      final int offset, final int length, final int attempt) throws ExecutorManagerException {
+    final Pair<ExecutionReference, ExecutableFlow> pair =
+        this.runningFlows.get(exFlow.getExecutionId());
     if (pair != null) {
-      Pair<String, String> typeParam = new Pair<String, String>("type", "job");
-      Pair<String, String> jobIdParam =
-          new Pair<String, String>("jobId", jobId);
-      Pair<String, String> offsetParam =
-          new Pair<String, String>("offset", String.valueOf(offset));
-      Pair<String, String> lengthParam =
-          new Pair<String, String>("length", String.valueOf(length));
-      Pair<String, String> attemptParam =
-          new Pair<String, String>("attempt", String.valueOf(attempt));
-
-      @SuppressWarnings("unchecked")
-      Map<String, Object> result =
+      final Pair<String, String> typeParam = new Pair<>("type", "job");
+      final Pair<String, String> jobIdParam =
+          new Pair<>("jobId", jobId);
+      final Pair<String, String> offsetParam =
+          new Pair<>("offset", String.valueOf(offset));
+      final Pair<String, String> lengthParam =
+          new Pair<>("length", String.valueOf(length));
+      final Pair<String, String> attemptParam =
+          new Pair<>("attempt", String.valueOf(attempt));
+
+      @SuppressWarnings("unchecked") final Map<String, Object> result =
           callExecutorServer(pair.getFirst(), ConnectorParams.LOG_ACTION,
               typeParam, jobIdParam, offsetParam, lengthParam, attemptParam);
       return LogData.createLogDataFromObject(result);
     } else {
-      LogData value =
-          executorLoader.fetchLogs(exFlow.getExecutionId(), jobId, attempt,
+      final LogData value =
+          this.executorLoader.fetchLogs(exFlow.getExecutionId(), jobId, attempt,
               offset, length);
       return value;
     }
   }
 
   @Override
-  public List<Object> getExecutionJobStats(ExecutableFlow exFlow, String jobId,
-      int attempt) throws ExecutorManagerException {
-    Pair<ExecutionReference, ExecutableFlow> pair =
-        runningFlows.get(exFlow.getExecutionId());
+  public List<Object> getExecutionJobStats(final ExecutableFlow exFlow, final String jobId,
+      final int attempt) throws ExecutorManagerException {
+    final Pair<ExecutionReference, ExecutableFlow> pair =
+        this.runningFlows.get(exFlow.getExecutionId());
     if (pair == null) {
-      return executorLoader.fetchAttachments(exFlow.getExecutionId(), jobId,
+      return this.executorLoader.fetchAttachments(exFlow.getExecutionId(), jobId,
           attempt);
     }
 
-    Pair<String, String> jobIdParam = new Pair<String, String>("jobId", jobId);
-    Pair<String, String> attemptParam =
-        new Pair<String, String>("attempt", String.valueOf(attempt));
+    final Pair<String, String> jobIdParam = new Pair<>("jobId", jobId);
+    final Pair<String, String> attemptParam =
+        new Pair<>("attempt", String.valueOf(attempt));
 
-    @SuppressWarnings("unchecked")
-    Map<String, Object> result =
+    @SuppressWarnings("unchecked") final Map<String, Object> result =
         callExecutorServer(pair.getFirst(), ConnectorParams.ATTACHMENTS_ACTION,
             jobIdParam, attemptParam);
 
-    @SuppressWarnings("unchecked")
-    List<Object> jobStats = (List<Object>) result.get("attachments");
+    @SuppressWarnings("unchecked") final List<Object> jobStats = (List<Object>) result
+        .get("attachments");
 
     return jobStats;
   }
 
   @Override
-  public JobMetaData getExecutionJobMetaData(ExecutableFlow exFlow,
-      String jobId, int offset, int length, int attempt)
+  public JobMetaData getExecutionJobMetaData(final ExecutableFlow exFlow,
+      final String jobId, final int offset, final int length, final int attempt)
       throws ExecutorManagerException {
-    Pair<ExecutionReference, ExecutableFlow> pair =
-        runningFlows.get(exFlow.getExecutionId());
+    final Pair<ExecutionReference, ExecutableFlow> pair =
+        this.runningFlows.get(exFlow.getExecutionId());
     if (pair != null) {
 
-      Pair<String, String> typeParam = new Pair<String, String>("type", "job");
-      Pair<String, String> jobIdParam =
-          new Pair<String, String>("jobId", jobId);
-      Pair<String, String> offsetParam =
-          new Pair<String, String>("offset", String.valueOf(offset));
-      Pair<String, String> lengthParam =
-          new Pair<String, String>("length", String.valueOf(length));
-      Pair<String, String> attemptParam =
-          new Pair<String, String>("attempt", String.valueOf(attempt));
-
-      @SuppressWarnings("unchecked")
-      Map<String, Object> result =
+      final Pair<String, String> typeParam = new Pair<>("type", "job");
+      final Pair<String, String> jobIdParam =
+          new Pair<>("jobId", jobId);
+      final Pair<String, String> offsetParam =
+          new Pair<>("offset", String.valueOf(offset));
+      final Pair<String, String> lengthParam =
+          new Pair<>("length", String.valueOf(length));
+      final Pair<String, String> attemptParam =
+          new Pair<>("attempt", String.valueOf(attempt));
+
+      @SuppressWarnings("unchecked") final Map<String, Object> result =
           callExecutorServer(pair.getFirst(), ConnectorParams.METADATA_ACTION,
               typeParam, jobIdParam, offsetParam, lengthParam, attemptParam);
       return JobMetaData.createJobMetaDataFromObject(result);
@@ -804,38 +778,38 @@ public class ExecutorManager extends EventHandler implements
   }
 
   /**
-   * if flows was dispatched to an executor, cancel by calling Executor else if
-   * flow is still in queue, remove from queue and finalize {@inheritDoc}
+   * if flows was dispatched to an executor, cancel by calling Executor else if flow is still in
+   * queue, remove from queue and finalize {@inheritDoc}
    *
    * @see azkaban.executor.ExecutorManagerAdapter#cancelFlow(azkaban.executor.ExecutableFlow,
-   *      java.lang.String)
+   * java.lang.String)
    */
   @Override
-  public void cancelFlow(ExecutableFlow exFlow, String userId)
-    throws ExecutorManagerException {
+  public void cancelFlow(final ExecutableFlow exFlow, final String userId)
+      throws ExecutorManagerException {
     synchronized (exFlow) {
-      if (runningFlows.containsKey(exFlow.getExecutionId())) {
-        Pair<ExecutionReference, ExecutableFlow> pair =
-          runningFlows.get(exFlow.getExecutionId());
+      if (this.runningFlows.containsKey(exFlow.getExecutionId())) {
+        final Pair<ExecutionReference, ExecutableFlow> pair =
+            this.runningFlows.get(exFlow.getExecutionId());
         callExecutorServer(pair.getFirst(), ConnectorParams.CANCEL_ACTION,
-          userId);
-      } else if (queuedFlows.hasExecution(exFlow.getExecutionId())) {
-        queuedFlows.dequeue(exFlow.getExecutionId());
+            userId);
+      } else if (this.queuedFlows.hasExecution(exFlow.getExecutionId())) {
+        this.queuedFlows.dequeue(exFlow.getExecutionId());
         finalizeFlows(exFlow);
       } else {
         throw new ExecutorManagerException("Execution "
-          + exFlow.getExecutionId() + " of flow " + exFlow.getFlowId()
-          + " isn't running.");
+            + exFlow.getExecutionId() + " of flow " + exFlow.getFlowId()
+            + " isn't running.");
       }
     }
   }
 
   @Override
-  public void resumeFlow(ExecutableFlow exFlow, String userId)
+  public void resumeFlow(final ExecutableFlow exFlow, final String userId)
       throws ExecutorManagerException {
     synchronized (exFlow) {
-      Pair<ExecutionReference, ExecutableFlow> pair =
-          runningFlows.get(exFlow.getExecutionId());
+      final Pair<ExecutionReference, ExecutableFlow> pair =
+          this.runningFlows.get(exFlow.getExecutionId());
       if (pair == null) {
         throw new ExecutorManagerException("Execution "
             + exFlow.getExecutionId() + " of flow " + exFlow.getFlowId()
@@ -846,11 +820,11 @@ public class ExecutorManager extends EventHandler implements
   }
 
   @Override
-  public void pauseFlow(ExecutableFlow exFlow, String userId)
+  public void pauseFlow(final ExecutableFlow exFlow, final String userId)
       throws ExecutorManagerException {
     synchronized (exFlow) {
-      Pair<ExecutionReference, ExecutableFlow> pair =
-          runningFlows.get(exFlow.getExecutionId());
+      final Pair<ExecutionReference, ExecutableFlow> pair =
+          this.runningFlows.get(exFlow.getExecutionId());
       if (pair == null) {
         throw new ExecutorManagerException("Execution "
             + exFlow.getExecutionId() + " of flow " + exFlow.getFlowId()
@@ -861,60 +835,60 @@ public class ExecutorManager extends EventHandler implements
   }
 
   @Override
-  public void pauseExecutingJobs(ExecutableFlow exFlow, String userId,
-      String... jobIds) throws ExecutorManagerException {
+  public void pauseExecutingJobs(final ExecutableFlow exFlow, final String userId,
+      final String... jobIds) throws ExecutorManagerException {
     modifyExecutingJobs(exFlow, ConnectorParams.MODIFY_PAUSE_JOBS, userId,
         jobIds);
   }
 
   @Override
-  public void resumeExecutingJobs(ExecutableFlow exFlow, String userId,
-      String... jobIds) throws ExecutorManagerException {
+  public void resumeExecutingJobs(final ExecutableFlow exFlow, final String userId,
+      final String... jobIds) throws ExecutorManagerException {
     modifyExecutingJobs(exFlow, ConnectorParams.MODIFY_RESUME_JOBS, userId,
         jobIds);
   }
 
   @Override
-  public void retryFailures(ExecutableFlow exFlow, String userId)
+  public void retryFailures(final ExecutableFlow exFlow, final String userId)
       throws ExecutorManagerException {
     modifyExecutingJobs(exFlow, ConnectorParams.MODIFY_RETRY_FAILURES, userId);
   }
 
   @Override
-  public void retryExecutingJobs(ExecutableFlow exFlow, String userId,
-      String... jobIds) throws ExecutorManagerException {
+  public void retryExecutingJobs(final ExecutableFlow exFlow, final String userId,
+      final String... jobIds) throws ExecutorManagerException {
     modifyExecutingJobs(exFlow, ConnectorParams.MODIFY_RETRY_JOBS, userId,
         jobIds);
   }
 
   @Override
-  public void disableExecutingJobs(ExecutableFlow exFlow, String userId,
-      String... jobIds) throws ExecutorManagerException {
+  public void disableExecutingJobs(final ExecutableFlow exFlow, final String userId,
+      final String... jobIds) throws ExecutorManagerException {
     modifyExecutingJobs(exFlow, ConnectorParams.MODIFY_DISABLE_JOBS, userId,
         jobIds);
   }
 
   @Override
-  public void enableExecutingJobs(ExecutableFlow exFlow, String userId,
-      String... jobIds) throws ExecutorManagerException {
+  public void enableExecutingJobs(final ExecutableFlow exFlow, final String userId,
+      final String... jobIds) throws ExecutorManagerException {
     modifyExecutingJobs(exFlow, ConnectorParams.MODIFY_ENABLE_JOBS, userId,
         jobIds);
   }
 
   @Override
-  public void cancelExecutingJobs(ExecutableFlow exFlow, String userId,
-      String... jobIds) throws ExecutorManagerException {
+  public void cancelExecutingJobs(final ExecutableFlow exFlow, final String userId,
+      final String... jobIds) throws ExecutorManagerException {
     modifyExecutingJobs(exFlow, ConnectorParams.MODIFY_CANCEL_JOBS, userId,
         jobIds);
   }
 
   @SuppressWarnings("unchecked")
-  private Map<String, Object> modifyExecutingJobs(ExecutableFlow exFlow,
-      String command, String userId, String... jobIds)
+  private Map<String, Object> modifyExecutingJobs(final ExecutableFlow exFlow,
+      final String command, final String userId, final String... jobIds)
       throws ExecutorManagerException {
     synchronized (exFlow) {
-      Pair<ExecutionReference, ExecutableFlow> pair =
-          runningFlows.get(exFlow.getExecutionId());
+      final Pair<ExecutionReference, ExecutableFlow> pair =
+          this.runningFlows.get(exFlow.getExecutionId());
       if (pair == null) {
         throw new ExecutorManagerException("Execution "
             + exFlow.getExecutionId() + " of flow " + exFlow.getFlowId()
@@ -923,9 +897,9 @@ public class ExecutorManager extends EventHandler implements
 
       Map<String, Object> response = null;
       if (jobIds != null && jobIds.length > 0) {
-        for (String jobId : jobIds) {
+        for (final String jobId : jobIds) {
           if (!jobId.isEmpty()) {
-            ExecutableNode node = exFlow.getExecutableNode(jobId);
+            final ExecutableNode node = exFlow.getExecutableNode(jobId);
             if (node == null) {
               throw new ExecutorManagerException("Job " + jobId
                   + " doesn't exist in execution " + exFlow.getExecutionId()
@@ -933,18 +907,18 @@ public class ExecutorManager extends EventHandler implements
             }
           }
         }
-        String ids = StringUtils.join(jobIds, ',');
+        final String ids = StringUtils.join(jobIds, ',');
         response =
             callExecutorServer(pair.getFirst(),
                 ConnectorParams.MODIFY_EXECUTION_ACTION, userId,
-                new Pair<String, String>(
+                new Pair<>(
                     ConnectorParams.MODIFY_EXECUTION_ACTION_TYPE, command),
-                new Pair<String, String>(ConnectorParams.MODIFY_JOBS_LIST, ids));
+                new Pair<>(ConnectorParams.MODIFY_JOBS_LIST, ids));
       } else {
         response =
             callExecutorServer(pair.getFirst(),
                 ConnectorParams.MODIFY_EXECUTION_ACTION, userId,
-                new Pair<String, String>(
+                new Pair<>(
                     ConnectorParams.MODIFY_EXECUTION_ACTION_TYPE, command));
       }
 
@@ -953,31 +927,31 @@ public class ExecutorManager extends EventHandler implements
   }
 
   @Override
-  public String submitExecutableFlow(ExecutableFlow exflow, String userId)
-    throws ExecutorManagerException {
+  public String submitExecutableFlow(final ExecutableFlow exflow, final String userId)
+      throws ExecutorManagerException {
 
-    String exFlowKey = exflow.getProjectName() + "." + exflow.getId() + ".submitFlow";
+    final String exFlowKey = exflow.getProjectName() + "." + exflow.getId() + ".submitFlow";
     // using project and flow name to prevent race condition when same flow is submitted by API and schedule at the same time
     // causing two same flow submission entering this piece.
     synchronized (exFlowKey.intern()) {
-      String flowId = exflow.getFlowId();
+      final String flowId = exflow.getFlowId();
 
       logger.info("Submitting execution flow " + flowId + " by " + userId);
 
       String message = "";
-      if (queuedFlows.isFull()) {
+      if (this.queuedFlows.isFull()) {
         message =
-          String
-            .format(
-              "Failed to submit %s for project %s. Azkaban has overrun its webserver queue capacity",
-              flowId, exflow.getProjectName());
+            String
+                .format(
+                    "Failed to submit %s for project %s. Azkaban has overrun its webserver queue capacity",
+                    flowId, exflow.getProjectName());
         logger.error(message);
       } else {
-        int projectId = exflow.getProjectId();
+        final int projectId = exflow.getProjectId();
         exflow.setSubmitUser(userId);
         exflow.setSubmitTime(System.currentTimeMillis());
 
-        List<Integer> running = getRunningFlows(projectId, flowId);
+        final List<Integer> running = getRunningFlows(projectId, flowId);
 
         ExecutionOptions options = exflow.getExecutionOptions();
         if (options == null) {
@@ -990,55 +964,55 @@ public class ExecutorManager extends EventHandler implements
 
         if (!running.isEmpty()) {
           if (options.getConcurrentOption().equals(
-            ExecutionOptions.CONCURRENT_OPTION_PIPELINE)) {
+              ExecutionOptions.CONCURRENT_OPTION_PIPELINE)) {
             Collections.sort(running);
-            Integer runningExecId = running.get(running.size() - 1);
+            final Integer runningExecId = running.get(running.size() - 1);
 
             options.setPipelineExecutionId(runningExecId);
             message =
-              "Flow " + flowId + " is already running with exec id "
-                + runningExecId + ". Pipelining level "
-                + options.getPipelineLevel() + ". \n";
+                "Flow " + flowId + " is already running with exec id "
+                    + runningExecId + ". Pipelining level "
+                    + options.getPipelineLevel() + ". \n";
           } else if (options.getConcurrentOption().equals(
-            ExecutionOptions.CONCURRENT_OPTION_SKIP)) {
+              ExecutionOptions.CONCURRENT_OPTION_SKIP)) {
             throw new ExecutorManagerException("Flow " + flowId
-              + " is already running. Skipping execution.",
-              ExecutorManagerException.Reason.SkippedExecution);
+                + " is already running. Skipping execution.",
+                ExecutorManagerException.Reason.SkippedExecution);
           } else {
             // The settings is to run anyways.
             message =
-              "Flow " + flowId + " is already running with exec id "
-                + StringUtils.join(running, ",")
-                + ". Will execute concurrently. \n";
+                "Flow " + flowId + " is already running with exec id "
+                    + StringUtils.join(running, ",")
+                    + ". Will execute concurrently. \n";
           }
         }
 
-        boolean memoryCheck =
-          !ProjectWhitelist.isProjectWhitelisted(exflow.getProjectId(),
-            ProjectWhitelist.WhitelistType.MemoryCheck);
+        final boolean memoryCheck =
+            !ProjectWhitelist.isProjectWhitelisted(exflow.getProjectId(),
+                ProjectWhitelist.WhitelistType.MemoryCheck);
         options.setMemoryCheck(memoryCheck);
 
         // The exflow id is set by the loader. So it's unavailable until after
         // this call.
-        executorLoader.uploadExecutableFlow(exflow);
+        this.executorLoader.uploadExecutableFlow(exflow);
 
         // We create an active flow reference in the datastore. If the upload
         // fails, we remove the reference.
-        ExecutionReference reference =
-          new ExecutionReference(exflow.getExecutionId());
+        final ExecutionReference reference =
+            new ExecutionReference(exflow.getExecutionId());
 
         if (isMultiExecutorMode()) {
           //Take MultiExecutor route
-          executorLoader.addActiveExecutableReference(reference);
-          queuedFlows.enqueue(exflow, reference);
+          this.executorLoader.addActiveExecutableReference(reference);
+          this.queuedFlows.enqueue(exflow, reference);
         } else {
           // assign only local executor we have
-          Executor choosenExecutor = activeExecutors.iterator().next();
-          executorLoader.addActiveExecutableReference(reference);
+          final Executor choosenExecutor = this.activeExecutors.iterator().next();
+          this.executorLoader.addActiveExecutableReference(reference);
           try {
             dispatch(reference, exflow, choosenExecutor);
             this.commonMetrics.markDispatchSuccess();
-          } catch (ExecutorManagerException e) {
+          } catch (final ExecutorManagerException e) {
             // When flow dispatch fails, should update the flow status
             // to FAILED in execution_flows DB table as well. Currently
             // this logic is only implemented in multiExecutorMode but
@@ -1049,84 +1023,86 @@ public class ExecutorManager extends EventHandler implements
           }
         }
         message +=
-          "Execution submitted successfully with exec id "
-            + exflow.getExecutionId();
+            "Execution submitted successfully with exec id "
+                + exflow.getExecutionId();
       }
       return message;
     }
   }
 
-  private void cleanOldExecutionLogs(long millis) {
-    long beforeDeleteLogsTimestamp = System.currentTimeMillis();
+  private void cleanOldExecutionLogs(final long millis) {
+    final long beforeDeleteLogsTimestamp = System.currentTimeMillis();
     try {
-      int count = executorLoader.removeExecutionLogsByTime(millis);
+      final int count = this.executorLoader.removeExecutionLogsByTime(millis);
       logger.info("Cleaned up " + count + " log entries.");
-    } catch (ExecutorManagerException e) {
+    } catch (final ExecutorManagerException e) {
       logger.error("log clean up failed. ", e);
     }
-    logger.info("log clean up time: "  + (System.currentTimeMillis() - beforeDeleteLogsTimestamp)/1000 + " seconds.");
+    logger.info(
+        "log clean up time: " + (System.currentTimeMillis() - beforeDeleteLogsTimestamp) / 1000
+            + " seconds.");
   }
 
-  private Map<String, Object> callExecutorServer(ExecutableFlow exflow,
-    Executor executor, String action) throws ExecutorManagerException {
+  private Map<String, Object> callExecutorServer(final ExecutableFlow exflow,
+      final Executor executor, final String action) throws ExecutorManagerException {
     try {
       return callExecutorServer(executor.getHost(), executor.getPort(), action,
-        exflow.getExecutionId(), null, (Pair<String, String>[]) null);
-    } catch (IOException e) {
+          exflow.getExecutionId(), null, (Pair<String, String>[]) null);
+    } catch (final IOException e) {
       throw new ExecutorManagerException(e);
     }
   }
 
-  private Map<String, Object> callExecutorServer(ExecutionReference ref,
-      String action, String user) throws ExecutorManagerException {
+  private Map<String, Object> callExecutorServer(final ExecutionReference ref,
+      final String action, final String user) throws ExecutorManagerException {
     try {
       return callExecutorServer(ref.getHost(), ref.getPort(), action,
           ref.getExecId(), user, (Pair<String, String>[]) null);
-    } catch (IOException e) {
+    } catch (final IOException e) {
       throw new ExecutorManagerException(e);
     }
   }
 
-  private Map<String, Object> callExecutorServer(ExecutionReference ref,
-      String action, Pair<String, String>... params)
+  private Map<String, Object> callExecutorServer(final ExecutionReference ref,
+      final String action, final Pair<String, String>... params)
       throws ExecutorManagerException {
     try {
       return callExecutorServer(ref.getHost(), ref.getPort(), action,
           ref.getExecId(), null, params);
-    } catch (IOException e) {
+    } catch (final IOException e) {
       throw new ExecutorManagerException(e);
     }
   }
 
-  private Map<String, Object> callExecutorServer(ExecutionReference ref,
-      String action, String user, Pair<String, String>... params)
+  private Map<String, Object> callExecutorServer(final ExecutionReference ref,
+      final String action, final String user, final Pair<String, String>... params)
       throws ExecutorManagerException {
     try {
       return callExecutorServer(ref.getHost(), ref.getPort(), action,
           ref.getExecId(), user, params);
-    } catch (IOException e) {
+    } catch (final IOException e) {
       throw new ExecutorManagerException(e);
     }
   }
 
-  private Map<String, Object> callExecutorServer(String host, int port,
-      String action, Integer executionId, String user,
-      Pair<String, String>... params) throws IOException {
-    List<Pair<String, String>> paramList = new ArrayList<Pair<String,String>>();
+  private Map<String, Object> callExecutorServer(final String host, final int port,
+      final String action, final Integer executionId, final String user,
+      final Pair<String, String>... params) throws IOException {
+    final List<Pair<String, String>> paramList = new ArrayList<>();
 
     // if params = null
-    if(params != null) {
+    if (params != null) {
       paramList.addAll(Arrays.asList(params));
     }
 
     paramList
-      .add(new Pair<String, String>(ConnectorParams.ACTION_PARAM, action));
-    paramList.add(new Pair<String, String>(ConnectorParams.EXECID_PARAM, String
-      .valueOf(executionId)));
-    paramList.add(new Pair<String, String>(ConnectorParams.USER_PARAM, user));
+        .add(new Pair<>(ConnectorParams.ACTION_PARAM, action));
+    paramList.add(new Pair<>(ConnectorParams.EXECID_PARAM, String
+        .valueOf(executionId)));
+    paramList.add(new Pair<>(ConnectorParams.USER_PARAM, user));
 
-    Map<String, Object> jsonResponse =
-      callExecutorForJsonObject(host, port, "/executor", paramList);
+    final Map<String, Object> jsonResponse =
+        callExecutorForJsonObject(host, port, "/executor", paramList);
 
     return jsonResponse;
   }
@@ -1135,15 +1111,14 @@ public class ExecutorManager extends EventHandler implements
    * Helper method used by ExecutorManager to call executor and return json
    * object map
    */
-  private Map<String, Object> callExecutorForJsonObject(String host, int port,
-    String path, List<Pair<String, String>> paramList) throws IOException {
-    String responseString =
-      callExecutorForJsonString(host, port, path, paramList);
-
-    @SuppressWarnings("unchecked")
-    Map<String, Object> jsonResponse =
-      (Map<String, Object>) JSONUtils.parseJSONFromString(responseString);
-    String error = (String) jsonResponse.get(ConnectorParams.RESPONSE_ERROR);
+  private Map<String, Object> callExecutorForJsonObject(final String host, final int port,
+      final String path, final List<Pair<String, String>> paramList) throws IOException {
+    final String responseString =
+        callExecutorForJsonString(host, port, path, paramList);
+
+    @SuppressWarnings("unchecked") final Map<String, Object> jsonResponse =
+        (Map<String, Object>) JSONUtils.parseJSONFromString(responseString);
+    final String error = (String) jsonResponse.get(ConnectorParams.RESPONSE_ERROR);
     if (error != null) {
       throw new IOException(error);
     }
@@ -1154,37 +1129,33 @@ public class ExecutorManager extends EventHandler implements
    * Helper method used by ExecutorManager to call executor and return raw json
    * string
    */
-  private String callExecutorForJsonString(String host, int port, String path,
-    List<Pair<String, String>> paramList) throws IOException {
+  private String callExecutorForJsonString(final String host, final int port, final String path,
+      List<Pair<String, String>> paramList) throws IOException {
     if (paramList == null) {
-      paramList = new ArrayList<Pair<String, String>>();
+      paramList = new ArrayList<>();
     }
 
-    ExecutorApiClient apiclient = ExecutorApiClient.getInstance();
-    @SuppressWarnings("unchecked")
-    URI uri =
-      ExecutorApiClient.buildUri(host, port, path, true,
-        paramList.toArray(new Pair[0]));
+    final ExecutorApiClient apiclient = ExecutorApiClient.getInstance();
+    @SuppressWarnings("unchecked") final URI uri =
+        ExecutorApiClient.buildUri(host, port, path, true,
+            paramList.toArray(new Pair[0]));
 
     return apiclient.httpGet(uri, null);
   }
 
   /**
-   * Manage servlet call for stats servlet in Azkaban execution server
-   * {@inheritDoc}
-   *
-   * @throws ExecutorManagerException
+   * Manage servlet call for stats servlet in Azkaban execution server {@inheritDoc}
    *
    * @see azkaban.executor.ExecutorManagerAdapter#callExecutorStats(java.lang.String,
-   *      azkaban.utils.Pair[])
+   * azkaban.utils.Pair[])
    */
   @Override
-  public Map<String, Object> callExecutorStats(int executorId, String action,
-    Pair<String, String>... params) throws IOException, ExecutorManagerException {
-    Executor executor = fetchExecutor(executorId);
+  public Map<String, Object> callExecutorStats(final int executorId, final String action,
+      final Pair<String, String>... params) throws IOException, ExecutorManagerException {
+    final Executor executor = fetchExecutor(executorId);
 
-    List<Pair<String, String>> paramList =
-      new ArrayList<Pair<String, String>>();
+    final List<Pair<String, String>> paramList =
+        new ArrayList<>();
 
     // if params = null
     if (params != null) {
@@ -1192,220 +1163,72 @@ public class ExecutorManager extends EventHandler implements
     }
 
     paramList
-      .add(new Pair<String, String>(ConnectorParams.ACTION_PARAM, action));
+        .add(new Pair<>(ConnectorParams.ACTION_PARAM, action));
 
     return callExecutorForJsonObject(executor.getHost(), executor.getPort(),
-      "/stats", paramList);
+        "/stats", paramList);
   }
 
 
   @Override
-  public Map<String, Object> callExecutorJMX(String hostPort, String action,
-      String mBean) throws IOException {
-    List<Pair<String, String>> paramList =
-      new ArrayList<Pair<String, String>>();
+  public Map<String, Object> callExecutorJMX(final String hostPort, final String action,
+      final String mBean) throws IOException {
+    final List<Pair<String, String>> paramList =
+        new ArrayList<>();
 
-    paramList.add(new Pair<String, String>(action, ""));
-    if(mBean != null) {
-      paramList.add(new Pair<String, String>(ConnectorParams.JMX_MBEAN, mBean));
+    paramList.add(new Pair<>(action, ""));
+    if (mBean != null) {
+      paramList.add(new Pair<>(ConnectorParams.JMX_MBEAN, mBean));
     }
 
-    String[] hostPortSplit = hostPort.split(":");
+    final String[] hostPortSplit = hostPort.split(":");
     return callExecutorForJsonObject(hostPortSplit[0],
-      Integer.valueOf(hostPortSplit[1]), "/jmx", paramList);
+        Integer.valueOf(hostPortSplit[1]), "/jmx", paramList);
   }
 
   @Override
   public void shutdown() {
     if (isMultiExecutorMode()) {
-      queueProcessor.shutdown();
+      this.queueProcessor.shutdown();
     }
-    executingManager.shutdown();
+    this.executingManager.shutdown();
   }
 
-  private class ExecutingManagerUpdaterThread extends Thread {
-    private boolean shutdown = false;
-
-    public ExecutingManagerUpdaterThread() {
-      this.setName("ExecutorManagerUpdaterThread");
-    }
-
-    private int waitTimeIdleMs = 2000;
-    private int waitTimeMs = 500;
-
-    // When we have an http error, for that flow, we'll check every 10 secs, 6
-    // times (1 mins) before we evict.
-    private int numErrors = 6;
-    private long errorThreshold = 10000;
-
-    private void shutdown() {
-      shutdown = true;
-    }
-
-    @Override
-    @SuppressWarnings("unchecked")
-    public void run() {
-      while (!shutdown) {
-        try {
-          lastThreadCheckTime = System.currentTimeMillis();
-          updaterStage = "Starting update all flows.";
-
-          Map<Executor, List<ExecutableFlow>> exFlowMap =
-              getFlowToExecutorMap();
-          ArrayList<ExecutableFlow> finishedFlows =
-              new ArrayList<ExecutableFlow>();
-          ArrayList<ExecutableFlow> finalizeFlows =
-              new ArrayList<ExecutableFlow>();
-
-          if (exFlowMap.size() > 0) {
-            for (Map.Entry<Executor, List<ExecutableFlow>> entry : exFlowMap
-                .entrySet()) {
-              List<Long> updateTimesList = new ArrayList<Long>();
-              List<Integer> executionIdsList = new ArrayList<Integer>();
-
-              Executor executor = entry.getKey();
-
-              updaterStage =
-                  "Starting update flows on " + executor.getHost() + ":"
-                      + executor.getPort();
-
-              // We pack the parameters of the same host together before we
-              // query.
-              fillUpdateTimeAndExecId(entry.getValue(), executionIdsList,
-                  updateTimesList);
-
-              Pair<String, String> updateTimes =
-                  new Pair<String, String>(
-                      ConnectorParams.UPDATE_TIME_LIST_PARAM,
-                      JSONUtils.toJSON(updateTimesList));
-              Pair<String, String> executionIds =
-                  new Pair<String, String>(ConnectorParams.EXEC_ID_LIST_PARAM,
-                      JSONUtils.toJSON(executionIdsList));
-
-              Map<String, Object> results = null;
-              try {
-                results =
-                    callExecutorServer(executor.getHost(),
-                      executor.getPort(), ConnectorParams.UPDATE_ACTION,
-                        null, null, executionIds, updateTimes);
-              } catch (IOException e) {
-                logger.error(e);
-                for (ExecutableFlow flow : entry.getValue()) {
-                  Pair<ExecutionReference, ExecutableFlow> pair =
-                      runningFlows.get(flow.getExecutionId());
-
-                  updaterStage =
-                      "Failed to get update. Doing some clean up for flow "
-                          + pair.getSecond().getExecutionId();
-
-                  if (pair != null) {
-                    ExecutionReference ref = pair.getFirst();
-                    int numErrors = ref.getNumErrors();
-                    if (ref.getNumErrors() < this.numErrors) {
-                      ref.setNextCheckTime(System.currentTimeMillis()
-                          + errorThreshold);
-                      ref.setNumErrors(++numErrors);
-                    } else {
-                      logger.error("Evicting flow " + flow.getExecutionId()
-                          + ". The executor is unresponsive.");
-                      // TODO should send out an unresponsive email here.
-                      finalizeFlows.add(pair.getSecond());
-                    }
-                  }
-                }
-              }
-
-              // We gets results
-              if (results != null) {
-                List<Map<String, Object>> executionUpdates =
-                    (List<Map<String, Object>>) results
-                        .get(ConnectorParams.RESPONSE_UPDATED_FLOWS);
-                for (Map<String, Object> updateMap : executionUpdates) {
-                  try {
-                    ExecutableFlow flow = updateExecution(updateMap);
-
-                    updaterStage = "Updated flow " + flow.getExecutionId();
-
-                    if (isFinished(flow)) {
-                      finishedFlows.add(flow);
-                      finalizeFlows.add(flow);
-                    }
-                  } catch (ExecutorManagerException e) {
-                    ExecutableFlow flow = e.getExecutableFlow();
-                    logger.error(e);
-
-                    if (flow != null) {
-                      logger.error("Finalizing flow " + flow.getExecutionId());
-                      finalizeFlows.add(flow);
-                    }
-                  }
-                }
-              }
-            }
+  private void finalizeFlows(final ExecutableFlow flow) {
 
-            updaterStage =
-                "Finalizing " + finalizeFlows.size() + " error flows.";
-
-            // Kill error flows
-            for (ExecutableFlow flow : finalizeFlows) {
-              finalizeFlows(flow);
-            }
-          }
-
-          updaterStage = "Updated all active flows. Waiting for next round.";
-
-          synchronized (this) {
-            try {
-              if (runningFlows.size() > 0) {
-                this.wait(waitTimeMs);
-              } else {
-                this.wait(waitTimeIdleMs);
-              }
-            } catch (InterruptedException e) {
-            }
-          }
-        } catch (Exception e) {
-          logger.error(e);
-        }
-      }
-    }
-  }
-
-  private void finalizeFlows(ExecutableFlow flow) {
-
-    int execId = flow.getExecutionId();
+    final int execId = flow.getExecutionId();
     boolean alertUser = true;
-    updaterStage = "finalizing flow " + execId;
+    this.updaterStage = "finalizing flow " + execId;
     // First we check if the execution in the datastore is complete
     try {
-      ExecutableFlow dsFlow;
+      final ExecutableFlow dsFlow;
       if (isFinished(flow)) {
         dsFlow = flow;
       } else {
-        updaterStage = "finalizing flow " + execId + " loading from db";
-        dsFlow = executorLoader.fetchExecutableFlow(execId);
+        this.updaterStage = "finalizing flow " + execId + " loading from db";
+        dsFlow = this.executorLoader.fetchExecutableFlow(execId);
 
         // If it's marked finished, we're good. If not, we fail everything and
         // then mark it finished.
         if (!isFinished(dsFlow)) {
-          updaterStage = "finalizing flow " + execId + " failing the flow";
+          this.updaterStage = "finalizing flow " + execId + " failing the flow";
           failEverything(dsFlow);
-          executorLoader.updateExecutableFlow(dsFlow);
+          this.executorLoader.updateExecutableFlow(dsFlow);
         }
       }
 
-      updaterStage = "finalizing flow " + execId + " deleting active reference";
+      this.updaterStage = "finalizing flow " + execId + " deleting active reference";
 
       // Delete the executing reference.
       if (flow.getEndTime() == -1) {
         flow.setEndTime(System.currentTimeMillis());
-        executorLoader.updateExecutableFlow(dsFlow);
+        this.executorLoader.updateExecutableFlow(dsFlow);
       }
-      executorLoader.removeActiveExecutableReference(execId);
+      this.executorLoader.removeActiveExecutableReference(execId);
 
-      updaterStage = "finalizing flow " + execId + " cleaning from memory";
-      runningFlows.remove(execId);
-    } catch (ExecutorManagerException e) {
+      this.updaterStage = "finalizing flow " + execId + " cleaning from memory";
+      this.runningFlows.remove(execId);
+    } catch (final ExecutorManagerException e) {
       alertUser = false; // failed due to azkaban internal error, not to alert user
       logger.error(e);
     }
@@ -1414,26 +1237,26 @@ public class ExecutorManager extends EventHandler implements
     // target no longer had
     // the reference.
 
-    updaterStage = "finalizing flow " + execId + " alerting and emailing";
-    if(alertUser) {
-      ExecutionOptions options = flow.getExecutionOptions();
+    this.updaterStage = "finalizing flow " + execId + " alerting and emailing";
+    if (alertUser) {
+      final ExecutionOptions options = flow.getExecutionOptions();
       // But we can definitely email them.
-      Alerter mailAlerter = alerterHolder.get("email");
+      final Alerter mailAlerter = this.alerterHolder.get("email");
       if (flow.getStatus() == Status.FAILED || flow.getStatus() == Status.KILLED) {
         if (options.getFailureEmails() != null && !options.getFailureEmails().isEmpty()) {
           try {
             mailAlerter.alertOnError(flow);
-          } catch (Exception e) {
+          } catch (final Exception e) {
             logger.error(e);
           }
         }
         if (options.getFlowParameters().containsKey("alert.type")) {
-          String alertType = options.getFlowParameters().get("alert.type");
-          Alerter alerter = alerterHolder.get(alertType);
+          final String alertType = options.getFlowParameters().get("alert.type");
+          final Alerter alerter = this.alerterHolder.get(alertType);
           if (alerter != null) {
             try {
               alerter.alertOnError(flow);
-            } catch (Exception e) {
+            } catch (final Exception e) {
               // TODO Auto-generated catch block
               e.printStackTrace();
               logger.error("Failed to alert by " + alertType);
@@ -1447,17 +1270,17 @@ public class ExecutorManager extends EventHandler implements
           try {
 
             mailAlerter.alertOnSuccess(flow);
-          } catch (Exception e) {
+          } catch (final Exception e) {
             logger.error(e);
           }
         }
         if (options.getFlowParameters().containsKey("alert.type")) {
-          String alertType = options.getFlowParameters().get("alert.type");
-          Alerter alerter = alerterHolder.get(alertType);
+          final String alertType = options.getFlowParameters().get("alert.type");
+          final Alerter alerter = this.alerterHolder.get(alertType);
           if (alerter != null) {
             try {
               alerter.alertOnSuccess(flow);
-            } catch (Exception e) {
+            } catch (final Exception e) {
               // TODO Auto-generated catch block
               e.printStackTrace();
               logger.error("Failed to alert by " + alertType);
@@ -1471,23 +1294,23 @@ public class ExecutorManager extends EventHandler implements
 
   }
 
-  private void failEverything(ExecutableFlow exFlow) {
-    long time = System.currentTimeMillis();
-    for (ExecutableNode node : exFlow.getExecutableNodes()) {
+  private void failEverything(final ExecutableFlow exFlow) {
+    final long time = System.currentTimeMillis();
+    for (final ExecutableNode node : exFlow.getExecutableNodes()) {
       switch (node.getStatus()) {
-      case SUCCEEDED:
-      case FAILED:
-      case KILLED:
-      case SKIPPED:
-      case DISABLED:
-        continue;
-        // case UNKNOWN:
-      case READY:
-        node.setStatus(Status.KILLED);
-        break;
-      default:
-        node.setStatus(Status.FAILED);
-        break;
+        case SUCCEEDED:
+        case FAILED:
+        case KILLED:
+        case SKIPPED:
+        case DISABLED:
+          continue;
+          // case UNKNOWN:
+        case READY:
+          node.setStatus(Status.KILLED);
+          break;
+        default:
+          node.setStatus(Status.FAILED);
+          break;
       }
 
       if (node.getStartTime() == -1) {
@@ -1505,25 +1328,25 @@ public class ExecutorManager extends EventHandler implements
     exFlow.setStatus(Status.FAILED);
   }
 
-  private ExecutableFlow updateExecution(Map<String, Object> updateData)
+  private ExecutableFlow updateExecution(final Map<String, Object> updateData)
       throws ExecutorManagerException {
 
-    Integer execId =
+    final Integer execId =
         (Integer) updateData.get(ConnectorParams.UPDATE_MAP_EXEC_ID);
     if (execId == null) {
       throw new ExecutorManagerException(
           "Response is malformed. Need exec id to update.");
     }
 
-    Pair<ExecutionReference, ExecutableFlow> refPair =
+    final Pair<ExecutionReference, ExecutableFlow> refPair =
         this.runningFlows.get(execId);
     if (refPair == null) {
       throw new ExecutorManagerException(
           "No running flow found with the execution id. Removing " + execId);
     }
 
-    ExecutionReference ref = refPair.getFirst();
-    ExecutableFlow flow = refPair.getSecond();
+    final ExecutionReference ref = refPair.getFirst();
+    final ExecutableFlow flow = refPair.getSecond();
     if (updateData.containsKey("error")) {
       // The flow should be finished here.
       throw new ExecutorManagerException((String) updateData.get("error"), flow);
@@ -1532,33 +1355,33 @@ public class ExecutorManager extends EventHandler implements
     // Reset errors.
     ref.setNextCheckTime(0);
     ref.setNumErrors(0);
-    Status oldStatus = flow.getStatus();
+    final Status oldStatus = flow.getStatus();
     flow.applyUpdateObject(updateData);
-    Status newStatus = flow.getStatus();
+    final Status newStatus = flow.getStatus();
 
-    if(oldStatus != newStatus && newStatus == Status.FAILED) {
+    if (oldStatus != newStatus && newStatus == Status.FAILED) {
       this.commonMetrics.markFlowFail();
     }
 
-    ExecutionOptions options = flow.getExecutionOptions();
+    final ExecutionOptions options = flow.getExecutionOptions();
     if (oldStatus != newStatus && newStatus.equals(Status.FAILED_FINISHING)) {
       // We want to see if we should give an email status on first failure.
       if (options.getNotifyOnFirstFailure()) {
-        Alerter mailAlerter = alerterHolder.get("email");
+        final Alerter mailAlerter = this.alerterHolder.get("email");
         try {
           mailAlerter.alertOnFirstError(flow);
-        } catch (Exception e) {
+        } catch (final Exception e) {
           e.printStackTrace();
           logger.error("Failed to send first error email." + e.getMessage());
         }
       }
       if (options.getFlowParameters().containsKey("alert.type")) {
-        String alertType = options.getFlowParameters().get("alert.type");
-        Alerter alerter = alerterHolder.get(alertType);
+        final String alertType = options.getFlowParameters().get("alert.type");
+        final Alerter alerter = this.alerterHolder.get(alertType);
         if (alerter != null) {
           try {
             alerter.alertOnFirstError(flow);
-          } catch (Exception e) {
+          } catch (final Exception e) {
             // TODO Auto-generated catch block
             e.printStackTrace();
             logger.error("Failed to alert by " + alertType);
@@ -1573,7 +1396,7 @@ public class ExecutorManager extends EventHandler implements
     return flow;
   }
 
-  public boolean isFinished(ExecutableFlow flow) {
+  public boolean isFinished(final ExecutableFlow flow) {
     switch (flow.getStatus()) {
       case SUCCEEDED:
       case FAILED:
@@ -1584,9 +1407,9 @@ public class ExecutorManager extends EventHandler implements
     }
   }
 
-  private void fillUpdateTimeAndExecId(List<ExecutableFlow> flows,
-      List<Integer> executionIds, List<Long> updateTimes) {
-    for (ExecutableFlow flow : flows) {
+  private void fillUpdateTimeAndExecId(final List<ExecutableFlow> flows,
+      final List<Integer> executionIds, final List<Long> updateTimes) {
+    for (final ExecutableFlow flow : flows) {
       executionIds.add(flow.getExecutionId());
       updateTimes.add(flow.getUpdateTime());
     }
@@ -1594,14 +1417,14 @@ public class ExecutorManager extends EventHandler implements
 
   /* Group Executable flow by Executors to reduce number of REST calls */
   private Map<Executor, List<ExecutableFlow>> getFlowToExecutorMap() {
-    HashMap<Executor, List<ExecutableFlow>> exFlowMap =
-      new HashMap<Executor, List<ExecutableFlow>>();
+    final HashMap<Executor, List<ExecutableFlow>> exFlowMap =
+        new HashMap<>();
 
-    for (Pair<ExecutionReference, ExecutableFlow> runningFlow : runningFlows
-      .values()) {
-      ExecutionReference ref = runningFlow.getFirst();
-      ExecutableFlow flow = runningFlow.getSecond();
-      Executor executor = ref.getExecutor();
+    for (final Pair<ExecutionReference, ExecutableFlow> runningFlow : this.runningFlows
+        .values()) {
+      final ExecutionReference ref = runningFlow.getFirst();
+      final ExecutableFlow flow = runningFlow.getSecond();
+      final Executor executor = ref.getExecutor();
 
       // We can set the next check time to prevent the checking of certain
       // flows.
@@ -1611,7 +1434,7 @@ public class ExecutorManager extends EventHandler implements
 
       List<ExecutableFlow> flows = exFlowMap.get(executor);
       if (flows == null) {
-        flows = new ArrayList<ExecutableFlow>();
+        flows = new ArrayList<>();
         exFlowMap.put(executor, flows);
       }
 
@@ -1622,22 +1445,199 @@ public class ExecutorManager extends EventHandler implements
   }
 
   @Override
-  public int getExecutableFlows(int projectId, String flowId, int from,
-      int length, List<ExecutableFlow> outputList)
+  public int getExecutableFlows(final int projectId, final String flowId, final int from,
+      final int length, final List<ExecutableFlow> outputList)
       throws ExecutorManagerException {
-    List<ExecutableFlow> flows =
-        executorLoader.fetchFlowHistory(projectId, flowId, from, length);
+    final List<ExecutableFlow> flows =
+        this.executorLoader.fetchFlowHistory(projectId, flowId, from, length);
     outputList.addAll(flows);
-    return executorLoader.fetchNumExecutableFlows(projectId, flowId);
+    return this.executorLoader.fetchNumExecutableFlows(projectId, flowId);
   }
 
   @Override
-  public List<ExecutableFlow> getExecutableFlows(int projectId, String flowId,
-      int from, int length, Status status) throws ExecutorManagerException {
-    return executorLoader.fetchFlowHistory(projectId, flowId, from, length,
+  public List<ExecutableFlow> getExecutableFlows(final int projectId, final String flowId,
+      final int from, final int length, final Status status) throws ExecutorManagerException {
+    return this.executorLoader.fetchFlowHistory(projectId, flowId, from, length,
         status);
   }
 
+  /**
+   * Calls executor to dispatch the flow, update db to assign the executor and in-memory state of
+   * executableFlow
+   */
+  private void dispatch(final ExecutionReference reference, final ExecutableFlow exflow,
+      final Executor choosenExecutor) throws ExecutorManagerException {
+    exflow.setUpdateTime(System.currentTimeMillis());
+
+    this.executorLoader.assignExecutor(choosenExecutor.getId(),
+        exflow.getExecutionId());
+    try {
+      callExecutorServer(exflow, choosenExecutor,
+          ConnectorParams.EXECUTE_ACTION);
+    } catch (final ExecutorManagerException ex) {
+      logger.error("Rolling back executor assignment for execution id:"
+          + exflow.getExecutionId(), ex);
+      this.executorLoader.unassignExecutor(exflow.getExecutionId());
+      throw new ExecutorManagerException(ex);
+    }
+    reference.setExecutor(choosenExecutor);
+
+    // move from flow to running flows
+    this.runningFlows.put(exflow.getExecutionId(),
+        new Pair<>(reference, exflow));
+
+    logger.info(String.format(
+        "Successfully dispatched exec %d with error count %d",
+        exflow.getExecutionId(), reference.getNumErrors()));
+  }
+
+  private class ExecutingManagerUpdaterThread extends Thread {
+
+    private final int waitTimeIdleMs = 2000;
+    private final int waitTimeMs = 500;
+    // When we have an http error, for that flow, we'll check every 10 secs, 6
+    // times (1 mins) before we evict.
+    private final int numErrors = 6;
+    private final long errorThreshold = 10000;
+    private boolean shutdown = false;
+
+    public ExecutingManagerUpdaterThread() {
+      this.setName("ExecutorManagerUpdaterThread");
+    }
+
+    private void shutdown() {
+      this.shutdown = true;
+    }
+
+    @Override
+    @SuppressWarnings("unchecked")
+    public void run() {
+      while (!this.shutdown) {
+        try {
+          ExecutorManager.this.lastThreadCheckTime = System.currentTimeMillis();
+          ExecutorManager.this.updaterStage = "Starting update all flows.";
+
+          final Map<Executor, List<ExecutableFlow>> exFlowMap =
+              getFlowToExecutorMap();
+          final ArrayList<ExecutableFlow> finishedFlows =
+              new ArrayList<>();
+          final ArrayList<ExecutableFlow> finalizeFlows =
+              new ArrayList<>();
+
+          if (exFlowMap.size() > 0) {
+            for (final Map.Entry<Executor, List<ExecutableFlow>> entry : exFlowMap
+                .entrySet()) {
+              final List<Long> updateTimesList = new ArrayList<>();
+              final List<Integer> executionIdsList = new ArrayList<>();
+
+              final Executor executor = entry.getKey();
+
+              ExecutorManager.this.updaterStage =
+                  "Starting update flows on " + executor.getHost() + ":"
+                      + executor.getPort();
+
+              // We pack the parameters of the same host together before we
+              // query.
+              fillUpdateTimeAndExecId(entry.getValue(), executionIdsList,
+                  updateTimesList);
+
+              final Pair<String, String> updateTimes =
+                  new Pair<>(
+                      ConnectorParams.UPDATE_TIME_LIST_PARAM,
+                      JSONUtils.toJSON(updateTimesList));
+              final Pair<String, String> executionIds =
+                  new Pair<>(ConnectorParams.EXEC_ID_LIST_PARAM,
+                      JSONUtils.toJSON(executionIdsList));
+
+              Map<String, Object> results = null;
+              try {
+                results =
+                    callExecutorServer(executor.getHost(),
+                        executor.getPort(), ConnectorParams.UPDATE_ACTION,
+                        null, null, executionIds, updateTimes);
+              } catch (final IOException e) {
+                logger.error(e);
+                for (final ExecutableFlow flow : entry.getValue()) {
+                  final Pair<ExecutionReference, ExecutableFlow> pair =
+                      ExecutorManager.this.runningFlows.get(flow.getExecutionId());
+
+                  ExecutorManager.this.updaterStage =
+                      "Failed to get update. Doing some clean up for flow "
+                          + pair.getSecond().getExecutionId();
+
+                  if (pair != null) {
+                    final ExecutionReference ref = pair.getFirst();
+                    int numErrors = ref.getNumErrors();
+                    if (ref.getNumErrors() < this.numErrors) {
+                      ref.setNextCheckTime(System.currentTimeMillis()
+                          + this.errorThreshold);
+                      ref.setNumErrors(++numErrors);
+                    } else {
+                      logger.error("Evicting flow " + flow.getExecutionId()
+                          + ". The executor is unresponsive.");
+                      // TODO should send out an unresponsive email here.
+                      finalizeFlows.add(pair.getSecond());
+                    }
+                  }
+                }
+              }
+
+              // We gets results
+              if (results != null) {
+                final List<Map<String, Object>> executionUpdates =
+                    (List<Map<String, Object>>) results
+                        .get(ConnectorParams.RESPONSE_UPDATED_FLOWS);
+                for (final Map<String, Object> updateMap : executionUpdates) {
+                  try {
+                    final ExecutableFlow flow = updateExecution(updateMap);
+
+                    ExecutorManager.this.updaterStage = "Updated flow " + flow.getExecutionId();
+
+                    if (isFinished(flow)) {
+                      finishedFlows.add(flow);
+                      finalizeFlows.add(flow);
+                    }
+                  } catch (final ExecutorManagerException e) {
+                    final ExecutableFlow flow = e.getExecutableFlow();
+                    logger.error(e);
+
+                    if (flow != null) {
+                      logger.error("Finalizing flow " + flow.getExecutionId());
+                      finalizeFlows.add(flow);
+                    }
+                  }
+                }
+              }
+            }
+
+            ExecutorManager.this.updaterStage =
+                "Finalizing " + finalizeFlows.size() + " error flows.";
+
+            // Kill error flows
+            for (final ExecutableFlow flow : finalizeFlows) {
+              finalizeFlows(flow);
+            }
+          }
+
+          ExecutorManager.this.updaterStage = "Updated all active flows. Waiting for next round.";
+
+          synchronized (this) {
+            try {
+              if (ExecutorManager.this.runningFlows.size() > 0) {
+                this.wait(this.waitTimeMs);
+              } else {
+                this.wait(this.waitTimeIdleMs);
+              }
+            } catch (final InterruptedException e) {
+            }
+          }
+        } catch (final Exception e) {
+          logger.error(e);
+        }
+      }
+    }
+  }
+
   /*
    * cleaner thread to clean up execution_logs, etc in DB. Runs every hour.
    */
@@ -1652,33 +1652,33 @@ public class ExecutorManager extends EventHandler implements
     private boolean shutdown = false;
     private long lastLogCleanTime = -1;
 
-    public CleanerThread(long executionLogsRetentionMs) {
+    public CleanerThread(final long executionLogsRetentionMs) {
       this.executionLogsRetentionMs = executionLogsRetentionMs;
       this.setName("AzkabanWebServer-Cleaner-Thread");
     }
 
     @SuppressWarnings("unused")
     public void shutdown() {
-      shutdown = true;
+      this.shutdown = true;
       this.interrupt();
     }
 
     @Override
     public void run() {
-      while (!shutdown) {
+      while (!this.shutdown) {
         synchronized (this) {
           try {
-            lastCleanerThreadCheckTime = System.currentTimeMillis();
+            ExecutorManager.this.lastCleanerThreadCheckTime = System.currentTimeMillis();
 
             // Cleanup old stuff.
-            long currentTime = System.currentTimeMillis();
-            if (currentTime - CLEANER_THREAD_WAIT_INTERVAL_MS > lastLogCleanTime) {
+            final long currentTime = System.currentTimeMillis();
+            if (currentTime - CLEANER_THREAD_WAIT_INTERVAL_MS > this.lastLogCleanTime) {
               cleanExecutionLogs();
-              lastLogCleanTime = currentTime;
+              this.lastLogCleanTime = currentTime;
             }
 
             wait(CLEANER_THREAD_WAIT_INTERVAL_MS);
-          } catch (InterruptedException e) {
+          } catch (final InterruptedException e) {
             logger.info("Interrupted. Probably to shut down.");
           }
         }
@@ -1687,49 +1687,20 @@ public class ExecutorManager extends EventHandler implements
 
     private void cleanExecutionLogs() {
       logger.info("Cleaning old logs from execution_logs");
-      long cutoff = System.currentTimeMillis() - executionLogsRetentionMs;
+      final long cutoff = System.currentTimeMillis() - this.executionLogsRetentionMs;
       logger.info("Cleaning old log files before "
           + new DateTime(cutoff).toString());
       cleanOldExecutionLogs(System.currentTimeMillis()
-          - executionLogsRetentionMs);
+          - this.executionLogsRetentionMs);
     }
   }
 
-  /**
-   * Calls executor to dispatch the flow, update db to assign the executor and
-   * in-memory state of executableFlow
-   */
-  private void dispatch(ExecutionReference reference, ExecutableFlow exflow,
-    Executor choosenExecutor) throws ExecutorManagerException {
-    exflow.setUpdateTime(System.currentTimeMillis());
-
-    executorLoader.assignExecutor(choosenExecutor.getId(),
-      exflow.getExecutionId());
-    try {
-      callExecutorServer(exflow, choosenExecutor,
-        ConnectorParams.EXECUTE_ACTION);
-    } catch (ExecutorManagerException ex) {
-      logger.error("Rolling back executor assignment for execution id:"
-        + exflow.getExecutionId(), ex);
-      executorLoader.unassignExecutor(exflow.getExecutionId());
-      throw new ExecutorManagerException(ex);
-    }
-    reference.setExecutor(choosenExecutor);
-
-    // move from flow to running flows
-    runningFlows.put(exflow.getExecutionId(),
-      new Pair<ExecutionReference, ExecutableFlow>(reference, exflow));
-
-    logger.info(String.format(
-      "Successfully dispatched exec %d with error count %d",
-      exflow.getExecutionId(), reference.getNumErrors()));
-  }
-
   /*
    * This thread is responsible for processing queued flows using dispatcher and
    * making rest api calls to executor server
    */
   private class QueueProcessorThread extends Thread {
+
     private static final long QUEUE_PROCESSOR_WAIT_IN_MS = 1000;
     private final int maxDispatchingErrors;
     private final long activeExecutorRefreshWindowInMilisec;
@@ -1738,78 +1709,78 @@ public class ExecutorManager extends EventHandler implements
     private volatile boolean shutdown = false;
     private volatile boolean isActive = true;
 
-    public QueueProcessorThread(boolean isActive,
-      long activeExecutorRefreshWindowInTime,
-      int activeExecutorRefreshWindowInFlows,
-      int maxDispatchingErrors) {
+    public QueueProcessorThread(final boolean isActive,
+        final long activeExecutorRefreshWindowInTime,
+        final int activeExecutorRefreshWindowInFlows,
+        final int maxDispatchingErrors) {
       setActive(isActive);
       this.maxDispatchingErrors = maxDispatchingErrors;
       this.activeExecutorRefreshWindowInFlows =
-        activeExecutorRefreshWindowInFlows;
+          activeExecutorRefreshWindowInFlows;
       this.activeExecutorRefreshWindowInMilisec =
-        activeExecutorRefreshWindowInTime;
+          activeExecutorRefreshWindowInTime;
       this.setName("AzkabanWebServer-QueueProcessor-Thread");
     }
 
-    public void setActive(boolean isActive) {
-      this.isActive = isActive;
-      logger.info("QueueProcessorThread active turned " + this.isActive);
+    public boolean isActive() {
+      return this.isActive;
     }
 
-    public boolean isActive() {
-      return isActive;
+    public void setActive(final boolean isActive) {
+      this.isActive = isActive;
+      logger.info("QueueProcessorThread active turned " + this.isActive);
     }
 
     public void shutdown() {
-      shutdown = true;
+      this.shutdown = true;
       this.interrupt();
     }
 
     @Override
     public void run() {
       // Loops till QueueProcessorThread is shutdown
-      while (!shutdown) {
+      while (!this.shutdown) {
         synchronized (this) {
           try {
             // start processing queue if active, other wait for sometime
-            if (isActive) {
-              processQueuedFlows(activeExecutorRefreshWindowInMilisec,
-                activeExecutorRefreshWindowInFlows);
+            if (this.isActive) {
+              processQueuedFlows(this.activeExecutorRefreshWindowInMilisec,
+                  this.activeExecutorRefreshWindowInFlows);
             }
             wait(QUEUE_PROCESSOR_WAIT_IN_MS);
-          } catch (Exception e) {
+          } catch (final Exception e) {
             logger.error(
-              "QueueProcessorThread Interrupted. Probably to shut down.", e);
+                "QueueProcessorThread Interrupted. Probably to shut down.", e);
           }
         }
       }
     }
 
     /* Method responsible for processing the non-dispatched flows */
-    private void processQueuedFlows(long activeExecutorsRefreshWindow,
-      int maxContinuousFlowProcessed) throws InterruptedException,
-      ExecutorManagerException {
+    private void processQueuedFlows(final long activeExecutorsRefreshWindow,
+        final int maxContinuousFlowProcessed) throws InterruptedException,
+        ExecutorManagerException {
       long lastExecutorRefreshTime = 0;
       int currentContinuousFlowProcessed = 0;
 
-      while (isActive() && (runningCandidate = queuedFlows.fetchHead()) != null) {
-        ExecutionReference reference = runningCandidate.getFirst();
-        ExecutableFlow exflow = runningCandidate.getSecond();
-        long currentTime = System.currentTimeMillis();
+      while (isActive() && (ExecutorManager.this.runningCandidate = ExecutorManager.this.queuedFlows
+          .fetchHead()) != null) {
+        final ExecutionReference reference = ExecutorManager.this.runningCandidate.getFirst();
+        final ExecutableFlow exflow = ExecutorManager.this.runningCandidate.getSecond();
+        final long currentTime = System.currentTimeMillis();
 
         // if we have dispatched more than maxContinuousFlowProcessed or
         // It has been more then activeExecutorsRefreshWindow millisec since we
         // refreshed
 
         if (currentTime - lastExecutorRefreshTime > activeExecutorsRefreshWindow
-          || currentContinuousFlowProcessed >= maxContinuousFlowProcessed) {
+            || currentContinuousFlowProcessed >= maxContinuousFlowProcessed) {
           // Refresh executorInfo for all activeExecutors
           refreshExecutors();
           lastExecutorRefreshTime = currentTime;
           currentContinuousFlowProcessed = 0;
         }
 
-
         /**
          * <pre>
          *  TODO: Work around till we improve Filters to have a notion of GlobalSystemState.
@@ -1823,119 +1794,121 @@ public class ExecutorManager extends EventHandler implements
          *   2. re-attempting a flow (which has been tried before) is considered as all executors are busy
          * </pre>
          */
-        if(exflow.getUpdateTime() > lastExecutorRefreshTime) {
+        if (exflow.getUpdateTime() > lastExecutorRefreshTime) {
           // put back in the queue
-          queuedFlows.enqueue(exflow, reference);
-          runningCandidate = null;
-          long sleepInterval =
-            activeExecutorsRefreshWindow
-              - (currentTime - lastExecutorRefreshTime);
+          ExecutorManager.this.queuedFlows.enqueue(exflow, reference);
+          ExecutorManager.this.runningCandidate = null;
+          final long sleepInterval =
+              activeExecutorsRefreshWindow
+                  - (currentTime - lastExecutorRefreshTime);
           // wait till next executor refresh
           sleep(sleepInterval);
         } else {
           exflow.setUpdateTime(currentTime);
           // process flow with current snapshot of activeExecutors
-          selectExecutorAndDispatchFlow(reference, exflow, new HashSet<Executor>(activeExecutors));
-          runningCandidate = null;
+          selectExecutorAndDispatchFlow(reference, exflow, new HashSet<>(
+              ExecutorManager.this.activeExecutors));
+          ExecutorManager.this.runningCandidate = null;
         }
 
         // do not count failed flow processsing (flows still in queue)
-        if(queuedFlows.getFlow(exflow.getExecutionId()) == null) {
+        if (ExecutorManager.this.queuedFlows.getFlow(exflow.getExecutionId()) == null) {
           currentContinuousFlowProcessed++;
         }
       }
     }
 
     /* process flow with a snapshot of available Executors */
-    private void selectExecutorAndDispatchFlow(ExecutionReference reference,
-      ExecutableFlow exflow, Set<Executor> availableExecutors)
-      throws ExecutorManagerException {
+    private void selectExecutorAndDispatchFlow(final ExecutionReference reference,
+        final ExecutableFlow exflow, final Set<Executor> availableExecutors)
+        throws ExecutorManagerException {
       synchronized (exflow) {
-        Executor selectedExecutor = selectExecutor(exflow, availableExecutors);
+        final Executor selectedExecutor = selectExecutor(exflow, availableExecutors);
         if (selectedExecutor != null) {
           try {
             dispatch(reference, exflow, selectedExecutor);
-            commonMetrics.markDispatchSuccess();
-          } catch (ExecutorManagerException e) {
-            commonMetrics.markDispatchFail();
+            ExecutorManager.this.commonMetrics.markDispatchSuccess();
+          } catch (final ExecutorManagerException e) {
+            ExecutorManager.this.commonMetrics.markDispatchFail();
             logger.warn(String.format(
-              "Executor %s responded with exception for exec: %d",
-              selectedExecutor, exflow.getExecutionId()), e);
+                "Executor %s responded with exception for exec: %d",
+                selectedExecutor, exflow.getExecutionId()), e);
             handleDispatchExceptionCase(reference, exflow, selectedExecutor,
-              availableExecutors);
+                availableExecutors);
           }
         } else {
-          commonMetrics.markDispatchFail();
+          ExecutorManager.this.commonMetrics.markDispatchFail();
           handleNoExecutorSelectedCase(reference, exflow);
         }
       }
     }
 
     /* Helper method to fetch  overriding Executor, if a valid user has specifed otherwise return null */
-    private Executor getUserSpecifiedExecutor(ExecutionOptions options,
-      int executionId) {
+    private Executor getUserSpecifiedExecutor(final ExecutionOptions options,
+        final int executionId) {
       Executor executor = null;
       if (options != null
-        && options.getFlowParameters() != null
-        && options.getFlowParameters().containsKey(
+          && options.getFlowParameters() != null
+          && options.getFlowParameters().containsKey(
           ExecutionOptions.USE_EXECUTOR)) {
         try {
-          int executorId =
-            Integer.valueOf(options.getFlowParameters().get(
-              ExecutionOptions.USE_EXECUTOR));
+          final int executorId =
+              Integer.valueOf(options.getFlowParameters().get(
+                  ExecutionOptions.USE_EXECUTOR));
           executor = fetchExecutor(executorId);
 
           if (executor == null) {
             logger
-              .warn(String
-                .format(
-                  "User specified executor id: %d for execution id: %d is not active, Looking up db.",
-                  executorId, executionId));
-            executor = executorLoader.fetchExecutor(executorId);
+                .warn(String
+                    .format(
+                        "User specified executor id: %d for execution id: %d is not active, Looking up db.",
+                        executorId, executionId));
+            executor = ExecutorManager.this.executorLoader.fetchExecutor(executorId);
             if (executor == null) {
               logger
-                .warn(String
-                  .format(
-                    "User specified executor id: %d for execution id: %d is missing from db. Defaulting to availableExecutors",
-                    executorId, executionId));
+                  .warn(String
+                      .format(
+                          "User specified executor id: %d for execution id: %d is missing from db. Defaulting to availableExecutors",
+                          executorId, executionId));
             }
           }
-        } catch (ExecutorManagerException ex) {
+        } catch (final ExecutorManagerException ex) {
           logger.error("Failed to fetch user specified executor for exec_id = "
-            + executionId, ex);
+              + executionId, ex);
         }
       }
       return executor;
     }
 
     /* Choose Executor for exflow among the available executors */
-    private Executor selectExecutor(ExecutableFlow exflow,
-      Set<Executor> availableExecutors) {
+    private Executor selectExecutor(final ExecutableFlow exflow,
+        final Set<Executor> availableExecutors) {
       Executor choosenExecutor =
-        getUserSpecifiedExecutor(exflow.getExecutionOptions(),
-          exflow.getExecutionId());
+          getUserSpecifiedExecutor(exflow.getExecutionOptions(),
+              exflow.getExecutionId());
 
       // If no executor was specified by admin
       if (choosenExecutor == null) {
         logger.info("Using dispatcher for execution id :"
-          + exflow.getExecutionId());
-        ExecutorSelector selector = new ExecutorSelector(filterList, comparatorWeightsMap);
+            + exflow.getExecutionId());
+        final ExecutorSelector selector = new ExecutorSelector(ExecutorManager.this.filterList,
+            ExecutorManager.this.comparatorWeightsMap);
         choosenExecutor = selector.getBest(availableExecutors, exflow);
       }
       return choosenExecutor;
     }
 
-    private void handleDispatchExceptionCase(ExecutionReference reference,
-      ExecutableFlow exflow, Executor lastSelectedExecutor,
-      Set<Executor> remainingExecutors) throws ExecutorManagerException {
+    private void handleDispatchExceptionCase(final ExecutionReference reference,
+        final ExecutableFlow exflow, final Executor lastSelectedExecutor,
+        final Set<Executor> remainingExecutors) throws ExecutorManagerException {
       logger
-        .info(String
-          .format(
-            "Reached handleDispatchExceptionCase stage for exec %d with error count %d",
-            exflow.getExecutionId(), reference.getNumErrors()));
+          .info(String
+              .format(
+                  "Reached handleDispatchExceptionCase stage for exec %d with error count %d",
+                  exflow.getExecutionId(), reference.getNumErrors()));
       reference.setNumErrors(reference.getNumErrors() + 1);
       if (reference.getNumErrors() > this.maxDispatchingErrors
-        || remainingExecutors.size() <= 1) {
+          || remainingExecutors.size() <= 1) {
         logger.error("Failed to process queued flow");
         finalizeFlows(exflow);
       } else {
@@ -1945,16 +1918,16 @@ public class ExecutorManager extends EventHandler implements
       }
     }
 
-    private void handleNoExecutorSelectedCase(ExecutionReference reference,
-      ExecutableFlow exflow) throws ExecutorManagerException {
+    private void handleNoExecutorSelectedCase(final ExecutionReference reference,
+        final ExecutableFlow exflow) throws ExecutorManagerException {
       logger
-      .info(String
-        .format(
-          "Reached handleNoExecutorSelectedCase stage for exec %d with error count %d",
-            exflow.getExecutionId(), reference.getNumErrors()));
+          .info(String
+              .format(
+                  "Reached handleNoExecutorSelectedCase stage for exec %d with error count %d",
+                  exflow.getExecutionId(), reference.getNumErrors()));
       // TODO: handle scenario where a high priority flow failing to get
       // schedule can starve all others
-      queuedFlows.enqueue(exflow, reference);
+      ExecutorManager.this.queuedFlows.enqueue(exflow, reference);
     }
   }
 }
diff --git a/azkaban-common/src/main/java/azkaban/executor/ExecutorManagerAdapter.java b/azkaban-common/src/main/java/azkaban/executor/ExecutorManagerAdapter.java
index fec3bd3..429d58b 100644
--- a/azkaban-common/src/main/java/azkaban/executor/ExecutorManagerAdapter.java
+++ b/azkaban-common/src/main/java/azkaban/executor/ExecutorManagerAdapter.java
@@ -175,17 +175,14 @@ public interface ExecutorManagerAdapter {
       throws ExecutorManagerException;
 
   /**
-   * Manage servlet call for stats servlet in Azkaban execution server
-   * Action can take any of the following values
-   * <ul>
-   * <li>{@link azkaban.executor.ConnectorParams#STATS_SET_REPORTINGINTERVAL}<li>
-   * <li>{@link azkaban.executor.ConnectorParams#STATS_SET_CLEANINGINTERVAL}<li>
-   * <li>{@link azkaban.executor.ConnectorParams#STATS_SET_MAXREPORTERPOINTS}<li>
-   * <li>{@link azkaban.executor.ConnectorParams#STATS_GET_ALLMETRICSNAME}<li>
-   * <li>{@link azkaban.executor.ConnectorParams#STATS_GET_METRICHISTORY}<li>
-   * <li>{@link azkaban.executor.ConnectorParams#STATS_SET_ENABLEMETRICS}<li>
-   * <li>{@link azkaban.executor.ConnectorParams#STATS_SET_DISABLEMETRICS}<li>
-   * </ul>
+   * Manage servlet call for stats servlet in Azkaban execution server Action can take any of the
+   * following values <ul> <li>{@link azkaban.executor.ConnectorParams#STATS_SET_REPORTINGINTERVAL}<li>
+   * <li>{@link azkaban.executor.ConnectorParams#STATS_SET_CLEANINGINTERVAL}<li> <li>{@link
+   * azkaban.executor.ConnectorParams#STATS_SET_MAXREPORTERPOINTS}<li> <li>{@link
+   * azkaban.executor.ConnectorParams#STATS_GET_ALLMETRICSNAME}<li> <li>{@link
+   * azkaban.executor.ConnectorParams#STATS_GET_METRICHISTORY}<li> <li>{@link
+   * azkaban.executor.ConnectorParams#STATS_SET_ENABLEMETRICS}<li> <li>{@link
+   * azkaban.executor.ConnectorParams#STATS_SET_DISABLEMETRICS}<li> </ul>
    */
   public Map<String, Object> callExecutorStats(int executorId, String action,
       Pair<String, String>... param) throws IOException, ExecutorManagerException;
@@ -206,8 +203,7 @@ public interface ExecutorManagerAdapter {
   public Set<? extends String> getPrimaryServerHosts();
 
   /**
-   * Returns a collection of all the active executors maintained by active
-   * executors
+   * Returns a collection of all the active executors maintained by active executors
    */
   public Collection<Executor> getAllActiveExecutors();
 
diff --git a/azkaban-common/src/main/java/azkaban/executor/JdbcExecutorLoader.java b/azkaban-common/src/main/java/azkaban/executor/JdbcExecutorLoader.java
index 3d1da87..da29ddf 100644
--- a/azkaban-common/src/main/java/azkaban/executor/JdbcExecutorLoader.java
+++ b/azkaban-common/src/main/java/azkaban/executor/JdbcExecutorLoader.java
@@ -30,11 +30,11 @@ import java.time.Duration;
 import java.util.List;
 import java.util.Map;
 import org.apache.commons.dbutils.DbUtils;
-import org.apache.log4j.Logger;
 
 @Singleton
 public class JdbcExecutorLoader extends AbstractJdbcLoader implements
     ExecutorLoader {
+
   private final ExecutionFlowDao executionFlowDao;
   private final ExecutorDao executorDao;
   private final ExecutionJobDao executionJobDao;
@@ -48,20 +48,20 @@ public class JdbcExecutorLoader extends AbstractJdbcLoader implements
 
   @Inject
   public JdbcExecutorLoader(final Props props, final CommonMetrics commonMetrics,
-                            final ExecutionFlowDao executionFlowDao,
-                            final ExecutorDao executorDao,
-                            final ExecutionJobDao executionJobDao,
-                            final ExecutionLogsDao executionLogsDao,
-                            final ExecutorEventsDao executorEventsDao,
-                            final ActiveExecutingFlowsDao activeExecutingFlowsDao,
-                            final FetchActiveFlowDao fetchActiveFlowDao,
-                            final AssignExecutorDao assignExecutorDao,
-                            final NumExecutionsDao numExecutionsDao) {
+      final ExecutionFlowDao executionFlowDao,
+      final ExecutorDao executorDao,
+      final ExecutionJobDao executionJobDao,
+      final ExecutionLogsDao executionLogsDao,
+      final ExecutorEventsDao executorEventsDao,
+      final ActiveExecutingFlowsDao activeExecutingFlowsDao,
+      final FetchActiveFlowDao fetchActiveFlowDao,
+      final AssignExecutorDao assignExecutorDao,
+      final NumExecutionsDao numExecutionsDao) {
     super(props, commonMetrics);
     this.executionFlowDao = executionFlowDao;
     this.executorDao = executorDao;
     this.executionJobDao = executionJobDao;
-    this.executionLogsDao= executionLogsDao;
+    this.executionLogsDao = executionLogsDao;
     this.executorEventsDao = executorEventsDao;
     this.activeExecutingFlowsDao = activeExecutingFlowsDao;
     this.fetchActiveFlowDao = fetchActiveFlowDao;
@@ -95,9 +95,9 @@ public class JdbcExecutorLoader extends AbstractJdbcLoader implements
     return this.executionFlowDao.fetchExecutableFlow(id);
   }
 
- @Override
+  @Override
   public List<Pair<ExecutionReference, ExecutableFlow>> fetchQueuedFlows()
-    throws ExecutorManagerException {
+      throws ExecutorManagerException {
     return this.executionFlowDao.fetchQueuedFlows();
   }
 
@@ -143,26 +143,27 @@ public class JdbcExecutorLoader extends AbstractJdbcLoader implements
 
   @Override
   public List<ExecutableFlow> fetchFlowHistory(final int projectId, final String flowId,
-                                               final int skip, final int num) throws ExecutorManagerException {
+      final int skip, final int num) throws ExecutorManagerException {
     return this.executionFlowDao.fetchFlowHistory(projectId, flowId, skip, num);
   }
 
   @Override
   public List<ExecutableFlow> fetchFlowHistory(final int projectId, final String flowId,
-                                               final int skip, final int num, final Status status) throws ExecutorManagerException {
+      final int skip, final int num, final Status status) throws ExecutorManagerException {
     return this.executionFlowDao.fetchFlowHistory(projectId, flowId, skip, num, status);
   }
 
   @Override
   public List<ExecutableFlow> fetchFlowHistory(final int skip, final int num)
       throws ExecutorManagerException {
-    return this.executionFlowDao.fetchFlowHistory(skip,num);
+    return this.executionFlowDao.fetchFlowHistory(skip, num);
   }
 
   @Override
   public List<ExecutableFlow> fetchFlowHistory(final String projContain,
-                                               final String flowContains, final String userNameContains, final int status, final long startTime,
-                                               final long endTime, final int skip, final int num) throws ExecutorManagerException {
+      final String flowContains, final String userNameContains, final int status,
+      final long startTime,
+      final long endTime, final int skip, final int num) throws ExecutorManagerException {
     return this.executionFlowDao.fetchFlowHistory(projContain, flowContains,
         userNameContains, status, startTime, endTime, skip, num);
   }
@@ -237,14 +238,15 @@ public class JdbcExecutorLoader extends AbstractJdbcLoader implements
 
   @Override
   public List<ExecutableJobInfo> fetchJobHistory(final int projectId, final String jobId,
-                                                 final int skip, final int size) throws ExecutorManagerException {
+      final int skip, final int size) throws ExecutorManagerException {
 
     return this.executionJobDao.fetchJobHistory(projectId, jobId, skip, size);
   }
 
   @Override
-  public LogData fetchLogs(final int execId, final String name, final int attempt, final int startByte,
-                           final int length) throws ExecutorManagerException {
+  public LogData fetchLogs(final int execId, final String name, final int attempt,
+      final int startByte,
+      final int length) throws ExecutorManagerException {
 
     return this.executionLogsDao.fetchLogs(execId, name, attempt, startByte, length);
   }
@@ -257,7 +259,8 @@ public class JdbcExecutorLoader extends AbstractJdbcLoader implements
   }
 
   @Override
-  public void uploadLogFile(final int execId, final String name, final int attempt, final File... files)
+  public void uploadLogFile(final int execId, final String name, final int attempt,
+      final File... files)
       throws ExecutorManagerException {
     this.executionLogsDao.uploadLogFile(execId, name, attempt, files);
   }
@@ -279,65 +282,65 @@ public class JdbcExecutorLoader extends AbstractJdbcLoader implements
     return connection;
   }
 
- @Override
+  @Override
   public List<Executor> fetchAllExecutors() throws ExecutorManagerException {
     return this.executorDao.fetchAllExecutors();
   }
 
- @Override
+  @Override
   public List<Executor> fetchActiveExecutors() throws ExecutorManagerException {
     return this.executorDao.fetchActiveExecutors();
   }
 
- @Override
+  @Override
   public Executor fetchExecutor(final String host, final int port)
-    throws ExecutorManagerException {
+      throws ExecutorManagerException {
     return this.executorDao.fetchExecutor(host, port);
   }
 
- @Override
+  @Override
   public Executor fetchExecutor(final int executorId) throws ExecutorManagerException {
     return this.executorDao.fetchExecutor(executorId);
   }
 
- @Override
+  @Override
   public void updateExecutor(final Executor executor) throws ExecutorManagerException {
     this.executorDao.updateExecutor(executor);
   }
 
   @Override
   public Executor addExecutor(final String host, final int port)
-    throws ExecutorManagerException {
+      throws ExecutorManagerException {
     return this.executorDao.addExecutor(host, port);
   }
 
- @Override
+  @Override
   public void removeExecutor(final String host, final int port) throws ExecutorManagerException {
     this.executorDao.removeExecutor(host, port);
   }
 
   @Override
   public void postExecutorEvent(final Executor executor, final EventType type, final String user,
-                                final String message) throws ExecutorManagerException{
+      final String message) throws ExecutorManagerException {
 
     this.executorEventsDao.postExecutorEvent(executor, type, user, message);
   }
 
   @Override
   public List<ExecutorLogEvent> getExecutorEvents(final Executor executor, final int num,
-                                                  final int offset) throws ExecutorManagerException {
+      final int offset) throws ExecutorManagerException {
     return this.executorEventsDao.getExecutorEvents(executor, num, offset);
   }
 
   @Override
   public void assignExecutor(final int executorId, final int executionId)
-    throws ExecutorManagerException {
+      throws ExecutorManagerException {
     this.assignExecutorDao.assignExecutor(executorId, executionId);
   }
 
   @Override
   public Executor fetchExecutorByExecutionId(final int executionId)
-    throws ExecutorManagerException {
+      throws ExecutorManagerException {
     return this.executorDao.fetchExecutorByExecutionId(executionId);
   }
 
diff --git a/azkaban-common/src/main/java/azkaban/executor/QueuedExecutions.java b/azkaban-common/src/main/java/azkaban/executor/QueuedExecutions.java
index 7eb592d..84ae246 100644
--- a/azkaban-common/src/main/java/azkaban/executor/QueuedExecutions.java
+++ b/azkaban-common/src/main/java/azkaban/executor/QueuedExecutions.java
@@ -34,8 +34,7 @@ public class QueuedExecutions {
   }
 
   /**
-   * Wraps BoundedQueue Take method to have a corresponding update in
-   * queuedFlowMap lookup table
+   * Wraps BoundedQueue Take method to have a corresponding update in queuedFlowMap lookup table
    */
   public Pair<ExecutionReference, ExecutableFlow> fetchHead()
       throws InterruptedException {
@@ -136,8 +135,7 @@ public class QueuedExecutions {
   }
 
   /**
-   * Fetch Activereference for an execution. Returns null, if execution not in
-   * queue
+   * Fetch Activereference for an execution. Returns null, if execution not in queue
    */
   public ExecutionReference getReference(final int executionId) {
     if (hasExecution(executionId)) {
diff --git a/azkaban-common/src/main/java/azkaban/executor/selector/ExecutorFilter.java b/azkaban-common/src/main/java/azkaban/executor/selector/ExecutorFilter.java
index 18260d9..2624e03 100644
--- a/azkaban-common/src/main/java/azkaban/executor/selector/ExecutorFilter.java
+++ b/azkaban-common/src/main/java/azkaban/executor/selector/ExecutorFilter.java
@@ -91,7 +91,8 @@ public final class ExecutorFilter extends CandidateFilter<Executor, ExecutableFl
    * NOTE : this is a static filter which means the filter will be filtering based on the system
    * standard which is not
    *        Coming for the passed flow.
-   *        Ideally this filter will make sure only the executor hasn't reached the Max allowed # of
+   *        Ideally this filter will make sure only the executor hasn't reached the Max allowed #
+   * of
    * executing flows.
    * </pre>
    */
diff --git a/azkaban-common/src/main/java/azkaban/executor/selector/FactorComparator.java b/azkaban-common/src/main/java/azkaban/executor/selector/FactorComparator.java
index e1cfab2..64f6da4 100644
--- a/azkaban-common/src/main/java/azkaban/executor/selector/FactorComparator.java
+++ b/azkaban-common/src/main/java/azkaban/executor/selector/FactorComparator.java
@@ -48,8 +48,8 @@ public final class FactorComparator<T> {
   }
 
   /**
-   * static function to generate an instance of the class.
-   * refer to the constructor for the param definitions.
+   * static function to generate an instance of the class. refer to the constructor for the param
+   * definitions.
    */
   public static <T> FactorComparator<T> create(final String factorName, final int weight,
       final Comparator<T> comparator) {
diff --git a/azkaban-common/src/main/java/azkaban/executor/selector/FactorFilter.java b/azkaban-common/src/main/java/azkaban/executor/selector/FactorFilter.java
index 97fb89f..d5d5c81 100644
--- a/azkaban-common/src/main/java/azkaban/executor/selector/FactorFilter.java
+++ b/azkaban-common/src/main/java/azkaban/executor/selector/FactorFilter.java
@@ -44,8 +44,8 @@ public final class FactorFilter<T, V> {
   }
 
   /**
-   * static function to generate an instance of the class.
-   * refer to the constructor for the param definitions.
+   * static function to generate an instance of the class. refer to the constructor for the param
+   * definitions.
    */
   public static <T, V> FactorFilter<T, V> create(final String factorName,
       final Filter<T, V> filter) {
diff --git a/azkaban-common/src/main/java/azkaban/flow/CommonJobProperties.java b/azkaban-common/src/main/java/azkaban/flow/CommonJobProperties.java
index f99fd4f..24ed874 100644
--- a/azkaban-common/src/main/java/azkaban/flow/CommonJobProperties.java
+++ b/azkaban-common/src/main/java/azkaban/flow/CommonJobProperties.java
@@ -27,8 +27,7 @@ public class CommonJobProperties {
   public static final String JOB_TYPE = "type";
 
   /**
-   * Force a node to be a root node in a flow, even if there are other jobs
-   * dependent on it.
+   * Force a node to be a root node in a flow, even if there are other jobs dependent on it.
    */
   public static final String ROOT_NODE = "root.node";
 
@@ -48,8 +47,7 @@ public class CommonJobProperties {
   public static final String RETRY_BACKOFF = "retry.backoff";
 
   /**
-   * Comma delimited list of email addresses for both failure and success
-   * messages
+   * Comma delimited list of email addresses for both failure and success messages
    */
   public static final String NOTIFY_EMAILS = "notify.emails";
 
@@ -103,8 +101,7 @@ public class CommonJobProperties {
   public static final String JOB_ID = "azkaban.job.id";
 
   /**
-   * The execution id. This should be unique per flow, but may not be due to
-   * restarts.
+   * The execution id. This should be unique per flow, but may not be due to restarts.
    */
   public static final String EXEC_ID = "azkaban.flow.execid";
 
@@ -129,8 +126,7 @@ public class CommonJobProperties {
   public static final String PROJECT_LAST_CHANGED_DATE = "azkaban.flow.projectlastchangeddate";
 
   /**
-   * The version of the project the flow is running. This may change if a forced
-   * hotspot occurs.
+   * The version of the project the flow is running. This may change if a forced hotspot occurs.
    */
   public static final String PROJECT_VERSION = "azkaban.flow.projectversion";
 
@@ -168,4 +164,4 @@ public class CommonJobProperties {
   public static final String FLOW_START_TIMEZONE =
       "azkaban.flow.start.timezone";
 
-}
\ No newline at end of file
+}
diff --git a/azkaban-common/src/main/java/azkaban/jmx/DisplayName.java b/azkaban-common/src/main/java/azkaban/jmx/DisplayName.java
index 7f7d1b2..683956e 100644
--- a/azkaban-common/src/main/java/azkaban/jmx/DisplayName.java
+++ b/azkaban-common/src/main/java/azkaban/jmx/DisplayName.java
@@ -24,8 +24,8 @@ import java.lang.annotation.Target;
 import javax.management.DescriptorKey;
 
 /**
- * DisplayName - This annotation allows to supply a display name for a method in
- * the MBean interface.
+ * DisplayName - This annotation allows to supply a display name for a method in the MBean
+ * interface.
  */
 @Documented
 @Target(ElementType.METHOD)
diff --git a/azkaban-common/src/main/java/azkaban/jmx/ParameterName.java b/azkaban-common/src/main/java/azkaban/jmx/ParameterName.java
index 36f75f1..a8deb04 100644
--- a/azkaban-common/src/main/java/azkaban/jmx/ParameterName.java
+++ b/azkaban-common/src/main/java/azkaban/jmx/ParameterName.java
@@ -16,8 +16,8 @@
 package azkaban.jmx;
 
 /**
- * ParameterName - This annotation allows to supply
- * a parameter name for a method in the MBean interface.
+ * ParameterName - This annotation allows to supply a parameter name for a method in the MBean
+ * interface.
  */
 
 import java.lang.annotation.Documented;
diff --git a/azkaban-common/src/main/java/azkaban/jobcallback/JobCallbackValidator.java b/azkaban-common/src/main/java/azkaban/jobcallback/JobCallbackValidator.java
index 680db51..93fc766 100644
--- a/azkaban-common/src/main/java/azkaban/jobcallback/JobCallbackValidator.java
+++ b/azkaban-common/src/main/java/azkaban/jobcallback/JobCallbackValidator.java
@@ -15,8 +15,7 @@ import java.util.Collection;
 import org.apache.log4j.Logger;
 
 /**
- * Responsible for validating the job callback related properties at project
- * upload time
+ * Responsible for validating the job callback related properties at project upload time
  *
  * @author hluu
  */
diff --git a/azkaban-common/src/main/java/azkaban/jobExecutor/Job.java b/azkaban-common/src/main/java/azkaban/jobExecutor/Job.java
index 02c72d9..9e2a10d 100644
--- a/azkaban-common/src/main/java/azkaban/jobExecutor/Job.java
+++ b/azkaban-common/src/main/java/azkaban/jobExecutor/Job.java
@@ -19,12 +19,9 @@ package azkaban.jobExecutor;
 import azkaban.utils.Props;
 
 /**
- * This interface defines a Raw Job interface. Each job defines
- * <ul>
- * <li>Job Type : {HADOOP, UNIX, JAVA, SUCCESS_TEST, CONTROLLER}</li>
- * <li>Job ID/Name : {String}</li>
- * <li>Arguments: Key/Value Map for Strings</li>
- * </ul>
+ * This interface defines a Raw Job interface. Each job defines <ul> <li>Job Type : {HADOOP, UNIX,
+ * JAVA, SUCCESS_TEST, CONTROLLER}</li> <li>Job ID/Name : {String}</li> <li>Arguments: Key/Value Map
+ * for Strings</li> </ul>
  *
  * A job is required to have a constructor Job(String jobId, Props props)
  */
@@ -37,8 +34,8 @@ public interface Job {
   public String getId();
 
   /**
-   * Run the job. In general this method can only be run once. Must either
-   * succeed or throw an exception.
+   * Run the job. In general this method can only be run once. Must either succeed or throw an
+   * exception.
    */
   public void run() throws Exception;
 
@@ -50,8 +47,7 @@ public interface Job {
   public void cancel() throws Exception;
 
   /**
-   * Returns a progress report between [0 - 1.0] to indicate the percentage
-   * complete
+   * Returns a progress report between [0 - 1.0] to indicate the percentage complete
    *
    * @throws Exception If getting progress fails
    */
diff --git a/azkaban-common/src/main/java/azkaban/jobExecutor/LongArgJob.java b/azkaban-common/src/main/java/azkaban/jobExecutor/LongArgJob.java
index 53488b0..405da38 100644
--- a/azkaban-common/src/main/java/azkaban/jobExecutor/LongArgJob.java
+++ b/azkaban-common/src/main/java/azkaban/jobExecutor/LongArgJob.java
@@ -26,8 +26,8 @@ import java.util.concurrent.TimeUnit;
 import org.apache.log4j.Logger;
 
 /**
- * A job that passes all the job properties as command line arguments in "long"
- * format, e.g. --key1 value1 --key2 value2 ...
+ * A job that passes all the job properties as command line arguments in "long" format, e.g. --key1
+ * value1 --key2 value2 ...
  */
 public abstract class LongArgJob extends AbstractProcessJob {
 
@@ -101,8 +101,8 @@ public abstract class LongArgJob extends AbstractProcessJob {
   }
 
   /**
-   * This gives access to the process builder used to construct the process. An
-   * overriding class can use this to add to the command being executed.
+   * This gives access to the process builder used to construct the process. An overriding class can
+   * use this to add to the command being executed.
    */
   protected AzkabanProcessBuilder getBuilder() {
     return this.builder;
diff --git a/azkaban-common/src/main/java/azkaban/jobExecutor/ProcessJob.java b/azkaban-common/src/main/java/azkaban/jobExecutor/ProcessJob.java
index 0a4a1fc..d059eeb 100644
--- a/azkaban-common/src/main/java/azkaban/jobExecutor/ProcessJob.java
+++ b/azkaban-common/src/main/java/azkaban/jobExecutor/ProcessJob.java
@@ -77,8 +77,8 @@ public class ProcessJob extends AbstractProcessJob {
   }
 
   /**
-   * Splits the command into a unix like command line structure. Quotes and
-   * single quotes are treated as nested strings.
+   * Splits the command into a unix like command line structure. Quotes and single quotes are
+   * treated as nested strings.
    */
   public static String[] partitionCommandLine(final String command) {
     final ArrayList<String> commands = new ArrayList<>();
@@ -360,8 +360,8 @@ public class ProcessJob extends AbstractProcessJob {
   }
 
   /**
-   * Checks to see if user has write access to current working directory which many users
-   * need for their jobs to store temporary data/jars on the executor.
+   * Checks to see if user has write access to current working directory which many users need for
+   * their jobs to store temporary data/jars on the executor.
    *
    * Accomplishes this by using execute-as-user to try to create an empty file in the cwd.
    *
@@ -379,8 +379,8 @@ public class ProcessJob extends AbstractProcessJob {
   }
 
   /**
-   * Changes permission on current working directory so that the directory is owned by the user
-   * and the group remains azkaban.
+   * Changes permission on current working directory so that the directory is owned by the user and
+   * the group remains azkaban.
    *
    * Leverages execute-as-user with "root" as the user to run the command.
    *
@@ -401,9 +401,9 @@ public class ProcessJob extends AbstractProcessJob {
   }
 
   /**
-   * This is used to get the min/max memory size requirement by processes.
-   * SystemMemoryInfo can use the info to determine if the memory request can be
-   * fulfilled. For Java process, this should be Xms/Xmx setting.
+   * This is used to get the min/max memory size requirement by processes. SystemMemoryInfo can use
+   * the info to determine if the memory request can be fulfilled. For Java process, this should be
+   * Xms/Xmx setting.
    *
    * @return pair of min/max memory size
    */
diff --git a/azkaban-common/src/main/java/azkaban/jobExecutor/ScriptJob.java b/azkaban-common/src/main/java/azkaban/jobExecutor/ScriptJob.java
index 845d13b..f41d0bf 100644
--- a/azkaban-common/src/main/java/azkaban/jobExecutor/ScriptJob.java
+++ b/azkaban-common/src/main/java/azkaban/jobExecutor/ScriptJob.java
@@ -21,9 +21,8 @@ import com.google.common.collect.ImmutableSet;
 import org.apache.log4j.Logger;
 
 /**
- * A script job issues a command of the form [EXECUTABLE] [SCRIPT] --key1 val1
- * ... --key2 val2 executable -- the interpretor command to execute script --
- * the script to pass in (requried)
+ * A script job issues a command of the form [EXECUTABLE] [SCRIPT] --key1 val1 ... --key2 val2
+ * executable -- the interpretor command to execute script -- the script to pass in (requried)
  */
 public class ScriptJob extends LongArgJob {
 
diff --git a/azkaban-common/src/main/java/azkaban/jobExecutor/utils/process/AzkabanProcess.java b/azkaban-common/src/main/java/azkaban/jobExecutor/utils/process/AzkabanProcess.java
index 4175d58..480f43a 100644
--- a/azkaban-common/src/main/java/azkaban/jobExecutor/utils/process/AzkabanProcess.java
+++ b/azkaban-common/src/main/java/azkaban/jobExecutor/utils/process/AzkabanProcess.java
@@ -34,8 +34,7 @@ import org.apache.log4j.Logger;
 /**
  * An improved version of java.lang.Process.
  *
- * Output is read by separate threads to avoid deadlock and logged to log4j
- * loggers.
+ * Output is read by separate threads to avoid deadlock and logged to log4j loggers.
  */
 public class AzkabanProcess {
 
@@ -150,8 +149,7 @@ public class AzkabanProcess {
   /**
    * Await the start of this process
    *
-   * When this method returns, the job process has been created and a this.processId has been
-   * set.
+   * When this method returns, the job process has been created and a this.processId has been set.
    *
    * @throws InterruptedException if the thread is interrupted while waiting.
    */
diff --git a/azkaban-common/src/main/java/azkaban/jobtype/JobTypePluginSet.java b/azkaban-common/src/main/java/azkaban/jobtype/JobTypePluginSet.java
index ab5cf93..aebf22e 100644
--- a/azkaban-common/src/main/java/azkaban/jobtype/JobTypePluginSet.java
+++ b/azkaban-common/src/main/java/azkaban/jobtype/JobTypePluginSet.java
@@ -23,11 +23,11 @@ import java.util.Map;
 /**
  * Container for job type plugins
  *
- * This contains the jobClass objects, the properties for loading plugins, and
- * the properties given by default to the plugin.
+ * This contains the jobClass objects, the properties for loading plugins, and the properties given
+ * by default to the plugin.
  *
- * This class is not thread safe, so adding to this class should only be
- * populated and controlled by the JobTypeManager
+ * This class is not thread safe, so adding to this class should only be populated and controlled by
+ * the JobTypeManager
  */
 public class JobTypePluginSet {
 
@@ -94,8 +94,7 @@ public class JobTypePluginSet {
   }
 
   /**
-   * Get the properties that will be given to the plugin as default job
-   * properties.
+   * Get the properties that will be given to the plugin as default job properties.
    */
   public Props getPluginJobProps(final String jobTypeName) {
     return this.pluginJobPropsMap.get(jobTypeName);
diff --git a/azkaban-common/src/main/java/azkaban/metric/AbstractMetric.java b/azkaban-common/src/main/java/azkaban/metric/AbstractMetric.java
index 2427a4c..463ed34 100644
--- a/azkaban-common/src/main/java/azkaban/metric/AbstractMetric.java
+++ b/azkaban-common/src/main/java/azkaban/metric/AbstractMetric.java
@@ -96,10 +96,8 @@ public abstract class AbstractMetric<T> implements IMetric<T>, Cloneable {
   }
 
   /**
-   * Method used to notify manager for a tracking event.
-   * Metric is free to call this method as per implementation.
-   * Timer based or Azkaban events are the most common implementation
-   * {@inheritDoc}
+   * Method used to notify manager for a tracking event. Metric is free to call this method as per
+   * implementation. Timer based or Azkaban events are the most common implementation {@inheritDoc}
    *
    * @see azkaban.metric.IMetric#notifyManager()
    */
diff --git a/azkaban-common/src/main/java/azkaban/metric/GangliaMetricEmitter.java b/azkaban-common/src/main/java/azkaban/metric/GangliaMetricEmitter.java
index 34fce3a..cb2e487 100644
--- a/azkaban-common/src/main/java/azkaban/metric/GangliaMetricEmitter.java
+++ b/azkaban-common/src/main/java/azkaban/metric/GangliaMetricEmitter.java
@@ -50,8 +50,7 @@ public class GangliaMetricEmitter implements IMetricEmitter {
   }
 
   /**
-   * Report metric by executing command line interface of gmetrics
-   * {@inheritDoc}
+   * Report metric by executing command line interface of gmetrics {@inheritDoc}
    *
    * @see azkaban.metric.IMetricEmitter#reportMetric(azkaban.metric.IMetric)
    */
diff --git a/azkaban-common/src/main/java/azkaban/metric/inmemoryemitter/InMemoryMetricEmitter.java b/azkaban-common/src/main/java/azkaban/metric/inmemoryemitter/InMemoryMetricEmitter.java
index 66a66bf..e222c32 100644
--- a/azkaban-common/src/main/java/azkaban/metric/inmemoryemitter/InMemoryMetricEmitter.java
+++ b/azkaban-common/src/main/java/azkaban/metric/inmemoryemitter/InMemoryMetricEmitter.java
@@ -32,8 +32,8 @@ import org.apache.log4j.Logger;
 
 
 /**
- * Metric Emitter which maintains in memory snapshots of the metrics
- * This is also the default metric emitter and used by /stats servlet
+ * Metric Emitter which maintains in memory snapshots of the metrics This is also the default metric
+ * emitter and used by /stats servlet
  */
 public class InMemoryMetricEmitter implements IMetricEmitter {
 
@@ -83,8 +83,7 @@ public class InMemoryMetricEmitter implements IMetricEmitter {
   }
 
   /**
-   * Ingest metric in snapshot data structure while maintaining interval
-   * {@inheritDoc}
+   * Ingest metric in snapshot data structure while maintaining interval {@inheritDoc}
    *
    * @see azkaban.metric.IMetricEmitter#reportMetric(azkaban.metric.IMetric)
    */
@@ -231,8 +230,7 @@ public class InMemoryMetricEmitter implements IMetricEmitter {
   }
 
   /**
-   * Clear snapshot data structure
-   * {@inheritDoc}
+   * Clear snapshot data structure {@inheritDoc}
    *
    * @see azkaban.metric.IMetricEmitter#purgeAllData()
    */
diff --git a/azkaban-common/src/main/java/azkaban/metric/MetricReportManager.java b/azkaban-common/src/main/java/azkaban/metric/MetricReportManager.java
index ab71461..d29eba4 100644
--- a/azkaban-common/src/main/java/azkaban/metric/MetricReportManager.java
+++ b/azkaban-common/src/main/java/azkaban/metric/MetricReportManager.java
@@ -25,13 +25,10 @@ import org.apache.log4j.Logger;
 
 
 /**
- * Manager for access or updating metric related functionality of Azkaban
- * MetricManager is responsible all handling all action requests from statsServlet in Exec server
- * <p> Metric Manager 'has a' relationship with :-
- * <ul>
- * <li>all the metric Azkaban is tracking</li>
- * <li>all the emitters Azkaban is supposed to report metrics</li>
- * </ul></p>
+ * Manager for access or updating metric related functionality of Azkaban MetricManager is
+ * responsible all handling all action requests from statsServlet in Exec server <p> Metric Manager
+ * 'has a' relationship with :- <ul> <li>all the metric Azkaban is tracking</li> <li>all the
+ * emitters Azkaban is supposed to report metrics</li> </ul></p>
  */
 public class MetricReportManager {
 
@@ -44,14 +41,14 @@ public class MetricReportManager {
   private static volatile MetricReportManager instance = null;
   private static volatile boolean isManagerEnabled;
   /**
-   * List of all the metrics that Azkaban is tracking
-   * Manager is not concerned with type of metric as long as it honors IMetric contracts
+   * List of all the metrics that Azkaban is tracking Manager is not concerned with type of metric
+   * as long as it honors IMetric contracts
    */
   private final List<IMetric<?>> metrics;
   /**
-   * List of all the emitter listening all the metrics
-   * Manager is not concerned with how emitter is reporting value.
-   * Manager is only responsible to notify all emitters whenever an IMetric wants to be notified
+   * List of all the emitter listening all the metrics Manager is not concerned with how emitter is
+   * reporting value. Manager is only responsible to notify all emitters whenever an IMetric wants
+   * to be notified
    */
   private final List<IMetricEmitter> metricEmitters;
   private final ExecutorService executorService;
@@ -216,8 +213,7 @@ public class MetricReportManager {
   }
 
   /**
-   * Shutdown execution service
-   * {@inheritDoc}
+   * Shutdown execution service {@inheritDoc}
    *
    * @see java.lang.Object#finalize()
    */
diff --git a/azkaban-common/src/main/java/azkaban/metrics/CommonMetrics.java b/azkaban-common/src/main/java/azkaban/metrics/CommonMetrics.java
index 7a62928..7e91f7a 100644
--- a/azkaban-common/src/main/java/azkaban/metrics/CommonMetrics.java
+++ b/azkaban-common/src/main/java/azkaban/metrics/CommonMetrics.java
@@ -22,9 +22,9 @@ import com.google.inject.Singleton;
 import java.util.concurrent.atomic.AtomicLong;
 
 /**
- * This singleton class CommonMetrics is in charge of collecting varieties of metrics
- * which are accessed in both web and exec modules. That said, these metrics will be
- * exposed in both Web server and executor.
+ * This singleton class CommonMetrics is in charge of collecting varieties of metrics which are
+ * accessed in both web and exec modules. That said, these metrics will be exposed in both Web
+ * server and executor.
  */
 @Singleton
 public class CommonMetrics {
@@ -71,8 +71,8 @@ public class CommonMetrics {
   }
 
   /**
-   * Mark flowFailMeter when a flow is considered as FAILED.
-   * This method could be called by Web Server or Executor, as they both detect flow failure.
+   * Mark flowFailMeter when a flow is considered as FAILED. This method could be called by Web
+   * Server or Executor, as they both detect flow failure.
    */
   public void markFlowFail() {
     this.flowFailMeter.mark();
diff --git a/azkaban-common/src/main/java/azkaban/metrics/MetricsManager.java b/azkaban-common/src/main/java/azkaban/metrics/MetricsManager.java
index de6981e..b5760fc 100644
--- a/azkaban-common/src/main/java/azkaban/metrics/MetricsManager.java
+++ b/azkaban-common/src/main/java/azkaban/metrics/MetricsManager.java
@@ -57,8 +57,8 @@ public class MetricsManager {
   }
 
   /**
-   * A {@link Meter} measures the rate of events over time (e.g., “requests per second”).
-   * Here we track 1-minute moving averages.
+   * A {@link Meter} measures the rate of events over time (e.g., “requests per second”). Here we
+   * track 1-minute moving averages.
    */
   public Meter addMeter(final String name) {
     final Meter curr = this.registry.meter(name);
@@ -79,9 +79,8 @@ public class MetricsManager {
   }
 
   /**
-   * reporting metrics to remote metrics collector.
-   * Note: this method must be synchronized, since both web server and executor
-   * will call it during initialization.
+   * reporting metrics to remote metrics collector. Note: this method must be synchronized, since
+   * both web server and executor will call it during initialization.
    */
   public synchronized void startReporting(final String reporterName, final Props props) {
     final String metricsReporterClassName = props.get(CUSTOM_METRICS_REPORTER_CLASS_NAME);
@@ -98,7 +97,7 @@ public class MetricsManager {
         log.error("Encountered error while loading and instantiating "
             + metricsReporterClassName, e);
         throw new IllegalStateException("Encountered error while loading and instantiating "
-                + metricsReporterClassName, e);
+            + metricsReporterClassName, e);
       }
     } else {
       log.error(String.format("No value for property: %s or %s was found",
diff --git a/azkaban-common/src/main/java/azkaban/project/JdbcProjectHandlerSet.java b/azkaban-common/src/main/java/azkaban/project/JdbcProjectHandlerSet.java
index 16b9bf0..f6fa3e1 100644
--- a/azkaban-common/src/main/java/azkaban/project/JdbcProjectHandlerSet.java
+++ b/azkaban-common/src/main/java/azkaban/project/JdbcProjectHandlerSet.java
@@ -39,6 +39,7 @@ import org.apache.commons.dbutils.ResultSetHandler;
 class JdbcProjectHandlerSet {
 
   public static class ProjectResultHandler implements ResultSetHandler<List<Project>> {
+
     public static String SELECT_PROJECT_BY_NAME =
         "SELECT id, name, active, modified_time, create_time, version, last_modified_by, description, enc_type, settings_blob FROM projects WHERE name=?";
 
@@ -109,12 +110,15 @@ class JdbcProjectHandlerSet {
     }
   }
 
-  public static class ProjectPermissionsResultHandler implements ResultSetHandler<List<Triple<String, Boolean, Permission>>> {
+  public static class ProjectPermissionsResultHandler implements
+      ResultSetHandler<List<Triple<String, Boolean, Permission>>> {
+
     public static String SELECT_PROJECT_PERMISSION =
         "SELECT project_id, modified_time, name, permissions, isGroup FROM project_permissions WHERE project_id=?";
 
     @Override
-    public List<Triple<String, Boolean, Permission>> handle(final ResultSet rs) throws SQLException {
+    public List<Triple<String, Boolean, Permission>> handle(final ResultSet rs)
+        throws SQLException {
       if (!rs.next()) {
         return Collections.emptyList();
       }
@@ -134,6 +138,7 @@ class JdbcProjectHandlerSet {
   }
 
   public static class ProjectFlowsResultHandler implements ResultSetHandler<List<Flow>> {
+
     public static String SELECT_PROJECT_FLOW =
         "SELECT project_id, version, flow_id, modified_time, encoding_type, json FROM project_flows WHERE project_id=? AND version=? AND flow_id=?";
 
@@ -182,7 +187,9 @@ class JdbcProjectHandlerSet {
     }
   }
 
-  public static class ProjectPropertiesResultsHandler implements ResultSetHandler<List<Pair<String, Props>>> {
+  public static class ProjectPropertiesResultsHandler implements
+      ResultSetHandler<List<Pair<String, Props>>> {
+
     public static String SELECT_PROJECT_PROPERTY =
         "SELECT project_id, version, name, modified_time, encoding_type, property FROM project_properties WHERE project_id=? AND version=? AND name=?";
 
@@ -225,6 +232,7 @@ class JdbcProjectHandlerSet {
   }
 
   public static class ProjectLogsResultHandler implements ResultSetHandler<List<ProjectLogEvent>> {
+
     public static String SELECT_PROJECT_EVENTS_ORDER =
         "SELECT project_id, event_type, event_time, username, message FROM project_events WHERE project_id=? ORDER BY event_time DESC LIMIT ? OFFSET ?";
 
@@ -243,7 +251,8 @@ class JdbcProjectHandlerSet {
         final String message = rs.getString(5);
 
         final ProjectLogEvent event =
-            new ProjectLogEvent(projectId, ProjectLogEvent.EventType.fromInteger(eventType), eventTime, username,
+            new ProjectLogEvent(projectId, ProjectLogEvent.EventType.fromInteger(eventType),
+                eventTime, username,
                 message);
         events.add(event);
       } while (rs.next());
@@ -253,6 +262,7 @@ class JdbcProjectHandlerSet {
   }
 
   public static class ProjectFileChunkResultHandler implements ResultSetHandler<List<byte[]>> {
+
     public static String SELECT_PROJECT_CHUNKS_FILE =
         "SELECT project_id, version, chunk, size, file FROM project_files WHERE project_id=? AND version=? AND chunk >= ? AND chunk < ? ORDER BY chunk ASC";
 
@@ -273,7 +283,9 @@ class JdbcProjectHandlerSet {
     }
   }
 
-  public static class ProjectVersionResultHandler implements ResultSetHandler<List<ProjectFileHandler>> {
+  public static class ProjectVersionResultHandler implements
+      ResultSetHandler<List<ProjectFileHandler>> {
+
     public static String SELECT_PROJECT_VERSION =
         "SELECT project_id, version, upload_time, uploader, file_type, file_name, md5, num_chunks, resource_id "
             + "FROM project_versions WHERE project_id=? AND version=?";
@@ -297,7 +309,8 @@ class JdbcProjectHandlerSet {
         final String resourceId = rs.getString(9);
 
         final ProjectFileHandler handler =
-            new ProjectFileHandler(projectId, version, uploadTime, uploader, fileType, fileName, numChunks, md5,
+            new ProjectFileHandler(projectId, version, uploadTime, uploader, fileType, fileName,
+                numChunks, md5,
                 resourceId);
 
         handlers.add(handler);
@@ -308,6 +321,7 @@ class JdbcProjectHandlerSet {
   }
 
   public static class IntHandler implements ResultSetHandler<Integer> {
+
     public static String SELECT_LATEST_VERSION = "SELECT MAX(version) FROM project_versions WHERE project_id=?";
 
     @Override
diff --git a/azkaban-common/src/main/java/azkaban/project/JdbcProjectImpl.java b/azkaban-common/src/main/java/azkaban/project/JdbcProjectImpl.java
index 8e28396..99eef86 100644
--- a/azkaban-common/src/main/java/azkaban/project/JdbcProjectImpl.java
+++ b/azkaban-common/src/main/java/azkaban/project/JdbcProjectImpl.java
@@ -59,12 +59,13 @@ import org.apache.log4j.Logger;
 
 
 /**
- * This class implements ProjectLoader using new azkaban-db code to allow DB failover.
- * TODO kunkun-tang: This class is too long. In future, we should split {@link ProjectLoader} interface
+ * This class implements ProjectLoader using new azkaban-db code to allow DB failover. TODO
+ * kunkun-tang: This class is too long. In future, we should split {@link ProjectLoader} interface
  * and have multiple short class implementations.
  */
 @Singleton
 public class JdbcProjectImpl implements ProjectLoader {
+
   private static final Logger logger = Logger.getLogger(JdbcProjectImpl.class);
 
   private static final int CHUCK_SIZE = 1024 * 1024 * 10;
@@ -106,7 +107,8 @@ public class JdbcProjectImpl implements ProjectLoader {
     return projects;
   }
 
-  private void setProjectPermission(final Project project, final Triple<String, Boolean, Permission> perm) {
+  private void setProjectPermission(final Project project,
+      final Triple<String, Boolean, Permission> perm) {
     if (perm.getSecond()) {
       project.setGroupPermission(perm.getFirst(), perm.getThird());
     } else {
@@ -154,7 +156,8 @@ public class JdbcProjectImpl implements ProjectLoader {
       List<Project> projects = this.dbOperator
           .query(ProjectResultHandler.SELECT_ACTIVE_PROJECT_BY_NAME, handler, name);
       if (projects.isEmpty()) {
-        projects = this.dbOperator.query(ProjectResultHandler.SELECT_PROJECT_BY_NAME, handler, name);
+        projects = this.dbOperator
+            .query(ProjectResultHandler.SELECT_PROJECT_BY_NAME, handler, name);
         if (projects.isEmpty()) {
           throw new ProjectManagerException("No project with name " + name + " exists in db.");
         }
@@ -167,12 +170,14 @@ public class JdbcProjectImpl implements ProjectLoader {
       }
     } catch (final SQLException ex) {
       logger.error(ProjectResultHandler.SELECT_ACTIVE_PROJECT_BY_NAME + " failed.", ex);
-      throw new ProjectManagerException(ProjectResultHandler.SELECT_ACTIVE_PROJECT_BY_NAME + " failed.", ex);
+      throw new ProjectManagerException(
+          ProjectResultHandler.SELECT_ACTIVE_PROJECT_BY_NAME + " failed.", ex);
     }
     return project;
   }
 
-  private List<Triple<String, Boolean, Permission>> fetchPermissionsForProject(final Project project)
+  private List<Triple<String, Boolean, Permission>> fetchPermissionsForProject(
+      final Project project)
       throws ProjectManagerException {
     final ProjectPermissionsResultHandler permHander = new ProjectPermissionsResultHandler();
 
@@ -180,10 +185,12 @@ public class JdbcProjectImpl implements ProjectLoader {
     try {
       permissions =
           this.dbOperator
-              .query(ProjectPermissionsResultHandler.SELECT_PROJECT_PERMISSION, permHander, project.getId());
+              .query(ProjectPermissionsResultHandler.SELECT_PROJECT_PERMISSION, permHander,
+                  project.getId());
     } catch (final SQLException ex) {
       logger.error(ProjectPermissionsResultHandler.SELECT_PROJECT_PERMISSION + " failed.", ex);
-      throw new ProjectManagerException("Query for permissions for " + project.getName() + " failed.", ex);
+      throw new ProjectManagerException(
+          "Query for permissions for " + project.getName() + " failed.", ex);
     }
     return permissions;
   }
@@ -191,11 +198,11 @@ public class JdbcProjectImpl implements ProjectLoader {
   /**
    * Creates a Project in the db.
    *
-   * It will throw an exception if it finds an active project of the same name,
-   * or the SQL fails
+   * It will throw an exception if it finds an active project of the same name, or the SQL fails
    */
   @Override
-  public synchronized Project createNewProject(final String name, final String description, final User creator)
+  public synchronized Project createNewProject(final String name, final String description,
+      final User creator)
       throws ProjectManagerException {
     final ProjectResultHandler handler = new ProjectResultHandler();
 
@@ -204,7 +211,8 @@ public class JdbcProjectImpl implements ProjectLoader {
       final List<Project> projects = this.dbOperator
           .query(ProjectResultHandler.SELECT_ACTIVE_PROJECT_BY_NAME, handler, name);
       if (!projects.isEmpty()) {
-        throw new ProjectManagerException("Active project with name " + name + " already exists in db.");
+        throw new ProjectManagerException(
+            "Active project with name " + name + " already exists in db.");
       }
     } catch (final SQLException ex) {
       logger.error(ex);
@@ -215,8 +223,9 @@ public class JdbcProjectImpl implements ProjectLoader {
         "INSERT INTO projects ( name, active, modified_time, create_time, version, last_modified_by, description, enc_type, settings_blob) values (?,?,?,?,?,?,?,?,?)";
     final SQLTransaction<Integer> insertProject = transOperator -> {
       final long time = System.currentTimeMillis();
-      return transOperator.update(INSERT_PROJECT, name, true, time, time, null, creator.getUserId(), description,
-          this.defaultEncodingType.getNumVal(), null);
+      return transOperator
+          .update(INSERT_PROJECT, name, true, time, time, null, creator.getUserId(), description,
+              this.defaultEncodingType.getNumVal(), null);
     };
 
     // Insert project
@@ -227,17 +236,20 @@ public class JdbcProjectImpl implements ProjectLoader {
       }
     } catch (final SQLException ex) {
       logger.error(INSERT_PROJECT + " failed.", ex);
-      throw new ProjectManagerException("Insert project" + name + " for existing project failed. ", ex);
+      throw new ProjectManagerException("Insert project" + name + " for existing project failed. ",
+          ex);
     }
     return fetchProjectByName(name);
   }
 
   @Override
-  public void uploadProjectFile(final int projectId, final int version, final File localFile, final String uploader)
+  public void uploadProjectFile(final int projectId, final int version, final File localFile,
+      final String uploader)
       throws ProjectManagerException {
     final long startMs = System.currentTimeMillis();
-    logger.info(String.format("Uploading Project ID: %d file: %s [%d bytes]", projectId, localFile.getName(),
-        localFile.length()));
+    logger.info(String
+        .format("Uploading Project ID: %d file: %s [%d bytes]", projectId, localFile.getName(),
+            localFile.length()));
 
     /*
      * The below transaction uses one connection to do all operations. Ideally, we should commit
@@ -249,7 +261,8 @@ public class JdbcProjectImpl implements ProjectLoader {
     final SQLTransaction<Integer> uploadProjectFileTransaction = transOperator -> {
 
       /* Step 1: Update DB with new project info */
-      addProjectToProjectVersions(transOperator, projectId, version, localFile, uploader, computeHash(localFile), null);
+      addProjectToProjectVersions(transOperator, projectId, version, localFile, uploader,
+          computeHash(localFile), null);
       transOperator.getConnection().commit();
 
       /* Step 2: Upload File in chunks to DB */
@@ -268,7 +281,8 @@ public class JdbcProjectImpl implements ProjectLoader {
     }
 
     final long duration = (System.currentTimeMillis() - startMs) / 1000;
-    logger.info(String.format("Uploaded Project ID: %d file: %s [%d bytes] in %d sec", projectId, localFile.getName(),
+    logger.info(String.format("Uploaded Project ID: %d file: %s [%d bytes] in %d sec", projectId,
+        localFile.getName(),
         localFile.length(), duration));
   }
 
@@ -297,7 +311,8 @@ public class JdbcProjectImpl implements ProjectLoader {
 
     // when one transaction completes, it automatically commits.
     final SQLTransaction<Integer> transaction = transOperator -> {
-      addProjectToProjectVersions(transOperator, projectId, version, localFile, uploader, md5, resourceId);
+      addProjectToProjectVersions(transOperator, projectId, version, localFile, uploader, md5,
+          resourceId);
       return 1;
     };
     try {
@@ -311,21 +326,20 @@ public class JdbcProjectImpl implements ProjectLoader {
   /**
    * Insert a new version record to TABLE project_versions before uploading files.
    *
-   * The reason for this operation:
-   * When error chunking happens in remote mysql server, incomplete file data remains
-   * in DB, and an SQL exception is thrown. If we don't have this operation before uploading file,
-   * the SQL exception prevents AZ from creating the new version record in Table project_versions.
-   * However, the Table project_files still reserve the incomplete files, which causes troubles
-   * when uploading a new file: Since the version in TABLE project_versions is still old, mysql will stop
-   * inserting new files to db.
+   * The reason for this operation: When error chunking happens in remote mysql server, incomplete
+   * file data remains in DB, and an SQL exception is thrown. If we don't have this operation before
+   * uploading file, the SQL exception prevents AZ from creating the new version record in Table
+   * project_versions. However, the Table project_files still reserve the incomplete files, which
+   * causes troubles when uploading a new file: Since the version in TABLE project_versions is still
+   * old, mysql will stop inserting new files to db.
    *
-   * Why this operation is safe:
-   * When AZ uploads a new zip file, it always fetches the latest version proj_v from TABLE project_version,
-   * proj_v+1 will be used as the new version for the uploading files.
+   * Why this operation is safe: When AZ uploads a new zip file, it always fetches the latest
+   * version proj_v from TABLE project_version, proj_v+1 will be used as the new version for the
+   * uploading files.
    *
-   * Assume error chunking happens on day 1. proj_v is created for this bad file (old file version + 1).
-   * When we upload a new project zip in day2, new file in day 2 will use the new version (proj_v + 1).
-   * When file uploading completes, AZ will clean all old chunks in DB afterward.
+   * Assume error chunking happens on day 1. proj_v is created for this bad file (old file version +
+   * 1). When we upload a new project zip in day2, new file in day 2 will use the new version
+   * (proj_v + 1). When file uploading completes, AZ will clean all old chunks in DB afterward.
    */
   private void addProjectToProjectVersions(
       final DatabaseTransOperator transOperator,
@@ -348,13 +362,15 @@ public class JdbcProjectImpl implements ProjectLoader {
       transOperator.update(INSERT_PROJECT_VERSION, projectId, version, updateTime, uploader,
           Files.getFileExtension(localFile.getName()), localFile.getName(), md5, 0, resourceId);
     } catch (final SQLException e) {
-      final String msg = String.format("Error initializing project id: %d version: %d ", projectId, version);
+      final String msg = String
+          .format("Error initializing project id: %d version: %d ", projectId, version);
       logger.error(msg, e);
       throw new ProjectManagerException(msg, e);
     }
   }
 
-  private int uploadFileInChunks(final DatabaseTransOperator transOperator, final int projectId, final int version, final File localFile)
+  private int uploadFileInChunks(final DatabaseTransOperator transOperator, final int projectId,
+      final int version, final File localFile)
       throws ProjectManagerException {
 
     // Really... I doubt we'll get a > 2gig file. So int casting it is!
@@ -394,7 +410,9 @@ public class JdbcProjectImpl implements ProjectLoader {
       }
     } catch (final IOException e) {
       throw new ProjectManagerException(
-          String.format("Error chunking file. projectId: %d, version: %d, file:%s[%d bytes], chunk: %d", projectId,
+          String.format(
+              "Error chunking file. projectId: %d, version: %d, file:%s[%d bytes], chunk: %d",
+              projectId,
               version, localFile.getName(), localFile.length(), chunk));
     } finally {
       IOUtils.closeQuietly(bufferedStream);
@@ -405,7 +423,8 @@ public class JdbcProjectImpl implements ProjectLoader {
   /**
    * we update num_chunks's actual number to db here.
    */
-  private void updateChunksInProjectVersions(final DatabaseTransOperator transOperator, final int projectId, final int version, final int chunk)
+  private void updateChunksInProjectVersions(final DatabaseTransOperator transOperator,
+      final int projectId, final int version, final int chunk)
       throws ProjectManagerException {
 
     final String UPDATE_PROJECT_NUM_CHUNKS =
@@ -415,7 +434,8 @@ public class JdbcProjectImpl implements ProjectLoader {
       transOperator.getConnection().commit();
     } catch (final SQLException e) {
       logger.error("Error updating project " + projectId + " : chunk_num " + chunk, e);
-      throw new ProjectManagerException("Error updating project " + projectId + " : chunk_num " + chunk, e);
+      throw new ProjectManagerException(
+          "Error updating project " + projectId + " : chunk_num " + chunk, e);
     }
   }
 
@@ -425,19 +445,22 @@ public class JdbcProjectImpl implements ProjectLoader {
     try {
       final List<ProjectFileHandler> projectFiles =
           this.dbOperator
-              .query(ProjectVersionResultHandler.SELECT_PROJECT_VERSION, pfHandler, projectId, version);
+              .query(ProjectVersionResultHandler.SELECT_PROJECT_VERSION, pfHandler, projectId,
+                  version);
       if (projectFiles == null || projectFiles.isEmpty()) {
         return null;
       }
       return projectFiles.get(0);
     } catch (final SQLException ex) {
       logger.error("Query for uploaded file for project id " + projectId + " failed.", ex);
-      throw new ProjectManagerException("Query for uploaded file for project id " + projectId + " failed.", ex);
+      throw new ProjectManagerException(
+          "Query for uploaded file for project id " + projectId + " failed.", ex);
     }
   }
 
   @Override
-  public ProjectFileHandler getUploadedFile(final int projectId, final int version) throws ProjectManagerException {
+  public ProjectFileHandler getUploadedFile(final int projectId, final int version)
+      throws ProjectManagerException {
     final ProjectFileHandler projHandler = fetchProjectMetaData(projectId, version);
     if (projHandler == null) {
       return null;
@@ -447,7 +470,8 @@ public class JdbcProjectImpl implements ProjectLoader {
     File file;
     try {
       try {
-        file = File.createTempFile(projHandler.getFileName(), String.valueOf(version), this.tempDir);
+        file = File
+            .createTempFile(projHandler.getFileName(), String.valueOf(version), this.tempDir);
         bStream = new BufferedOutputStream(new FileOutputStream(file));
       } catch (final IOException e) {
         throw new ProjectManagerException("Error creating temp file for stream.");
@@ -461,11 +485,13 @@ public class JdbcProjectImpl implements ProjectLoader {
         List<byte[]> data = null;
         try {
           data = this.dbOperator
-              .query(ProjectFileChunkResultHandler.SELECT_PROJECT_CHUNKS_FILE, chunkHandler, projectId,
-              version, fromChunk, toChunk);
+              .query(ProjectFileChunkResultHandler.SELECT_PROJECT_CHUNKS_FILE, chunkHandler,
+                  projectId,
+                  version, fromChunk, toChunk);
         } catch (final SQLException e) {
           logger.error(e);
-          throw new ProjectManagerException("Query for uploaded file for " + projectId + " failed.", e);
+          throw new ProjectManagerException("Query for uploaded file for " + projectId + " failed.",
+              e);
         }
 
         try {
@@ -503,7 +529,8 @@ public class JdbcProjectImpl implements ProjectLoader {
   }
 
   @Override
-  public void changeProjectVersion(final Project project, final int version, final String user) throws ProjectManagerException {
+  public void changeProjectVersion(final Project project, final int version, final String user)
+      throws ProjectManagerException {
     final long timestamp = System.currentTimeMillis();
     try {
       final String UPDATE_PROJECT_VERSION =
@@ -515,12 +542,14 @@ public class JdbcProjectImpl implements ProjectLoader {
       project.setLastModifiedUser(user);
     } catch (final SQLException e) {
       logger.error("Error updating switching project version " + project.getName(), e);
-      throw new ProjectManagerException("Error updating switching project version " + project.getName(), e);
+      throw new ProjectManagerException(
+          "Error updating switching project version " + project.getName(), e);
     }
   }
 
   @Override
-  public void updatePermission(final Project project, final String name, final Permission perm, final boolean isGroup)
+  public void updatePermission(final Project project, final String name, final Permission perm,
+      final boolean isGroup)
       throws ProjectManagerException {
 
     final long updateTime = System.currentTimeMillis();
@@ -529,15 +558,20 @@ public class JdbcProjectImpl implements ProjectLoader {
         final String INSERT_PROJECT_PERMISSION =
             "INSERT INTO project_permissions (project_id, modified_time, name, permissions, isGroup) values (?,?,?,?,?)"
                 + "ON DUPLICATE KEY UPDATE modified_time = VALUES(modified_time), permissions = VALUES(permissions)";
-        this.dbOperator.update(INSERT_PROJECT_PERMISSION, project.getId(), updateTime, name, perm.toFlags(), isGroup);
+        this.dbOperator
+            .update(INSERT_PROJECT_PERMISSION, project.getId(), updateTime, name, perm.toFlags(),
+                isGroup);
       } else {
         final String MERGE_PROJECT_PERMISSION =
             "MERGE INTO project_permissions (project_id, modified_time, name, permissions, isGroup) KEY (project_id, name) values (?,?,?,?,?)";
-        this.dbOperator.update(MERGE_PROJECT_PERMISSION, project.getId(), updateTime, name, perm.toFlags(), isGroup);
+        this.dbOperator
+            .update(MERGE_PROJECT_PERMISSION, project.getId(), updateTime, name, perm.toFlags(),
+                isGroup);
       }
     } catch (final SQLException ex) {
       logger.error("Error updating project permission", ex);
-      throw new ProjectManagerException("Error updating project " + project.getName() + " permissions for " + name, ex);
+      throw new ProjectManagerException(
+          "Error updating project " + project.getName() + " permissions for " + name, ex);
     }
 
     if (isGroup) {
@@ -560,7 +594,8 @@ public class JdbcProjectImpl implements ProjectLoader {
     return data;
   }
 
-  private void updateProjectSettings(final Project project, final EncodingType encType) throws ProjectManagerException {
+  private void updateProjectSettings(final Project project, final EncodingType encType)
+      throws ProjectManagerException {
     final String UPDATE_PROJECT_SETTINGS = "UPDATE projects SET enc_type=?, settings_blob=? WHERE id=?";
 
     final String json = JSONUtils.toJSON(project.toObject());
@@ -582,14 +617,16 @@ public class JdbcProjectImpl implements ProjectLoader {
   }
 
   @Override
-  public void removePermission(final Project project, final String name, final boolean isGroup) throws ProjectManagerException {
+  public void removePermission(final Project project, final String name, final boolean isGroup)
+      throws ProjectManagerException {
     final String DELETE_PROJECT_PERMISSION =
         "DELETE FROM project_permissions WHERE project_id=? AND name=? AND isGroup=?";
     try {
       this.dbOperator.update(DELETE_PROJECT_PERMISSION, project.getId(), name, isGroup);
     } catch (final SQLException e) {
       logger.error("remove Permission failed.", e);
-      throw new ProjectManagerException("Error deleting project " + project.getName() + " permissions for " + name, e);
+      throw new ProjectManagerException(
+          "Error deleting project " + project.getName() + " permissions for " + name, e);
     }
 
     if (isGroup) {
@@ -600,7 +637,8 @@ public class JdbcProjectImpl implements ProjectLoader {
   }
 
   @Override
-  public List<Triple<String, Boolean, Permission>> getProjectPermissions(final Project project) throws ProjectManagerException {
+  public List<Triple<String, Boolean, Permission>> getProjectPermissions(final Project project)
+      throws ProjectManagerException {
     return fetchPermissionsForProject(project);
   }
 
@@ -609,7 +647,8 @@ public class JdbcProjectImpl implements ProjectLoader {
    * We should rewrite the code to follow the literal meanings.
    */
   @Override
-  public void removeProject(final Project project, final String user) throws ProjectManagerException {
+  public void removeProject(final Project project, final String user)
+      throws ProjectManagerException {
 
     final long updateTime = System.currentTimeMillis();
     final String UPDATE_INACTIVE_PROJECT =
@@ -623,13 +662,15 @@ public class JdbcProjectImpl implements ProjectLoader {
   }
 
   @Override
-  public boolean postEvent(final Project project, final EventType type, final String user, final String message) {
+  public boolean postEvent(final Project project, final EventType type, final String user,
+      final String message) {
     final String INSERT_PROJECT_EVENTS =
         "INSERT INTO project_events (project_id, event_type, event_time, username, message) values (?,?,?,?,?)";
     final long updateTime = System.currentTimeMillis();
     try {
       this.dbOperator
-          .update(INSERT_PROJECT_EVENTS, project.getId(), type.getNumVal(), updateTime, user, message);
+          .update(INSERT_PROJECT_EVENTS, project.getId(), type.getNumVal(), updateTime, user,
+              message);
     } catch (final SQLException e) {
       logger.error("post event failed,", e);
       return false;
@@ -638,13 +679,15 @@ public class JdbcProjectImpl implements ProjectLoader {
   }
 
   @Override
-  public List<ProjectLogEvent> getProjectEvents(final Project project, final int num, final int skip) throws ProjectManagerException {
+  public List<ProjectLogEvent> getProjectEvents(final Project project, final int num,
+      final int skip) throws ProjectManagerException {
     final ProjectLogsResultHandler logHandler = new ProjectLogsResultHandler();
     List<ProjectLogEvent> events = null;
     try {
       events = this.dbOperator
-          .query(ProjectLogsResultHandler.SELECT_PROJECT_EVENTS_ORDER, logHandler, project.getId(), num,
-          skip);
+          .query(ProjectLogsResultHandler.SELECT_PROJECT_EVENTS_ORDER, logHandler, project.getId(),
+              num,
+              skip);
     } catch (final SQLException e) {
       logger.error("Error getProjectEvents, project " + project.getName(), e);
       throw new ProjectManagerException("Error getProjectEvents, project " + project.getName(), e);
@@ -654,18 +697,21 @@ public class JdbcProjectImpl implements ProjectLoader {
   }
 
   @Override
-  public void updateDescription(final Project project, final String description, final String user) throws ProjectManagerException {
+  public void updateDescription(final Project project, final String description, final String user)
+      throws ProjectManagerException {
     final String UPDATE_PROJECT_DESCRIPTION =
         "UPDATE projects SET description=?,modified_time=?,last_modified_by=? WHERE id=?";
     final long updateTime = System.currentTimeMillis();
     try {
-      this.dbOperator.update(UPDATE_PROJECT_DESCRIPTION, description, updateTime, user, project.getId());
+      this.dbOperator
+          .update(UPDATE_PROJECT_DESCRIPTION, description, updateTime, user, project.getId());
       project.setDescription(description);
       project.setLastModifiedTimestamp(updateTime);
       project.setLastModifiedUser(user);
     } catch (final SQLException e) {
       logger.error(e);
-      throw new ProjectManagerException("Error update Description, project " + project.getName(), e);
+      throw new ProjectManagerException("Error update Description, project " + project.getName(),
+          e);
     }
   }
 
@@ -676,12 +722,14 @@ public class JdbcProjectImpl implements ProjectLoader {
       return this.dbOperator.query(IntHandler.SELECT_LATEST_VERSION, handler, project.getId());
     } catch (final SQLException e) {
       logger.error(e);
-      throw new ProjectManagerException("Error marking project " + project.getName() + " as inactive", e);
+      throw new ProjectManagerException(
+          "Error marking project " + project.getName() + " as inactive", e);
     }
   }
 
   @Override
-  public void uploadFlows(final Project project, final int version, final Collection<Flow> flows) throws ProjectManagerException {
+  public void uploadFlows(final Project project, final int version, final Collection<Flow> flows)
+      throws ProjectManagerException {
     // We do one at a time instead of batch... because well, the batch could be
     // large.
     logger.info("Uploading flows");
@@ -695,7 +743,8 @@ public class JdbcProjectImpl implements ProjectLoader {
   }
 
   @Override
-  public void uploadFlow(final Project project, final int version, final Flow flow) throws ProjectManagerException {
+  public void uploadFlow(final Project project, final int version, final Flow flow)
+      throws ProjectManagerException {
     logger.info("Uploading flow " + flow.getId());
     try {
       uploadFlow(project, version, flow, this.defaultEncodingType);
@@ -705,7 +754,8 @@ public class JdbcProjectImpl implements ProjectLoader {
   }
 
   @Override
-  public void updateFlow(final Project project, final int version, final Flow flow) throws ProjectManagerException {
+  public void updateFlow(final Project project, final int version, final Flow flow)
+      throws ProjectManagerException {
     logger.info("Uploading flow " + flow.getId());
     try {
       final String json = JSONUtils.toJSON(flow.toObject());
@@ -715,7 +765,8 @@ public class JdbcProjectImpl implements ProjectLoader {
           "UPDATE project_flows SET encoding_type=?,json=? WHERE project_id=? AND version=? AND flow_id=?";
       try {
         this.dbOperator
-            .update(UPDATE_FLOW, this.defaultEncodingType.getNumVal(), data, project.getId(), version, flow.getId());
+            .update(UPDATE_FLOW, this.defaultEncodingType.getNumVal(), data, project.getId(),
+                version, flow.getId());
       } catch (final SQLException e) {
         logger.error("Error inserting flow", e);
         throw new ProjectManagerException("Error inserting flow " + flow.getId(), e);
@@ -725,7 +776,8 @@ public class JdbcProjectImpl implements ProjectLoader {
     }
   }
 
-  private void uploadFlow(final Project project, final int version, final Flow flow, final EncodingType encType)
+  private void uploadFlow(final Project project, final int version, final Flow flow,
+      final EncodingType encType)
       throws ProjectManagerException, IOException {
     final String json = JSONUtils.toJSON(flow.toObject());
     final byte[] data = convertJsonToBytes(encType, json);
@@ -734,8 +786,9 @@ public class JdbcProjectImpl implements ProjectLoader {
     final String INSERT_FLOW =
         "INSERT INTO project_flows (project_id, version, flow_id, modified_time, encoding_type, json) values (?,?,?,?,?,?)";
     try {
-      this.dbOperator.update(INSERT_FLOW, project.getId(), version, flow.getId(), System.currentTimeMillis(),
-          encType.getNumVal(), data);
+      this.dbOperator
+          .update(INSERT_FLOW, project.getId(), version, flow.getId(), System.currentTimeMillis(),
+              encType.getNumVal(), data);
     } catch (final SQLException e) {
       logger.error("Error inserting flow", e);
       throw new ProjectManagerException("Error inserting flow " + flow.getId(), e);
@@ -754,16 +807,18 @@ public class JdbcProjectImpl implements ProjectLoader {
     try {
       flows = this.dbOperator
           .query(ProjectFlowsResultHandler.SELECT_ALL_PROJECT_FLOWS, handler, project.getId(),
-          project.getVersion());
+              project.getVersion());
     } catch (final SQLException e) {
       throw new ProjectManagerException(
-          "Error fetching flows from project " + project.getName() + " version " + project.getVersion(), e);
+          "Error fetching flows from project " + project.getName() + " version " + project
+              .getVersion(), e);
     }
     return flows;
   }
 
   @Override
-  public void uploadProjectProperties(final Project project, final List<Props> properties) throws ProjectManagerException {
+  public void uploadProjectProperties(final Project project, final List<Props> properties)
+      throws ProjectManagerException {
     for (final Props props : properties) {
       try {
         uploadProjectProperty(project, props.getSource(), props);
@@ -774,7 +829,8 @@ public class JdbcProjectImpl implements ProjectLoader {
   }
 
   @Override
-  public void uploadProjectProperty(final Project project, final Props props) throws ProjectManagerException {
+  public void uploadProjectProperty(final Project project, final Props props)
+      throws ProjectManagerException {
     try {
       uploadProjectProperty(project, props.getSource(), props);
     } catch (final IOException e) {
@@ -783,7 +839,8 @@ public class JdbcProjectImpl implements ProjectLoader {
   }
 
   @Override
-  public void updateProjectProperty(final Project project, final Props props) throws ProjectManagerException {
+  public void updateProjectProperty(final Project project, final Props props)
+      throws ProjectManagerException {
     try {
       updateProjectProperty(project, props.getSource(), props);
     } catch (final IOException e) {
@@ -813,7 +870,8 @@ public class JdbcProjectImpl implements ProjectLoader {
 
     final byte[] propsData = getBytes(props);
     try {
-      this.dbOperator.update(INSERT_PROPERTIES, project.getId(), project.getVersion(), name, System.currentTimeMillis(),
+      this.dbOperator.update(INSERT_PROPERTIES, project.getId(), project.getVersion(), name,
+          System.currentTimeMillis(),
           this.defaultEncodingType.getNumVal(), propsData);
     } catch (final SQLException e) {
       throw new ProjectManagerException(
@@ -832,38 +890,45 @@ public class JdbcProjectImpl implements ProjectLoader {
   }
 
   @Override
-  public Props fetchProjectProperty(final int projectId, final int projectVer, final String propsName) throws ProjectManagerException {
+  public Props fetchProjectProperty(final int projectId, final int projectVer,
+      final String propsName) throws ProjectManagerException {
 
     final ProjectPropertiesResultsHandler handler = new ProjectPropertiesResultsHandler();
     try {
       final List<Pair<String, Props>> properties =
           this.dbOperator
-              .query(ProjectPropertiesResultsHandler.SELECT_PROJECT_PROPERTY, handler, projectId, projectVer,
-              propsName);
+              .query(ProjectPropertiesResultsHandler.SELECT_PROJECT_PROPERTY, handler, projectId,
+                  projectVer,
+                  propsName);
 
       if (properties == null || properties.isEmpty()) {
-        logger.warn("Project " + projectId + " version " + projectVer + " property " + propsName + " is empty.");
+        logger.warn("Project " + projectId + " version " + projectVer + " property " + propsName
+            + " is empty.");
         return null;
       }
 
       return properties.get(0).getSecond();
     } catch (final SQLException e) {
-      logger.error("Error fetching property " + propsName + " Project " + projectId + " version " + projectVer, e);
+      logger.error("Error fetching property " + propsName + " Project " + projectId + " version "
+          + projectVer, e);
       throw new ProjectManagerException("Error fetching property " + propsName, e);
     }
   }
 
   @Override
-  public Props fetchProjectProperty(final Project project, final String propsName) throws ProjectManagerException {
+  public Props fetchProjectProperty(final Project project, final String propsName)
+      throws ProjectManagerException {
     return fetchProjectProperty(project.getId(), project.getVersion(), propsName);
   }
 
   @Override
-  public Map<String, Props> fetchProjectProperties(final int projectId, final int version) throws ProjectManagerException {
+  public Map<String, Props> fetchProjectProperties(final int projectId, final int version)
+      throws ProjectManagerException {
 
     try {
-      final List<Pair<String, Props>> properties = this.dbOperator.query(ProjectPropertiesResultsHandler.SELECT_PROJECT_PROPERTIES,
-          new ProjectPropertiesResultsHandler(), projectId, version);
+      final List<Pair<String, Props>> properties = this.dbOperator
+          .query(ProjectPropertiesResultsHandler.SELECT_PROJECT_PROPERTIES,
+              new ProjectPropertiesResultsHandler(), projectId, version);
       if (properties == null || properties.isEmpty()) {
         return null;
       }
@@ -879,7 +944,8 @@ public class JdbcProjectImpl implements ProjectLoader {
   }
 
   @Override
-  public void cleanOlderProjectVersion(final int projectId, final int version) throws ProjectManagerException {
+  public void cleanOlderProjectVersion(final int projectId, final int version)
+      throws ProjectManagerException {
     final String DELETE_FLOW = "DELETE FROM project_flows WHERE project_id=? AND version<?";
     final String DELETE_PROPERTIES = "DELETE FROM project_properties WHERE project_id=? AND version<?";
     final String DELETE_PROJECT_FILES = "DELETE FROM project_files WHERE project_id=? AND version<?";
diff --git a/azkaban-common/src/main/java/azkaban/project/ProjectLoader.java b/azkaban-common/src/main/java/azkaban/project/ProjectLoader.java
index 9245e06..59806c1 100644
--- a/azkaban-common/src/main/java/azkaban/project/ProjectLoader.java
+++ b/azkaban-common/src/main/java/azkaban/project/ProjectLoader.java
@@ -45,13 +45,11 @@ public interface ProjectLoader {
   Project fetchProjectByName(String name) throws ProjectManagerException;
 
   /**
-   * Should create an empty project with the given name and user and adds it to
-   * the data store. It will auto assign a unique id for this project if
-   * successful.
+   * Should create an empty project with the given name and user and adds it to the data store. It
+   * will auto assign a unique id for this project if successful.
    *
-   * If an active project of the same name exists, it will throw an exception.
-   * If the name and description of the project exceeds the store's constraints,
-   * it will throw an exception.
+   * If an active project of the same name exists, it will throw an exception. If the name and
+   * description of the project exceeds the store's constraints, it will throw an exception.
    *
    * @throws ProjectManagerException if an active project of the same name exists.
    */
@@ -65,9 +63,8 @@ public interface ProjectLoader {
       throws ProjectManagerException;
 
   /**
-   * Adds and updates the user permissions. Does not check if the user is valid.
-   * If the permission doesn't exist, it adds. If the permission exists, it
-   * updates.
+   * Adds and updates the user permissions. Does not check if the user is valid. If the permission
+   * doesn't exist, it adds. If the permission exists, it updates.
    */
   void updatePermission(Project project, String name, Permission perm,
       boolean isGroup) throws ProjectManagerException;
@@ -82,8 +79,7 @@ public interface ProjectLoader {
       throws ProjectManagerException;
 
   /**
-   * Stores logs for a particular project. Will soft fail rather than throw
-   * exception.
+   * Stores logs for a particular project. Will soft fail rather than throw exception.
    *
    * @param message return true if the posting was success.
    */
diff --git a/azkaban-common/src/main/java/azkaban/project/ProjectManager.java b/azkaban-common/src/main/java/azkaban/project/ProjectManager.java
index 51852a3..1b8c986 100644
--- a/azkaban-common/src/main/java/azkaban/project/ProjectManager.java
+++ b/azkaban-common/src/main/java/azkaban/project/ProjectManager.java
@@ -190,8 +190,6 @@ public class ProjectManager {
 
   /**
    * Checks if a project is active using project_name
-   *
-   * @param name
    */
   public Boolean isActiveProject(final String name) {
     return this.projectsByName.containsKey(name);
@@ -199,19 +197,13 @@ public class ProjectManager {
 
   /**
    * Checks if a project is active using project_id
-   *
-   * @param name
    */
   public Boolean isActiveProject(final int id) {
     return this.projectsById.containsKey(id);
   }
 
   /**
-   * fetch active project from cache and inactive projects from db by
-   * project_name
-   *
-   * @param name
-   * @return
+   * fetch active project from cache and inactive projects from db by project_name
    */
   public Project getProject(final String name) {
     Project fetchedProject = null;
@@ -228,11 +220,7 @@ public class ProjectManager {
   }
 
   /**
-   * fetch active project from cache and inactive projects from db by
-   * project_id
-   *
-   * @param id
-   * @return
+   * fetch active project from cache and inactive projects from db by project_id
    */
   public Project getProject(final int id) {
     Project fetchedProject = null;
@@ -294,13 +282,8 @@ public class ProjectManager {
   }
 
   /**
-   * Permanently delete all project files and properties data for all versions
-   * of a project and log event in project_events table
-   *
-   * @param project
-   * @param deleter
-   * @return
-   * @throws ProjectManagerException
+   * Permanently delete all project files and properties data for all versions of a project and log
+   * event in project_events table
    */
   public synchronized Project purgeProject(final Project project, final User deleter)
       throws ProjectManagerException {
@@ -429,18 +412,14 @@ public class ProjectManager {
   }
 
   /**
-   * This method retrieves the uploaded project zip file from DB. A temporary
-   * file is created to hold the content of the uploaded zip file. This
-   * temporary file is provided in the ProjectFileHandler instance and the
-   * caller of this method should call method
-   * {@ProjectFileHandler.deleteLocalFile}
-   * to delete the temporary file.
+   * This method retrieves the uploaded project zip file from DB. A temporary file is created to
+   * hold the content of the uploaded zip file. This temporary file is provided in the
+   * ProjectFileHandler instance and the caller of this method should call method
+   * {@ProjectFileHandler.deleteLocalFile} to delete the temporary file.
    *
-   * @param project
    * @param version - latest version is used if value is -1
-   * @return ProjectFileHandler - null if can't find project zip file based on
-   *         project name and version
-   * @throws ProjectManagerException
+   * @return ProjectFileHandler - null if can't find project zip file based on project name and
+   * version
    */
   public ProjectFileHandler getProjectFileHandler(final Project project, final int version)
       throws ProjectManagerException {
diff --git a/azkaban-common/src/main/java/azkaban/project/ProjectWhitelist.java b/azkaban-common/src/main/java/azkaban/project/ProjectWhitelist.java
index f8c82e1..7790b1a 100644
--- a/azkaban-common/src/main/java/azkaban/project/ProjectWhitelist.java
+++ b/azkaban-common/src/main/java/azkaban/project/ProjectWhitelist.java
@@ -20,15 +20,15 @@ import org.xml.sax.SAXException;
 /**
  * @author wkang
  *
- *         This class manages project whitelist defined in xml config file. An single xml config
- *         file contains different types of whitelisted projects. For additional type of whitelist,
- *         modify WhitelistType enum.
+ * This class manages project whitelist defined in xml config file. An single xml config file
+ * contains different types of whitelisted projects. For additional type of whitelist, modify
+ * WhitelistType enum.
  *
- *         The xml config file should in the following format. Please note the tag <MemoryCheck> is
- *         same as the defined enum MemoryCheck
+ * The xml config file should in the following format. Please note the tag <MemoryCheck> is same as
+ * the defined enum MemoryCheck
  *
- *         <ProjectWhitelist> <MemoryCheck> <project projectname="project1" /> <project
- *         projectname="project2" /> </MemoryCheck> <ProjectWhitelist>
+ * <ProjectWhitelist> <MemoryCheck> <project projectname="project1" /> <project
+ * projectname="project2" /> </MemoryCheck> <ProjectWhitelist>
  */
 public class ProjectWhitelist {
 
@@ -127,11 +127,10 @@ public class ProjectWhitelist {
   }
 
   /**
-   * The tag in the project whitelist xml config file should be same as
-   * the defined enums.
+   * The tag in the project whitelist xml config file should be same as the defined enums.
    */
   public static enum WhitelistType {
     MemoryCheck,
     NumJobPerFlow
   }
-}
\ No newline at end of file
+}
diff --git a/azkaban-common/src/main/java/azkaban/project/validator/ProjectValidator.java b/azkaban-common/src/main/java/azkaban/project/validator/ProjectValidator.java
index 6c3e6d4..38b5ae9 100644
--- a/azkaban-common/src/main/java/azkaban/project/validator/ProjectValidator.java
+++ b/azkaban-common/src/main/java/azkaban/project/validator/ProjectValidator.java
@@ -5,8 +5,8 @@ import azkaban.utils.Props;
 import java.io.File;
 
 /**
- * Interface to be implemented by plugins which are to be registered with Azkaban
- * as project validators that validate a project before uploaded into Azkaban.
+ * Interface to be implemented by plugins which are to be registered with Azkaban as project
+ * validators that validate a project before uploaded into Azkaban.
  */
 public interface ProjectValidator {
 
@@ -21,9 +21,8 @@ public interface ProjectValidator {
   String getValidatorName();
 
   /**
-   * Validate the project inside the given directory. The validator, using its own
-   * validation logic, will generate a {@link ValidationReport} representing the result of
-   * the validation.
+   * Validate the project inside the given directory. The validator, using its own validation logic,
+   * will generate a {@link ValidationReport} representing the result of the validation.
    */
   ValidationReport validateProject(Project project, File projectDir);
 }
diff --git a/azkaban-common/src/main/java/azkaban/project/validator/ValidationReport.java b/azkaban-common/src/main/java/azkaban/project/validator/ValidationReport.java
index 321976b..830fa26 100644
--- a/azkaban-common/src/main/java/azkaban/project/validator/ValidationReport.java
+++ b/azkaban-common/src/main/java/azkaban/project/validator/ValidationReport.java
@@ -4,12 +4,11 @@ import java.util.HashSet;
 import java.util.Set;
 
 /**
- * The result of a project validation generated by a {@link ProjectValidator}. It contains
- * an enum of type {@link ValidationStatus} representing whether the validation passes,
- * generates warnings, or generates errors. Accordingly, three sets of String are also
- * maintained, storing the messages generated by the {@link ProjectValidator} at both
- * {@link ValidationStatus#WARN} and {@link ValidationStatus#ERROR} level, as well as
- * information messages associated with both levels.
+ * The result of a project validation generated by a {@link ProjectValidator}. It contains an enum
+ * of type {@link ValidationStatus} representing whether the validation passes, generates warnings,
+ * or generates errors. Accordingly, three sets of String are also maintained, storing the messages
+ * generated by the {@link ProjectValidator} at both {@link ValidationStatus#WARN} and {@link
+ * ValidationStatus#ERROR} level, as well as information messages associated with both levels.
  */
 public class ValidationReport {
 
diff --git a/azkaban-common/src/main/java/azkaban/project/validator/ValidationStatus.java b/azkaban-common/src/main/java/azkaban/project/validator/ValidationStatus.java
index 6aaed24..c9337b9 100644
--- a/azkaban-common/src/main/java/azkaban/project/validator/ValidationStatus.java
+++ b/azkaban-common/src/main/java/azkaban/project/validator/ValidationStatus.java
@@ -1,8 +1,8 @@
 package azkaban.project.validator;
 
 /**
- * Status of the ValidationReport. It also represents the severity of each rule.
- * The order of severity for the status is PASS < WARN < ERROR.
+ * Status of the ValidationReport. It also represents the severity of each rule. The order of
+ * severity for the status is PASS < WARN < ERROR.
  */
 public enum ValidationStatus {
   PASS("PASS"),
diff --git a/azkaban-common/src/main/java/azkaban/project/validator/ValidatorClassLoader.java b/azkaban-common/src/main/java/azkaban/project/validator/ValidatorClassLoader.java
index 2f5ea85..4b39ab6 100644
--- a/azkaban-common/src/main/java/azkaban/project/validator/ValidatorClassLoader.java
+++ b/azkaban-common/src/main/java/azkaban/project/validator/ValidatorClassLoader.java
@@ -14,8 +14,8 @@ import java.util.jar.JarFile;
 import sun.net.www.protocol.jar.JarURLConnection;
 
 /**
- * Workaround for jdk 6 disgrace with open jar files & native libs,
- * which is a reason of unrefreshable classloader.
+ * Workaround for jdk 6 disgrace with open jar files & native libs, which is a reason of
+ * unrefreshable classloader.
  */
 public class ValidatorClassLoader extends URLClassLoader {
 
diff --git a/azkaban-common/src/main/java/azkaban/project/validator/ValidatorManager.java b/azkaban-common/src/main/java/azkaban/project/validator/ValidatorManager.java
index f7d89c7..c34a841 100644
--- a/azkaban-common/src/main/java/azkaban/project/validator/ValidatorManager.java
+++ b/azkaban-common/src/main/java/azkaban/project/validator/ValidatorManager.java
@@ -8,9 +8,9 @@ import java.util.Map;
 import org.apache.log4j.Logger;
 
 /**
- * ValidatorManager is responsible for loading the list of validators specified in the
- * Azkaban validator configuration file. Once these validators are loaded, the ValidatorManager
- * will use the registered validators to verify each uploaded project before persisting it.
+ * ValidatorManager is responsible for loading the list of validators specified in the Azkaban
+ * validator configuration file. Once these validators are loaded, the ValidatorManager will use the
+ * registered validators to verify each uploaded project before persisting it.
  */
 public interface ValidatorManager {
 
@@ -21,16 +21,16 @@ public interface ValidatorManager {
   void loadValidators(Props props, Logger logger);
 
   /**
-   * Validate the given project using the registered list of validators. This method returns a
-   * map of {@link ValidationReport} with the key being the validator's name and the value being
-   * the {@link ValidationReport} generated by that validator.
+   * Validate the given project using the registered list of validators. This method returns a map
+   * of {@link ValidationReport} with the key being the validator's name and the value being the
+   * {@link ValidationReport} generated by that validator.
    */
   Map<String, ValidationReport> validate(Project project, File projectDir);
 
   /**
    * The ValidatorManager should have a default validator which checks for the most essential
-   * components of a project. The ValidatorManager should always load the default validator.
-   * This method returns the default validator of this ValidatorManager.
+   * components of a project. The ValidatorManager should always load the default validator. This
+   * method returns the default validator of this ValidatorManager.
    */
   ProjectValidator getDefaultValidator();
 
diff --git a/azkaban-common/src/main/java/azkaban/project/validator/XmlValidatorManager.java b/azkaban-common/src/main/java/azkaban/project/validator/XmlValidatorManager.java
index bf625fe..cc6d1ab 100644
--- a/azkaban-common/src/main/java/azkaban/project/validator/XmlValidatorManager.java
+++ b/azkaban-common/src/main/java/azkaban/project/validator/XmlValidatorManager.java
@@ -25,17 +25,12 @@ import org.w3c.dom.NodeList;
 import org.xml.sax.SAXException;
 
 /**
- * Xml implementation of the ValidatorManager. Looks for the property
- * project.validators.xml.file in the azkaban properties.
+ * Xml implementation of the ValidatorManager. Looks for the property project.validators.xml.file in
+ * the azkaban properties.
  *
- * The xml to be in the following form:
- * <azkaban-validators>
- * <validator classname="validator class name">
- * <!-- optional configurations for each individual validator -->
- * <property key="validator property key" value="validator property value" />
- * ...
- * </validator>
- * </azkaban-validators>
+ * The xml to be in the following form: <azkaban-validators> <validator classname="validator class
+ * name"> <!-- optional configurations for each individual validator --> <property key="validator
+ * property key" value="validator property value" /> ... </validator> </azkaban-validators>
  */
 public class XmlValidatorManager implements ValidatorManager {
 
@@ -51,8 +46,8 @@ public class XmlValidatorManager implements ValidatorManager {
   private Map<String, ProjectValidator> validators;
 
   /**
-   * Load the validator plugins from the validator directory (default being validators/) into
-   * the validator ClassLoader. This enables creating instances of these validators in the
+   * Load the validator plugins from the validator directory (default being validators/) into the
+   * validator ClassLoader. This enables creating instances of these validators in the
    * loadValidators() method.
    */
   public XmlValidatorManager(final Props props) {
diff --git a/azkaban-common/src/main/java/azkaban/scheduler/ScheduleManager.java b/azkaban-common/src/main/java/azkaban/scheduler/ScheduleManager.java
index 60c96c3..ab1ad1e 100644
--- a/azkaban-common/src/main/java/azkaban/scheduler/ScheduleManager.java
+++ b/azkaban-common/src/main/java/azkaban/scheduler/ScheduleManager.java
@@ -34,10 +34,9 @@ import org.joda.time.format.DateTimeFormat;
 import org.joda.time.format.DateTimeFormatter;
 
 /**
- * The ScheduleManager stores and executes the schedule. It uses a single thread
- * instead and waits until correct loading time for the flow. It will not remove
- * the flow from the schedule when it is run, which can potentially allow the
- * flow to and overlap each other.
+ * The ScheduleManager stores and executes the schedule. It uses a single thread instead and waits
+ * until correct loading time for the flow. It will not remove the flow from the schedule when it is
+ * run, which can potentially allow the flow to and overlap each other.
  *
  * TODO kunkun-tang: When new AZ quartz Scheduler comes, we will remove this class.
  */
@@ -55,9 +54,7 @@ public class ScheduleManager implements TriggerAgent {
       new LinkedHashMap<>();
 
   /**
-   * Give the schedule manager a loader class that will properly load the
-   * schedule.
-   *
+   * Give the schedule manager a loader class that will properly load the schedule.
    */
   @Inject
   public ScheduleManager(final ScheduleLoader loader) {
@@ -88,8 +85,7 @@ public class ScheduleManager implements TriggerAgent {
   }
 
   /**
-   * Shutdowns the scheduler thread. After shutdown, it may not be safe to use
-   * it again.
+   * Shutdowns the scheduler thread. After shutdown, it may not be safe to use it again.
    */
   @Override
   public void shutdown() {
@@ -98,7 +94,6 @@ public class ScheduleManager implements TriggerAgent {
 
   /**
    * Retrieves a copy of the list of schedules.
-   *
    */
   public synchronized List<Schedule> getSchedules()
       throws ScheduleManagerException {
@@ -109,7 +104,6 @@ public class ScheduleManager implements TriggerAgent {
 
   /**
    * Returns the scheduled flow for the flow name
-   *
    */
   public Schedule getSchedule(final int projectId, final String flowId)
       throws ScheduleManagerException {
@@ -131,7 +125,6 @@ public class ScheduleManager implements TriggerAgent {
 
   /**
    * Removes the flow from the schedule if it exists.
-   *
    */
   public synchronized void removeSchedule(final Schedule sched) {
     final Pair<Integer, String> identityPairMap = sched.getScheduleIdentityPair();
@@ -151,26 +144,27 @@ public class ScheduleManager implements TriggerAgent {
   }
 
   public Schedule scheduleFlow(final int scheduleId,
-                               final int projectId,
-                               final String projectName,
-                               final String flowName,
-                               final String status,
-                               final long firstSchedTime,
-                               final long endSchedTime,
-                               final DateTimeZone timezone,
-                               final ReadablePeriod period,
-                               final long lastModifyTime,
-                               final long nextExecTime,
-                               final long submitTime,
-                               final String submitUser,
-                               final ExecutionOptions execOptions,
-                               final List<SlaOption> slaOptions) {
+      final int projectId,
+      final String projectName,
+      final String flowName,
+      final String status,
+      final long firstSchedTime,
+      final long endSchedTime,
+      final DateTimeZone timezone,
+      final ReadablePeriod period,
+      final long lastModifyTime,
+      final long nextExecTime,
+      final long submitTime,
+      final String submitUser,
+      final ExecutionOptions execOptions,
+      final List<SlaOption> slaOptions) {
     final Schedule sched = new Schedule(scheduleId, projectId, projectName, flowName, status,
         firstSchedTime, endSchedTime, timezone, period, lastModifyTime, nextExecTime,
         submitTime, submitUser, execOptions, slaOptions, null);
     logger
         .info("Scheduling flow '" + sched.getScheduleName() + "' for "
-            + this._dateFormat.print(firstSchedTime) + " with a period of " + (period == null ? "(non-recurring)"
+            + this._dateFormat.print(firstSchedTime) + " with a period of " + (period == null
+            ? "(non-recurring)"
             : period));
 
     insertSchedule(sched);
@@ -178,20 +172,20 @@ public class ScheduleManager implements TriggerAgent {
   }
 
   public Schedule cronScheduleFlow(final int scheduleId,
-                                   final int projectId,
-                                   final String projectName,
-                                   final String flowName,
-                                   final String status,
-                                   final long firstSchedTime,
-                                   final long endSchedTime,
-                                   final DateTimeZone timezone,
-                                   final long lastModifyTime,
-                                   final long nextExecTime,
-                                   final long submitTime,
-                                   final String submitUser,
-                                   final ExecutionOptions execOptions,
-                                   final List<SlaOption> slaOptions,
-                                   final String cronExpression) {
+      final int projectId,
+      final String projectName,
+      final String flowName,
+      final String status,
+      final long firstSchedTime,
+      final long endSchedTime,
+      final DateTimeZone timezone,
+      final long lastModifyTime,
+      final long nextExecTime,
+      final long submitTime,
+      final String submitUser,
+      final ExecutionOptions execOptions,
+      final List<SlaOption> slaOptions,
+      final String cronExpression) {
     final Schedule sched =
         new Schedule(scheduleId, projectId, projectName, flowName, status,
             firstSchedTime, endSchedTime, timezone, null, lastModifyTime, nextExecTime,
@@ -203,6 +197,7 @@ public class ScheduleManager implements TriggerAgent {
     insertSchedule(sched);
     return sched;
   }
+
   /**
    * Schedules the flow, but doesn't save the schedule afterwards.
    */
diff --git a/azkaban-common/src/main/java/azkaban/server/HttpRequestUtils.java b/azkaban-common/src/main/java/azkaban/server/HttpRequestUtils.java
index cff70c7..f2f6c87 100644
--- a/azkaban-common/src/main/java/azkaban/server/HttpRequestUtils.java
+++ b/azkaban-common/src/main/java/azkaban/server/HttpRequestUtils.java
@@ -143,8 +143,7 @@ public class HttpRequestUtils {
   }
 
   /**
-   * parse a string as number and throws exception if parsed value is not a
-   * valid integer
+   * parse a string as number and throws exception if parsed value is not a valid integer
    *
    * @throws ExecutorManagerException if paramName is not a valid integer
    */
@@ -180,8 +179,7 @@ public class HttpRequestUtils {
   }
 
   /**
-   * Retrieves the param from the http servlet request. Will throw an exception
-   * if not found
+   * Retrieves the param from the http servlet request. Will throw an exception if not found
    */
   public static String getParam(final HttpServletRequest request, final String name)
       throws ServletException {
@@ -206,8 +204,8 @@ public class HttpRequestUtils {
   }
 
   /**
-   * Returns the param and parses it into an int. Will throw an exception if not
-   * found, or a parse error if the type is incorrect.
+   * Returns the param and parses it into an int. Will throw an exception if not found, or a parse
+   * error if the type is incorrect.
    */
   public static int getIntParam(final HttpServletRequest request, final String name)
       throws ServletException {
diff --git a/azkaban-common/src/main/java/azkaban/server/session/SessionCache.java b/azkaban-common/src/main/java/azkaban/server/session/SessionCache.java
index a2147b3..81d11b9 100644
--- a/azkaban-common/src/main/java/azkaban/server/session/SessionCache.java
+++ b/azkaban-common/src/main/java/azkaban/server/session/SessionCache.java
@@ -25,10 +25,9 @@ import java.util.concurrent.TimeUnit;
 /**
  * Cache for web session.
  *
- * The following global azkaban properties can be used: max.num.sessions - used
- * to determine the number of live sessions that azkaban will handle. Default is
- * 10000 session.time.to.live -Number of seconds before session expires. Default
- * set to 10 hours.
+ * The following global azkaban properties can be used: max.num.sessions - used to determine the
+ * number of live sessions that azkaban will handle. Default is 10000 session.time.to.live -Number
+ * of seconds before session expires. Default set to 10 hours.
  */
 public class SessionCache {
 
diff --git a/azkaban-common/src/main/java/azkaban/sla/SlaOption.java b/azkaban-common/src/main/java/azkaban/sla/SlaOption.java
index 3e3872b..fbf785e 100644
--- a/azkaban-common/src/main/java/azkaban/sla/SlaOption.java
+++ b/azkaban-common/src/main/java/azkaban/sla/SlaOption.java
@@ -49,7 +49,7 @@ public class SlaOption {
   public static final String ACTION_KILL_JOB = "SlaKillJob";
   private static final DateTimeFormatter fmt = DateTimeFormat
       .forPattern("MM/dd, YYYY HH:mm");
-  
+
   private String type;
   private Map<String, Object> info;
   private List<String> actions;
diff --git a/azkaban-common/src/main/java/azkaban/storage/HdfsAuth.java b/azkaban-common/src/main/java/azkaban/storage/HdfsAuth.java
index b9a666d..331627c 100644
--- a/azkaban-common/src/main/java/azkaban/storage/HdfsAuth.java
+++ b/azkaban-common/src/main/java/azkaban/storage/HdfsAuth.java
@@ -58,9 +58,8 @@ public class HdfsAuth {
   }
 
   /**
-   * API to authorize HDFS access.
-   * This logins in the configured user via the keytab.
-   * If the user is already logged in then it renews the TGT.
+   * API to authorize HDFS access. This logins in the configured user via the keytab. If the user is
+   * already logged in then it renews the TGT.
    */
   public void authorize() {
     if (this.isSecurityEnabled) {
diff --git a/azkaban-common/src/main/java/azkaban/storage/StorageCleaner.java b/azkaban-common/src/main/java/azkaban/storage/StorageCleaner.java
index 969e15a..9cd518d 100644
--- a/azkaban-common/src/main/java/azkaban/storage/StorageCleaner.java
+++ b/azkaban-common/src/main/java/azkaban/storage/StorageCleaner.java
@@ -87,10 +87,9 @@ public class StorageCleaner {
    * From the storage perspective, cleanup just needs the {@link Storage#delete(String)} API to
    * work.
    *
-   * Failure cases:
-   * - If the storage cleanup fails, the cleanup will be attempted again on the next upload
-   * - If the storage cleanup succeeds and the DB cleanup fails, the DB will be cleaned up in the
-   * next attempt.
+   * Failure cases: - If the storage cleanup fails, the cleanup will be attempted again on the next
+   * upload - If the storage cleanup succeeds and the DB cleanup fails, the DB will be cleaned up in
+   * the next attempt.
    *
    * @param projectId project ID
    */
diff --git a/azkaban-common/src/main/java/azkaban/storage/StorageManager.java b/azkaban-common/src/main/java/azkaban/storage/StorageManager.java
index 1373649..d8aae5b 100644
--- a/azkaban-common/src/main/java/azkaban/storage/StorageManager.java
+++ b/azkaban-common/src/main/java/azkaban/storage/StorageManager.java
@@ -122,8 +122,7 @@ public class StorageManager {
   }
 
   /**
-   * Clean up project artifacts based on project ID.
-   * See {@link StorageCleaner#cleanupProjectArtifacts(int)}
+   * Clean up project artifacts based on project ID. See {@link StorageCleaner#cleanupProjectArtifacts(int)}
    */
   public void cleanupProjectArtifacts(final int projectId) {
     try {
diff --git a/azkaban-common/src/main/java/azkaban/trigger/builtin/BasicTimeChecker.java b/azkaban-common/src/main/java/azkaban/trigger/builtin/BasicTimeChecker.java
index 85808aa..4112517 100644
--- a/azkaban-common/src/main/java/azkaban/trigger/builtin/BasicTimeChecker.java
+++ b/azkaban-common/src/main/java/azkaban/trigger/builtin/BasicTimeChecker.java
@@ -41,8 +41,8 @@ public class BasicTimeChecker implements ConditionChecker {
   private boolean skipPastChecks = true;
 
   public BasicTimeChecker(final String id, final long firstCheckTime,
-                          final DateTimeZone timezone, final boolean isRecurring, final boolean skipPastChecks,
-                          final ReadablePeriod period, final String cronExpression) {
+      final DateTimeZone timezone, final boolean isRecurring, final boolean skipPastChecks,
+      final ReadablePeriod period, final String cronExpression) {
     this.id = id;
     this.firstCheckTime = firstCheckTime;
     this.timezone = timezone;
@@ -56,8 +56,8 @@ public class BasicTimeChecker implements ConditionChecker {
   }
 
   public BasicTimeChecker(final String id, final long firstCheckTime,
-                          final DateTimeZone timezone, final long nextCheckTime, final boolean isRecurring,
-                          final boolean skipPastChecks, final ReadablePeriod period, final String cronExpression) {
+      final DateTimeZone timezone, final long nextCheckTime, final boolean isRecurring,
+      final boolean skipPastChecks, final ReadablePeriod period, final String cronExpression) {
     this.id = id;
     this.firstCheckTime = firstCheckTime;
     this.timezone = timezone;
diff --git a/azkaban-common/src/main/java/azkaban/trigger/Condition.java b/azkaban-common/src/main/java/azkaban/trigger/Condition.java
index beb9820..92f72c1 100644
--- a/azkaban-common/src/main/java/azkaban/trigger/Condition.java
+++ b/azkaban-common/src/main/java/azkaban/trigger/Condition.java
@@ -45,7 +45,7 @@ public class Condition {
   }
 
   public Condition(final Map<String, ConditionChecker> checkers, final String expr,
-                   final long nextCheckTime) {
+      final long nextCheckTime) {
     this.nextCheckTime = nextCheckTime;
     setCheckers(checkers);
     this.expression = jexl.createExpression(expr);
diff --git a/azkaban-common/src/main/java/azkaban/trigger/Trigger.java b/azkaban-common/src/main/java/azkaban/trigger/Trigger.java
index b783628..d882a38 100644
--- a/azkaban-common/src/main/java/azkaban/trigger/Trigger.java
+++ b/azkaban-common/src/main/java/azkaban/trigger/Trigger.java
@@ -53,10 +53,10 @@ public class Trigger {
   }
 
   private Trigger(final int triggerId, final long lastModifyTime, final long submitTime,
-                  final String submitUser, final String source, final Condition triggerCondition,
-                  final Condition expireCondition, final List<TriggerAction> actions,
-                  final List<TriggerAction> expireActions, final Map<String, Object> info,
-                  final Map<String, Object> context) {
+      final String submitUser, final String source, final Condition triggerCondition,
+      final Condition expireCondition, final List<TriggerAction> actions,
+      final List<TriggerAction> expireActions, final Map<String, Object> info,
+      final Map<String, Object> context) {
     requireNonNull(submitUser);
     requireNonNull(source);
     requireNonNull(triggerCondition);
@@ -216,9 +216,9 @@ public class Trigger {
     return this.expireCondition;
   }
 
- public void setExpireCondition(final Condition expireCondition) {
+  public void setExpireCondition(final Condition expireCondition) {
     this.expireCondition = expireCondition;
- }
+  }
 
   public List<TriggerAction> getActions() {
     return this.actions;
@@ -362,6 +362,7 @@ public class Trigger {
   }
 
   public static class TriggerBuilder {
+
     private final String submitUser;
     private final String source;
     private final TriggerStatus status = TriggerStatus.READY;
@@ -377,10 +378,10 @@ public class Trigger {
     private Map<String, Object> context = new HashMap<>();
 
     public TriggerBuilder(final String submitUser,
-                          final String source,
-                          final Condition triggerCondition,
-                          final Condition expireCondition,
-                          final List<TriggerAction> actions) {
+        final String source,
+        final Condition triggerCondition,
+        final Condition expireCondition,
+        final List<TriggerAction> actions) {
       this.submitUser = submitUser;
       this.source = source;
       this.triggerCondition = triggerCondition;
diff --git a/azkaban-common/src/main/java/azkaban/user/UserManager.java b/azkaban-common/src/main/java/azkaban/user/UserManager.java
index 5556f0a..b583e94 100644
--- a/azkaban-common/src/main/java/azkaban/user/UserManager.java
+++ b/azkaban-common/src/main/java/azkaban/user/UserManager.java
@@ -17,12 +17,11 @@
 package azkaban.user;
 
 /**
- * Interface for the UserManager. Implementors will have to handle the retrieval
- * of the User object given the username and password.
+ * Interface for the UserManager. Implementors will have to handle the retrieval of the User object
+ * given the username and password.
  *
- * The constructor will be called with a azkaban.utils.Props object passed as
- * the only parameter. If such a constructor doesn't exist, than the UserManager
- * instantiation may fail.
+ * The constructor will be called with a azkaban.utils.Props object passed as the only parameter. If
+ * such a constructor doesn't exist, than the UserManager instantiation may fail.
  */
 public interface UserManager {
 
@@ -35,14 +34,12 @@ public interface UserManager {
       throws UserManagerException;
 
   /**
-   * Returns true if the user is valid. This is used when adding permissions for
-   * users
+   * Returns true if the user is valid. This is used when adding permissions for users
    */
   public boolean validateUser(String username);
 
   /**
-   * Returns true if the group is valid. This is used when adding permissions
-   * for groups.
+   * Returns true if the group is valid. This is used when adding permissions for groups.
    */
   public boolean validateGroup(String group);
 
diff --git a/azkaban-common/src/main/java/azkaban/user/UserUtils.java b/azkaban-common/src/main/java/azkaban/user/UserUtils.java
index 687bcda..b196221 100644
--- a/azkaban-common/src/main/java/azkaban/user/UserUtils.java
+++ b/azkaban-common/src/main/java/azkaban/user/UserUtils.java
@@ -1,6 +1,7 @@
 package azkaban.user;
 
 public final class UserUtils {
+
   private UserUtils() {
 
   }
diff --git a/azkaban-common/src/main/java/azkaban/user/XmlUserManager.java b/azkaban-common/src/main/java/azkaban/user/XmlUserManager.java
index 007865d..9f99251 100644
--- a/azkaban-common/src/main/java/azkaban/user/XmlUserManager.java
+++ b/azkaban-common/src/main/java/azkaban/user/XmlUserManager.java
@@ -34,12 +34,11 @@ import org.w3c.dom.NodeList;
 import org.xml.sax.SAXException;
 
 /**
- * Xml implementation of the UserManager. Looks for the property
- * user.manager.xml.file in the azkaban properties.
+ * Xml implementation of the UserManager. Looks for the property user.manager.xml.file in the
+ * azkaban properties.
  *
- * The xml to be in the following form: <azkaban-users> <user
- * username="username" password="azkaban" roles="admin" groups="azkaban"/>
- * </azkaban-users>
+ * The xml to be in the following form: <azkaban-users> <user username="username" password="azkaban"
+ * roles="admin" groups="azkaban"/> </azkaban-users>
  */
 public class XmlUserManager implements UserManager {
 
diff --git a/azkaban-common/src/main/java/azkaban/utils/CircularBuffer.java b/azkaban-common/src/main/java/azkaban/utils/CircularBuffer.java
index f0a2f80..6f426ea 100644
--- a/azkaban-common/src/main/java/azkaban/utils/CircularBuffer.java
+++ b/azkaban-common/src/main/java/azkaban/utils/CircularBuffer.java
@@ -23,8 +23,8 @@ import java.util.Iterator;
 import java.util.List;
 
 /**
- * A circular buffer of items of a given length. It will grow up to the give
- * size as items are appended, then it will begin to overwrite older items.
+ * A circular buffer of items of a given length. It will grow up to the give size as items are
+ * appended, then it will begin to overwrite older items.
  *
  * @param <T> The type of the item contained.
  */
diff --git a/azkaban-common/src/main/java/azkaban/utils/FileIOUtils.java b/azkaban-common/src/main/java/azkaban/utils/FileIOUtils.java
index 8deb690..68d64f1 100644
--- a/azkaban-common/src/main/java/azkaban/utils/FileIOUtils.java
+++ b/azkaban-common/src/main/java/azkaban/utils/FileIOUtils.java
@@ -36,8 +36,7 @@ import org.apache.log4j.Logger;
 
 
 /**
- * Runs a few unix commands. Created this so that I can move to JNI in the
- * future.
+ * Runs a few unix commands. Created this so that I can move to JNI in the future.
  */
 public class FileIOUtils {
 
diff --git a/azkaban-common/src/main/java/azkaban/utils/OsMemoryUtil.java b/azkaban-common/src/main/java/azkaban/utils/OsMemoryUtil.java
index f0b5fc6..1091aeb 100644
--- a/azkaban-common/src/main/java/azkaban/utils/OsMemoryUtil.java
+++ b/azkaban-common/src/main/java/azkaban/utils/OsMemoryUtil.java
@@ -13,8 +13,7 @@ import org.slf4j.LoggerFactory;
 /**
  * Utility class for getting system memory information
  *
- * Note:
- * This check is designed for Linux only.
+ * Note: This check is designed for Linux only.
  */
 class OsMemoryUtil {
 
@@ -84,16 +83,10 @@ class OsMemoryUtil {
   }
 
   /**
-   * Example file:
-   * $ cat /proc/meminfo
-   * MemTotal:       65894008 kB
-   * MemFree:        59400536 kB
-   * Buffers:          409348 kB
-   * Cached:          4290236 kB
-   * SwapCached:            0 kB
+   * Example file: $ cat /proc/meminfo MemTotal:       65894008 kB MemFree:        59400536 kB
+   * Buffers:          409348 kB Cached:          4290236 kB SwapCached:            0 kB
    *
-   * Make the method package private to make unit testing easier.
-   * Otherwise it can be made private.
+   * Make the method package private to make unit testing easier. Otherwise it can be made private.
    *
    * @param line the text for a memory usage statistics we are interested in
    * @return size of the memory. unit kB. 0 if there is an error.
diff --git a/azkaban-common/src/main/java/azkaban/utils/Props.java b/azkaban-common/src/main/java/azkaban/utils/Props.java
index d0fc84c..fd0cfd1 100644
--- a/azkaban-common/src/main/java/azkaban/utils/Props.java
+++ b/azkaban-common/src/main/java/azkaban/utils/Props.java
@@ -38,8 +38,8 @@ import java.util.TreeMap;
 import org.apache.log4j.Logger;
 
 /**
- * Hashmap implementation of a hierarchitical properties with helpful converter
- * functions and Exception throwing. This class is not threadsafe.
+ * Hashmap implementation of a hierarchitical properties with helpful converter functions and
+ * Exception throwing. This class is not threadsafe.
  */
 public class Props {
 
@@ -125,16 +125,15 @@ public class Props {
   }
 
   /**
-   * Create a Props with a null parent from a list of key value pairing. i.e.
-   * [key1, value1, key2, value2 ...]
+   * Create a Props with a null parent from a list of key value pairing. i.e. [key1, value1, key2,
+   * value2 ...]
    */
   public static Props of(final String... args) {
     return of((Props) null, args);
   }
 
   /**
-   * Create a Props from a list of key value pairing. i.e. [key1, value1, key2,
-   * value2 ...]
+   * Create a Props from a list of key value pairing. i.e. [key1, value1, key2, value2 ...]
    */
   public static Props of(final Props parent, final String... args) {
     if (args.length % 2 != 0) {
@@ -254,9 +253,8 @@ public class Props {
   }
 
   /**
-   * Put the given string value for the string key. This method performs any
-   * variable substitution in the value replacing any occurance of ${name} with
-   * the value of get("name").
+   * Put the given string value for the string key. This method performs any variable substitution
+   * in the value replacing any occurance of ${name} with the value of get("name").
    *
    * @param key The key to put the value to
    * @param value The value to do substitution on and store
@@ -268,10 +266,9 @@ public class Props {
   }
 
   /**
-   * Put the given Properties into the Props. This method performs any variable
-   * substitution in the value replacing any occurrence of ${name} with the
-   * value of get("name"). get() is called first on the Props and next on the
-   * Properties object.
+   * Put the given Properties into the Props. This method performs any variable substitution in the
+   * value replacing any occurrence of ${name} with the value of get("name"). get() is called first
+   * on the Props and next on the Properties object.
    *
    * @param properties The properties to put
    * @throws IllegalArgumentException If the variable given for substitution is not a valid key in
@@ -354,16 +351,16 @@ public class Props {
   }
 
   /**
-   * The number of unique keys defined by this Props (keys defined only in
-   * parent Props are not counted)
+   * The number of unique keys defined by this Props (keys defined only in parent Props are not
+   * counted)
    */
   public int localSize() {
     return this._current.size();
   }
 
   /**
-   * Attempts to return the Class that corresponds to the Props value. If the
-   * class doesn't exit, an IllegalArgumentException will be thrown.
+   * Attempts to return the Class that corresponds to the Props value. If the class doesn't exit, an
+   * IllegalArgumentException will be thrown.
    */
   public Class<?> getClass(final String key) {
     try {
@@ -392,8 +389,7 @@ public class Props {
   }
 
   /**
-   * Gets the class from the Props. If it doesn't exist, it will return the
-   * defaultClass
+   * Gets the class from the Props. If it doesn't exist, it will return the defaultClass
    */
   public Class<?> getClass(final String key, final Class<?> defaultClass) {
     if (containsKey(key)) {
@@ -404,8 +400,7 @@ public class Props {
   }
 
   /**
-   * Gets the string from the Props. If it doesn't exist, it will return the
-   * defaultValue
+   * Gets the string from the Props. If it doesn't exist, it will return the defaultValue
    */
   public String getString(final String key, final String defaultValue) {
     if (containsKey(key)) {
@@ -416,8 +411,7 @@ public class Props {
   }
 
   /**
-   * Gets the string from the Props. If it doesn't exist, throw and
-   * UndefinedPropertiesException
+   * Gets the string from the Props. If it doesn't exist, throw and UndefinedPropertiesException
    */
   public String getString(final String key) {
     if (containsKey(key)) {
@@ -453,8 +447,8 @@ public class Props {
   }
 
   /**
-   * Returns a list of strings with the comma as the separator of the value. If
-   * the value is null, it'll return the defaultValue.
+   * Returns a list of strings with the comma as the separator of the value. If the value is null,
+   * it'll return the defaultValue.
    */
   public List<String> getStringList(final String key, final List<String> defaultValue) {
     if (containsKey(key)) {
@@ -465,8 +459,8 @@ public class Props {
   }
 
   /**
-   * Returns a list of strings with the sep as the separator of the value. If
-   * the value is null, it'll return the defaultValue.
+   * Returns a list of strings with the sep as the separator of the value. If the value is null,
+   * it'll return the defaultValue.
    */
   public List<String> getStringList(final String key, final List<String> defaultValue,
       final String sep) {
@@ -478,8 +472,8 @@ public class Props {
   }
 
   /**
-   * Returns true if the value equals "true". If the value is null, then the
-   * default value is returned.
+   * Returns true if the value equals "true". If the value is null, then the default value is
+   * returned.
    */
   public boolean getBoolean(final String key, final boolean defaultValue) {
     if (containsKey(key)) {
@@ -503,9 +497,8 @@ public class Props {
   }
 
   /**
-   * Returns the long representation of the value. If the value is null, then
-   * the default value is returned. If the value isn't a long, then a parse
-   * exception will be thrown.
+   * Returns the long representation of the value. If the value is null, then the default value is
+   * returned. If the value isn't a long, then a parse exception will be thrown.
    */
   public long getLong(final String name, final long defaultValue) {
     if (containsKey(name)) {
@@ -517,8 +510,8 @@ public class Props {
 
   /**
    * Returns the long representation of the value. If the value is null, then a
-   * UndefinedPropertyException will be thrown. If the value isn't a long, then
-   * a parse exception will be thrown.
+   * UndefinedPropertyException will be thrown. If the value isn't a long, then a parse exception
+   * will be thrown.
    */
   public long getLong(final String name) {
     if (containsKey(name)) {
@@ -530,9 +523,8 @@ public class Props {
   }
 
   /**
-   * Returns the int representation of the value. If the value is null, then the
-   * default value is returned. If the value isn't a int, then a parse exception
-   * will be thrown.
+   * Returns the int representation of the value. If the value is null, then the default value is
+   * returned. If the value isn't a int, then a parse exception will be thrown.
    */
   public int getInt(final String name, final int defaultValue) {
     if (containsKey(name)) {
@@ -544,8 +536,8 @@ public class Props {
 
   /**
    * Returns the int representation of the value. If the value is null, then a
-   * UndefinedPropertyException will be thrown. If the value isn't a int, then a
-   * parse exception will be thrown.
+   * UndefinedPropertyException will be thrown. If the value isn't a int, then a parse exception
+   * will be thrown.
    */
   public int getInt(final String name) {
     if (containsKey(name)) {
@@ -557,9 +549,8 @@ public class Props {
   }
 
   /**
-   * Returns the double representation of the value. If the value is null, then
-   * the default value is returned. If the value isn't a double, then a parse
-   * exception will be thrown.
+   * Returns the double representation of the value. If the value is null, then the default value is
+   * returned. If the value isn't a double, then a parse exception will be thrown.
    */
   public double getDouble(final String name, final double defaultValue) {
     if (containsKey(name)) {
@@ -570,9 +561,9 @@ public class Props {
   }
 
   /**
-   * Returns the double representation of the value. If the value is null, then
-   * a UndefinedPropertyException will be thrown. If the value isn't a double,
-   * then a parse exception will be thrown.
+   * Returns the double representation of the value. If the value is null, then a
+   * UndefinedPropertyException will be thrown. If the value isn't a double, then a parse exception
+   * will be thrown.
    */
   public double getDouble(final String name) {
     if (containsKey(name)) {
@@ -584,9 +575,8 @@ public class Props {
   }
 
   /**
-   * Returns the uri representation of the value. If the value is null, then the
-   * default value is returned. If the value isn't a uri, then a
-   * IllegalArgumentException will be thrown.
+   * Returns the uri representation of the value. If the value is null, then the default value is
+   * returned. If the value isn't a uri, then a IllegalArgumentException will be thrown.
    */
   public URI getUri(final String name) {
     if (containsKey(name)) {
@@ -602,9 +592,8 @@ public class Props {
   }
 
   /**
-   * Returns the double representation of the value. If the value is null, then
-   * the default value is returned. If the value isn't a uri, then a
-   * IllegalArgumentException will be thrown.
+   * Returns the double representation of the value. If the value is null, then the default value is
+   * returned. If the value isn't a uri, then a IllegalArgumentException will be thrown.
    */
   public URI getUri(final String name, final URI defaultValue) {
     if (containsKey(name)) {
diff --git a/azkaban-common/src/main/java/azkaban/utils/RestfulApiClient.java b/azkaban-common/src/main/java/azkaban/utils/RestfulApiClient.java
index 89242a8..251e764 100644
--- a/azkaban-common/src/main/java/azkaban/utils/RestfulApiClient.java
+++ b/azkaban-common/src/main/java/azkaban/utils/RestfulApiClient.java
@@ -39,8 +39,7 @@ import org.apache.http.impl.client.HttpClients;
 import org.apache.log4j.Logger;
 
 /**
- * class handles the communication between the application and
- * a Restful API based web server.
+ * class handles the communication between the application and a Restful API based web server.
  *
  * @param T : type of the returning response object. Note: the idea of this abstract class is to
  * provide a wrapper for the logic around HTTP layer communication so development work can take this
@@ -152,12 +151,10 @@ public abstract class RestfulApiClient<T> {
   }
 
   /**
-   * Method to transform the response returned by the httpClient into the
-   * type specified.
-   * Note: Method need to handle case such as failed request.
-   * Also method is not supposed to pass the response object out
-   * via the returning value as the response will be closed after the
-   * execution steps out of the method context.
+   * Method to transform the response returned by the httpClient into the type specified. Note:
+   * Method need to handle case such as failed request. Also method is not supposed to pass the
+   * response object out via the returning value as the response will be closed after the execution
+   * steps out of the method context.
    **/
   protected abstract T parseResponse(HttpResponse response)
       throws HttpResponseException, IOException;
diff --git a/azkaban-common/src/main/java/azkaban/utils/StdOutErrRedirect.java b/azkaban-common/src/main/java/azkaban/utils/StdOutErrRedirect.java
index 00b9967..e135383 100644
--- a/azkaban-common/src/main/java/azkaban/utils/StdOutErrRedirect.java
+++ b/azkaban-common/src/main/java/azkaban/utils/StdOutErrRedirect.java
@@ -22,9 +22,8 @@ import org.apache.log4j.Level;
 import org.apache.log4j.Logger;
 
 /**
- * A class to encapsulate the redirection of stdout and stderr to log4j
- * This allows us to catch messages written to the console (although we should
- * not be using System.out to write out).
+ * A class to encapsulate the redirection of stdout and stderr to log4j This allows us to catch
+ * messages written to the console (although we should not be using System.out to write out).
  */
 
 public class StdOutErrRedirect {
diff --git a/azkaban-common/src/main/java/azkaban/utils/SwapQueue.java b/azkaban-common/src/main/java/azkaban/utils/SwapQueue.java
index a13b5f3..8a16539 100644
--- a/azkaban-common/src/main/java/azkaban/utils/SwapQueue.java
+++ b/azkaban-common/src/main/java/azkaban/utils/SwapQueue.java
@@ -21,8 +21,8 @@ import java.util.Collection;
 import java.util.Iterator;
 
 /**
- * Queue that swaps its lists. Allows for non-blocking writes when reading. Swap
- * should be called before every read.
+ * Queue that swaps its lists. Allows for non-blocking writes when reading. Swap should be called
+ * before every read.
  */
 public class SwapQueue<T> implements Iterable<T> {
 
@@ -35,8 +35,7 @@ public class SwapQueue<T> implements Iterable<T> {
   }
 
   /**
-   * Swaps primaryQueue with secondary queue. The previous primary queue will be
-   * released.
+   * Swaps primaryQueue with secondary queue. The previous primary queue will be released.
    */
   public synchronized void swap() {
     this.primaryQueue = this.secondaryQueue;
diff --git a/azkaban-common/src/main/java/azkaban/utils/SystemMemoryInfo.java b/azkaban-common/src/main/java/azkaban/utils/SystemMemoryInfo.java
index 5886bc9..8d0af8c 100644
--- a/azkaban-common/src/main/java/azkaban/utils/SystemMemoryInfo.java
+++ b/azkaban-common/src/main/java/azkaban/utils/SystemMemoryInfo.java
@@ -5,12 +5,12 @@ import org.slf4j.LoggerFactory;
 
 
 /**
- * This class is used to maintain system memory information. Processes utilizing
- * large amount of memory should consult this class to see if the system has enough
- * memory to proceed the operation.
+ * This class is used to maintain system memory information. Processes utilizing large amount of
+ * memory should consult this class to see if the system has enough memory to proceed the
+ * operation.
  *
- * Memory information is obtained from /proc/meminfo, so only Unix/Linux like system
- * will support this class.
+ * Memory information is obtained from /proc/meminfo, so only Unix/Linux like system will support
+ * this class.
  *
  * All the memory size used in this function is in KB.
  */
diff --git a/azkaban-common/src/main/java/azkaban/utils/ThreadPoolExecutingListener.java b/azkaban-common/src/main/java/azkaban/utils/ThreadPoolExecutingListener.java
index d7d377b..392dac3 100644
--- a/azkaban-common/src/main/java/azkaban/utils/ThreadPoolExecutingListener.java
+++ b/azkaban-common/src/main/java/azkaban/utils/ThreadPoolExecutingListener.java
@@ -16,8 +16,7 @@
 package azkaban.utils;
 
 /**
- * Interface for listener to get notified before and after a task has been
- * executed.
+ * Interface for listener to get notified before and after a task has been executed.
  *
  * @author hluu
  */
@@ -26,4 +25,4 @@ public interface ThreadPoolExecutingListener {
   public void beforeExecute(Runnable r);
 
   public void afterExecute(Runnable r);
-}
\ No newline at end of file
+}
diff --git a/azkaban-common/src/main/java/azkaban/utils/TrackingThreadPool.java b/azkaban-common/src/main/java/azkaban/utils/TrackingThreadPool.java
index 6fc918f..2b71961 100644
--- a/azkaban-common/src/main/java/azkaban/utils/TrackingThreadPool.java
+++ b/azkaban-common/src/main/java/azkaban/utils/TrackingThreadPool.java
@@ -25,11 +25,11 @@ import java.util.concurrent.TimeUnit;
 import org.apache.log4j.Logger;
 
 /**
- * A simple subclass of {@link ThreadPoolExecutor} to keep track of in progress
- * tasks as well as other interesting statistics.
+ * A simple subclass of {@link ThreadPoolExecutor} to keep track of in progress tasks as well as
+ * other interesting statistics.
  *
- * The content of this class is copied from article "Java theory and practice:
- * Instrumenting applications with JMX"
+ * The content of this class is copied from article "Java theory and practice: Instrumenting
+ * applications with JMX"
  *
  * @author hluu
  */
@@ -109,4 +109,4 @@ public class TrackingThreadPool extends ThreadPoolExecutor {
     public void afterExecute(final Runnable r) {
     }
   }
-}
\ No newline at end of file
+}
diff --git a/azkaban-common/src/main/java/azkaban/utils/Utils.java b/azkaban-common/src/main/java/azkaban/utils/Utils.java
index d024bd2..30680d0 100644
--- a/azkaban-common/src/main/java/azkaban/utils/Utils.java
+++ b/azkaban-common/src/main/java/azkaban/utils/Utils.java
@@ -67,8 +67,8 @@ public class Utils {
   }
 
   /**
-   * Equivalent to Object.equals except that it handles nulls. If a and b are
-   * both null, true is returned.
+   * Equivalent to Object.equals except that it handles nulls. If a and b are both null, true is
+   * returned.
    */
   public static boolean equals(final Object a, final Object b) {
     if (a == null || b == null) {
diff --git a/azkaban-common/src/main/java/azkaban/utils/WebUtils.java b/azkaban-common/src/main/java/azkaban/utils/WebUtils.java
index 0fe3ae1..2ff8e5a 100644
--- a/azkaban-common/src/main/java/azkaban/utils/WebUtils.java
+++ b/azkaban-common/src/main/java/azkaban/utils/WebUtils.java
@@ -167,12 +167,11 @@ public class WebUtils {
   }
 
   /**
-   * Gets the actual client IP address inspecting the X-Forwarded-For
-   * HTTP header or using the provided 'remote IP address' from the
-   * low level TCP connection from the client.
+   * Gets the actual client IP address inspecting the X-Forwarded-For HTTP header or using the
+   * provided 'remote IP address' from the low level TCP connection from the client.
    *
-   * If multiple IP addresses are provided in the X-Forwarded-For header
-   * then the first one (first hop) is used
+   * If multiple IP addresses are provided in the X-Forwarded-For header then the first one (first
+   * hop) is used
    *
    * @param httpHeaders List of HTTP headers for the current request
    * @param remoteAddr The client IP address and port from the current request's TCP connection
diff --git a/azkaban-common/src/test/java/azkaban/executor/ExecutionFlowDaoTest.java b/azkaban-common/src/test/java/azkaban/executor/ExecutionFlowDaoTest.java
index badcb18..ea4cc79 100644
--- a/azkaban-common/src/test/java/azkaban/executor/ExecutionFlowDaoTest.java
+++ b/azkaban-common/src/test/java/azkaban/executor/ExecutionFlowDaoTest.java
@@ -124,11 +124,11 @@ public class ExecutionFlowDaoTest {
   public void fetchFlowHistory() throws Exception {
     final ExecutableFlow flow = createTestFlow();
     this.executionFlowDao.uploadExecutableFlow(flow);
-    final List<ExecutableFlow> flowList1 = this.executionFlowDao.fetchFlowHistory(0,2 );
+    final List<ExecutableFlow> flowList1 = this.executionFlowDao.fetchFlowHistory(0, 2);
     assertThat(flowList1.size()).isEqualTo(1);
 
     final List<ExecutableFlow> flowList2 = this.executionFlowDao
-        .fetchFlowHistory(flow.getProjectId(), flow.getId(),0,2 );
+        .fetchFlowHistory(flow.getProjectId(), flow.getId(), 0, 2);
     assertThat(flowList2.size()).isEqualTo(1);
 
     final ExecutableFlow fetchFlow =
@@ -182,7 +182,8 @@ public class ExecutionFlowDaoTest {
     flow2.setStatus(Status.PREPARING);
     this.executionFlowDao.uploadExecutableFlow(flow2);
 
-    final List<Pair<ExecutionReference, ExecutableFlow>> fetchedQueuedFlows = this.executionFlowDao.fetchQueuedFlows();
+    final List<Pair<ExecutionReference, ExecutableFlow>> fetchedQueuedFlows = this.executionFlowDao
+        .fetchQueuedFlows();
     assertThat(fetchedQueuedFlows.size()).isEqualTo(2);
     final Pair<ExecutionReference, ExecutableFlow> fetchedFlow1 = fetchedQueuedFlows.get(0);
     final Pair<ExecutionReference, ExecutableFlow> fetchedFlow2 = fetchedQueuedFlows.get(1);
@@ -200,7 +201,8 @@ public class ExecutionFlowDaoTest {
     this.executionFlowDao.uploadExecutableFlow(flow);
     this.assignExecutor.assignExecutor(executor.getId(), flow.getExecutionId());
 
-    final Executor fetchExecutor = this.executorDao.fetchExecutorByExecutionId(flow.getExecutionId());
+    final Executor fetchExecutor = this.executorDao
+        .fetchExecutorByExecutionId(flow.getExecutionId());
     assertThat(fetchExecutor).isEqualTo(executor);
 
     this.assignExecutor.unassignExecutor(flow.getExecutionId());
@@ -216,13 +218,13 @@ public class ExecutionFlowDaoTest {
     // Since we haven't inserted any executors, 1 should be non-existent executor id.
     assertThatThrownBy(
         () -> this.assignExecutor.assignExecutor(1, flow.getExecutionId()))
-            .isInstanceOf(ExecutorManagerException.class)
-            .hasMessageContaining("non-existent executor");
+        .isInstanceOf(ExecutorManagerException.class)
+        .hasMessageContaining("non-existent executor");
   }
 
   /* Test exception when assigning an executor to a non-existent flow execution */
   @Test
-  public void testAssignExecutorInvalidExecution() throws Exception{
+  public void testAssignExecutorInvalidExecution() throws Exception {
     final String host = "localhost";
     final int port = 12345;
     final Executor executor = this.executorDao.addExecutor(host, port);
@@ -287,13 +289,15 @@ public class ExecutionFlowDaoTest {
     assertThat(activeFlows1.containsKey(flow1.getExecutionId())).isFalse();
   }
 
-  @Test @Ignore
+  @Test
+  @Ignore
   // TODO jamiesjc: Active_execution_flow table is already deprecated. we should remove related
   // test methods as well.
   public void testFetchActiveFlowsReferenceChanged() throws Exception {
   }
 
-  @Test @Ignore
+  @Test
+  @Ignore
   // TODO jamiesjc: Active_execution_flow table is already deprecated. we should remove related
   // test methods as well.
   public void testFetchActiveFlowByExecId() throws Exception {
diff --git a/azkaban-common/src/test/java/azkaban/executor/ExecutionLogsDaoTest.java b/azkaban-common/src/test/java/azkaban/executor/ExecutionLogsDaoTest.java
index f22a597..b4b8436 100644
--- a/azkaban-common/src/test/java/azkaban/executor/ExecutionLogsDaoTest.java
+++ b/azkaban-common/src/test/java/azkaban/executor/ExecutionLogsDaoTest.java
@@ -69,8 +69,8 @@ public class ExecutionLogsDaoTest {
   public void testSmallUploadLog() throws ExecutorManagerException {
     final File logDir = ExecutionsTestUtil.getFlowDir(LOG_TEST_DIR_NAME);
     final File[] smalllog =
-        { new File(logDir, "log1.log"), new File(logDir, "log2.log"),
-            new File(logDir, "log3.log") };
+        {new File(logDir, "log1.log"), new File(logDir, "log2.log"),
+            new File(logDir, "log3.log")};
 
     this.executionLogsDao.uploadLogFile(1, "smallFiles", 0, smalllog);
 
@@ -92,8 +92,8 @@ public class ExecutionLogsDaoTest {
 
     // Multiple of 255 for Henry the Eigth
     final File[] largelog =
-        { new File(logDir, "largeLog1.log"), new File(logDir, "largeLog2.log"),
-            new File(logDir, "largeLog3.log") };
+        {new File(logDir, "largeLog1.log"), new File(logDir, "largeLog2.log"),
+            new File(logDir, "largeLog3.log")};
 
     this.executionLogsDao.uploadLogFile(1, "largeFiles", 0, largelog);
 
diff --git a/azkaban-common/src/test/java/azkaban/executor/ExecutorManagerTest.java b/azkaban-common/src/test/java/azkaban/executor/ExecutorManagerTest.java
index 8152f4c..ce79de2 100644
--- a/azkaban-common/src/test/java/azkaban/executor/ExecutorManagerTest.java
+++ b/azkaban-common/src/test/java/azkaban/executor/ExecutorManagerTest.java
@@ -68,7 +68,7 @@ public class ExecutorManagerTest {
 
   /* Helper method to create a ExecutorManager Instance */
   private ExecutorManager createMultiExecutorManagerInstance()
-    throws ExecutorManagerException {
+      throws ExecutorManagerException {
     return createMultiExecutorManagerInstance(new MockExecutorLoader());
   }
 
@@ -107,14 +107,14 @@ public class ExecutorManagerTest {
     final ExecutorManager manager =
         new ExecutorManager(this.props, loader, this.alertHolder, this.commonMetrics);
     final Set<Executor> activeExecutors =
-      new HashSet(manager.getAllActiveExecutors());
+        new HashSet(manager.getAllActiveExecutors());
 
     Assert.assertEquals(activeExecutors.size(), 1);
     final Executor executor = activeExecutors.iterator().next();
     Assert.assertEquals(executor.getHost(), "localhost");
     Assert.assertEquals(executor.getPort(), 12345);
     Assert.assertArrayEquals(activeExecutors.toArray(), loader
-      .fetchActiveExecutors().toArray());
+        .fetchActiveExecutors().toArray());
   }
 
   /*
@@ -130,9 +130,9 @@ public class ExecutorManagerTest {
     final ExecutorManager manager =
         new ExecutorManager(this.props, loader, this.alertHolder, this.commonMetrics);
     final Set<Executor> activeExecutors =
-      new HashSet(manager.getAllActiveExecutors());
-    Assert.assertArrayEquals(activeExecutors.toArray(), new Executor[] {
-      executor1, executor2 });
+        new HashSet(manager.getAllActiveExecutors());
+    Assert.assertArrayEquals(activeExecutors.toArray(), new Executor[]{
+        executor1, executor2});
   }
 
   /*
@@ -146,7 +146,7 @@ public class ExecutorManagerTest {
     final ExecutorManager manager =
         new ExecutorManager(this.props, loader, this.alertHolder, this.commonMetrics);
     Assert.assertArrayEquals(manager.getAllActiveExecutors().toArray(),
-      new Executor[] { executor1 });
+        new Executor[]{executor1});
 
     // mark older executor as inactive
     executor1.setActive(false);
@@ -156,7 +156,7 @@ public class ExecutorManagerTest {
     manager.setupExecutors();
 
     Assert.assertArrayEquals(manager.getAllActiveExecutors().toArray(),
-      new Executor[] { executor2, executor3 });
+        new Executor[]{executor2, executor3});
   }
 
   /*
@@ -171,9 +171,9 @@ public class ExecutorManagerTest {
     final ExecutorManager manager =
         new ExecutorManager(this.props, loader, this.alertHolder, this.commonMetrics);
     final Set<Executor> activeExecutors =
-      new HashSet(manager.getAllActiveExecutors());
+        new HashSet(manager.getAllActiveExecutors());
     Assert.assertArrayEquals(activeExecutors.toArray(),
-      new Executor[] { executor1 });
+        new Executor[]{executor1});
 
     // mark older executor as inactive
     executor1.setActive(false);
@@ -217,7 +217,7 @@ public class ExecutorManagerTest {
     final List<Integer> testFlows = Arrays.asList(flow1.getExecutionId(), flow2.getExecutionId());
 
     final List<Pair<ExecutionReference, ExecutableFlow>> queuedFlowsDB =
-      loader.fetchQueuedFlows();
+        loader.fetchQueuedFlows();
     Assert.assertEquals(queuedFlowsDB.size(), testFlows.size());
     // Verify things are correctly setup in db
     for (final Pair<ExecutionReference, ExecutableFlow> pair : queuedFlowsDB) {
@@ -229,7 +229,7 @@ public class ExecutorManagerTest {
     final List<Integer> managerActiveFlows = manager.getRunningFlows()
         .stream().map(ExecutableFlow::getExecutionId).collect(Collectors.toList());
     Assert.assertTrue(managerActiveFlows.containsAll(testFlows)
-      && testFlows.containsAll(managerActiveFlows));
+        && testFlows.containsAll(managerActiveFlows));
 
     // Verify getQueuedFlowIds method
     Assert.assertEquals("[1, 2]", manager.getQueuedFlowIds());
@@ -238,11 +238,11 @@ public class ExecutorManagerTest {
   /* Test submit duplicate flow when previous instance is not dispatched */
   @Test(expected = ExecutorManagerException.class)
   public void testDuplicateQueuedFlows() throws ExecutorManagerException,
-    IOException {
+      IOException {
     final ExecutorManager manager = createMultiExecutorManagerInstance();
     final ExecutableFlow flow1 = TestUtils.createExecutableFlow("exectest1", "exec1");
     flow1.getExecutionOptions().setConcurrentOption(
-      ExecutionOptions.CONCURRENT_OPTION_SKIP);
+        ExecutionOptions.CONCURRENT_OPTION_SKIP);
 
     final User testUser = TestUtils.getTestUser();
     manager.submitExecutableFlow(flow1, testUser.getUserId());
@@ -263,7 +263,7 @@ public class ExecutorManagerTest {
 
     manager.cancelFlow(flow1, testUser.getUserId());
     final ExecutableFlow fetchedFlow =
-      loader.fetchExecutableFlow(flow1.getExecutionId());
+        loader.fetchExecutableFlow(flow1.getExecutionId());
     Assert.assertEquals(fetchedFlow.getStatus(), Status.FAILED);
 
     Assert.assertFalse(manager.getRunningFlows().contains(flow1));
@@ -282,7 +282,8 @@ public class ExecutorManagerTest {
     verify(this.loader).addActiveExecutableReference(any());
   }
 
-  @Ignore @Test
+  @Ignore
+  @Test
   public void testFetchAllActiveFlows() throws ExecutorManagerException, IOException {
     testSetUpForRunningFlows();
     final List<ExecutableFlow> flows = this.manager.getRunningFlows();
@@ -291,7 +292,8 @@ public class ExecutorManagerTest {
     }
   }
 
-  @Ignore @Test
+  @Ignore
+  @Test
   public void testFetchActiveFlowByProject() throws ExecutorManagerException, IOException {
     testSetUpForRunningFlows();
     final List<Integer> executions = this.manager.getRunningFlows(this.flow1.getProjectId(),
@@ -301,7 +303,8 @@ public class ExecutorManagerTest {
         .assertTrue(this.manager.isFlowRunning(this.flow1.getProjectId(), this.flow1.getFlowId()));
   }
 
-  @Ignore @Test
+  @Ignore
+  @Test
   public void testFetchActiveFlowWithExecutor() throws ExecutorManagerException, IOException {
     testSetUpForRunningFlows();
     final List<Pair<ExecutableFlow, Executor>> activeFlowsWithExecutor =
@@ -318,8 +321,10 @@ public class ExecutorManagerTest {
     final Set<String> activeExecutorServerHosts = this.manager.getAllActiveExecutorServerHosts();
     final Executor executor1 = this.manager.fetchExecutor(this.flow1.getExecutionId());
     final Executor executor2 = this.manager.fetchExecutor(this.flow2.getExecutionId());
-    Assert.assertTrue(activeExecutorServerHosts.contains(executor1.getHost() + ":" + executor1.getPort()));
-    Assert.assertTrue(activeExecutorServerHosts.contains(executor2.getHost() + ":" + executor2.getPort()));
+    Assert.assertTrue(
+        activeExecutorServerHosts.contains(executor1.getHost() + ":" + executor1.getPort()));
+    Assert.assertTrue(
+        activeExecutorServerHosts.contains(executor2.getHost() + ":" + executor2.getPort()));
   }
 
   /*
diff --git a/azkaban-common/src/test/java/azkaban/executor/InteractiveTestJob.java b/azkaban-common/src/test/java/azkaban/executor/InteractiveTestJob.java
index 5ca9993..9fb5da2 100644
--- a/azkaban-common/src/test/java/azkaban/executor/InteractiveTestJob.java
+++ b/azkaban-common/src/test/java/azkaban/executor/InteractiveTestJob.java
@@ -63,6 +63,12 @@ public class InteractiveTestJob extends AbstractProcessJob {
     testJobs.clear();
   }
 
+  public static void clearTestJobs(final String... names) {
+    for (final String name : names) {
+      assertNotNull(testJobs.remove(name));
+    }
+  }
+
   @Override
   public void run() throws Exception {
     final String nestedFlowPath =
@@ -148,10 +154,4 @@ public class InteractiveTestJob extends AbstractProcessJob {
     info("Killing job");
     failJob();
   }
-
-  public static void clearTestJobs(final String... names) {
-    for (String name : names) {
-      assertNotNull(testJobs.remove(name));
-    }
-  }
 }
diff --git a/azkaban-common/src/test/java/azkaban/executor/MockExecutorLoader.java b/azkaban-common/src/test/java/azkaban/executor/MockExecutorLoader.java
index adadb2d..6b4db57 100644
--- a/azkaban-common/src/test/java/azkaban/executor/MockExecutorLoader.java
+++ b/azkaban-common/src/test/java/azkaban/executor/MockExecutorLoader.java
@@ -22,11 +22,11 @@ import azkaban.utils.Pair;
 import azkaban.utils.Props;
 import java.io.File;
 import java.io.IOException;
+import java.time.Duration;
 import java.util.ArrayList;
 import java.util.Date;
 import java.util.List;
 import java.util.Map;
-import java.time.Duration;
 import java.util.concurrent.ConcurrentHashMap;
 import org.apache.commons.io.FileUtils;
 import org.apache.log4j.Logger;
@@ -413,7 +413,7 @@ public class MockExecutorLoader implements ExecutorLoader {
   }
 
   @Override
-  public List<ExecutableFlow> fetchRecentlyFinishedFlows(Duration maxAge)
+  public List<ExecutableFlow> fetchRecentlyFinishedFlows(final Duration maxAge)
       throws ExecutorManagerException {
     return new ArrayList<>();
   }
diff --git a/azkaban-common/src/test/java/azkaban/executor/NumExecutionsDaoTest.java b/azkaban-common/src/test/java/azkaban/executor/NumExecutionsDaoTest.java
index d3791e7..f00a807 100644
--- a/azkaban-common/src/test/java/azkaban/executor/NumExecutionsDaoTest.java
+++ b/azkaban-common/src/test/java/azkaban/executor/NumExecutionsDaoTest.java
@@ -70,7 +70,6 @@ public class NumExecutionsDaoTest {
     flow1.setStatus(Status.PREPARING);
     this.executionFlowDao.uploadExecutableFlow(flow1);
 
-
     final ExecutableFlow flow2 = TestUtils.createExecutableFlow("exectest1", "exec2");
     flow2.setStatus(Status.RUNNING);
     this.executionFlowDao.uploadExecutableFlow(flow2);
@@ -82,7 +81,8 @@ public class NumExecutionsDaoTest {
     final int count = this.numExecutionsDao.fetchNumExecutableFlows();
     assertThat(count).isEqualTo(3);
 
-    final int flow2Count = this.numExecutionsDao.fetchNumExecutableFlows(1, "derived-member-data-2");
+    final int flow2Count = this.numExecutionsDao
+        .fetchNumExecutableFlows(1, "derived-member-data-2");
     assertThat(flow2Count).isEqualTo(2);
   }
 
diff --git a/azkaban-common/src/test/java/azkaban/jobtype/JobTypeManagerTest.java b/azkaban-common/src/test/java/azkaban/jobtype/JobTypeManagerTest.java
index 4ebea5e..181b8b0 100644
--- a/azkaban-common/src/test/java/azkaban/jobtype/JobTypeManagerTest.java
+++ b/azkaban-common/src/test/java/azkaban/jobtype/JobTypeManagerTest.java
@@ -37,8 +37,7 @@ import org.junit.Test;
 import org.junit.rules.TemporaryFolder;
 
 /**
- * Test the flow run, especially with embedded flows. Files are in
- * unit/plugins/jobtypes
+ * Test the flow run, especially with embedded flows. Files are in unit/plugins/jobtypes
  */
 public class JobTypeManagerTest {
 
@@ -92,8 +91,8 @@ public class JobTypeManagerTest {
   }
 
   /**
-   * Tests that the proper classes were loaded and that the common and the load
-   * properties are properly loaded.
+   * Tests that the proper classes were loaded and that the common and the load properties are
+   * properly loaded.
    */
   @Test
   public void testLoadedClasses() throws Exception {
diff --git a/azkaban-common/src/test/java/azkaban/project/JdbcProjectImplTest.java b/azkaban-common/src/test/java/azkaban/project/JdbcProjectImplTest.java
index c90e149..2478868 100644
--- a/azkaban-common/src/test/java/azkaban/project/JdbcProjectImplTest.java
+++ b/azkaban-common/src/test/java/azkaban/project/JdbcProjectImplTest.java
@@ -155,7 +155,8 @@ public class JdbcProjectImplTest {
     final Project project = this.loader.fetchProjectByName("mytestProject");
     final File testFile = new File(getClass().getClassLoader().getResource(SAMPLE_FILE).getFile());
     final int newVersion = this.loader.getLatestProjectVersion(project) + 1;
-    this.loader.addProjectVersion(project.getId(), newVersion, testFile, "uploadUser1", computeHash(testFile), "resourceId1");
+    this.loader.addProjectVersion(project.getId(), newVersion, testFile, "uploadUser1",
+        computeHash(testFile), "resourceId1");
     final int currVersion = this.loader.getLatestProjectVersion(project);
     Assert.assertEquals(currVersion, newVersion);
   }
@@ -177,7 +178,7 @@ public class JdbcProjectImplTest {
     final Project project = this.loader.fetchProjectByName("mytestProject");
     final int newVersion = this.loader.getLatestProjectVersion(project) + 7;
     this.loader.changeProjectVersion(project, newVersion, "uploadUser1");
-    final Project sameProject= this.loader.fetchProjectById(project.getId());
+    final Project sameProject = this.loader.fetchProjectById(project.getId());
     Assert.assertEquals(sameProject.getVersion(), newVersion);
   }
 
@@ -185,9 +186,11 @@ public class JdbcProjectImplTest {
   public void testUpdatePermission() throws Exception {
     createThreeProjects();
     final Project project = this.loader.fetchProjectByName("mytestProject");
-    this.loader.updatePermission(project, project.getLastModifiedUser(), new Permission(Permission.Type.ADMIN), false);
+    this.loader.updatePermission(project, project.getLastModifiedUser(),
+        new Permission(Permission.Type.ADMIN), false);
 
-    final List<Triple<String, Boolean, Permission>> permissionsTriple = this.loader.getProjectPermissions(project);
+    final List<Triple<String, Boolean, Permission>> permissionsTriple = this.loader
+        .getProjectPermissions(project);
     Assert.assertEquals(permissionsTriple.size(), 1);
     Assert.assertEquals(permissionsTriple.get(0).getFirst(), "testUser1");
     Assert.assertEquals(permissionsTriple.get(0).getThird().toString(), "ADMIN");
@@ -208,9 +211,11 @@ public class JdbcProjectImplTest {
   public void testRemovePermission() throws Exception {
     createThreeProjects();
     final Project project = this.loader.fetchProjectByName("mytestProject");
-    this.loader.updatePermission(project, project.getLastModifiedUser(), new Permission(Permission.Type.ADMIN), false);
+    this.loader.updatePermission(project, project.getLastModifiedUser(),
+        new Permission(Permission.Type.ADMIN), false);
     this.loader.removePermission(project, project.getLastModifiedUser(), false);
-    final List<Triple<String, Boolean, Permission>> permissionsTriple = this.loader.getProjectPermissions(project);
+    final List<Triple<String, Boolean, Permission>> permissionsTriple = this.loader
+        .getProjectPermissions(project);
     Assert.assertEquals(permissionsTriple.size(), 0);
   }
 
@@ -313,7 +318,8 @@ public class JdbcProjectImplTest {
     final Project project = this.loader.fetchProjectByName("mytestProject");
     this.loader.uploadProjectProperties(project, list);
 
-    final Map<String, Props> propsMap = this.loader.fetchProjectProperties(project.getId(), project.getVersion());
+    final Map<String, Props> propsMap = this.loader
+        .fetchProjectProperties(project.getId(), project.getVersion());
     Assert.assertEquals(propsMap.get("source1").get("key2"), "value2");
     Assert.assertEquals(propsMap.get("source2").get("keyaaa"), "valueaaa");
   }
@@ -329,7 +335,7 @@ public class JdbcProjectImplTest {
     final ProjectFileHandler fileHandler = this.loader.getUploadedFile(project.getId(), newVersion);
     Assert.assertEquals(fileHandler.getNumChunks(), 1);
 
-    this.loader.cleanOlderProjectVersion(project.getId(), newVersion+1);
+    this.loader.cleanOlderProjectVersion(project.getId(), newVersion + 1);
 
     final ProjectFileHandler fileHandler2 = this.loader
         .fetchProjectMetaData(project.getId(), newVersion);
diff --git a/azkaban-common/src/test/java/azkaban/project/validator/XmlValidatorManagerTest.java b/azkaban-common/src/test/java/azkaban/project/validator/XmlValidatorManagerTest.java
index fc1da74..b5a3baa 100644
--- a/azkaban-common/src/test/java/azkaban/project/validator/XmlValidatorManagerTest.java
+++ b/azkaban-common/src/test/java/azkaban/project/validator/XmlValidatorManagerTest.java
@@ -12,8 +12,8 @@ public class XmlValidatorManagerTest {
   private final Props baseProps = new Props();
 
   /**
-   * Test that if the validator directory does not exist, XmlValidatorManager
-   * should still load the default validator.
+   * Test that if the validator directory does not exist, XmlValidatorManager should still load the
+   * default validator.
    */
   @Test
   public void testNoValidatorsDir() {
diff --git a/azkaban-common/src/test/java/azkaban/test/Utils.java b/azkaban-common/src/test/java/azkaban/test/Utils.java
index 2dac04e..b19248f 100644
--- a/azkaban-common/src/test/java/azkaban/test/Utils.java
+++ b/azkaban-common/src/test/java/azkaban/test/Utils.java
@@ -30,7 +30,7 @@ public class Utils {
     SERVICE_PROVIDER.setInjector(injector);
   }
 
-  public static DatabaseOperator initTestDB() throws Exception{
+  public static DatabaseOperator initTestDB() throws Exception {
     final AzkabanDataSource dataSource = new EmbeddedH2BasicDataSource();
 
     final String sqlScriptsDir = new File("../azkaban-db/src/main/sql/").getCanonicalPath();
diff --git a/azkaban-common/src/test/java/azkaban/trigger/BasicTimeCheckerTest.java b/azkaban-common/src/test/java/azkaban/trigger/BasicTimeCheckerTest.java
index 4ba6e32..904a558 100644
--- a/azkaban-common/src/test/java/azkaban/trigger/BasicTimeCheckerTest.java
+++ b/azkaban-common/src/test/java/azkaban/trigger/BasicTimeCheckerTest.java
@@ -43,8 +43,8 @@ public class BasicTimeCheckerTest {
 
 
   /**
-   * This test manipulates global states (time) in org.joda.time.DateTimeUtils . Thus this test
-   * can run in parallel with tests that do the same.
+   * This test manipulates global states (time) in org.joda.time.DateTimeUtils . Thus this test can
+   * run in parallel with tests that do the same.
    */
   @Test
   public void periodTimerTest() {
diff --git a/azkaban-common/src/test/java/azkaban/trigger/TriggerManagerDeadlockTest.java b/azkaban-common/src/test/java/azkaban/trigger/TriggerManagerDeadlockTest.java
index 5d3dc22..d45992a 100644
--- a/azkaban-common/src/test/java/azkaban/trigger/TriggerManagerDeadlockTest.java
+++ b/azkaban-common/src/test/java/azkaban/trigger/TriggerManagerDeadlockTest.java
@@ -16,7 +16,6 @@
 
 package azkaban.trigger;
 
-import azkaban.utils.AbstractMailerTest;
 import azkaban.executor.AlerterHolder;
 import azkaban.executor.ExecutorLoader;
 import azkaban.executor.ExecutorManager;
@@ -25,6 +24,7 @@ import azkaban.executor.MockExecutorLoader;
 import azkaban.metrics.CommonMetrics;
 import azkaban.metrics.MetricsManager;
 import azkaban.trigger.builtin.CreateTriggerAction;
+import azkaban.utils.AbstractMailerTest;
 import azkaban.utils.Emailer;
 import azkaban.utils.Props;
 import com.codahale.metrics.MetricRegistry;
diff --git a/azkaban-common/src/test/java/azkaban/trigger/TriggerManagerTest.java b/azkaban-common/src/test/java/azkaban/trigger/TriggerManagerTest.java
index 8fbcbc3..de874f6 100644
--- a/azkaban-common/src/test/java/azkaban/trigger/TriggerManagerTest.java
+++ b/azkaban-common/src/test/java/azkaban/trigger/TriggerManagerTest.java
@@ -122,7 +122,7 @@ public class TriggerManagerTest {
     assertTrue(t1.getStatus() == TriggerStatus.PAUSED);
   }
 
-  private void sleep (final long millis) {
+  private void sleep(final long millis) {
     try {
       Thread.sleep(millis);
     } catch (final InterruptedException e) {
@@ -135,7 +135,7 @@ public class TriggerManagerTest {
     final Map<String, ConditionChecker> expireCheckers = new HashMap<>();
     final ConditionChecker triggerChecker = new ThresholdChecker(ThresholdChecker.type, threshold);
     final ConditionChecker endTimeChecker = new BasicTimeChecker("EndTimeCheck_1", 111L,
-        DateTimeZone.UTC, 2536871155000L,false, false,
+        DateTimeZone.UTC, 2536871155000L, false, false,
         null, null);
     triggerCheckers.put(triggerChecker.getId(), triggerChecker);
     expireCheckers.put(endTimeChecker.getId(), endTimeChecker);
@@ -169,7 +169,7 @@ public class TriggerManagerTest {
 
     // End time is 3 seconds past now.
     final ConditionChecker endTimeChecker = new BasicTimeChecker("EndTimeChecker_1", 111L,
-        DateTimeZone.UTC, currMillis + 3000L,false, false,
+        DateTimeZone.UTC, currMillis + 3000L, false, false,
         null, null);
     triggerCheckers.put(triggerChecker.getId(), triggerChecker);
     expireCheckers.put(endTimeChecker.getId(), endTimeChecker);
@@ -199,6 +199,7 @@ public class TriggerManagerTest {
   }
 
   public static class MockTriggerLoader implements TriggerLoader {
+
     private final Map<Integer, Trigger> triggers = new HashMap<>();
     private int idIndex = 0;
 
diff --git a/azkaban-common/src/test/java/azkaban/user/PermissionTest.java b/azkaban-common/src/test/java/azkaban/user/PermissionTest.java
index 0ee4cab..8b55538 100644
--- a/azkaban-common/src/test/java/azkaban/user/PermissionTest.java
+++ b/azkaban-common/src/test/java/azkaban/user/PermissionTest.java
@@ -112,8 +112,8 @@ public class PermissionTest {
   }
 
   /**
-   * Verify that the binary bit for UPLOADPROJECTS is not turned on
-   * by setting the other permissions.
+   * Verify that the binary bit for UPLOADPROJECTS is not turned on by setting the other
+   * permissions.
    */
   @Test
   public void testUploadProjectFlag() throws Exception {
diff --git a/azkaban-common/src/test/java/azkaban/user/UserUtilsTest.java b/azkaban-common/src/test/java/azkaban/user/UserUtilsTest.java
index c087a88..002925a 100644
--- a/azkaban-common/src/test/java/azkaban/user/UserUtilsTest.java
+++ b/azkaban-common/src/test/java/azkaban/user/UserUtilsTest.java
@@ -8,6 +8,7 @@ import org.junit.Test;
 
 
 public class UserUtilsTest {
+
   @Test
   public void testAdminUserCanUploadProject() throws UserManagerException {
     final UserManager userManager = TestUtils.createTestXmlUserManager();
diff --git a/azkaban-common/src/test/java/azkaban/utils/ExternalLinkUtilsTest.java b/azkaban-common/src/test/java/azkaban/utils/ExternalLinkUtilsTest.java
index 371b670..8ff3e27 100644
--- a/azkaban-common/src/test/java/azkaban/utils/ExternalLinkUtilsTest.java
+++ b/azkaban-common/src/test/java/azkaban/utils/ExternalLinkUtilsTest.java
@@ -59,8 +59,8 @@ public class ExternalLinkUtilsTest {
   }
 
   /**
-   * Test validates the happy path when an external analyzer is configured
-   * with '${url}' as the format in 'azkaban.properties'.
+   * Test validates the happy path when an external analyzer is configured with '${url}' as the
+   * format in 'azkaban.properties'.
    */
   @Test
   public void testGetExternalAnalyzerValidFormat() {
@@ -80,8 +80,8 @@ public class ExternalLinkUtilsTest {
   }
 
   /**
-   * Test validates the happy path when an log viewer is configured
-   * with '${execid}'  and '${jobid} as the format in 'azkaban.properties'.
+   * Test validates the happy path when an log viewer is configured with '${execid}'  and '${jobid}
+   * as the format in 'azkaban.properties'.
    */
   @Test
   public void testGetExternalLogViewerValidFormat() {
@@ -98,8 +98,8 @@ public class ExternalLinkUtilsTest {
   }
 
   /**
-   * Test validates the condition when an external analyzer is not configured
-   * in 'azkaban.properties'.
+   * Test validates the condition when an external analyzer is not configured in
+   * 'azkaban.properties'.
    */
   @Test
   public void testGetExternalAnalyzerNotConfigured() {
@@ -109,8 +109,8 @@ public class ExternalLinkUtilsTest {
   }
 
   /**
-   * Test validates the condition when an external log viewer is not configured
-   * in 'azkaban.properties'.
+   * Test validates the condition when an external log viewer is not configured in
+   * 'azkaban.properties'.
    */
   @Test
   public void testGetLogViewerNotConfigured() {
diff --git a/azkaban-common/src/test/java/azkaban/utils/PatternLayoutEscapedTest.java b/azkaban-common/src/test/java/azkaban/utils/PatternLayoutEscapedTest.java
index e854e21..68c2056 100644
--- a/azkaban-common/src/test/java/azkaban/utils/PatternLayoutEscapedTest.java
+++ b/azkaban-common/src/test/java/azkaban/utils/PatternLayoutEscapedTest.java
@@ -26,9 +26,9 @@ import org.junit.Before;
 import org.junit.Test;
 
 /**
- * Test output of PatternLayoutEscapedTest
- * It should be appending stack traces, escaping new lines, quotes, tabs and backslashes
- * This is necessary when we are logging these messages out as JSON objects
+ * Test output of PatternLayoutEscapedTest It should be appending stack traces, escaping new lines,
+ * quotes, tabs and backslashes This is necessary when we are logging these messages out as JSON
+ * objects
  */
 public class PatternLayoutEscapedTest {
 
diff --git a/azkaban-common/src/test/resources/sql/create.execution_flows.sql b/azkaban-common/src/test/resources/sql/create.execution_flows.sql
index b2f7625..043ed90 100644
--- a/azkaban-common/src/test/resources/sql/create.execution_flows.sql
+++ b/azkaban-common/src/test/resources/sql/create.execution_flows.sql
@@ -1,20 +1,24 @@
 CREATE TABLE execution_flows (
-	exec_id INT NOT NULL AUTO_INCREMENT,
-	project_id INT NOT NULL,
-	version INT NOT NULL,
-	flow_id VARCHAR(128) NOT NULL,
-	status TINYINT,
-	submit_user VARCHAR(64),
-	submit_time BIGINT,
-	update_time BIGINT,
-	start_time BIGINT,
-	end_time BIGINT,
-	enc_type TINYINT,
-	flow_data LONGBLOB,
-	PRIMARY KEY (exec_id)
+  exec_id     INT          NOT NULL AUTO_INCREMENT,
+  project_id  INT          NOT NULL,
+  version     INT          NOT NULL,
+  flow_id     VARCHAR(128) NOT NULL,
+  status      TINYINT,
+  submit_user VARCHAR(64),
+  submit_time BIGINT,
+  update_time BIGINT,
+  start_time  BIGINT,
+  end_time    BIGINT,
+  enc_type    TINYINT,
+  flow_data   LONGBLOB,
+  PRIMARY KEY (exec_id)
 );
 
-CREATE INDEX ex_flows_start_time ON execution_flows(start_time);
-CREATE INDEX ex_flows_end_time ON execution_flows(end_time);
-CREATE INDEX ex_flows_time_range ON execution_flows(start_time, end_time);
-CREATE INDEX ex_flows_flows ON execution_flows(project_id, flow_id);
+CREATE INDEX ex_flows_start_time
+  ON execution_flows (start_time);
+CREATE INDEX ex_flows_end_time
+  ON execution_flows (end_time);
+CREATE INDEX ex_flows_time_range
+  ON execution_flows (start_time, end_time);
+CREATE INDEX ex_flows_flows
+  ON execution_flows (project_id, flow_id);
diff --git a/azkaban-common/src/test/resources/sql/create.execution_jobs.sql b/azkaban-common/src/test/resources/sql/create.execution_jobs.sql
index a62d3a9..d2379f2 100644
--- a/azkaban-common/src/test/resources/sql/create.execution_jobs.sql
+++ b/azkaban-common/src/test/resources/sql/create.execution_jobs.sql
@@ -1,19 +1,22 @@
 CREATE TABLE execution_jobs (
-	exec_id INT NOT NULL,
-	project_id INT NOT NULL,
-	version INT NOT NULL,
-	flow_id VARCHAR(128) NOT NULL,
-	job_id VARCHAR(128) NOT NULL,
-	attempt INT,
-	start_time BIGINT,
-	end_time BIGINT,
-	status TINYINT,
-	input_params LONGBLOB,
-	output_params LONGBLOB,
-	attachments LONGBLOB,
-	PRIMARY KEY (exec_id, job_id, attempt)
+  exec_id       INT          NOT NULL,
+  project_id    INT          NOT NULL,
+  version       INT          NOT NULL,
+  flow_id       VARCHAR(128) NOT NULL,
+  job_id        VARCHAR(128) NOT NULL,
+  attempt       INT,
+  start_time    BIGINT,
+  end_time      BIGINT,
+  status        TINYINT,
+  input_params  LONGBLOB,
+  output_params LONGBLOB,
+  attachments   LONGBLOB,
+  PRIMARY KEY (exec_id, job_id, attempt)
 );
 
-CREATE INDEX exec_job ON execution_jobs(exec_id, job_id);
-CREATE INDEX exec_id ON execution_jobs(exec_id);
-CREATE INDEX ex_job_id ON execution_jobs(project_id, job_id);
+CREATE INDEX exec_job
+  ON execution_jobs (exec_id, job_id);
+CREATE INDEX exec_id
+  ON execution_jobs (exec_id);
+CREATE INDEX ex_job_id
+  ON execution_jobs (project_id, job_id);
diff --git a/azkaban-common/src/test/resources/sql/create.execution_logs.sql b/azkaban-common/src/test/resources/sql/create.execution_logs.sql
index 0aa6a36..80a9777 100644
--- a/azkaban-common/src/test/resources/sql/create.execution_logs.sql
+++ b/azkaban-common/src/test/resources/sql/create.execution_logs.sql
@@ -1,14 +1,16 @@
 CREATE TABLE execution_logs (
-	exec_id INT NOT NULL,
-	name VARCHAR(128),
-	attempt INT,
-	enc_type TINYINT,
-	start_byte INT,
-	end_byte INT,
-	log LONGBLOB,
-	upload_time BIGINT,
-	PRIMARY KEY (exec_id, name, attempt, start_byte)
+  exec_id     INT NOT NULL,
+  name        VARCHAR(128),
+  attempt     INT,
+  enc_type    TINYINT,
+  start_byte  INT,
+  end_byte    INT,
+  log         LONGBLOB,
+  upload_time BIGINT,
+  PRIMARY KEY (exec_id, name, attempt, start_byte)
 );
 
-CREATE INDEX ex_log_attempt ON execution_logs(exec_id, name, attempt);
-CREATE INDEX ex_log_index ON execution_logs(exec_id, name);
\ No newline at end of file
+CREATE INDEX ex_log_attempt
+  ON execution_logs (exec_id, name, attempt);
+CREATE INDEX ex_log_index
+  ON execution_logs (exec_id, name);
diff --git a/azkaban-common/src/test/resources/sql/create.project_events.sql b/azkaban-common/src/test/resources/sql/create.project_events.sql
index dd24d5f..dda35a8 100644
--- a/azkaban-common/src/test/resources/sql/create.project_events.sql
+++ b/azkaban-common/src/test/resources/sql/create.project_events.sql
@@ -1,9 +1,10 @@
 CREATE TABLE project_events (
-	project_id INT NOT NULL,
-	event_type TINYINT NOT NULL,
-	event_time BIGINT NOT NULL,
-	username VARCHAR(64),
-	message VARCHAR(512)
+  project_id INT     NOT NULL,
+  event_type TINYINT NOT NULL,
+  event_time BIGINT  NOT NULL,
+  username   VARCHAR(64),
+  message    VARCHAR(512)
 );
 
-CREATE INDEX log ON project_events(project_id, event_time);
+CREATE INDEX log
+  ON project_events (project_id, event_time);
diff --git a/azkaban-common/src/test/resources/sql/create.properties.sql b/azkaban-common/src/test/resources/sql/create.properties.sql
index aaa37ec..27694fd 100644
--- a/azkaban-common/src/test/resources/sql/create.properties.sql
+++ b/azkaban-common/src/test/resources/sql/create.properties.sql
@@ -1,7 +1,7 @@
 CREATE TABLE properties (
-	name VARCHAR(64) NOT NULL,
-	type INT NOT NULL,
-	modified_time BIGINT NOT NULL,
-	value VARCHAR(256),
-	PRIMARY KEY (name, type)
-);
\ No newline at end of file
+  name          VARCHAR(64) NOT NULL,
+  type          INT         NOT NULL,
+  modified_time BIGINT      NOT NULL,
+  value         VARCHAR(256),
+  PRIMARY KEY (name, type)
+);
diff --git a/azkaban-common/src/test/resources/sql/update.execution_jobs.2.1.sql b/azkaban-common/src/test/resources/sql/update.execution_jobs.2.1.sql
index b7313ad..5f7cf07 100644
--- a/azkaban-common/src/test/resources/sql/update.execution_jobs.2.1.sql
+++ b/azkaban-common/src/test/resources/sql/update.execution_jobs.2.1.sql
@@ -1,4 +1,8 @@
-ALTER TABLE execution_jobs ADD COLUMN attempt INT DEFAULT 0;
-ALTER TABLE execution_jobs DROP PRIMARY KEY;
-ALTER TABLE execution_jobs ADD PRIMARY KEY(exec_id, job_id, attempt);
-ALTER TABLE execution_jobs ADD INDEX exec_job (exec_id, job_id);
+ALTER TABLE execution_jobs
+  ADD COLUMN attempt INT DEFAULT 0;
+ALTER TABLE execution_jobs
+  DROP PRIMARY KEY;
+ALTER TABLE execution_jobs
+  ADD PRIMARY KEY (exec_id, job_id, attempt);
+ALTER TABLE execution_jobs
+  ADD INDEX exec_job (exec_id, job_id);
diff --git a/azkaban-common/src/test/resources/sql/update.execution_jobs.2.3.sql b/azkaban-common/src/test/resources/sql/update.execution_jobs.2.3.sql
index 8c43495..0da2610 100644
--- a/azkaban-common/src/test/resources/sql/update.execution_jobs.2.3.sql
+++ b/azkaban-common/src/test/resources/sql/update.execution_jobs.2.3.sql
@@ -1 +1 @@
-INSERT INTO properties (name,value,modified_time,type) VALUES ('test4', 'value1', 0, 99);
\ No newline at end of file
+INSERT INTO properties (name, value, modified_time, type) VALUES ('test4', 'value1', 0, 99);
diff --git a/azkaban-common/src/test/resources/sql/update.execution_logs.2.1.sql b/azkaban-common/src/test/resources/sql/update.execution_logs.2.1.sql
index 5c2dc0b..61ab60c 100644
--- a/azkaban-common/src/test/resources/sql/update.execution_logs.2.1.sql
+++ b/azkaban-common/src/test/resources/sql/update.execution_logs.2.1.sql
@@ -1,7 +1,14 @@
-ALTER TABLE execution_logs ADD COLUMN attempt INT DEFAULT 0;
-ALTER TABLE execution_logs ADD COLUMN upload_time BIGINT DEFAULT 1420099200000;
-UPDATE execution_logs SET upload_time=(UNIX_TIMESTAMP()*1000) WHERE upload_time=1420099200000;
+ALTER TABLE execution_logs
+  ADD COLUMN attempt INT DEFAULT 0;
+ALTER TABLE execution_logs
+  ADD COLUMN upload_time BIGINT DEFAULT 1420099200000;
+UPDATE execution_logs
+SET upload_time = (UNIX_TIMESTAMP() * 1000)
+WHERE upload_time = 1420099200000;
 
-ALTER TABLE execution_logs DROP PRIMARY KEY;
-ALTER TABLE execution_logs ADD PRIMARY KEY(exec_id, name, attempt, start_byte);
-ALTER TABLE execution_logs ADD INDEX ex_log_attempt (exec_id, name, attempt)
+ALTER TABLE execution_logs
+  DROP PRIMARY KEY;
+ALTER TABLE execution_logs
+  ADD PRIMARY KEY (exec_id, name, attempt, start_byte);
+ALTER TABLE execution_logs
+  ADD INDEX ex_log_attempt (exec_id, name, attempt)
diff --git a/azkaban-common/src/test/resources/sql/update.execution_logs.2.4.sql b/azkaban-common/src/test/resources/sql/update.execution_logs.2.4.sql
index f0a7aae..b9e1530 100644
--- a/azkaban-common/src/test/resources/sql/update.execution_logs.2.4.sql
+++ b/azkaban-common/src/test/resources/sql/update.execution_logs.2.4.sql
@@ -1 +1 @@
-INSERT INTO properties (name,value,modified_time,type) VALUES ('test', 'value1', 0, 99);
\ No newline at end of file
+INSERT INTO properties (name, value, modified_time, type) VALUES ('test', 'value1', 0, 99);
diff --git a/azkaban-common/src/test/resources/sql/update.execution_logs.2.7.sql b/azkaban-common/src/test/resources/sql/update.execution_logs.2.7.sql
index 9974cc7..a474b5f 100644
--- a/azkaban-common/src/test/resources/sql/update.execution_logs.2.7.sql
+++ b/azkaban-common/src/test/resources/sql/update.execution_logs.2.7.sql
@@ -1 +1 @@
-INSERT INTO properties (name,value,modified_time,type) VALUES ('test1','value1', 0, 99);
\ No newline at end of file
+INSERT INTO properties (name, value, modified_time, type) VALUES ('test1', 'value1', 0, 99);
diff --git a/azkaban-common/src/test/resources/sql/update.project_events.2.1.sql b/azkaban-common/src/test/resources/sql/update.project_events.2.1.sql
index 14d7554..e02595c 100644
--- a/azkaban-common/src/test/resources/sql/update.project_events.2.1.sql
+++ b/azkaban-common/src/test/resources/sql/update.project_events.2.1.sql
@@ -1 +1,2 @@
-ALTER TABLE project_events MODIFY COLUMN message VARCHAR(512);
+ALTER TABLE project_events
+  MODIFY COLUMN message VARCHAR(512);