azkaban-aplcache

Use Intellij Refactor plugin for the two JDBC classes (#1342) Do

8/15/2017 6:45:11 PM

Details

diff --git a/azkaban-common/src/main/java/azkaban/executor/ExecutorLoader.java b/azkaban-common/src/main/java/azkaban/executor/ExecutorLoader.java
index eb8c24c..a8a719d 100644
--- a/azkaban-common/src/main/java/azkaban/executor/ExecutorLoader.java
+++ b/azkaban-common/src/main/java/azkaban/executor/ExecutorLoader.java
@@ -16,15 +16,14 @@
 
 package azkaban.executor;
 
-import java.io.File;
-import java.util.List;
-import java.util.Map;
-import java.time.Duration;
-
 import azkaban.executor.ExecutorLogEvent.EventType;
 import azkaban.utils.FileIOUtils.LogData;
 import azkaban.utils.Pair;
 import azkaban.utils.Props;
+import java.io.File;
+import java.time.Duration;
+import java.util.List;
+import java.util.Map;
 
 public interface ExecutorLoader {
   void uploadExecutableFlow(ExecutableFlow flow)
diff --git a/azkaban-common/src/main/java/azkaban/executor/JdbcExecutorLoader.java b/azkaban-common/src/main/java/azkaban/executor/JdbcExecutorLoader.java
index 0e572a7..e2d6c6b 100644
--- a/azkaban-common/src/main/java/azkaban/executor/JdbcExecutorLoader.java
+++ b/azkaban-common/src/main/java/azkaban/executor/JdbcExecutorLoader.java
@@ -16,7 +16,16 @@
 
 package azkaban.executor;
 
+import azkaban.database.AbstractJdbcLoader;
+import azkaban.executor.ExecutorLogEvent.EventType;
 import azkaban.metrics.CommonMetrics;
+import azkaban.utils.FileIOUtils;
+import azkaban.utils.FileIOUtils.LogData;
+import azkaban.utils.GZIPUtils;
+import azkaban.utils.JSONUtils;
+import azkaban.utils.Pair;
+import azkaban.utils.Props;
+import azkaban.utils.PropsUtils;
 import com.google.inject.Inject;
 import com.google.inject.Singleton;
 import java.io.BufferedInputStream;
@@ -28,6 +37,7 @@ import java.nio.charset.StandardCharsets;
 import java.sql.Connection;
 import java.sql.ResultSet;
 import java.sql.SQLException;
+import java.time.Duration;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
@@ -35,8 +45,6 @@ import java.util.Date;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
-import java.time.Duration;
-
 import org.apache.commons.dbutils.DbUtils;
 import org.apache.commons.dbutils.QueryRunner;
 import org.apache.commons.dbutils.ResultSetHandler;
@@ -45,16 +53,6 @@ import org.apache.commons.io.IOUtils;
 import org.apache.log4j.Logger;
 import org.joda.time.DateTime;
 
-import azkaban.database.AbstractJdbcLoader;
-import azkaban.executor.ExecutorLogEvent.EventType;
-import azkaban.utils.FileIOUtils;
-import azkaban.utils.FileIOUtils.LogData;
-import azkaban.utils.GZIPUtils;
-import azkaban.utils.JSONUtils;
-import azkaban.utils.Pair;
-import azkaban.utils.Props;
-import azkaban.utils.PropsUtils;
-
 @Singleton
 public class JdbcExecutorLoader extends AbstractJdbcLoader implements
     ExecutorLoader {
@@ -64,42 +62,42 @@ public class JdbcExecutorLoader extends AbstractJdbcLoader implements
   private EncodingType defaultEncodingType = EncodingType.GZIP;
 
   @Inject
-  public JdbcExecutorLoader(Props props, CommonMetrics commonMetrics) {
+  public JdbcExecutorLoader(final Props props, final CommonMetrics commonMetrics) {
     super(props, commonMetrics);
   }
 
   public EncodingType getDefaultEncodingType() {
-    return defaultEncodingType;
+    return this.defaultEncodingType;
   }
 
-  public void setDefaultEncodingType(EncodingType defaultEncodingType) {
+  public void setDefaultEncodingType(final EncodingType defaultEncodingType) {
     this.defaultEncodingType = defaultEncodingType;
   }
 
   @Override
-  public synchronized void uploadExecutableFlow(ExecutableFlow flow)
+  public synchronized void uploadExecutableFlow(final ExecutableFlow flow)
       throws ExecutorManagerException {
-    Connection connection = getConnection();
+    final Connection connection = getConnection();
     try {
-      uploadExecutableFlow(connection, flow, defaultEncodingType);
-    } catch (IOException e) {
+      uploadExecutableFlow(connection, flow, this.defaultEncodingType);
+    } catch (final IOException e) {
       throw new ExecutorManagerException("Error uploading flow", e);
     } finally {
       DbUtils.closeQuietly(connection);
     }
   }
 
-  private synchronized void uploadExecutableFlow(Connection connection,
-      ExecutableFlow flow, EncodingType encType)
+  private synchronized void uploadExecutableFlow(final Connection connection,
+                                                 final ExecutableFlow flow, final EncodingType encType)
       throws ExecutorManagerException, IOException {
     final String INSERT_EXECUTABLE_FLOW =
         "INSERT INTO execution_flows "
             + "(project_id, flow_id, version, status, submit_time, submit_user, update_time) "
             + "values (?,?,?,?,?,?,?)";
-    QueryRunner runner = new QueryRunner();
-    long submitTime = System.currentTimeMillis();
+    final QueryRunner runner = new QueryRunner();
+    final long submitTime = System.currentTimeMillis();
 
-    long id;
+    final long id;
     try {
       flow.setStatus(Status.PREPARING);
       runner.update(connection, INSERT_EXECUTABLE_FLOW, flow.getProjectId(),
@@ -118,41 +116,41 @@ public class JdbcExecutorLoader extends AbstractJdbcLoader implements
       flow.setExecutionId((int) id);
 
       updateExecutableFlow(connection, flow, encType);
-    } catch (SQLException e) {
+    } catch (final SQLException e) {
       throw new ExecutorManagerException("Error creating execution.", e);
     }
   }
 
   @Override
-  public void updateExecutableFlow(ExecutableFlow flow)
+  public void updateExecutableFlow(final ExecutableFlow flow)
       throws ExecutorManagerException {
-    Connection connection = this.getConnection();
+    final Connection connection = this.getConnection();
 
     try {
-      updateExecutableFlow(connection, flow, defaultEncodingType);
+      updateExecutableFlow(connection, flow, this.defaultEncodingType);
     } finally {
       DbUtils.closeQuietly(connection);
     }
   }
 
-  private void updateExecutableFlow(Connection connection, ExecutableFlow flow,
-      EncodingType encType) throws ExecutorManagerException {
+  private void updateExecutableFlow(final Connection connection, final ExecutableFlow flow,
+                                    final EncodingType encType) throws ExecutorManagerException {
     final String UPDATE_EXECUTABLE_FLOW_DATA =
         "UPDATE execution_flows "
             + "SET status=?,update_time=?,start_time=?,end_time=?,enc_type=?,flow_data=? "
             + "WHERE exec_id=?";
-    QueryRunner runner = new QueryRunner();
+    final QueryRunner runner = new QueryRunner();
 
-    String json = JSONUtils.toJSON(flow.toObject());
+    final String json = JSONUtils.toJSON(flow.toObject());
     byte[] data = null;
     try {
-      byte[] stringData = json.getBytes("UTF-8");
+      final byte[] stringData = json.getBytes("UTF-8");
       data = stringData;
 
       if (encType == EncodingType.GZIP) {
         data = GZIPUtils.gzipBytes(stringData);
       }
-    } catch (IOException e) {
+    } catch (final IOException e) {
       throw new ExecutorManagerException("Error encoding the execution flow.");
     }
 
@@ -161,19 +159,19 @@ public class JdbcExecutorLoader extends AbstractJdbcLoader implements
           .getNumVal(), flow.getUpdateTime(), flow.getStartTime(), flow
           .getEndTime(), encType.getNumVal(), data, flow.getExecutionId());
       connection.commit();
-    } catch (SQLException e) {
+    } catch (final SQLException e) {
       throw new ExecutorManagerException("Error updating flow.", e);
     }
   }
 
   @Override
-  public ExecutableFlow fetchExecutableFlow(int id)
+  public ExecutableFlow fetchExecutableFlow(final int id)
       throws ExecutorManagerException {
-    QueryRunner runner = createQueryRunner();
-    FetchExecutableFlows flowHandler = new FetchExecutableFlows();
+    final QueryRunner runner = createQueryRunner();
+    final FetchExecutableFlows flowHandler = new FetchExecutableFlows();
 
     try {
-      List<ExecutableFlow> properties =
+      final List<ExecutableFlow> properties =
           runner.query(FetchExecutableFlows.FETCH_EXECUTABLE_FLOW, flowHandler,
               id);
       if (properties.isEmpty()) {
@@ -181,7 +179,7 @@ public class JdbcExecutorLoader extends AbstractJdbcLoader implements
       } else {
         return properties.get(0);
       }
-    } catch (SQLException e) {
+    } catch (final SQLException e) {
       throw new ExecutorManagerException("Error fetching flow id " + id, e);
     }
   }
@@ -194,15 +192,15 @@ public class JdbcExecutorLoader extends AbstractJdbcLoader implements
   @Override
   public List<Pair<ExecutionReference, ExecutableFlow>> fetchQueuedFlows()
     throws ExecutorManagerException {
-    QueryRunner runner = createQueryRunner();
-    FetchQueuedExecutableFlows flowHandler = new FetchQueuedExecutableFlows();
+    final QueryRunner runner = createQueryRunner();
+    final FetchQueuedExecutableFlows flowHandler = new FetchQueuedExecutableFlows();
 
     try {
-      List<Pair<ExecutionReference, ExecutableFlow>> flows =
+      final List<Pair<ExecutionReference, ExecutableFlow>> flows =
         runner.query(FetchQueuedExecutableFlows.FETCH_QUEUED_EXECUTABLE_FLOW,
           flowHandler);
       return flows;
-    } catch (SQLException e) {
+    } catch (final SQLException e) {
       throw new ExecutorManagerException("Error fetching active flows", e);
     }
   }
@@ -211,19 +209,19 @@ public class JdbcExecutorLoader extends AbstractJdbcLoader implements
    * maxAge indicates how long finished flows are shown in Recently Finished flow page.
    */
   @Override
-  public List<ExecutableFlow> fetchRecentlyFinishedFlows(Duration maxAge)
+  public List<ExecutableFlow> fetchRecentlyFinishedFlows(final Duration maxAge)
       throws ExecutorManagerException {
-    QueryRunner runner = createQueryRunner();
-    FetchRecentlyFinishedFlows flowHandler = new FetchRecentlyFinishedFlows();
+    final QueryRunner runner = createQueryRunner();
+    final FetchRecentlyFinishedFlows flowHandler = new FetchRecentlyFinishedFlows();
 
     try {
-      List<ExecutableFlow> flows =
+      final List<ExecutableFlow> flows =
           runner.query(FetchRecentlyFinishedFlows.FETCH_RECENTLY_FINISHED_FLOW,
               flowHandler, System.currentTimeMillis() - maxAge.toMillis(),
               Status.SUCCEEDED.getNumVal(), Status.KILLED.getNumVal(),
               Status.FAILED.getNumVal());
       return flows;
-    } catch (SQLException e) {
+    } catch (final SQLException e) {
       throw new ExecutorManagerException("Error fetching recently finished flows", e);
     }
   }
@@ -231,27 +229,27 @@ public class JdbcExecutorLoader extends AbstractJdbcLoader implements
   @Override
   public Map<Integer, Pair<ExecutionReference, ExecutableFlow>> fetchActiveFlows()
       throws ExecutorManagerException {
-    QueryRunner runner = createQueryRunner();
-    FetchActiveExecutableFlows flowHandler = new FetchActiveExecutableFlows();
+    final QueryRunner runner = createQueryRunner();
+    final FetchActiveExecutableFlows flowHandler = new FetchActiveExecutableFlows();
 
     try {
-      Map<Integer, Pair<ExecutionReference, ExecutableFlow>> properties =
+      final Map<Integer, Pair<ExecutionReference, ExecutableFlow>> properties =
           runner.query(FetchActiveExecutableFlows.FETCH_ACTIVE_EXECUTABLE_FLOW,
               flowHandler);
       return properties;
-    } catch (SQLException e) {
+    } catch (final SQLException e) {
       throw new ExecutorManagerException("Error fetching active flows", e);
     }
   }
 
   @Override
-  public Pair<ExecutionReference, ExecutableFlow> fetchActiveFlowByExecId(int execId)
+  public Pair<ExecutionReference, ExecutableFlow> fetchActiveFlowByExecId(final int execId)
       throws ExecutorManagerException {
-    QueryRunner runner = createQueryRunner();
-    FetchActiveExecutableFlowByExecId flowHandler = new FetchActiveExecutableFlowByExecId();
+    final QueryRunner runner = createQueryRunner();
+    final FetchActiveExecutableFlowByExecId flowHandler = new FetchActiveExecutableFlowByExecId();
 
     try {
-      List<Pair<ExecutionReference, ExecutableFlow>> flows =
+      final List<Pair<ExecutionReference, ExecutableFlow>> flows =
           runner.query(FetchActiveExecutableFlowByExecId.FETCH_ACTIVE_EXECUTABLE_FLOW_BY_EXECID,
               flowHandler, execId);
       if(flows.isEmpty()) {
@@ -260,111 +258,111 @@ public class JdbcExecutorLoader extends AbstractJdbcLoader implements
       else {
         return flows.get(0);
       }
-    } catch (SQLException e) {
+    } catch (final SQLException e) {
       throw new ExecutorManagerException("Error fetching active flows by exec id", e);
     }
   }
 
   @Override
   public int fetchNumExecutableFlows() throws ExecutorManagerException {
-    QueryRunner runner = createQueryRunner();
+    final QueryRunner runner = createQueryRunner();
 
-    IntHandler intHandler = new IntHandler();
+    final IntHandler intHandler = new IntHandler();
     try {
-      int count = runner.query(IntHandler.NUM_EXECUTIONS, intHandler);
+      final int count = runner.query(IntHandler.NUM_EXECUTIONS, intHandler);
       return count;
-    } catch (SQLException e) {
+    } catch (final SQLException e) {
       throw new ExecutorManagerException("Error fetching num executions", e);
     }
   }
 
   @Override
-  public int fetchNumExecutableFlows(int projectId, String flowId)
+  public int fetchNumExecutableFlows(final int projectId, final String flowId)
       throws ExecutorManagerException {
-    QueryRunner runner = createQueryRunner();
+    final QueryRunner runner = createQueryRunner();
 
-    IntHandler intHandler = new IntHandler();
+    final IntHandler intHandler = new IntHandler();
     try {
-      int count =
+      final int count =
           runner.query(IntHandler.NUM_FLOW_EXECUTIONS, intHandler, projectId,
               flowId);
       return count;
-    } catch (SQLException e) {
+    } catch (final SQLException e) {
       throw new ExecutorManagerException("Error fetching num executions", e);
     }
   }
 
   @Override
-  public int fetchNumExecutableNodes(int projectId, String jobId)
+  public int fetchNumExecutableNodes(final int projectId, final String jobId)
       throws ExecutorManagerException {
-    QueryRunner runner = createQueryRunner();
+    final QueryRunner runner = createQueryRunner();
 
-    IntHandler intHandler = new IntHandler();
+    final IntHandler intHandler = new IntHandler();
     try {
-      int count =
+      final int count =
           runner.query(IntHandler.NUM_JOB_EXECUTIONS, intHandler, projectId,
               jobId);
       return count;
-    } catch (SQLException e) {
+    } catch (final SQLException e) {
       throw new ExecutorManagerException("Error fetching num executions", e);
     }
   }
 
   @Override
-  public List<ExecutableFlow> fetchFlowHistory(int projectId, String flowId,
-      int skip, int num) throws ExecutorManagerException {
-    QueryRunner runner = createQueryRunner();
-    FetchExecutableFlows flowHandler = new FetchExecutableFlows();
+  public List<ExecutableFlow> fetchFlowHistory(final int projectId, final String flowId,
+                                               final int skip, final int num) throws ExecutorManagerException {
+    final QueryRunner runner = createQueryRunner();
+    final FetchExecutableFlows flowHandler = new FetchExecutableFlows();
 
     try {
-      List<ExecutableFlow> properties =
+      final List<ExecutableFlow> properties =
           runner.query(FetchExecutableFlows.FETCH_EXECUTABLE_FLOW_HISTORY,
               flowHandler, projectId, flowId, skip, num);
       return properties;
-    } catch (SQLException e) {
+    } catch (final SQLException e) {
       throw new ExecutorManagerException("Error fetching active flows", e);
     }
   }
 
   @Override
-  public List<ExecutableFlow> fetchFlowHistory(int projectId, String flowId,
-      int skip, int num, Status status) throws ExecutorManagerException {
-    QueryRunner runner = createQueryRunner();
-    FetchExecutableFlows flowHandler = new FetchExecutableFlows();
+  public List<ExecutableFlow> fetchFlowHistory(final int projectId, final String flowId,
+                                               final int skip, final int num, final Status status) throws ExecutorManagerException {
+    final QueryRunner runner = createQueryRunner();
+    final FetchExecutableFlows flowHandler = new FetchExecutableFlows();
 
     try {
-      List<ExecutableFlow> properties =
+      final List<ExecutableFlow> properties =
           runner.query(FetchExecutableFlows.FETCH_EXECUTABLE_FLOW_BY_STATUS,
               flowHandler, projectId, flowId, status.getNumVal(), skip, num);
       return properties;
-    } catch (SQLException e) {
+    } catch (final SQLException e) {
       throw new ExecutorManagerException("Error fetching active flows", e);
     }
   }
 
   @Override
-  public List<ExecutableFlow> fetchFlowHistory(int skip, int num)
+  public List<ExecutableFlow> fetchFlowHistory(final int skip, final int num)
       throws ExecutorManagerException {
-    QueryRunner runner = createQueryRunner();
+    final QueryRunner runner = createQueryRunner();
 
-    FetchExecutableFlows flowHandler = new FetchExecutableFlows();
+    final FetchExecutableFlows flowHandler = new FetchExecutableFlows();
 
     try {
-      List<ExecutableFlow> properties =
+      final List<ExecutableFlow> properties =
           runner.query(FetchExecutableFlows.FETCH_ALL_EXECUTABLE_FLOW_HISTORY,
               flowHandler, skip, num);
       return properties;
-    } catch (SQLException e) {
+    } catch (final SQLException e) {
       throw new ExecutorManagerException("Error fetching active flows", e);
     }
   }
 
   @Override
-  public List<ExecutableFlow> fetchFlowHistory(String projContain,
-      String flowContains, String userNameContains, int status, long startTime,
-      long endTime, int skip, int num) throws ExecutorManagerException {
+  public List<ExecutableFlow> fetchFlowHistory(final String projContain,
+                                               final String flowContains, final String userNameContains, final int status, final long startTime,
+                                               final long endTime, final int skip, final int num) throws ExecutorManagerException {
     String query = FetchExecutableFlows.FETCH_BASE_EXECUTABLE_FLOW_QUERY;
-    ArrayList<Object> params = new ArrayList<Object>();
+    final ArrayList<Object> params = new ArrayList<>();
 
     boolean first = true;
     if (projContain != null && !projContain.isEmpty()) {
@@ -435,59 +433,59 @@ public class JdbcExecutorLoader extends AbstractJdbcLoader implements
       params.add(num);
     }
 
-    QueryRunner runner = createQueryRunner();
-    FetchExecutableFlows flowHandler = new FetchExecutableFlows();
+    final QueryRunner runner = createQueryRunner();
+    final FetchExecutableFlows flowHandler = new FetchExecutableFlows();
 
     try {
-      List<ExecutableFlow> properties =
+      final List<ExecutableFlow> properties =
           runner.query(query, flowHandler, params.toArray());
       return properties;
-    } catch (SQLException e) {
+    } catch (final SQLException e) {
       throw new ExecutorManagerException("Error fetching active flows", e);
     }
   }
 
   @Override
-  public void addActiveExecutableReference(ExecutionReference reference)
+  public void addActiveExecutableReference(final ExecutionReference reference)
       throws ExecutorManagerException {
     final String INSERT =
         "INSERT INTO active_executing_flows "
             + "(exec_id, update_time) values (?,?)";
-    QueryRunner runner = createQueryRunner();
+    final QueryRunner runner = createQueryRunner();
 
     try {
       runner.update(INSERT, reference.getExecId(), reference.getUpdateTime());
-    } catch (SQLException e) {
+    } catch (final SQLException e) {
       throw new ExecutorManagerException(
           "Error updating active flow reference " + reference.getExecId(), e);
     }
   }
 
   @Override
-  public void removeActiveExecutableReference(int execid)
+  public void removeActiveExecutableReference(final int execid)
       throws ExecutorManagerException {
     final String DELETE = "DELETE FROM active_executing_flows WHERE exec_id=?";
 
-    QueryRunner runner = createQueryRunner();
+    final QueryRunner runner = createQueryRunner();
     try {
       runner.update(DELETE, execid);
-    } catch (SQLException e) {
+    } catch (final SQLException e) {
       throw new ExecutorManagerException(
           "Error deleting active flow reference " + execid, e);
     }
   }
 
   @Override
-  public boolean updateExecutableReference(int execId, long updateTime)
+  public boolean updateExecutableReference(final int execId, final long updateTime)
       throws ExecutorManagerException {
     final String DELETE =
         "UPDATE active_executing_flows set update_time=? WHERE exec_id=?";
 
-    QueryRunner runner = createQueryRunner();
+    final QueryRunner runner = createQueryRunner();
     int updateNum = 0;
     try {
       updateNum = runner.update(DELETE, updateTime, execId);
-    } catch (SQLException e) {
+    } catch (final SQLException e) {
       throw new ExecutorManagerException(
           "Error deleting active flow reference " + execId, e);
     }
@@ -497,7 +495,7 @@ public class JdbcExecutorLoader extends AbstractJdbcLoader implements
   }
 
   @Override
-  public void uploadExecutableNode(ExecutableNode node, Props inputProps)
+  public void uploadExecutableNode(final ExecutableNode node, final Props inputProps)
       throws ExecutorManagerException {
     final String INSERT_EXECUTION_NODE =
         "INSERT INTO execution_jobs "
@@ -507,30 +505,30 @@ public class JdbcExecutorLoader extends AbstractJdbcLoader implements
     byte[] inputParam = null;
     if (inputProps != null) {
       try {
-        String jsonString =
+        final String jsonString =
             JSONUtils.toJSON(PropsUtils.toHierarchicalMap(inputProps));
         inputParam = GZIPUtils.gzipString(jsonString, "UTF-8");
-      } catch (IOException e) {
+      } catch (final IOException e) {
         throw new ExecutorManagerException("Error encoding input params");
       }
     }
 
-    ExecutableFlow flow = node.getExecutableFlow();
-    String flowId = node.getParentFlow().getFlowPath();
+    final ExecutableFlow flow = node.getExecutableFlow();
+    final String flowId = node.getParentFlow().getFlowPath();
     System.out.println("Uploading flowId " + flowId);
-    QueryRunner runner = createQueryRunner();
+    final QueryRunner runner = createQueryRunner();
     try {
       runner.update(INSERT_EXECUTION_NODE, flow.getExecutionId(),
           flow.getProjectId(), flow.getVersion(), flowId, node.getId(),
           node.getStartTime(), node.getEndTime(), node.getStatus().getNumVal(),
           inputParam, node.getAttempt());
-    } catch (SQLException e) {
+    } catch (final SQLException e) {
       throw new ExecutorManagerException("Error writing job " + node.getId(), e);
     }
   }
 
   @Override
-  public void updateExecutableNode(ExecutableNode node)
+  public void updateExecutableNode(final ExecutableNode node)
       throws ExecutorManagerException {
     final String UPSERT_EXECUTION_NODE =
         "UPDATE execution_jobs "
@@ -538,36 +536,36 @@ public class JdbcExecutorLoader extends AbstractJdbcLoader implements
             + "WHERE exec_id=? AND flow_id=? AND job_id=? AND attempt=?";
 
     byte[] outputParam = null;
-    Props outputProps = node.getOutputProps();
+    final Props outputProps = node.getOutputProps();
     if (outputProps != null) {
       try {
-        String jsonString =
+        final String jsonString =
             JSONUtils.toJSON(PropsUtils.toHierarchicalMap(outputProps));
         outputParam = GZIPUtils.gzipString(jsonString, "UTF-8");
-      } catch (IOException e) {
+      } catch (final IOException e) {
         throw new ExecutorManagerException("Error encoding input params");
       }
     }
 
-    QueryRunner runner = createQueryRunner();
+    final QueryRunner runner = createQueryRunner();
     try {
       runner.update(UPSERT_EXECUTION_NODE, node.getStartTime(), node
           .getEndTime(), node.getStatus().getNumVal(), outputParam, node
           .getExecutableFlow().getExecutionId(), node.getParentFlow()
           .getFlowPath(), node.getId(), node.getAttempt());
-    } catch (SQLException e) {
+    } catch (final SQLException e) {
       throw new ExecutorManagerException("Error updating job " + node.getId(),
           e);
     }
   }
 
   @Override
-  public List<ExecutableJobInfo> fetchJobInfoAttempts(int execId, String jobId)
+  public List<ExecutableJobInfo> fetchJobInfoAttempts(final int execId, final String jobId)
       throws ExecutorManagerException {
-    QueryRunner runner = createQueryRunner();
+    final QueryRunner runner = createQueryRunner();
 
     try {
-      List<ExecutableJobInfo> info =
+      final List<ExecutableJobInfo> info =
           runner.query(
               FetchExecutableJobHandler.FETCH_EXECUTABLE_NODE_ATTEMPTS,
               new FetchExecutableJobHandler(), execId, jobId);
@@ -575,122 +573,122 @@ public class JdbcExecutorLoader extends AbstractJdbcLoader implements
         return null;
       }
       return info;
-    } catch (SQLException e) {
+    } catch (final SQLException e) {
       throw new ExecutorManagerException("Error querying job info " + jobId, e);
     }
   }
 
   @Override
-  public ExecutableJobInfo fetchJobInfo(int execId, String jobId, int attempts)
+  public ExecutableJobInfo fetchJobInfo(final int execId, final String jobId, final int attempts)
       throws ExecutorManagerException {
-    QueryRunner runner = createQueryRunner();
+    final QueryRunner runner = createQueryRunner();
 
     try {
-      List<ExecutableJobInfo> info =
+      final List<ExecutableJobInfo> info =
           runner.query(FetchExecutableJobHandler.FETCH_EXECUTABLE_NODE,
               new FetchExecutableJobHandler(), execId, jobId, attempts);
       if (info == null || info.isEmpty()) {
         return null;
       }
       return info.get(0);
-    } catch (SQLException e) {
+    } catch (final SQLException e) {
       throw new ExecutorManagerException("Error querying job info " + jobId, e);
     }
   }
 
   @Override
-  public Props fetchExecutionJobInputProps(int execId, String jobId)
+  public Props fetchExecutionJobInputProps(final int execId, final String jobId)
       throws ExecutorManagerException {
-    QueryRunner runner = createQueryRunner();
+    final QueryRunner runner = createQueryRunner();
     try {
-      Pair<Props, Props> props =
+      final Pair<Props, Props> props =
           runner.query(
               FetchExecutableJobPropsHandler.FETCH_INPUT_PARAM_EXECUTABLE_NODE,
               new FetchExecutableJobPropsHandler(), execId, jobId);
       return props.getFirst();
-    } catch (SQLException e) {
+    } catch (final SQLException e) {
       throw new ExecutorManagerException("Error querying job params " + execId
           + " " + jobId, e);
     }
   }
 
   @Override
-  public Props fetchExecutionJobOutputProps(int execId, String jobId)
+  public Props fetchExecutionJobOutputProps(final int execId, final String jobId)
       throws ExecutorManagerException {
-    QueryRunner runner = createQueryRunner();
+    final QueryRunner runner = createQueryRunner();
     try {
-      Pair<Props, Props> props =
+      final Pair<Props, Props> props =
           runner
               .query(
                   FetchExecutableJobPropsHandler.FETCH_OUTPUT_PARAM_EXECUTABLE_NODE,
                   new FetchExecutableJobPropsHandler(), execId, jobId);
       return props.getFirst();
-    } catch (SQLException e) {
+    } catch (final SQLException e) {
       throw new ExecutorManagerException("Error querying job params " + execId
           + " " + jobId, e);
     }
   }
 
   @Override
-  public Pair<Props, Props> fetchExecutionJobProps(int execId, String jobId)
+  public Pair<Props, Props> fetchExecutionJobProps(final int execId, final String jobId)
       throws ExecutorManagerException {
-    QueryRunner runner = createQueryRunner();
+    final QueryRunner runner = createQueryRunner();
     try {
-      Pair<Props, Props> props =
+      final Pair<Props, Props> props =
           runner
               .query(
                   FetchExecutableJobPropsHandler.FETCH_INPUT_OUTPUT_PARAM_EXECUTABLE_NODE,
                   new FetchExecutableJobPropsHandler(), execId, jobId);
       return props;
-    } catch (SQLException e) {
+    } catch (final SQLException e) {
       throw new ExecutorManagerException("Error querying job params " + execId
           + " " + jobId, e);
     }
   }
 
   @Override
-  public List<ExecutableJobInfo> fetchJobHistory(int projectId, String jobId,
-      int skip, int size) throws ExecutorManagerException {
-    QueryRunner runner = createQueryRunner();
+  public List<ExecutableJobInfo> fetchJobHistory(final int projectId, final String jobId,
+                                                 final int skip, final int size) throws ExecutorManagerException {
+    final QueryRunner runner = createQueryRunner();
 
     try {
-      List<ExecutableJobInfo> info =
+      final List<ExecutableJobInfo> info =
           runner.query(FetchExecutableJobHandler.FETCH_PROJECT_EXECUTABLE_NODE,
               new FetchExecutableJobHandler(), projectId, jobId, skip, size);
       if (info == null || info.isEmpty()) {
         return null;
       }
       return info;
-    } catch (SQLException e) {
+    } catch (final SQLException e) {
       throw new ExecutorManagerException("Error querying job info " + jobId, e);
     }
   }
 
   @Override
-  public LogData fetchLogs(int execId, String name, int attempt, int startByte,
-      int length) throws ExecutorManagerException {
-    QueryRunner runner = createQueryRunner();
+  public LogData fetchLogs(final int execId, final String name, final int attempt, final int startByte,
+                           final int length) throws ExecutorManagerException {
+    final QueryRunner runner = createQueryRunner();
 
-    FetchLogsHandler handler =
+    final FetchLogsHandler handler =
         new FetchLogsHandler(startByte, length + startByte);
     try {
-      LogData result =
+      final LogData result =
           runner.query(FetchLogsHandler.FETCH_LOGS, handler, execId, name,
               attempt, startByte, startByte + length);
       return result;
-    } catch (SQLException e) {
+    } catch (final SQLException e) {
       throw new ExecutorManagerException("Error fetching logs " + execId
           + " : " + name, e);
     }
   }
 
   @Override
-  public List<Object> fetchAttachments(int execId, String jobId, int attempt)
+  public List<Object> fetchAttachments(final int execId, final String jobId, final int attempt)
       throws ExecutorManagerException {
-    QueryRunner runner = createQueryRunner();
+    final QueryRunner runner = createQueryRunner();
 
     try {
-      String attachments =
+      final String attachments =
           runner
               .query(
                   FetchExecutableJobAttachmentsHandler.FETCH_ATTACHMENTS_EXECUTABLE_NODE,
@@ -699,51 +697,50 @@ public class JdbcExecutorLoader extends AbstractJdbcLoader implements
         return null;
       }
 
-      @SuppressWarnings("unchecked")
-      List<Object> attachmentList =
+      final List<Object> attachmentList =
           (List<Object>) JSONUtils.parseJSONFromString(attachments);
 
       return attachmentList;
-    } catch (IOException e) {
+    } catch (final IOException e) {
       throw new ExecutorManagerException(
           "Error converting job attachments to JSON " + jobId, e);
-    } catch (SQLException e) {
+    } catch (final SQLException e) {
       throw new ExecutorManagerException(
           "Error query job attachments " + jobId, e);
     }
   }
 
   @Override
-  public void uploadLogFile(int execId, String name, int attempt, File... files)
+  public void uploadLogFile(final int execId, final String name, final int attempt, final File... files)
       throws ExecutorManagerException {
-    Connection connection = getConnection();
+    final Connection connection = getConnection();
     try {
       uploadLogFile(connection, execId, name, attempt, files,
-          defaultEncodingType);
+          this.defaultEncodingType);
       connection.commit();
-    } catch (SQLException e) {
+    } catch (final SQLException e) {
       throw new ExecutorManagerException("Error committing log", e);
-    } catch (IOException e) {
+    } catch (final IOException e) {
       throw new ExecutorManagerException("Error committing log", e);
     } finally {
       DbUtils.closeQuietly(connection);
     }
   }
 
-  private void uploadLogFile(Connection connection, int execId, String name,
-      int attempt, File[] files, EncodingType encType)
+  private void uploadLogFile(final Connection connection, final int execId, final String name,
+                             final int attempt, final File[] files, final EncodingType encType)
       throws ExecutorManagerException, IOException {
     // 50K buffer... if logs are greater than this, we chunk.
     // However, we better prevent large log files from being uploaded somehow
-    byte[] buffer = new byte[50 * 1024];
+    final byte[] buffer = new byte[50 * 1024];
     int pos = 0;
     int length = buffer.length;
     int startByte = 0;
     try {
       for (int i = 0; i < files.length; ++i) {
-        File file = files[i];
+        final File file = files[i];
 
-        BufferedInputStream bufferedStream =
+        final BufferedInputStream bufferedStream =
             new BufferedInputStream(new FileInputStream(file));
         try {
           int size = bufferedStream.read(buffer, pos, length);
@@ -773,22 +770,22 @@ public class JdbcExecutorLoader extends AbstractJdbcLoader implements
         uploadLogPart(connection, execId, name, attempt, startByte, startByte
             + pos, encType, buffer, pos);
       }
-    } catch (SQLException e) {
+    } catch (final SQLException e) {
       throw new ExecutorManagerException("Error writing log part.", e);
-    } catch (IOException e) {
+    } catch (final IOException e) {
       throw new ExecutorManagerException("Error chunking", e);
     }
   }
 
-  private void uploadLogPart(Connection connection, int execId, String name,
-      int attempt, int startByte, int endByte, EncodingType encType,
-      byte[] buffer, int length) throws SQLException, IOException {
+  private void uploadLogPart(final Connection connection, final int execId, final String name,
+                             final int attempt, final int startByte, final int endByte, final EncodingType encType,
+                             final byte[] buffer, final int length) throws SQLException, IOException {
     final String INSERT_EXECUTION_LOGS =
         "INSERT INTO execution_logs "
             + "(exec_id, name, attempt, enc_type, start_byte, end_byte, "
             + "log, upload_time) VALUES (?,?,?,?,?,?,?,?)";
 
-    QueryRunner runner = new QueryRunner();
+    final QueryRunner runner = new QueryRunner();
     byte[] buf = buffer;
     if (encType == EncodingType.GZIP) {
       buf = GZIPUtils.gzipBytes(buf, 0, length);
@@ -802,32 +799,32 @@ public class JdbcExecutorLoader extends AbstractJdbcLoader implements
   }
 
   @Override
-  public void uploadAttachmentFile(ExecutableNode node, File file)
+  public void uploadAttachmentFile(final ExecutableNode node, final File file)
       throws ExecutorManagerException {
-    Connection connection = getConnection();
+    final Connection connection = getConnection();
     try {
-      uploadAttachmentFile(connection, node, file, defaultEncodingType);
+      uploadAttachmentFile(connection, node, file, this.defaultEncodingType);
       connection.commit();
-    } catch (SQLException e) {
+    } catch (final SQLException e) {
       throw new ExecutorManagerException("Error committing attachments ", e);
-    } catch (IOException e) {
+    } catch (final IOException e) {
       throw new ExecutorManagerException("Error uploading attachments ", e);
     } finally {
       DbUtils.closeQuietly(connection);
     }
   }
 
-  private void uploadAttachmentFile(Connection connection, ExecutableNode node,
-      File file, EncodingType encType) throws SQLException, IOException {
+  private void uploadAttachmentFile(final Connection connection, final ExecutableNode node,
+                                    final File file, final EncodingType encType) throws SQLException, IOException {
 
-    String jsonString = FileUtils.readFileToString(file);
-    byte[] attachments = GZIPUtils.gzipString(jsonString, "UTF-8");
+    final String jsonString = FileUtils.readFileToString(file);
+    final byte[] attachments = GZIPUtils.gzipString(jsonString, "UTF-8");
 
     final String UPDATE_EXECUTION_NODE_ATTACHMENTS =
         "UPDATE execution_jobs " + "SET attachments=? "
             + "WHERE exec_id=? AND flow_id=? AND job_id=? AND attempt=?";
 
-    QueryRunner runner = new QueryRunner();
+    final QueryRunner runner = new QueryRunner();
     runner.update(connection, UPDATE_EXECUTION_NODE_ATTACHMENTS, attachments,
         node.getExecutableFlow().getExecutionId(), node.getParentFlow()
             .getNestedId(), node.getId(), node.getAttempt());
@@ -837,7 +834,7 @@ public class JdbcExecutorLoader extends AbstractJdbcLoader implements
     Connection connection = null;
     try {
       connection = super.getDBConnection(false);
-    } catch (Exception e) {
+    } catch (final Exception e) {
       DbUtils.closeQuietly(connection);
       throw new ExecutorManagerException("Error getting DB connection.", e);
     }
@@ -853,14 +850,14 @@ public class JdbcExecutorLoader extends AbstractJdbcLoader implements
    */
   @Override
   public List<Executor> fetchAllExecutors() throws ExecutorManagerException {
-    QueryRunner runner = createQueryRunner();
-    FetchExecutorHandler executorHandler = new FetchExecutorHandler();
+    final QueryRunner runner = createQueryRunner();
+    final FetchExecutorHandler executorHandler = new FetchExecutorHandler();
 
     try {
-      List<Executor> executors =
+      final List<Executor> executors =
         runner.query(FetchExecutorHandler.FETCH_ALL_EXECUTORS, executorHandler);
       return executors;
-    } catch (Exception e) {
+    } catch (final Exception e) {
       throw new ExecutorManagerException("Error fetching executors", e);
     }
   }
@@ -873,15 +870,15 @@ public class JdbcExecutorLoader extends AbstractJdbcLoader implements
    */
   @Override
   public List<Executor> fetchActiveExecutors() throws ExecutorManagerException {
-    QueryRunner runner = createQueryRunner();
-    FetchExecutorHandler executorHandler = new FetchExecutorHandler();
+    final QueryRunner runner = createQueryRunner();
+    final FetchExecutorHandler executorHandler = new FetchExecutorHandler();
 
     try {
-      List<Executor> executors =
+      final List<Executor> executors =
         runner.query(FetchExecutorHandler.FETCH_ACTIVE_EXECUTORS,
           executorHandler);
       return executors;
-    } catch (Exception e) {
+    } catch (final Exception e) {
       throw new ExecutorManagerException("Error fetching active executors", e);
     }
   }
@@ -892,13 +889,13 @@ public class JdbcExecutorLoader extends AbstractJdbcLoader implements
    * @see azkaban.executor.ExecutorLoader#fetchExecutor(java.lang.String, int)
    */
   @Override
-  public Executor fetchExecutor(String host, int port)
+  public Executor fetchExecutor(final String host, final int port)
     throws ExecutorManagerException {
-    QueryRunner runner = createQueryRunner();
-    FetchExecutorHandler executorHandler = new FetchExecutorHandler();
+    final QueryRunner runner = createQueryRunner();
+    final FetchExecutorHandler executorHandler = new FetchExecutorHandler();
 
     try {
-      List<Executor> executors =
+      final List<Executor> executors =
         runner.query(FetchExecutorHandler.FETCH_EXECUTOR_BY_HOST_PORT,
           executorHandler, host, port);
       if (executors.isEmpty()) {
@@ -906,7 +903,7 @@ public class JdbcExecutorLoader extends AbstractJdbcLoader implements
       } else {
         return executors.get(0);
       }
-    } catch (Exception e) {
+    } catch (final Exception e) {
       throw new ExecutorManagerException(String.format(
         "Error fetching executor %s:%d", host, port), e);
     }
@@ -918,12 +915,12 @@ public class JdbcExecutorLoader extends AbstractJdbcLoader implements
    * @see azkaban.executor.ExecutorLoader#fetchExecutor(int)
    */
   @Override
-  public Executor fetchExecutor(int executorId) throws ExecutorManagerException {
-    QueryRunner runner = createQueryRunner();
-    FetchExecutorHandler executorHandler = new FetchExecutorHandler();
+  public Executor fetchExecutor(final int executorId) throws ExecutorManagerException {
+    final QueryRunner runner = createQueryRunner();
+    final FetchExecutorHandler executorHandler = new FetchExecutorHandler();
 
     try {
-      List<Executor> executors =
+      final List<Executor> executors =
         runner.query(FetchExecutorHandler.FETCH_EXECUTOR_BY_ID,
           executorHandler, executorId);
       if (executors.isEmpty()) {
@@ -931,7 +928,7 @@ public class JdbcExecutorLoader extends AbstractJdbcLoader implements
       } else {
         return executors.get(0);
       }
-    } catch (Exception e) {
+    } catch (final Exception e) {
       throw new ExecutorManagerException(String.format(
         "Error fetching executor with id: %d", executorId), e);
     }
@@ -943,20 +940,20 @@ public class JdbcExecutorLoader extends AbstractJdbcLoader implements
    * @see azkaban.executor.ExecutorLoader#updateExecutor(int)
    */
   @Override
-  public void updateExecutor(Executor executor) throws ExecutorManagerException {
+  public void updateExecutor(final Executor executor) throws ExecutorManagerException {
     final String UPDATE =
       "UPDATE executors SET host=?, port=?, active=? where id=?";
 
-    QueryRunner runner = createQueryRunner();
+    final QueryRunner runner = createQueryRunner();
     try {
-      int rows =
+      final int rows =
         runner.update(UPDATE, executor.getHost(), executor.getPort(),
           executor.isActive(), executor.getId());
       if (rows == 0) {
         throw new ExecutorManagerException("No executor with id :"
           + executor.getId());
       }
-    } catch (SQLException e) {
+    } catch (final SQLException e) {
       throw new ExecutorManagerException("Error inactivating executor "
         + executor.getId(), e);
     }
@@ -968,7 +965,7 @@ public class JdbcExecutorLoader extends AbstractJdbcLoader implements
    * @see azkaban.executor.ExecutorLoader#addExecutor(java.lang.String, int)
    */
   @Override
-  public Executor addExecutor(String host, int port)
+  public Executor addExecutor(final String host, final int port)
     throws ExecutorManagerException {
     // verify, if executor already exists
     Executor executor = fetchExecutor(host, port);
@@ -984,13 +981,13 @@ public class JdbcExecutorLoader extends AbstractJdbcLoader implements
     return executor;
   }
 
-  private void addExecutorHelper(String host, int port)
+  private void addExecutorHelper(final String host, final int port)
     throws ExecutorManagerException {
     final String INSERT = "INSERT INTO executors (host, port) values (?,?)";
-    QueryRunner runner = createQueryRunner();
+    final QueryRunner runner = createQueryRunner();
     try {
       runner.update(INSERT, host, port);
-    } catch (SQLException e) {
+    } catch (final SQLException e) {
       throw new ExecutorManagerException(String.format("Error adding %s:%d ",
         host, port), e);
     }
@@ -1003,16 +1000,16 @@ public class JdbcExecutorLoader extends AbstractJdbcLoader implements
    * @see azkaban.executor.ExecutorLoader#removeExecutor(String, int)
    */
   @Override
-  public void removeExecutor(String host, int port) throws ExecutorManagerException {
+  public void removeExecutor(final String host, final int port) throws ExecutorManagerException {
     final String DELETE = "DELETE FROM executors WHERE host=? AND port=?";
-    QueryRunner runner = createQueryRunner();
+    final QueryRunner runner = createQueryRunner();
     try {
-      int rows = runner.update(DELETE, host, port);
+      final int rows = runner.update(DELETE, host, port);
       if (rows == 0) {
         throw new ExecutorManagerException("No executor with host, port :"
             + "(" + host + "," + port + ")");
       }
-    } catch (SQLException e) {
+    } catch (final SQLException e) {
       throw new ExecutorManagerException("Error removing executor with host, port : "
           + "(" + host + "," + port + ")", e);
     }
@@ -1026,17 +1023,17 @@ public class JdbcExecutorLoader extends AbstractJdbcLoader implements
    *      java.lang.String)
    */
   @Override
-  public void postExecutorEvent(Executor executor, EventType type, String user,
-    String message) throws ExecutorManagerException{
-    QueryRunner runner = createQueryRunner();
+  public void postExecutorEvent(final Executor executor, final EventType type, final String user,
+                                final String message) throws ExecutorManagerException{
+    final QueryRunner runner = createQueryRunner();
 
     final String INSERT_PROJECT_EVENTS =
       "INSERT INTO executor_events (executor_id, event_type, event_time, username, message) values (?,?,?,?,?)";
-    Date updateDate = new Date();
+    final Date updateDate = new Date();
     try {
       runner.update(INSERT_PROJECT_EVENTS, executor.getId(), type.getNumVal(),
         updateDate, user, message);
-    } catch (SQLException e) {
+    } catch (final SQLException e) {
       throw new ExecutorManagerException("Failed to post executor event", e);
     }
   }
@@ -1048,17 +1045,17 @@ public class JdbcExecutorLoader extends AbstractJdbcLoader implements
    *      int, int)
    */
   @Override
-  public List<ExecutorLogEvent> getExecutorEvents(Executor executor, int num,
-    int offset) throws ExecutorManagerException {
-    QueryRunner runner = createQueryRunner();
+  public List<ExecutorLogEvent> getExecutorEvents(final Executor executor, final int num,
+                                                  final int offset) throws ExecutorManagerException {
+    final QueryRunner runner = createQueryRunner();
 
-    ExecutorLogsResultHandler logHandler = new ExecutorLogsResultHandler();
+    final ExecutorLogsResultHandler logHandler = new ExecutorLogsResultHandler();
     List<ExecutorLogEvent> events = null;
     try {
       events =
         runner.query(ExecutorLogsResultHandler.SELECT_EXECUTOR_EVENTS_ORDER,
           logHandler, executor.getId(), num, offset);
-    } catch (SQLException e) {
+    } catch (final SQLException e) {
       throw new ExecutorManagerException(
         "Failed to fetch events for executor id : " + executor.getId(), e);
     }
@@ -1073,27 +1070,27 @@ public class JdbcExecutorLoader extends AbstractJdbcLoader implements
    * @see azkaban.executor.ExecutorLoader#assignExecutor(int, int)
    */
   @Override
-  public void assignExecutor(int executorId, int executionId)
+  public void assignExecutor(final int executorId, final int executionId)
     throws ExecutorManagerException {
     final String UPDATE =
       "UPDATE execution_flows SET executor_id=? where exec_id=?";
 
-    QueryRunner runner = createQueryRunner();
+    final QueryRunner runner = createQueryRunner();
     try {
-      Executor executor = fetchExecutor(executorId);
+      final Executor executor = fetchExecutor(executorId);
       if (executor == null) {
         throw new ExecutorManagerException(String.format(
           "Failed to assign non-existent executor Id: %d to execution : %d  ",
           executorId, executionId));
       }
 
-      int rows = runner.update(UPDATE, executorId, executionId);
+      final int rows = runner.update(UPDATE, executorId, executionId);
       if (rows == 0) {
         throw new ExecutorManagerException(String.format(
           "Failed to assign executor Id: %d to non-existent execution : %d  ",
           executorId, executionId));
       }
-    } catch (SQLException e) {
+    } catch (final SQLException e) {
       throw new ExecutorManagerException("Error updating executor id "
         + executorId, e);
     }
@@ -1106,75 +1103,116 @@ public class JdbcExecutorLoader extends AbstractJdbcLoader implements
    * @see azkaban.executor.ExecutorLoader#fetchExecutorByExecutionId(int)
    */
   @Override
-  public Executor fetchExecutorByExecutionId(int executionId)
+  public Executor fetchExecutorByExecutionId(final int executionId)
     throws ExecutorManagerException {
-    QueryRunner runner = createQueryRunner();
-    FetchExecutorHandler executorHandler = new FetchExecutorHandler();
+    final QueryRunner runner = createQueryRunner();
+    final FetchExecutorHandler executorHandler = new FetchExecutorHandler();
     Executor executor = null;
     try {
-      List<Executor> executors =
+      final List<Executor> executors =
         runner.query(FetchExecutorHandler.FETCH_EXECUTION_EXECUTOR,
           executorHandler, executionId);
       if (executors.size() > 0) {
         executor = executors.get(0);
       }
-    } catch (SQLException e) {
+    } catch (final SQLException e) {
       throw new ExecutorManagerException(
         "Error fetching executor for exec_id : " + executionId, e);
     }
     return executor;
   }
 
+  @Override
+  public int removeExecutionLogsByTime(final long millis)
+      throws ExecutorManagerException {
+    final String DELETE_BY_TIME =
+        "DELETE FROM execution_logs WHERE upload_time < ?";
+
+    final QueryRunner runner = createQueryRunner();
+    int updateNum = 0;
+    try {
+      updateNum = runner.update(DELETE_BY_TIME, millis);
+    } catch (final SQLException e) {
+      e.printStackTrace();
+      throw new ExecutorManagerException(
+          "Error deleting old execution_logs before " + millis, e);
+    }
+
+    return updateNum;
+  }
+
+  /**
+   *
+   * {@inheritDoc}
+   * @see azkaban.executor.ExecutorLoader#unassignExecutor(int)
+   */
+  @Override
+  public void unassignExecutor(final int executionId) throws ExecutorManagerException {
+    final String UPDATE =
+      "UPDATE execution_flows SET executor_id=NULL where exec_id=?";
+
+    final QueryRunner runner = createQueryRunner();
+    try {
+      final int rows = runner.update(UPDATE, executionId);
+      if (rows == 0) {
+        throw new ExecutorManagerException(String.format(
+          "Failed to unassign executor for execution : %d  ", executionId));
+      }
+    } catch (final SQLException e) {
+      throw new ExecutorManagerException("Error updating execution id "
+        + executionId, e);
+    }
+  }
+
   private static class LastInsertID implements ResultSetHandler<Long> {
-    private static String LAST_INSERT_ID = "SELECT LAST_INSERT_ID()";
+    private static final String LAST_INSERT_ID = "SELECT LAST_INSERT_ID()";
 
     @Override
-    public Long handle(ResultSet rs) throws SQLException {
+    public Long handle(final ResultSet rs) throws SQLException {
       if (!rs.next()) {
         return -1L;
       }
-      long id = rs.getLong(1);
+      final long id = rs.getLong(1);
       return id;
     }
   }
 
   private static class FetchLogsHandler implements ResultSetHandler<LogData> {
-    private static String FETCH_LOGS =
+    private static final String FETCH_LOGS =
         "SELECT exec_id, name, attempt, enc_type, start_byte, end_byte, log "
             + "FROM execution_logs "
             + "WHERE exec_id=? AND name=? AND attempt=? AND end_byte > ? "
             + "AND start_byte <= ? ORDER BY start_byte";
 
-    private int startByte;
-    private int endByte;
+    private final int startByte;
+    private final int endByte;
 
-    public FetchLogsHandler(int startByte, int endByte) {
+    public FetchLogsHandler(final int startByte, final int endByte) {
       this.startByte = startByte;
       this.endByte = endByte;
     }
 
     @Override
-    public LogData handle(ResultSet rs) throws SQLException {
+    public LogData handle(final ResultSet rs) throws SQLException {
       if (!rs.next()) {
         return null;
       }
 
-      ByteArrayOutputStream byteStream = new ByteArrayOutputStream();
+      final ByteArrayOutputStream byteStream = new ByteArrayOutputStream();
 
       do {
         // int execId = rs.getInt(1);
         // String name = rs.getString(2);
-        @SuppressWarnings("unused")
-        int attempt = rs.getInt(3);
-        EncodingType encType = EncodingType.fromInteger(rs.getInt(4));
-        int startByte = rs.getInt(5);
-        int endByte = rs.getInt(6);
+        final int attempt = rs.getInt(3);
+        final EncodingType encType = EncodingType.fromInteger(rs.getInt(4));
+        final int startByte = rs.getInt(5);
+        final int endByte = rs.getInt(6);
 
-        byte[] data = rs.getBytes(7);
+        final byte[] data = rs.getBytes(7);
 
-        int offset =
+        final int offset =
             this.startByte > startByte ? this.startByte - startByte : 0;
-        int length =
+        final int length =
             this.endByte < endByte ? this.endByte - startByte - offset
                 : endByte - startByte - offset;
         try {
@@ -1184,56 +1222,56 @@ public class JdbcExecutorLoader extends AbstractJdbcLoader implements
           }
 
           byteStream.write(buffer, offset, length);
-        } catch (IOException e) {
+        } catch (final IOException e) {
           throw new SQLException(e);
         }
       } while (rs.next());
 
-      byte[] buffer = byteStream.toByteArray();
-      Pair<Integer, Integer> result =
+      final byte[] buffer = byteStream.toByteArray();
+      final Pair<Integer, Integer> result =
           FileIOUtils.getUtf8Range(buffer, 0, buffer.length);
 
-      return new LogData(startByte + result.getFirst(), result.getSecond(),
+      return new LogData(this.startByte + result.getFirst(), result.getSecond(),
           new String(buffer, result.getFirst(), result.getSecond(), StandardCharsets.UTF_8));
     }
   }
 
   private static class FetchExecutableJobHandler implements
       ResultSetHandler<List<ExecutableJobInfo>> {
-    private static String FETCH_EXECUTABLE_NODE =
+    private static final String FETCH_EXECUTABLE_NODE =
         "SELECT exec_id, project_id, version, flow_id, job_id, "
             + "start_time, end_time, status, attempt "
             + "FROM execution_jobs WHERE exec_id=? "
             + "AND job_id=? AND attempt=?";
-    private static String FETCH_EXECUTABLE_NODE_ATTEMPTS =
+    private static final String FETCH_EXECUTABLE_NODE_ATTEMPTS =
         "SELECT exec_id, project_id, version, flow_id, job_id, "
             + "start_time, end_time, status, attempt FROM execution_jobs "
             + "WHERE exec_id=? AND job_id=?";
-    private static String FETCH_PROJECT_EXECUTABLE_NODE =
+    private static final String FETCH_PROJECT_EXECUTABLE_NODE =
         "SELECT exec_id, project_id, version, flow_id, job_id, "
             + "start_time, end_time, status, attempt FROM execution_jobs "
             + "WHERE project_id=? AND job_id=? "
             + "ORDER BY exec_id DESC LIMIT ?, ? ";
 
     @Override
-    public List<ExecutableJobInfo> handle(ResultSet rs) throws SQLException {
+    public List<ExecutableJobInfo> handle(final ResultSet rs) throws SQLException {
       if (!rs.next()) {
         return Collections.<ExecutableJobInfo> emptyList();
       }
 
-      List<ExecutableJobInfo> execNodes = new ArrayList<ExecutableJobInfo>();
+      final List<ExecutableJobInfo> execNodes = new ArrayList<>();
       do {
-        int execId = rs.getInt(1);
-        int projectId = rs.getInt(2);
-        int version = rs.getInt(3);
-        String flowId = rs.getString(4);
-        String jobId = rs.getString(5);
-        long startTime = rs.getLong(6);
-        long endTime = rs.getLong(7);
-        Status status = Status.fromInteger(rs.getInt(8));
-        int attempt = rs.getInt(9);
-
-        ExecutableJobInfo info =
+        final int execId = rs.getInt(1);
+        final int projectId = rs.getInt(2);
+        final int version = rs.getInt(3);
+        final String flowId = rs.getString(4);
+        final String jobId = rs.getString(5);
+        final long startTime = rs.getLong(6);
+        final long endTime = rs.getLong(7);
+        final Status status = Status.fromInteger(rs.getInt(8));
+        final int attempt = rs.getInt(9);
+
+        final ExecutableJobInfo info =
             new ExecutableJobInfo(execId, projectId, version, flowId, jobId,
                 startTime, endTime, status, attempt);
         execNodes.add(info);
@@ -1245,19 +1283,19 @@ public class JdbcExecutorLoader extends AbstractJdbcLoader implements
 
   private static class FetchExecutableJobAttachmentsHandler implements
       ResultSetHandler<String> {
-    private static String FETCH_ATTACHMENTS_EXECUTABLE_NODE =
+    private static final String FETCH_ATTACHMENTS_EXECUTABLE_NODE =
         "SELECT attachments FROM execution_jobs WHERE exec_id=? AND job_id=?";
 
     @Override
-    public String handle(ResultSet rs) throws SQLException {
+    public String handle(final ResultSet rs) throws SQLException {
       String attachmentsJson = null;
       if (rs.next()) {
         try {
-          byte[] attachments = rs.getBytes(1);
+          final byte[] attachments = rs.getBytes(1);
           if (attachments != null) {
             attachmentsJson = GZIPUtils.unGzipString(attachments, "UTF-8");
           }
-        } catch (IOException e) {
+        } catch (final IOException e) {
           throw new SQLException("Error decoding job attachments", e);
         }
       }
@@ -1267,62 +1305,61 @@ public class JdbcExecutorLoader extends AbstractJdbcLoader implements
 
   private static class FetchExecutableJobPropsHandler implements
       ResultSetHandler<Pair<Props, Props>> {
-    private static String FETCH_OUTPUT_PARAM_EXECUTABLE_NODE =
+    private static final String FETCH_OUTPUT_PARAM_EXECUTABLE_NODE =
         "SELECT output_params FROM execution_jobs WHERE exec_id=? AND job_id=?";
-    private static String FETCH_INPUT_PARAM_EXECUTABLE_NODE =
+    private static final String FETCH_INPUT_PARAM_EXECUTABLE_NODE =
         "SELECT input_params FROM execution_jobs WHERE exec_id=? AND job_id=?";
-    private static String FETCH_INPUT_OUTPUT_PARAM_EXECUTABLE_NODE =
+    private static final String FETCH_INPUT_OUTPUT_PARAM_EXECUTABLE_NODE =
         "SELECT input_params, output_params "
             + "FROM execution_jobs WHERE exec_id=? AND job_id=?";
 
-    @SuppressWarnings("unchecked")
     @Override
-    public Pair<Props, Props> handle(ResultSet rs) throws SQLException {
+    public Pair<Props, Props> handle(final ResultSet rs) throws SQLException {
       if (!rs.next()) {
-        return new Pair<Props, Props>(null, null);
+        return new Pair<>(null, null);
       }
 
       if (rs.getMetaData().getColumnCount() > 1) {
-        byte[] input = rs.getBytes(1);
-        byte[] output = rs.getBytes(2);
+        final byte[] input = rs.getBytes(1);
+        final byte[] output = rs.getBytes(2);
 
         Props inputProps = null;
         Props outputProps = null;
         try {
           if (input != null) {
-            String jsonInputString = GZIPUtils.unGzipString(input, "UTF-8");
+            final String jsonInputString = GZIPUtils.unGzipString(input, "UTF-8");
             inputProps =
                 PropsUtils.fromHierarchicalMap((Map<String, Object>) JSONUtils
                     .parseJSONFromString(jsonInputString));
 
           }
           if (output != null) {
-            String jsonOutputString = GZIPUtils.unGzipString(output, "UTF-8");
+            final String jsonOutputString = GZIPUtils.unGzipString(output, "UTF-8");
             outputProps =
                 PropsUtils.fromHierarchicalMap((Map<String, Object>) JSONUtils
                     .parseJSONFromString(jsonOutputString));
           }
-        } catch (IOException e) {
+        } catch (final IOException e) {
           throw new SQLException("Error decoding param data", e);
         }
 
-        return new Pair<Props, Props>(inputProps, outputProps);
+        return new Pair<>(inputProps, outputProps);
       } else {
-        byte[] params = rs.getBytes(1);
+        final byte[] params = rs.getBytes(1);
         Props props = null;
         try {
           if (params != null) {
-            String jsonProps = GZIPUtils.unGzipString(params, "UTF-8");
+            final String jsonProps = GZIPUtils.unGzipString(params, "UTF-8");
 
             props =
                 PropsUtils.fromHierarchicalMap((Map<String, Object>) JSONUtils
                     .parseJSONFromString(jsonProps));
           }
-        } catch (IOException e) {
+        } catch (final IOException e) {
           throw new SQLException("Error decoding param data", e);
         }
 
-        return new Pair<Props, Props>(props, null);
+        return new Pair<>(props, null);
       }
     }
   }
@@ -1333,47 +1370,47 @@ public class JdbcExecutorLoader extends AbstractJdbcLoader implements
   private static class FetchQueuedExecutableFlows implements
     ResultSetHandler<List<Pair<ExecutionReference, ExecutableFlow>>> {
     // Select queued unassigned flows
-    private static String FETCH_QUEUED_EXECUTABLE_FLOW =
+    private static final String FETCH_QUEUED_EXECUTABLE_FLOW =
         "SELECT exec_id, enc_type, flow_data FROM execution_flows"
             + " Where executor_id is NULL AND status = "
             + Status.PREPARING.getNumVal();
 
     @Override
-    public List<Pair<ExecutionReference, ExecutableFlow>> handle(ResultSet rs)
+    public List<Pair<ExecutionReference, ExecutableFlow>> handle(final ResultSet rs)
       throws SQLException {
       if (!rs.next()) {
         return Collections.emptyList();
       }
 
-      List<Pair<ExecutionReference, ExecutableFlow>> execFlows =
+      final List<Pair<ExecutionReference, ExecutableFlow>> execFlows =
         new ArrayList<>();
       do {
-        int id = rs.getInt(1);
-        int encodingType = rs.getInt(2);
-        byte[] data = rs.getBytes(3);
+        final int id = rs.getInt(1);
+        final int encodingType = rs.getInt(2);
+        final byte[] data = rs.getBytes(3);
 
         if (data == null) {
           logger.error("Found a flow with empty data blob exec_id: " + id);
         } else {
-          EncodingType encType = EncodingType.fromInteger(encodingType);
-          Object flowObj;
+          final EncodingType encType = EncodingType.fromInteger(encodingType);
+          final Object flowObj;
           try {
             // Convoluted way to inflate strings. Should find common package or
             // helper function.
             if (encType == EncodingType.GZIP) {
               // Decompress the sucker.
-              String jsonString = GZIPUtils.unGzipString(data, "UTF-8");
+              final String jsonString = GZIPUtils.unGzipString(data, "UTF-8");
               flowObj = JSONUtils.parseJSONFromString(jsonString);
             } else {
-              String jsonString = new String(data, "UTF-8");
+              final String jsonString = new String(data, "UTF-8");
               flowObj = JSONUtils.parseJSONFromString(jsonString);
             }
 
-            ExecutableFlow exFlow =
+            final ExecutableFlow exFlow =
               ExecutableFlow.createExecutableFlowFromObject(flowObj);
-            ExecutionReference ref = new ExecutionReference(id);
+            final ExecutionReference ref = new ExecutionReference(id);
             execFlows.add(new Pair<>(ref, exFlow));
-          } catch (IOException e) {
+          } catch (final IOException e) {
             throw new SQLException("Error retrieving flow data " + id, e);
           }
         }
@@ -1386,40 +1423,40 @@ public class JdbcExecutorLoader extends AbstractJdbcLoader implements
   private static class FetchRecentlyFinishedFlows implements
     ResultSetHandler<List<ExecutableFlow>> {
     // Execution_flows table is already indexed by end_time
-    private static String FETCH_RECENTLY_FINISHED_FLOW =
+    private static final String FETCH_RECENTLY_FINISHED_FLOW =
         "SELECT exec_id, enc_type, flow_data FROM execution_flows "
             + "WHERE end_time > ? AND status IN (?, ?, ?)";
 
     @Override
     public List<ExecutableFlow> handle(
-        ResultSet rs) throws SQLException {
+        final ResultSet rs) throws SQLException {
       if (!rs.next()) {
         return Collections.emptyList();
       }
 
-      List<ExecutableFlow> execFlows = new ArrayList<>();
+      final List<ExecutableFlow> execFlows = new ArrayList<>();
       do {
-        int id = rs.getInt(1);
-        int encodingType = rs.getInt(2);
-        byte[] data = rs.getBytes(3);
+        final int id = rs.getInt(1);
+        final int encodingType = rs.getInt(2);
+        final byte[] data = rs.getBytes(3);
 
         if (data != null) {
-          EncodingType encType = EncodingType.fromInteger(encodingType);
-          Object flowObj;
+          final EncodingType encType = EncodingType.fromInteger(encodingType);
+          final Object flowObj;
           try {
             if (encType == EncodingType.GZIP) {
-              String jsonString = GZIPUtils.unGzipString(data, "UTF-8");
+              final String jsonString = GZIPUtils.unGzipString(data, "UTF-8");
               flowObj = JSONUtils.parseJSONFromString(jsonString);
             } else {
-              String jsonString = new String(data, "UTF-8");
+              final String jsonString = new String(data, "UTF-8");
               flowObj = JSONUtils.parseJSONFromString(jsonString);
             }
 
-            ExecutableFlow exFlow =
+            final ExecutableFlow exFlow =
                 ExecutableFlow.createExecutableFlowFromObject(flowObj);
 
             execFlows.add(exFlow);
-          } catch (IOException e) {
+          } catch (final IOException e) {
             throw new SQLException("Error retrieving flow data " + id, e);
           }
         }
@@ -1432,7 +1469,7 @@ public class JdbcExecutorLoader extends AbstractJdbcLoader implements
   private static class FetchActiveExecutableFlows implements
       ResultSetHandler<Map<Integer, Pair<ExecutionReference, ExecutableFlow>>> {
     // Select running and executor assigned flows
-    private static String FETCH_ACTIVE_EXECUTABLE_FLOW =
+    private static final String FETCH_ACTIVE_EXECUTABLE_FLOW =
       "SELECT ex.exec_id exec_id, ex.enc_type enc_type, ex.flow_data flow_data, et.host host, "
         + "et.port port, et.id executorId, et.active executorStatus"
         + " FROM execution_flows ex"
@@ -1445,45 +1482,45 @@ public class JdbcExecutorLoader extends AbstractJdbcLoader implements
 
     @Override
     public Map<Integer, Pair<ExecutionReference, ExecutableFlow>> handle(
-        ResultSet rs) throws SQLException {
+        final ResultSet rs) throws SQLException {
       if (!rs.next()) {
         return Collections.emptyMap();
       }
 
-      Map<Integer, Pair<ExecutionReference, ExecutableFlow>> execFlows =
+      final Map<Integer, Pair<ExecutionReference, ExecutableFlow>> execFlows =
           new HashMap<>();
       do {
-        int id = rs.getInt(1);
-        int encodingType = rs.getInt(2);
-        byte[] data = rs.getBytes(3);
-        String host = rs.getString(4);
-        int port = rs.getInt(5);
-        int executorId = rs.getInt(6);
-        boolean executorStatus = rs.getBoolean(7);
+        final int id = rs.getInt(1);
+        final int encodingType = rs.getInt(2);
+        final byte[] data = rs.getBytes(3);
+        final String host = rs.getString(4);
+        final int port = rs.getInt(5);
+        final int executorId = rs.getInt(6);
+        final boolean executorStatus = rs.getBoolean(7);
 
         if (data == null) {
           execFlows.put(id, null);
         } else {
-          EncodingType encType = EncodingType.fromInteger(encodingType);
-          Object flowObj;
+          final EncodingType encType = EncodingType.fromInteger(encodingType);
+          final Object flowObj;
           try {
             // Convoluted way to inflate strings. Should find common package or
             // helper function.
             if (encType == EncodingType.GZIP) {
               // Decompress the sucker.
-              String jsonString = GZIPUtils.unGzipString(data, "UTF-8");
+              final String jsonString = GZIPUtils.unGzipString(data, "UTF-8");
               flowObj = JSONUtils.parseJSONFromString(jsonString);
             } else {
-              String jsonString = new String(data, "UTF-8");
+              final String jsonString = new String(data, "UTF-8");
               flowObj = JSONUtils.parseJSONFromString(jsonString);
             }
 
-            ExecutableFlow exFlow =
+            final ExecutableFlow exFlow =
                 ExecutableFlow.createExecutableFlowFromObject(flowObj);
-            Executor executor = new Executor(executorId, host, port, executorStatus);
-            ExecutionReference ref = new ExecutionReference(id, executor);
+            final Executor executor = new Executor(executorId, host, port, executorStatus);
+            final ExecutionReference ref = new ExecutionReference(id, executor);
             execFlows.put(id, new Pair<>(ref, exFlow));
-          } catch (IOException e) {
+          } catch (final IOException e) {
             throw new SQLException("Error retrieving flow data " + id, e);
           }
         }
@@ -1495,7 +1532,7 @@ public class JdbcExecutorLoader extends AbstractJdbcLoader implements
 
   private static class FetchActiveExecutableFlowByExecId implements
       ResultSetHandler<List<Pair<ExecutionReference, ExecutableFlow>>> {
-    private static String FETCH_ACTIVE_EXECUTABLE_FLOW_BY_EXECID =
+    private static final String FETCH_ACTIVE_EXECUTABLE_FLOW_BY_EXECID =
         "SELECT ex.exec_id exec_id, ex.enc_type enc_type, ex.flow_data flow_data, et.host host, "
             + "et.port port, et.id executorId, et.active executorStatus"
             + " FROM execution_flows ex"
@@ -1507,43 +1544,43 @@ public class JdbcExecutorLoader extends AbstractJdbcLoader implements
             + Status.FAILED.getNumVal() + ")";
 
     @Override
-    public List<Pair<ExecutionReference, ExecutableFlow>> handle(ResultSet rs)
+    public List<Pair<ExecutionReference, ExecutableFlow>> handle(final ResultSet rs)
         throws SQLException {
       if (!rs.next()) {
         return Collections.emptyList();
       }
 
-      List<Pair<ExecutionReference, ExecutableFlow>> execFlows =
+      final List<Pair<ExecutionReference, ExecutableFlow>> execFlows =
           new ArrayList<>();
       do {
-        int id = rs.getInt(1);
-        int encodingType = rs.getInt(2);
-        byte[] data = rs.getBytes(3);
-        String host = rs.getString(4);
-        int port = rs.getInt(5);
-        int executorId = rs.getInt(6);
-        boolean executorStatus = rs.getBoolean(7);
+        final int id = rs.getInt(1);
+        final int encodingType = rs.getInt(2);
+        final byte[] data = rs.getBytes(3);
+        final String host = rs.getString(4);
+        final int port = rs.getInt(5);
+        final int executorId = rs.getInt(6);
+        final boolean executorStatus = rs.getBoolean(7);
 
         if (data == null) {
           logger.error("Found a flow with empty data blob exec_id: " + id);
         } else {
-          EncodingType encType = EncodingType.fromInteger(encodingType);
-          Object flowObj;
+          final EncodingType encType = EncodingType.fromInteger(encodingType);
+          final Object flowObj;
           try {
             if (encType == EncodingType.GZIP) {
-              String jsonString = GZIPUtils.unGzipString(data, "UTF-8");
+              final String jsonString = GZIPUtils.unGzipString(data, "UTF-8");
               flowObj = JSONUtils.parseJSONFromString(jsonString);
             } else {
-              String jsonString = new String(data, "UTF-8");
+              final String jsonString = new String(data, "UTF-8");
               flowObj = JSONUtils.parseJSONFromString(jsonString);
             }
 
-            ExecutableFlow exFlow =
+            final ExecutableFlow exFlow =
                 ExecutableFlow.createExecutableFlowFromObject(flowObj);
-            Executor executor = new Executor(executorId, host, port, executorStatus);
-            ExecutionReference ref = new ExecutionReference(id, executor);
+            final Executor executor = new Executor(executorId, host, port, executorStatus);
+            final ExecutionReference ref = new ExecutionReference(id, executor);
             execFlows.add(new Pair<>(ref, exFlow));
-          } catch (IOException e) {
+          } catch (final IOException e) {
             throw new SQLException("Error retrieving flow data " + id, e);
           }
         }
@@ -1555,9 +1592,9 @@ public class JdbcExecutorLoader extends AbstractJdbcLoader implements
 
   private static class FetchExecutableFlows implements
       ResultSetHandler<List<ExecutableFlow>> {
-    private static String FETCH_BASE_EXECUTABLE_FLOW_QUERY =
+    private static final String FETCH_BASE_EXECUTABLE_FLOW_QUERY =
         "SELECT exec_id, enc_type, flow_data FROM execution_flows ";
-    private static String FETCH_EXECUTABLE_FLOW =
+    private static final String FETCH_EXECUTABLE_FLOW =
         "SELECT exec_id, enc_type, flow_data FROM execution_flows "
             + "WHERE exec_id=?";
     // private static String FETCH_ACTIVE_EXECUTABLE_FLOW =
@@ -1565,49 +1602,49 @@ public class JdbcExecutorLoader extends AbstractJdbcLoader implements
     // +
     // "FROM execution_flows ex " +
     // "INNER JOIN active_executing_flows ax ON ex.exec_id = ax.exec_id";
-    private static String FETCH_ALL_EXECUTABLE_FLOW_HISTORY =
+    private static final String FETCH_ALL_EXECUTABLE_FLOW_HISTORY =
         "SELECT exec_id, enc_type, flow_data FROM execution_flows "
             + "ORDER BY exec_id DESC LIMIT ?, ?";
-    private static String FETCH_EXECUTABLE_FLOW_HISTORY =
+    private static final String FETCH_EXECUTABLE_FLOW_HISTORY =
         "SELECT exec_id, enc_type, flow_data FROM execution_flows "
             + "WHERE project_id=? AND flow_id=? "
             + "ORDER BY exec_id DESC LIMIT ?, ?";
-    private static String FETCH_EXECUTABLE_FLOW_BY_STATUS =
+    private static final String FETCH_EXECUTABLE_FLOW_BY_STATUS =
         "SELECT exec_id, enc_type, flow_data FROM execution_flows "
             + "WHERE project_id=? AND flow_id=? AND status=? "
             + "ORDER BY exec_id DESC LIMIT ?, ?";
 
     @Override
-    public List<ExecutableFlow> handle(ResultSet rs) throws SQLException {
+    public List<ExecutableFlow> handle(final ResultSet rs) throws SQLException {
       if (!rs.next()) {
         return Collections.<ExecutableFlow> emptyList();
       }
 
-      List<ExecutableFlow> execFlows = new ArrayList<ExecutableFlow>();
+      final List<ExecutableFlow> execFlows = new ArrayList<>();
       do {
-        int id = rs.getInt(1);
-        int encodingType = rs.getInt(2);
-        byte[] data = rs.getBytes(3);
+        final int id = rs.getInt(1);
+        final int encodingType = rs.getInt(2);
+        final byte[] data = rs.getBytes(3);
 
         if (data != null) {
-          EncodingType encType = EncodingType.fromInteger(encodingType);
-          Object flowObj;
+          final EncodingType encType = EncodingType.fromInteger(encodingType);
+          final Object flowObj;
           try {
             // Convoluted way to inflate strings. Should find common package
             // or helper function.
             if (encType == EncodingType.GZIP) {
               // Decompress the sucker.
-              String jsonString = GZIPUtils.unGzipString(data, "UTF-8");
+              final String jsonString = GZIPUtils.unGzipString(data, "UTF-8");
               flowObj = JSONUtils.parseJSONFromString(jsonString);
             } else {
-              String jsonString = new String(data, "UTF-8");
+              final String jsonString = new String(data, "UTF-8");
               flowObj = JSONUtils.parseJSONFromString(jsonString);
             }
 
-            ExecutableFlow exFlow =
+            final ExecutableFlow exFlow =
                 ExecutableFlow.createExecutableFlowFromObject(flowObj);
             execFlows.add(exFlow);
-          } catch (IOException e) {
+          } catch (final IOException e) {
             throw new SQLException("Error retrieving flow data " + id, e);
           }
         }
@@ -1618,17 +1655,17 @@ public class JdbcExecutorLoader extends AbstractJdbcLoader implements
   }
 
   private static class IntHandler implements ResultSetHandler<Integer> {
-    private static String NUM_EXECUTIONS =
+    private static final String NUM_EXECUTIONS =
         "SELECT COUNT(1) FROM execution_flows";
-    private static String NUM_FLOW_EXECUTIONS =
+    private static final String NUM_FLOW_EXECUTIONS =
         "SELECT COUNT(1) FROM execution_flows WHERE project_id=? AND flow_id=?";
-    private static String NUM_JOB_EXECUTIONS =
+    private static final String NUM_JOB_EXECUTIONS =
         "SELECT COUNT(1) FROM execution_jobs WHERE project_id=? AND job_id=?";
-    private static String FETCH_EXECUTOR_ID =
+    private static final String FETCH_EXECUTOR_ID =
         "SELECT executor_id FROM execution_flows WHERE exec_id=?";
 
     @Override
-    public Integer handle(ResultSet rs) throws SQLException {
+    public Integer handle(final ResultSet rs) throws SQLException {
       if (!rs.next()) {
         return 0;
       }
@@ -1636,56 +1673,37 @@ public class JdbcExecutorLoader extends AbstractJdbcLoader implements
     }
   }
 
-  @Override
-  public int removeExecutionLogsByTime(long millis)
-      throws ExecutorManagerException {
-    final String DELETE_BY_TIME =
-        "DELETE FROM execution_logs WHERE upload_time < ?";
-
-    QueryRunner runner = createQueryRunner();
-    int updateNum = 0;
-    try {
-      updateNum = runner.update(DELETE_BY_TIME, millis);
-    } catch (SQLException e) {
-      e.printStackTrace();
-      throw new ExecutorManagerException(
-          "Error deleting old execution_logs before " + millis, e);
-    }
-
-    return updateNum;
-  }
-
   /**
    * JDBC ResultSetHandler to fetch records from executors table
    */
   private static class FetchExecutorHandler implements
     ResultSetHandler<List<Executor>> {
-    private static String FETCH_ALL_EXECUTORS =
+    private static final String FETCH_ALL_EXECUTORS =
       "SELECT id, host, port, active FROM executors";
-    private static String FETCH_ACTIVE_EXECUTORS =
+    private static final String FETCH_ACTIVE_EXECUTORS =
       "SELECT id, host, port, active FROM executors where active=true";
-    private static String FETCH_EXECUTOR_BY_ID =
+    private static final String FETCH_EXECUTOR_BY_ID =
       "SELECT id, host, port, active FROM executors where id=?";
-    private static String FETCH_EXECUTOR_BY_HOST_PORT =
+    private static final String FETCH_EXECUTOR_BY_HOST_PORT =
       "SELECT id, host, port, active FROM executors where host=? AND port=?";
-    private static String FETCH_EXECUTION_EXECUTOR =
+    private static final String FETCH_EXECUTION_EXECUTOR =
       "SELECT ex.id, ex.host, ex.port, ex.active FROM "
         + " executors ex INNER JOIN execution_flows ef "
         + "on ex.id = ef.executor_id  where exec_id=?";
 
     @Override
-    public List<Executor> handle(ResultSet rs) throws SQLException {
+    public List<Executor> handle(final ResultSet rs) throws SQLException {
       if (!rs.next()) {
         return Collections.<Executor> emptyList();
       }
 
-      List<Executor> executors = new ArrayList<Executor>();
+      final List<Executor> executors = new ArrayList<>();
       do {
-        int id = rs.getInt(1);
-        String host = rs.getString(2);
-        int port = rs.getInt(3);
-        boolean active = rs.getBoolean(4);
-        Executor executor = new Executor(id, host, port, active);
+        final int id = rs.getInt(1);
+        final String host = rs.getString(2);
+        final int port = rs.getInt(3);
+        final boolean active = rs.getBoolean(4);
+        final Executor executor = new Executor(id, host, port, active);
         executors.add(executor);
       } while (rs.next());
 
@@ -1698,25 +1716,25 @@ public class JdbcExecutorLoader extends AbstractJdbcLoader implements
    */
   private static class ExecutorLogsResultHandler implements
     ResultSetHandler<List<ExecutorLogEvent>> {
-    private static String SELECT_EXECUTOR_EVENTS_ORDER =
+    private static final String SELECT_EXECUTOR_EVENTS_ORDER =
       "SELECT executor_id, event_type, event_time, username, message FROM executor_events "
         + " WHERE executor_id=? ORDER BY event_time LIMIT ? OFFSET ?";
 
     @Override
-    public List<ExecutorLogEvent> handle(ResultSet rs) throws SQLException {
+    public List<ExecutorLogEvent> handle(final ResultSet rs) throws SQLException {
       if (!rs.next()) {
         return Collections.<ExecutorLogEvent> emptyList();
       }
 
-      ArrayList<ExecutorLogEvent> events = new ArrayList<ExecutorLogEvent>();
+      final ArrayList<ExecutorLogEvent> events = new ArrayList<>();
       do {
-        int executorId = rs.getInt(1);
-        int eventType = rs.getInt(2);
-        Date eventTime = rs.getDate(3);
-        String username = rs.getString(4);
-        String message = rs.getString(5);
+        final int executorId = rs.getInt(1);
+        final int eventType = rs.getInt(2);
+        final Date eventTime = rs.getDate(3);
+        final String username = rs.getString(4);
+        final String message = rs.getString(5);
 
-        ExecutorLogEvent event =
+        final ExecutorLogEvent event =
           new ExecutorLogEvent(executorId, username, eventTime,
             EventType.fromInteger(eventType), message);
         events.add(event);
@@ -1725,27 +1743,4 @@ public class JdbcExecutorLoader extends AbstractJdbcLoader implements
       return events;
     }
   }
-
-  /**
-   *
-   * {@inheritDoc}
-   * @see azkaban.executor.ExecutorLoader#unassignExecutor(int)
-   */
-  @Override
-  public void unassignExecutor(int executionId) throws ExecutorManagerException {
-    final String UPDATE =
-      "UPDATE execution_flows SET executor_id=NULL where exec_id=?";
-
-    QueryRunner runner = createQueryRunner();
-    try {
-      int rows = runner.update(UPDATE, executionId);
-      if (rows == 0) {
-        throw new ExecutorManagerException(String.format(
-          "Failed to unassign executor for execution : %d  ", executionId));
-      }
-    } catch (SQLException e) {
-      throw new ExecutorManagerException("Error updating execution id "
-        + executionId, e);
-    }
-  }
 }