azkaban-aplcache

Metrics code refactor. Removed MetricsUtility. (#1276) Changes: -

7/12/2017 6:23:04 PM

Details

diff --git a/azkaban-common/src/main/java/azkaban/metrics/CommonMetrics.java b/azkaban-common/src/main/java/azkaban/metrics/CommonMetrics.java
index ee7857e..2d4b5c9 100644
--- a/azkaban-common/src/main/java/azkaban/metrics/CommonMetrics.java
+++ b/azkaban-common/src/main/java/azkaban/metrics/CommonMetrics.java
@@ -17,7 +17,6 @@
 package azkaban.metrics;
 
 import com.codahale.metrics.Meter;
-import com.codahale.metrics.MetricRegistry;
 import com.google.inject.Inject;
 import com.google.inject.Singleton;
 import java.util.concurrent.atomic.AtomicLong;
@@ -32,21 +31,21 @@ public class CommonMetrics {
 
   private final AtomicLong dbConnectionTime = new AtomicLong(0L);
   private final AtomicLong OOMWaitingJobCount = new AtomicLong(0L);
-  private final MetricRegistry registry;
+  private final MetricsManager metricsManager;
   private Meter dbConnectionMeter;
   private Meter flowFailMeter;
 
   @Inject
-  public CommonMetrics(final MetricRegistry metricsRegistry) {
-    this.registry = metricsRegistry;
+  public CommonMetrics(final MetricsManager metricsManager) {
+    this.metricsManager = metricsManager;
     setupAllMetrics();
   }
 
   private void setupAllMetrics() {
-    this.dbConnectionMeter = MetricsUtility.addMeter("DB-Connection-meter", this.registry);
-    this.flowFailMeter = MetricsUtility.addMeter("flow-fail-meter", this.registry);
-    MetricsUtility.addGauge("OOM-waiting-job-count", this.registry, this.OOMWaitingJobCount::get);
-    MetricsUtility.addGauge("dbConnectionTime", this.registry, this.dbConnectionTime::get);
+    this.dbConnectionMeter = this.metricsManager.addMeter("DB-Connection-meter");
+    this.flowFailMeter = this.metricsManager.addMeter("flow-fail-meter");
+    this.metricsManager.addGauge("OOM-waiting-job-count", this.OOMWaitingJobCount::get);
+    this.metricsManager.addGauge("dbConnectionTime", this.dbConnectionTime::get);
   }
 
   /**
diff --git a/azkaban-common/src/main/java/azkaban/metrics/MetricsManager.java b/azkaban-common/src/main/java/azkaban/metrics/MetricsManager.java
index 65f0bdf..de6981e 100644
--- a/azkaban-common/src/main/java/azkaban/metrics/MetricsManager.java
+++ b/azkaban-common/src/main/java/azkaban/metrics/MetricsManager.java
@@ -20,7 +20,8 @@ import static azkaban.Constants.ConfigurationKeys.CUSTOM_METRICS_REPORTER_CLASS_
 import static azkaban.Constants.ConfigurationKeys.METRICS_SERVER_URL;
 
 import azkaban.utils.Props;
-import com.codahale.metrics.ConsoleReporter;
+import com.codahale.metrics.Gauge;
+import com.codahale.metrics.Meter;
 import com.codahale.metrics.MetricRegistry;
 import com.codahale.metrics.jvm.GarbageCollectorMetricSet;
 import com.codahale.metrics.jvm.MemoryUsageGaugeSet;
@@ -28,9 +29,9 @@ import com.codahale.metrics.jvm.ThreadStatesGaugeSet;
 import com.google.inject.Inject;
 import com.google.inject.Singleton;
 import java.lang.reflect.Constructor;
-import java.time.Duration;
-import java.util.concurrent.TimeUnit;
-import org.apache.log4j.Logger;
+import java.util.function.Supplier;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 /**
  * The singleton class, MetricsManager, is the place to have MetricRegistry and ConsoleReporter in
@@ -40,28 +41,41 @@ import org.apache.log4j.Logger;
 @Singleton
 public class MetricsManager {
 
-  private static final Logger logger = Logger.getLogger(MetricsManager.class);
+  private static final Logger log = LoggerFactory.getLogger(MetricsManager.class);
   private final MetricRegistry registry;
-  private ConsoleReporter consoleReporter = null;
 
-  /**
-   * Constructor is eaagerly called when this class is loaded.
-   */
   @Inject
-  public MetricsManager(MetricRegistry registry) {
+  public MetricsManager(final MetricRegistry registry) {
     this.registry = registry;
+    registerJvmMetrics();
+  }
+
+  private void registerJvmMetrics() {
     this.registry.register("MEMORY_Gauge", new MemoryUsageGaugeSet());
     this.registry.register("GC_Gauge", new GarbageCollectorMetricSet());
     this.registry.register("Thread_State_Gauge", new ThreadStatesGaugeSet());
   }
 
   /**
-   * Return the Metrics registry.
+   * A {@link Meter} measures the rate of events over time (e.g., “requests per second”).
+   * Here we track 1-minute moving averages.
+   */
+  public Meter addMeter(final String name) {
+    final Meter curr = this.registry.meter(name);
+    this.registry.register(name + "-gauge", (Gauge<Double>) curr::getFifteenMinuteRate);
+    return curr;
+  }
+
+  /**
+   * A {@link Gauge} is an instantaneous reading of a particular value. This method leverages
+   * Supplier, a Functional Interface, to get Generics metrics values. With this support, no matter
+   * what our interesting metrics is a Double or a Long, we could pass it to Metrics Parser.
    *
-   * @return the single {@code MetricRegistry} used for all of Az Metrics monitoring
+   * E.g., in {@link CommonMetrics#setupAllMetrics()}, we construct a supplier lambda by having a
+   * AtomicLong object and its get method, in order to collect dbConnection metric.
    */
-  public MetricRegistry getRegistry() {
-    return this.registry;
+  public <T> void addGauge(final String name, final Supplier<T> gaugeFunc) {
+    this.registry.register(name, (Gauge<T>) gaugeFunc::get);
   }
 
   /**
@@ -74,51 +88,21 @@ public class MetricsManager {
     final String metricsServerURL = props.get(METRICS_SERVER_URL);
     if (metricsReporterClassName != null && metricsServerURL != null) {
       try {
-        logger.info("metricsReporterClassName: " + metricsReporterClassName);
+        log.info("metricsReporterClassName: " + metricsReporterClassName);
         final Class metricsClass = Class.forName(metricsReporterClassName);
 
-        final Constructor[] constructors =
-            metricsClass.getConstructors();
+        final Constructor[] constructors = metricsClass.getConstructors();
         constructors[0].newInstance(reporterName, this.registry, metricsServerURL);
 
       } catch (final Exception e) {
-        logger.error("Encountered error while loading and instantiating "
+        log.error("Encountered error while loading and instantiating "
             + metricsReporterClassName, e);
-        throw new IllegalStateException(
-            "Encountered error while loading and instantiating "
+        throw new IllegalStateException("Encountered error while loading and instantiating "
                 + metricsReporterClassName, e);
       }
     } else {
-      logger.error("No value for property: "
-          + CUSTOM_METRICS_REPORTER_CLASS_NAME
-          + "or" + METRICS_SERVER_URL + " was found");
-    }
-
-  }
-
-  /**
-   * Create a ConsoleReporter to the AZ Metrics registry.
-   *
-   * @param reportInterval time to wait between dumping metrics to the console
-   */
-  public synchronized void addConsoleReporter(final Duration reportInterval) {
-    if (null != this.consoleReporter) {
-      return;
+      log.error(String.format("No value for property: %s or %s was found",
+          CUSTOM_METRICS_REPORTER_CLASS_NAME, METRICS_SERVER_URL));
     }
-
-    this.consoleReporter = ConsoleReporter.forRegistry(getRegistry()).build();
-    this.consoleReporter.start(reportInterval.toMillis(), TimeUnit.MILLISECONDS);
-  }
-
-  /**
-   * Stop ConsoldeReporter previously created by a call to
-   * {@link #addConsoleReporter(Duration)} and release it for GC.
-   */
-  public synchronized void removeConsoleReporter() {
-    if (null != this.consoleReporter) {
-      this.consoleReporter.stop();
-    }
-
-    this.consoleReporter = null;
   }
 }
diff --git a/azkaban-common/src/test/java/azkaban/executor/ExecutorManagerTest.java b/azkaban-common/src/test/java/azkaban/executor/ExecutorManagerTest.java
index 3014a6c..4bff2c8 100644
--- a/azkaban-common/src/test/java/azkaban/executor/ExecutorManagerTest.java
+++ b/azkaban-common/src/test/java/azkaban/executor/ExecutorManagerTest.java
@@ -16,37 +16,41 @@
 
 package azkaban.executor;
 
+import static org.mockito.Mockito.any;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
 import azkaban.metrics.CommonMetrics;
+import azkaban.metrics.MetricsManager;
+import azkaban.user.User;
+import azkaban.utils.Pair;
+import azkaban.utils.Props;
+import azkaban.utils.TestUtils;
 import com.codahale.metrics.MetricRegistry;
 import java.io.IOException;
-import java.util.Arrays;
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
 import java.util.stream.Collectors;
-
 import org.junit.Assert;
-import org.junit.Test;
 import org.junit.Ignore;
-
-import azkaban.user.User;
-import azkaban.utils.Pair;
-import azkaban.utils.Props;
-import azkaban.utils.TestUtils;
-import static org.mockito.Mockito.*;
+import org.junit.Test;
 
 /**
  * Test class for executor manager
  */
 public class ExecutorManagerTest {
+
+  private final Map<Integer, Pair<ExecutionReference, ExecutableFlow>> activeFlows = new HashMap<>();
   private ExecutorManager manager;
   private ExecutorLoader loader;
   private Props props;
   private User user;
-  private Map<Integer, Pair<ExecutionReference, ExecutableFlow>> activeFlows = new HashMap<>();
   private ExecutableFlow flow1;
   private ExecutableFlow flow2;
 
@@ -61,15 +65,15 @@ public class ExecutorManagerTest {
    * ExecutorLoader
    */
   private ExecutorManager createMultiExecutorManagerInstance(
-    ExecutorLoader loader) throws ExecutorManagerException {
-    Props props = new Props();
+      final ExecutorLoader loader) throws ExecutorManagerException {
+    final Props props = new Props();
     props.put(ExecutorManager.AZKABAN_USE_MULTIPLE_EXECUTORS, "true");
     props.put(ExecutorManager.AZKABAN_QUEUEPROCESSING_ENABLED, "false");
 
     loader.addExecutor("localhost", 12345);
     loader.addExecutor("localhost", 12346);
     return new ExecutorManager(props, loader, new AlerterHolder(props),
-        new CommonMetrics(new MetricRegistry()));
+        new CommonMetrics(new MetricsManager(new MetricRegistry())));
   }
 
   /*
@@ -78,13 +82,12 @@ public class ExecutorManagerTest {
    */
   @Test(expected = ExecutorManagerException.class)
   public void testNoExecutorScenario() throws ExecutorManagerException {
-    Props props = new Props();
+    final Props props = new Props();
     props.put(ExecutorManager.AZKABAN_USE_MULTIPLE_EXECUTORS, "true");
-    ExecutorLoader loader = new MockExecutorLoader();
-    @SuppressWarnings("unused")
-    ExecutorManager manager =
+    final ExecutorLoader loader = new MockExecutorLoader();
+    @SuppressWarnings("unused") final ExecutorManager manager =
       new ExecutorManager(props, loader, new AlerterHolder(props),
-          new CommonMetrics(new MetricRegistry()));
+          new CommonMetrics(new MetricsManager(new MetricRegistry())));
   }
 
   /*
@@ -92,18 +95,18 @@ public class ExecutorManagerTest {
    */
   @Test
   public void testLocalExecutorScenario() throws ExecutorManagerException {
-    Props props = new Props();
+    final Props props = new Props();
     props.put("executor.port", 12345);
 
-    ExecutorLoader loader = new MockExecutorLoader();
-    ExecutorManager manager =
+    final ExecutorLoader loader = new MockExecutorLoader();
+    final ExecutorManager manager =
       new ExecutorManager(props, loader, new AlerterHolder(props),
-          new CommonMetrics(new MetricRegistry()));
-    Set<Executor> activeExecutors =
+          new CommonMetrics(new MetricsManager(new MetricRegistry())));
+    final Set<Executor> activeExecutors =
       new HashSet(manager.getAllActiveExecutors());
 
     Assert.assertEquals(activeExecutors.size(), 1);
-    Executor executor = activeExecutors.iterator().next();
+    final Executor executor = activeExecutors.iterator().next();
     Assert.assertEquals(executor.getHost(), "localhost");
     Assert.assertEquals(executor.getPort(), 12345);
     Assert.assertArrayEquals(activeExecutors.toArray(), loader
@@ -115,16 +118,16 @@ public class ExecutorManagerTest {
    */
   @Test
   public void testMultipleExecutorScenario() throws ExecutorManagerException {
-    Props props = new Props();
+    final Props props = new Props();
     props.put(ExecutorManager.AZKABAN_USE_MULTIPLE_EXECUTORS, "true");
-    ExecutorLoader loader = new MockExecutorLoader();
-    Executor executor1 = loader.addExecutor("localhost", 12345);
-    Executor executor2 = loader.addExecutor("localhost", 12346);
+    final ExecutorLoader loader = new MockExecutorLoader();
+    final Executor executor1 = loader.addExecutor("localhost", 12345);
+    final Executor executor2 = loader.addExecutor("localhost", 12346);
 
-    ExecutorManager manager =
+    final ExecutorManager manager =
       new ExecutorManager(props, loader, new AlerterHolder(props),
-          new CommonMetrics(new MetricRegistry()));
-    Set<Executor> activeExecutors =
+          new CommonMetrics(new MetricsManager(new MetricRegistry())));
+    final Set<Executor> activeExecutors =
       new HashSet(manager.getAllActiveExecutors());
     Assert.assertArrayEquals(activeExecutors.toArray(), new Executor[] {
       executor1, executor2 });
@@ -135,22 +138,22 @@ public class ExecutorManagerTest {
    */
   @Test
   public void testSetupExecutorsSucess() throws ExecutorManagerException {
-    Props props = new Props();
+    final Props props = new Props();
     props.put(ExecutorManager.AZKABAN_USE_MULTIPLE_EXECUTORS, "true");
-    ExecutorLoader loader = new MockExecutorLoader();
-    Executor executor1 = loader.addExecutor("localhost", 12345);
+    final ExecutorLoader loader = new MockExecutorLoader();
+    final Executor executor1 = loader.addExecutor("localhost", 12345);
 
-    ExecutorManager manager =
+    final ExecutorManager manager =
       new ExecutorManager(props, loader, new AlerterHolder(props),
-          new CommonMetrics(new MetricRegistry()));
+          new CommonMetrics(new MetricsManager(new MetricRegistry())));
     Assert.assertArrayEquals(manager.getAllActiveExecutors().toArray(),
       new Executor[] { executor1 });
 
     // mark older executor as inactive
     executor1.setActive(false);
     loader.updateExecutor(executor1);
-    Executor executor2 = loader.addExecutor("localhost", 12346);
-    Executor executor3 = loader.addExecutor("localhost", 12347);
+    final Executor executor2 = loader.addExecutor("localhost", 12346);
+    final Executor executor3 = loader.addExecutor("localhost", 12347);
     manager.setupExecutors();
 
     Assert.assertArrayEquals(manager.getAllActiveExecutors().toArray(),
@@ -163,15 +166,15 @@ public class ExecutorManagerTest {
    */
   @Test(expected = ExecutorManagerException.class)
   public void testSetupExecutorsException() throws ExecutorManagerException {
-    Props props = new Props();
+    final Props props = new Props();
     props.put(ExecutorManager.AZKABAN_USE_MULTIPLE_EXECUTORS, "true");
-    ExecutorLoader loader = new MockExecutorLoader();
-    Executor executor1 = loader.addExecutor("localhost", 12345);
+    final ExecutorLoader loader = new MockExecutorLoader();
+    final Executor executor1 = loader.addExecutor("localhost", 12345);
 
-    ExecutorManager manager =
+    final ExecutorManager manager =
       new ExecutorManager(props, loader, new AlerterHolder(props),
-          new CommonMetrics(new MetricRegistry()));
-    Set<Executor> activeExecutors =
+          new CommonMetrics(new MetricsManager(new MetricRegistry())));
+    final Set<Executor> activeExecutors =
       new HashSet(manager.getAllActiveExecutors());
     Assert.assertArrayEquals(activeExecutors.toArray(),
       new Executor[] { executor1 });
@@ -185,7 +188,7 @@ public class ExecutorManagerTest {
   /* Test disabling queue process thread to pause dispatching */
   @Test
   public void testDisablingQueueProcessThread() throws ExecutorManagerException {
-    ExecutorManager manager = createMultiExecutorManagerInstance();
+    final ExecutorManager manager = createMultiExecutorManagerInstance();
     manager.enableQueueProcessorThread();
     Assert.assertEquals(manager.isQueueProcessorThreadActive(), true);
     manager.disableQueueProcessorThread();
@@ -195,7 +198,7 @@ public class ExecutorManagerTest {
   /* Test renabling queue process thread to pause restart dispatching */
   @Test
   public void testEnablingQueueProcessThread() throws ExecutorManagerException {
-    ExecutorManager manager = createMultiExecutorManagerInstance();
+    final ExecutorManager manager = createMultiExecutorManagerInstance();
     Assert.assertEquals(manager.isQueueProcessorThreadActive(), false);
     manager.enableQueueProcessorThread();
     Assert.assertEquals(manager.isQueueProcessorThreadActive(), true);
@@ -204,30 +207,30 @@ public class ExecutorManagerTest {
   /* Test submit a non-dispatched flow */
   @Test
   public void testQueuedFlows() throws ExecutorManagerException, IOException {
-    ExecutorLoader loader = new MockExecutorLoader();
-    ExecutorManager manager = createMultiExecutorManagerInstance(loader);
-    ExecutableFlow flow1 = TestUtils.createExecutableFlow("exectest1", "exec1");
+    final ExecutorLoader loader = new MockExecutorLoader();
+    final ExecutorManager manager = createMultiExecutorManagerInstance(loader);
+    final ExecutableFlow flow1 = TestUtils.createExecutableFlow("exectest1", "exec1");
     flow1.setExecutionId(1);
-    ExecutableFlow flow2 = TestUtils.createExecutableFlow("exectest1", "exec2");
+    final ExecutableFlow flow2 = TestUtils.createExecutableFlow("exectest1", "exec2");
     flow2.setExecutionId(2);
 
-    User testUser = TestUtils.getTestUser();
+    final User testUser = TestUtils.getTestUser();
     manager.submitExecutableFlow(flow1, testUser.getUserId());
     manager.submitExecutableFlow(flow2, testUser.getUserId());
 
-    List<Integer> testFlows = Arrays.asList(flow1.getExecutionId(), flow2.getExecutionId());
+    final List<Integer> testFlows = Arrays.asList(flow1.getExecutionId(), flow2.getExecutionId());
 
-    List<Pair<ExecutionReference, ExecutableFlow>> queuedFlowsDB =
+    final List<Pair<ExecutionReference, ExecutableFlow>> queuedFlowsDB =
       loader.fetchQueuedFlows();
     Assert.assertEquals(queuedFlowsDB.size(), testFlows.size());
     // Verify things are correctly setup in db
-    for (Pair<ExecutionReference, ExecutableFlow> pair : queuedFlowsDB) {
+    for (final Pair<ExecutionReference, ExecutableFlow> pair : queuedFlowsDB) {
       Assert.assertTrue(testFlows.contains(pair.getSecond().getExecutionId()));
     }
 
     // Verify running flows using old definition of "running" flows i.e. a
     // non-dispatched flow is also considered running
-    List<Integer> managerActiveFlows = manager.getRunningFlows()
+    final List<Integer> managerActiveFlows = manager.getRunningFlows()
         .stream().map(ExecutableFlow::getExecutionId).collect(Collectors.toList());
     Assert.assertTrue(managerActiveFlows.containsAll(testFlows)
       && testFlows.containsAll(managerActiveFlows));
@@ -240,12 +243,12 @@ public class ExecutorManagerTest {
   @Test(expected = ExecutorManagerException.class)
   public void testDuplicateQueuedFlows() throws ExecutorManagerException,
     IOException {
-    ExecutorManager manager = createMultiExecutorManagerInstance();
-    ExecutableFlow flow1 = TestUtils.createExecutableFlow("exectest1", "exec1");
+    final ExecutorManager manager = createMultiExecutorManagerInstance();
+    final ExecutableFlow flow1 = TestUtils.createExecutableFlow("exectest1", "exec1");
     flow1.getExecutionOptions().setConcurrentOption(
       ExecutionOptions.CONCURRENT_OPTION_SKIP);
 
-    User testUser = TestUtils.getTestUser();
+    final User testUser = TestUtils.getTestUser();
     manager.submitExecutableFlow(flow1, testUser.getUserId());
     manager.submitExecutableFlow(flow1, testUser.getUserId());
   }
@@ -256,14 +259,14 @@ public class ExecutorManagerTest {
    */
   @Test
   public void testKillQueuedFlow() throws ExecutorManagerException, IOException {
-    ExecutorLoader loader = new MockExecutorLoader();
-    ExecutorManager manager = createMultiExecutorManagerInstance(loader);
-    ExecutableFlow flow1 = TestUtils.createExecutableFlow("exectest1", "exec1");
-    User testUser = TestUtils.getTestUser();
+    final ExecutorLoader loader = new MockExecutorLoader();
+    final ExecutorManager manager = createMultiExecutorManagerInstance(loader);
+    final ExecutableFlow flow1 = TestUtils.createExecutableFlow("exectest1", "exec1");
+    final User testUser = TestUtils.getTestUser();
     manager.submitExecutableFlow(flow1, testUser.getUserId());
 
     manager.cancelFlow(flow1, testUser.getUserId());
-    ExecutableFlow fetchedFlow =
+    final ExecutableFlow fetchedFlow =
       loader.fetchExecutableFlow(flow1.getExecutionId());
     Assert.assertEquals(fetchedFlow.getStatus(), Status.FAILED);
 
@@ -277,17 +280,17 @@ public class ExecutorManagerTest {
   @Test
   public void testSubmitFlows() throws ExecutorManagerException, IOException {
     testSetUpForRunningFlows();
-    ExecutableFlow flow1 = TestUtils.createExecutableFlow("exectest1", "exec1");
-    manager.submitExecutableFlow(flow1, user.getUserId());
-    verify(loader).uploadExecutableFlow(flow1);
-    verify(loader).addActiveExecutableReference(any());
+    final ExecutableFlow flow1 = TestUtils.createExecutableFlow("exectest1", "exec1");
+    this.manager.submitExecutableFlow(flow1, this.user.getUserId());
+    verify(this.loader).uploadExecutableFlow(flow1);
+    verify(this.loader).addActiveExecutableReference(any());
   }
 
   @Ignore @Test
   public void testFetchAllActiveFlows() throws ExecutorManagerException, IOException {
     testSetUpForRunningFlows();
-    List<ExecutableFlow> flows = manager.getRunningFlows();
-    for(Pair<ExecutionReference, ExecutableFlow> pair : activeFlows.values()) {
+    final List<ExecutableFlow> flows = this.manager.getRunningFlows();
+    for (final Pair<ExecutionReference, ExecutableFlow> pair : this.activeFlows.values()) {
       Assert.assertTrue(flows.contains(pair.getSecond()));
     }
   }
@@ -295,28 +298,30 @@ public class ExecutorManagerTest {
   @Ignore @Test
   public void testFetchActiveFlowByProject() throws ExecutorManagerException, IOException {
     testSetUpForRunningFlows();
-    List<Integer> executions = manager.getRunningFlows(flow1.getProjectId(), flow1.getFlowId());
-    Assert.assertTrue(executions.contains(flow1.getExecutionId()));
-    Assert.assertTrue(manager.isFlowRunning(flow1.getProjectId(), flow1.getFlowId()));
+    final List<Integer> executions = this.manager.getRunningFlows(this.flow1.getProjectId(),
+        this.flow1.getFlowId());
+    Assert.assertTrue(executions.contains(this.flow1.getExecutionId()));
+    Assert
+        .assertTrue(this.manager.isFlowRunning(this.flow1.getProjectId(), this.flow1.getFlowId()));
   }
 
   @Ignore @Test
   public void testFetchActiveFlowWithExecutor() throws ExecutorManagerException, IOException {
     testSetUpForRunningFlows();
-    List<Pair<ExecutableFlow, Executor>> activeFlowsWithExecutor =
-        manager.getActiveFlowsWithExecutor();
-    Assert.assertTrue(activeFlowsWithExecutor.contains(new Pair<>(flow1,
-        manager.fetchExecutor(flow1.getExecutionId()))));
-    Assert.assertTrue(activeFlowsWithExecutor.contains(new Pair<>(flow2,
-        manager.fetchExecutor(flow2.getExecutionId()))));
+    final List<Pair<ExecutableFlow, Executor>> activeFlowsWithExecutor =
+        this.manager.getActiveFlowsWithExecutor();
+    Assert.assertTrue(activeFlowsWithExecutor.contains(new Pair<>(this.flow1,
+        this.manager.fetchExecutor(this.flow1.getExecutionId()))));
+    Assert.assertTrue(activeFlowsWithExecutor.contains(new Pair<>(this.flow2,
+        this.manager.fetchExecutor(this.flow2.getExecutionId()))));
   }
 
   @Test
   public void testFetchAllActiveExecutorServerHosts() throws ExecutorManagerException, IOException {
     testSetUpForRunningFlows();
-    Set<String> activeExecutorServerHosts = manager.getAllActiveExecutorServerHosts();
-    Executor executor1 = manager.fetchExecutor(flow1.getExecutionId());
-    Executor executor2 = manager.fetchExecutor(flow2.getExecutionId());
+    final Set<String> activeExecutorServerHosts = this.manager.getAllActiveExecutorServerHosts();
+    final Executor executor1 = this.manager.fetchExecutor(this.flow1.getExecutionId());
+    final Executor executor2 = this.manager.fetchExecutor(this.flow2.getExecutionId());
     Assert.assertTrue(activeExecutorServerHosts.contains(executor1.getHost() + ":" + executor1.getPort()));
     Assert.assertTrue(activeExecutorServerHosts.contains(executor2.getHost() + ":" + executor2.getPort()));
   }
@@ -326,34 +331,34 @@ public class ExecutorManagerTest {
    */
   private void testSetUpForRunningFlows()
       throws ExecutorManagerException, IOException {
-    loader = mock(ExecutorLoader.class);
-    user = TestUtils.getTestUser();
-    props = new Props();
-    props.put(ExecutorManager.AZKABAN_USE_MULTIPLE_EXECUTORS, "true");
+    this.loader = mock(ExecutorLoader.class);
+    this.user = TestUtils.getTestUser();
+    this.props = new Props();
+    this.props.put(ExecutorManager.AZKABAN_USE_MULTIPLE_EXECUTORS, "true");
     //To test runningFlows, AZKABAN_QUEUEPROCESSING_ENABLED should be set to true
     //so that flows will be dispatched to executors.
-    props.put(ExecutorManager.AZKABAN_QUEUEPROCESSING_ENABLED, "true");
+    this.props.put(ExecutorManager.AZKABAN_QUEUEPROCESSING_ENABLED, "true");
 
-    List<Executor> executors = new ArrayList<>();
-    Executor executor1 = new Executor(1, "localhost", 12345, true);
-    Executor executor2 = new Executor(2, "localhost", 12346, true);
+    final List<Executor> executors = new ArrayList<>();
+    final Executor executor1 = new Executor(1, "localhost", 12345, true);
+    final Executor executor2 = new Executor(2, "localhost", 12346, true);
     executors.add(executor1);
     executors.add(executor2);
 
-    when(loader.fetchActiveExecutors()).thenReturn(executors);
-    manager = new ExecutorManager(props, loader, new AlerterHolder(props),
-        new CommonMetrics(new MetricRegistry()));
-
-    flow1 = TestUtils.createExecutableFlow("exectest1", "exec1");
-    flow2 = TestUtils.createExecutableFlow("exectest1", "exec2");
-    flow1.setExecutionId(1);
-    flow2.setExecutionId(2);
-    ExecutionReference ref1 =
-        new ExecutionReference(flow1.getExecutionId(), executor1);
-    ExecutionReference ref2 =
-        new ExecutionReference(flow2.getExecutionId(), executor2);
-    activeFlows.put(flow1.getExecutionId(), new Pair<>(ref1, flow1));
-    activeFlows.put(flow2.getExecutionId(), new Pair<>(ref2, flow2));
-    when(loader.fetchActiveFlows()).thenReturn(activeFlows);
+    when(this.loader.fetchActiveExecutors()).thenReturn(executors);
+    this.manager = new ExecutorManager(this.props, this.loader, new AlerterHolder(this.props),
+        new CommonMetrics(new MetricsManager(new MetricRegistry())));
+
+    this.flow1 = TestUtils.createExecutableFlow("exectest1", "exec1");
+    this.flow2 = TestUtils.createExecutableFlow("exectest1", "exec2");
+    this.flow1.setExecutionId(1);
+    this.flow2.setExecutionId(2);
+    final ExecutionReference ref1 =
+        new ExecutionReference(this.flow1.getExecutionId(), executor1);
+    final ExecutionReference ref2 =
+        new ExecutionReference(this.flow2.getExecutionId(), executor2);
+    this.activeFlows.put(this.flow1.getExecutionId(), new Pair<>(ref1, this.flow1));
+    this.activeFlows.put(this.flow2.getExecutionId(), new Pair<>(ref2, this.flow2));
+    when(this.loader.fetchActiveFlows()).thenReturn(this.activeFlows);
   }
 }
diff --git a/azkaban-common/src/test/java/azkaban/executor/JdbcExecutorLoaderTest.java b/azkaban-common/src/test/java/azkaban/executor/JdbcExecutorLoaderTest.java
index e9874a1..3b3f1ca 100644
--- a/azkaban-common/src/test/java/azkaban/executor/JdbcExecutorLoaderTest.java
+++ b/azkaban-common/src/test/java/azkaban/executor/JdbcExecutorLoaderTest.java
@@ -16,7 +16,15 @@
 
 package azkaban.executor;
 
+import azkaban.database.DataSourceUtils;
+import azkaban.executor.ExecutorLogEvent.EventType;
 import azkaban.metrics.CommonMetrics;
+import azkaban.metrics.MetricsManager;
+import azkaban.user.User;
+import azkaban.utils.FileIOUtils.LogData;
+import azkaban.utils.Pair;
+import azkaban.utils.Props;
+import azkaban.utils.TestUtils;
 import com.codahale.metrics.MetricRegistry;
 import java.io.File;
 import java.io.IOException;
@@ -26,12 +34,9 @@ import java.sql.SQLException;
 import java.time.Duration;
 import java.util.ArrayList;
 import java.util.HashSet;
-import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
-
 import javax.sql.DataSource;
-
 import org.apache.commons.dbutils.DbUtils;
 import org.apache.commons.dbutils.QueryRunner;
 import org.apache.commons.dbutils.ResultSetHandler;
@@ -43,16 +48,7 @@ import org.junit.BeforeClass;
 import org.junit.Ignore;
 import org.junit.Test;
 
-import azkaban.database.DataSourceUtils;
-import azkaban.executor.ExecutorLogEvent.EventType;
-import azkaban.user.User;
-import azkaban.utils.FileIOUtils.LogData;
-import azkaban.utils.Pair;
-import azkaban.utils.Props;
-import azkaban.utils.TestUtils;
-
 public class JdbcExecutorLoaderTest {
-  private static boolean testDBExists;
   /* Directory with serialized description of test flows */
   private static final String UNIT_BASE_DIR =
     "../test/src/test/resources/azkaban/test/executions";
@@ -65,10 +61,11 @@ public class JdbcExecutorLoaderTest {
   private static final int numConnections = 10;
   private static final Duration RECENTLY_FINISHED_LIFETIME = Duration.ofMinutes(1);
   private static final Duration FLOW_FINISHED_TIME = Duration.ofMinutes(2);
+  private static boolean testDBExists;
 
   @BeforeClass
   public static void setupDB() {
-    DataSource dataSource =
+    final DataSource dataSource =
         DataSourceUtils.getMySQLDataSource(host, port, database, user,
             password, numConnections);
     testDBExists = true;
@@ -76,19 +73,19 @@ public class JdbcExecutorLoaderTest {
     Connection connection = null;
     try {
       connection = dataSource.getConnection();
-    } catch (SQLException e) {
+    } catch (final SQLException e) {
       e.printStackTrace();
       testDBExists = false;
       DbUtils.closeQuietly(connection);
       return;
     }
 
-    CountHandler countHandler = new CountHandler();
-    QueryRunner runner = new QueryRunner();
+    final CountHandler countHandler = new CountHandler();
+    final QueryRunner runner = new QueryRunner();
     try {
       runner.query(connection, "SELECT COUNT(1) FROM active_executing_flows",
           countHandler);
-    } catch (SQLException e) {
+    } catch (final SQLException e) {
       e.printStackTrace();
       testDBExists = false;
       DbUtils.closeQuietly(connection);
@@ -98,7 +95,7 @@ public class JdbcExecutorLoaderTest {
     try {
       runner.query(connection, "SELECT COUNT(1) FROM execution_flows",
           countHandler);
-    } catch (SQLException e) {
+    } catch (final SQLException e) {
       e.printStackTrace();
       testDBExists = false;
       DbUtils.closeQuietly(connection);
@@ -108,7 +105,7 @@ public class JdbcExecutorLoaderTest {
     try {
       runner.query(connection, "SELECT COUNT(1) FROM execution_jobs",
           countHandler);
-    } catch (SQLException e) {
+    } catch (final SQLException e) {
       e.printStackTrace();
       testDBExists = false;
       DbUtils.closeQuietly(connection);
@@ -118,7 +115,7 @@ public class JdbcExecutorLoaderTest {
     try {
       runner.query(connection, "SELECT COUNT(1) FROM execution_logs",
           countHandler);
-    } catch (SQLException e) {
+    } catch (final SQLException e) {
       e.printStackTrace();
       testDBExists = false;
       DbUtils.closeQuietly(connection);
@@ -128,7 +125,7 @@ public class JdbcExecutorLoaderTest {
     try {
       runner.query(connection, "SELECT COUNT(1) FROM executors",
           countHandler);
-    } catch (SQLException e) {
+    } catch (final SQLException e) {
       e.printStackTrace();
       testDBExists = false;
       DbUtils.closeQuietly(connection);
@@ -138,7 +135,7 @@ public class JdbcExecutorLoaderTest {
     try {
       runner.query(connection, "SELECT COUNT(1) FROM executor_events",
           countHandler);
-    } catch (SQLException e) {
+    } catch (final SQLException e) {
       e.printStackTrace();
       testDBExists = false;
       DbUtils.closeQuietly(connection);
@@ -154,24 +151,24 @@ public class JdbcExecutorLoaderTest {
       return;
     }
 
-    DataSource dataSource =
+    final DataSource dataSource =
         DataSourceUtils.getMySQLDataSource(host, port, database, user,
             password, numConnections);
     Connection connection = null;
     try {
       connection = dataSource.getConnection();
-    } catch (SQLException e) {
+    } catch (final SQLException e) {
       e.printStackTrace();
       testDBExists = false;
       DbUtils.closeQuietly(connection);
       return;
     }
 
-    QueryRunner runner = new QueryRunner();
+    final QueryRunner runner = new QueryRunner();
     try {
       runner.update(connection, "DELETE FROM active_executing_flows");
 
-    } catch (SQLException e) {
+    } catch (final SQLException e) {
       e.printStackTrace();
       testDBExists = false;
       DbUtils.closeQuietly(connection);
@@ -180,7 +177,7 @@ public class JdbcExecutorLoaderTest {
 
     try {
       runner.update(connection, "DELETE FROM execution_flows");
-    } catch (SQLException e) {
+    } catch (final SQLException e) {
       e.printStackTrace();
       testDBExists = false;
       DbUtils.closeQuietly(connection);
@@ -189,7 +186,7 @@ public class JdbcExecutorLoaderTest {
 
     try {
       runner.update(connection, "DELETE FROM execution_jobs");
-    } catch (SQLException e) {
+    } catch (final SQLException e) {
       e.printStackTrace();
       testDBExists = false;
       DbUtils.closeQuietly(connection);
@@ -198,7 +195,7 @@ public class JdbcExecutorLoaderTest {
 
     try {
       runner.update(connection, "DELETE FROM execution_logs");
-    } catch (SQLException e) {
+    } catch (final SQLException e) {
       e.printStackTrace();
       testDBExists = false;
       DbUtils.closeQuietly(connection);
@@ -207,7 +204,7 @@ public class JdbcExecutorLoaderTest {
 
     try {
       runner.update(connection, "DELETE FROM executors");
-    } catch (SQLException e) {
+    } catch (final SQLException e) {
       e.printStackTrace();
       testDBExists = false;
       DbUtils.closeQuietly(connection);
@@ -216,7 +213,7 @@ public class JdbcExecutorLoaderTest {
 
     try {
       runner.update(connection, "DELETE FROM executor_events");
-    } catch (SQLException e) {
+    } catch (final SQLException e) {
       e.printStackTrace();
       testDBExists = false;
       DbUtils.closeQuietly(connection);
@@ -231,12 +228,12 @@ public class JdbcExecutorLoaderTest {
     if (!isTestSetup()) {
       return;
     }
-    ExecutorLoader loader = createLoader();
-    ExecutableFlow flow = TestUtils.createExecutableFlow("exectest1", "exec1");
+    final ExecutorLoader loader = createLoader();
+    final ExecutableFlow flow = TestUtils.createExecutableFlow("exectest1", "exec1");
 
     loader.uploadExecutableFlow(flow);
 
-    ExecutableFlow fetchFlow =
+    final ExecutableFlow fetchFlow =
         loader.fetchExecutableFlow(flow.getExecutionId());
 
     // Shouldn't be the same object.
@@ -250,8 +247,8 @@ public class JdbcExecutorLoaderTest {
     Assert.assertEquals(flow.getVersion(), fetchFlow.getVersion());
     Assert.assertEquals(flow.getExecutionOptions().getFailureAction(),
         fetchFlow.getExecutionOptions().getFailureAction());
-    Assert.assertEquals(new HashSet<String>(flow.getEndNodes()),
-        new HashSet<String>(fetchFlow.getEndNodes()));
+    Assert.assertEquals(new HashSet<>(flow.getEndNodes()),
+        new HashSet<>(fetchFlow.getEndNodes()));
   }
 
   @Test
@@ -260,18 +257,18 @@ public class JdbcExecutorLoaderTest {
       return;
     }
 
-    ExecutorLoader loader = createLoader();
-    ExecutableFlow flow = TestUtils.createExecutableFlow("exectest1", "exec1");
+    final ExecutorLoader loader = createLoader();
+    final ExecutableFlow flow = TestUtils.createExecutableFlow("exectest1", "exec1");
 
     loader.uploadExecutableFlow(flow);
 
-    ExecutableFlow fetchFlow2 =
+    final ExecutableFlow fetchFlow2 =
         loader.fetchExecutableFlow(flow.getExecutionId());
 
     fetchFlow2.setEndTime(System.currentTimeMillis());
     fetchFlow2.setStatus(Status.SUCCEEDED);
     loader.updateExecutableFlow(fetchFlow2);
-    ExecutableFlow fetchFlow =
+    final ExecutableFlow fetchFlow =
         loader.fetchExecutableFlow(flow.getExecutionId());
 
     // Shouldn't be the same object.
@@ -286,8 +283,8 @@ public class JdbcExecutorLoaderTest {
     Assert.assertEquals(flow.getVersion(), fetchFlow.getVersion());
     Assert.assertEquals(flow.getExecutionOptions().getFailureAction(),
         fetchFlow.getExecutionOptions().getFailureAction());
-    Assert.assertEquals(new HashSet<String>(flow.getEndNodes()),
-        new HashSet<String>(fetchFlow.getEndNodes()));
+    Assert.assertEquals(new HashSet<>(flow.getEndNodes()),
+        new HashSet<>(fetchFlow.getEndNodes()));
   }
 
   @Test
@@ -296,18 +293,18 @@ public class JdbcExecutorLoaderTest {
       return;
     }
 
-    ExecutorLoader loader = createLoader();
-    ExecutableFlow flow = createExecutableFlow(10, "exec1");
+    final ExecutorLoader loader = createLoader();
+    final ExecutableFlow flow = createExecutableFlow(10, "exec1");
     flow.setExecutionId(10);
 
-    File jobFile = new File(UNIT_BASE_DIR + "/exectest1", "job10.job");
-    Props props = new Props(null, jobFile);
+    final File jobFile = new File(UNIT_BASE_DIR + "/exectest1", "job10.job");
+    final Props props = new Props(null, jobFile);
     props.put("test", "test2");
-    ExecutableNode oldNode = flow.getExecutableNode("job10");
+    final ExecutableNode oldNode = flow.getExecutableNode("job10");
     oldNode.setStartTime(System.currentTimeMillis());
     loader.uploadExecutableNode(oldNode, props);
 
-    ExecutableJobInfo info = loader.fetchJobInfo(10, "job10", 0);
+    final ExecutableJobInfo info = loader.fetchJobInfo(10, "job10", 0);
     Assert.assertEquals(flow.getExecutionId(), info.getExecId());
     Assert.assertEquals(flow.getProjectId(), info.getProjectId());
     Assert.assertEquals(flow.getVersion(), info.getVersion());
@@ -320,15 +317,15 @@ public class JdbcExecutorLoaderTest {
         info.getEndTime());
 
     // Fetch props
-    Props outputProps = new Props();
+    final Props outputProps = new Props();
     outputProps.put("hello", "output");
     oldNode.setOutputProps(outputProps);
     oldNode.setEndTime(System.currentTimeMillis());
     loader.updateExecutableNode(oldNode);
 
-    Props fInputProps = loader.fetchExecutionJobInputProps(10, "job10");
-    Props fOutputProps = loader.fetchExecutionJobOutputProps(10, "job10");
-    Pair<Props, Props> inOutProps = loader.fetchExecutionJobProps(10, "job10");
+    final Props fInputProps = loader.fetchExecutionJobInputProps(10, "job10");
+    final Props fOutputProps = loader.fetchExecutionJobOutputProps(10, "job10");
+    final Pair<Props, Props> inOutProps = loader.fetchExecutionJobProps(10, "job10");
 
     Assert.assertEquals(fInputProps.get("test"), "test2");
     Assert.assertEquals(fOutputProps.get("hello"), "output");
@@ -344,11 +341,11 @@ public class JdbcExecutorLoaderTest {
     if (!isTestSetup()) {
       return;
     }
-    ExecutorLoader loader = createLoader();
+    final ExecutorLoader loader = createLoader();
     try {
       loader.unassignExecutor(2);
       Assert.fail("Expecting exception, but didn't get one");
-    } catch (ExecutorManagerException ex) {
+    } catch (final ExecutorManagerException ex) {
       System.out.println("Test true");
     }
   }
@@ -360,11 +357,11 @@ public class JdbcExecutorLoaderTest {
     if (!isTestSetup()) {
       return;
     }
-    ExecutorLoader loader = createLoader();
-    String host = "localhost";
-    int port = 12345;
-    Executor executor = loader.addExecutor(host, port);
-    ExecutableFlow flow = TestUtils.createExecutableFlow("exectest1", "exec1");
+    final ExecutorLoader loader = createLoader();
+    final String host = "localhost";
+    final int port = 12345;
+    final Executor executor = loader.addExecutor(host, port);
+    final ExecutableFlow flow = TestUtils.createExecutableFlow("exectest1", "exec1");
     loader.uploadExecutableFlow(flow);
     loader.assignExecutor(executor.getId(), flow.getExecutionId());
     Assert.assertEquals(
@@ -381,13 +378,13 @@ public class JdbcExecutorLoaderTest {
     if (!isTestSetup()) {
       return;
     }
-    ExecutorLoader loader = createLoader();
-    ExecutableFlow flow = TestUtils.createExecutableFlow("exectest1", "exec1");
+    final ExecutorLoader loader = createLoader();
+    final ExecutableFlow flow = TestUtils.createExecutableFlow("exectest1", "exec1");
     loader.uploadExecutableFlow(flow);
     try {
       loader.assignExecutor(flow.getExecutionId(), 1);
       Assert.fail("Expecting exception, but didn't get one");
-    } catch (ExecutorManagerException ex) {
+    } catch (final ExecutorManagerException ex) {
       System.out.println("Test true");
     }
   }
@@ -399,14 +396,14 @@ public class JdbcExecutorLoaderTest {
     if (!isTestSetup()) {
       return;
     }
-    ExecutorLoader loader = createLoader();
-    String host = "localhost";
-    int port = 12345;
-    Executor executor = loader.addExecutor(host, port);
+    final ExecutorLoader loader = createLoader();
+    final String host = "localhost";
+    final int port = 12345;
+    final Executor executor = loader.addExecutor(host, port);
     try {
       loader.assignExecutor(2, executor.getId());
       Assert.fail("Expecting exception, but didn't get one");
-    } catch (ExecutorManagerException ex) {
+    } catch (final ExecutorManagerException ex) {
       System.out.println("Test true");
     }
   }
@@ -418,7 +415,7 @@ public class JdbcExecutorLoaderTest {
     if (!isTestSetup()) {
       return;
     }
-    ExecutorLoader loader = createLoader();
+    final ExecutorLoader loader = createLoader();
     Assert.assertEquals(loader.fetchExecutorByExecutionId(1), null);
   }
 
@@ -429,8 +426,8 @@ public class JdbcExecutorLoaderTest {
     if (!isTestSetup()) {
       return;
     }
-    ExecutorLoader loader = createLoader();
-    ExecutableFlow flow = TestUtils.createExecutableFlow("exectest1", "exec1");
+    final ExecutorLoader loader = createLoader();
+    final ExecutableFlow flow = TestUtils.createExecutableFlow("exectest1", "exec1");
     loader.uploadExecutableFlow(flow);
     Assert.assertEquals(loader.fetchExecutorByExecutionId(flow.getExecutionId()),
       null);
@@ -443,11 +440,11 @@ public class JdbcExecutorLoaderTest {
     if (!isTestSetup()) {
       return;
     }
-    ExecutorLoader loader = createLoader();
-    String host = "localhost";
-    int port = 12345;
-    Executor executor = loader.addExecutor(host, port);
-    ExecutableFlow flow = TestUtils.createExecutableFlow("exectest1", "exec1");
+    final ExecutorLoader loader = createLoader();
+    final String host = "localhost";
+    final int port = 12345;
+    final Executor executor = loader.addExecutor(host, port);
+    final ExecutableFlow flow = TestUtils.createExecutableFlow("exectest1", "exec1");
     loader.uploadExecutableFlow(flow);
     loader.assignExecutor(executor.getId(), flow.getExecutionId());
     Assert.assertEquals(loader.fetchExecutorByExecutionId(flow.getExecutionId()),
@@ -462,25 +459,25 @@ public class JdbcExecutorLoaderTest {
       return;
     }
 
-    ExecutorLoader loader = createLoader();
-    List<Pair<ExecutionReference, ExecutableFlow>> queuedFlows =
+    final ExecutorLoader loader = createLoader();
+    final List<Pair<ExecutionReference, ExecutableFlow>> queuedFlows =
       loader.fetchQueuedFlows();
 
     // no execution flows at all i.e. no running, completed or queued flows
     Assert.assertTrue(queuedFlows.isEmpty());
 
-    String host = "lcoalhost";
-    int port = 12345;
-    Executor executor = loader.addExecutor(host, port);
+    final String host = "lcoalhost";
+    final int port = 12345;
+    final Executor executor = loader.addExecutor(host, port);
 
     // When a flow is assigned an executor, it is no longer in queued state
-    ExecutableFlow flow = TestUtils.createExecutableFlow("exectest1", "exec1");
+    final ExecutableFlow flow = TestUtils.createExecutableFlow("exectest1", "exec1");
     loader.uploadExecutableFlow(flow);
     loader.assignExecutor(executor.getId(), flow.getExecutionId());
     Assert.assertTrue(queuedFlows.isEmpty());
 
     // When flow status is finished, it is no longer in queued state
-    ExecutableFlow flow2 = TestUtils.createExecutableFlow("exectest1", "exec2");
+    final ExecutableFlow flow2 = TestUtils.createExecutableFlow("exectest1", "exec2");
     loader.uploadExecutableFlow(flow2);
     flow2.setStatus(Status.SUCCEEDED);
     loader.updateExecutableFlow(flow2);
@@ -495,17 +492,18 @@ public class JdbcExecutorLoaderTest {
       return;
     }
 
-    ExecutorLoader loader = createLoader();
+    final ExecutorLoader loader = createLoader();
 
-    ExecutableFlow flow = TestUtils.createExecutableFlow("exectest1", "exec1");
+    final ExecutableFlow flow = TestUtils.createExecutableFlow("exectest1", "exec1");
     loader.uploadExecutableFlow(flow);
-    ExecutableFlow flow2 = TestUtils.createExecutableFlow("exectest1", "exec2");
+    final ExecutableFlow flow2 = TestUtils.createExecutableFlow("exectest1", "exec2");
     loader.uploadExecutableFlow(flow2);
 
-    List<Pair<ExecutionReference, ExecutableFlow>> fetchedQueuedFlows = loader.fetchQueuedFlows();
+    final List<Pair<ExecutionReference, ExecutableFlow>> fetchedQueuedFlows = loader
+        .fetchQueuedFlows();
     Assert.assertEquals(2, fetchedQueuedFlows.size());
-    Pair<ExecutionReference, ExecutableFlow> fetchedFlow1 = fetchedQueuedFlows.get(0);
-    Pair<ExecutionReference, ExecutableFlow> fetchedFlow2 = fetchedQueuedFlows.get(1);
+    final Pair<ExecutionReference, ExecutableFlow> fetchedFlow1 = fetchedQueuedFlows.get(0);
+    final Pair<ExecutionReference, ExecutableFlow> fetchedFlow2 = fetchedQueuedFlows.get(1);
 
     Assert.assertEquals(flow.getExecutionId(), fetchedFlow1.getSecond().getExecutionId());
     Assert.assertEquals(flow.getFlowId(), fetchedFlow1.getSecond().getFlowId());
@@ -521,8 +519,8 @@ public class JdbcExecutorLoaderTest {
     if (!isTestSetup()) {
       return;
     }
-    ExecutorLoader loader = createLoader();
-    List<Executor> executors = loader.fetchAllExecutors();
+    final ExecutorLoader loader = createLoader();
+    final List<Executor> executors = loader.fetchAllExecutors();
     Assert.assertEquals(executors.size(), 0);
   }
 
@@ -532,8 +530,8 @@ public class JdbcExecutorLoaderTest {
     if (!isTestSetup()) {
       return;
     }
-    ExecutorLoader loader = createLoader();
-    List<Executor> executors = loader.fetchActiveExecutors();
+    final ExecutorLoader loader = createLoader();
+    final List<Executor> executors = loader.fetchActiveExecutors();
     Assert.assertEquals(executors.size(), 0);
   }
 
@@ -543,8 +541,8 @@ public class JdbcExecutorLoaderTest {
     if (!isTestSetup()) {
       return;
     }
-    ExecutorLoader loader = createLoader();
-    Executor executor = loader.fetchExecutor(0);
+    final ExecutorLoader loader = createLoader();
+    final Executor executor = loader.fetchExecutor(0);
     Assert.assertEquals(executor, null);
   }
 
@@ -554,8 +552,8 @@ public class JdbcExecutorLoaderTest {
     if (!isTestSetup()) {
       return;
     }
-    ExecutorLoader loader = createLoader();
-    Executor executor = loader.fetchExecutor("localhost", 12345);
+    final ExecutorLoader loader = createLoader();
+    final Executor executor = loader.fetchExecutor("localhost", 12345);
     Assert.assertEquals(executor, null);
   }
 
@@ -565,9 +563,9 @@ public class JdbcExecutorLoaderTest {
     if (!isTestSetup()) {
       return;
     }
-    ExecutorLoader loader = createLoader();
-    Executor executor = new Executor(1, "localhost", 12345, true);
-    List<ExecutorLogEvent> executorEvents =
+    final ExecutorLoader loader = createLoader();
+    final Executor executor = new Executor(1, "localhost", 12345, true);
+    final List<ExecutorLogEvent> executorEvents =
       loader.getExecutorEvents(executor, 5, 0);
     Assert.assertEquals(executorEvents.size(), 0);
   }
@@ -578,25 +576,25 @@ public class JdbcExecutorLoaderTest {
     if (!isTestSetup()) {
       return;
     }
-    ExecutorLoader loader = createLoader();
-    int skip = 1;
-    User user = new User("testUser");
-    Executor executor = new Executor(1, "localhost", 12345, true);
-    String message = "My message ";
-    EventType[] events =
+    final ExecutorLoader loader = createLoader();
+    final int skip = 1;
+    final User user = new User("testUser");
+    final Executor executor = new Executor(1, "localhost", 12345, true);
+    final String message = "My message ";
+    final EventType[] events =
       { EventType.CREATED, EventType.HOST_UPDATE, EventType.INACTIVATION };
 
-    for (EventType event : events) {
+    for (final EventType event : events) {
       loader.postExecutorEvent(executor, event, user.getUserId(),
         message + event.getNumVal());
     }
 
-    List<ExecutorLogEvent> eventLogs =
+    final List<ExecutorLogEvent> eventLogs =
       loader.getExecutorEvents(executor, 10, skip);
     Assert.assertTrue(eventLogs.size() == 2);
 
     for (int index = 0; index < eventLogs.size(); ++index) {
-      ExecutorLogEvent eventLog = eventLogs.get(index);
+      final ExecutorLogEvent eventLog = eventLogs.get(index);
       Assert.assertEquals(eventLog.getExecutorId(), executor.getId());
       Assert.assertEquals(eventLog.getUser(), user.getUserId());
       Assert.assertEquals(eventLog.getType(), events[index + skip]);
@@ -611,14 +609,14 @@ public class JdbcExecutorLoaderTest {
     if (!isTestSetup()) {
       return;
     }
-    ExecutorLoader loader = createLoader();
+    final ExecutorLoader loader = createLoader();
     try {
-      String host = "localhost";
-      int port = 12345;
+      final String host = "localhost";
+      final int port = 12345;
       loader.addExecutor(host, port);
       loader.addExecutor(host, port);
       Assert.fail("Expecting exception, but didn't get one");
-    } catch (ExecutorManagerException ex) {
+    } catch (final ExecutorManagerException ex) {
       System.out.println("Test true");
     }
   }
@@ -629,12 +627,12 @@ public class JdbcExecutorLoaderTest {
     if (!isTestSetup()) {
       return;
     }
-    ExecutorLoader loader = createLoader();
+    final ExecutorLoader loader = createLoader();
     try {
-      Executor executor = new Executor(1, "localhost", 1234, true);
+      final Executor executor = new Executor(1, "localhost", 1234, true);
       loader.updateExecutor(executor);
       Assert.fail("Expecting exception, but didn't get one");
-    } catch (ExecutorManagerException ex) {
+    } catch (final ExecutorManagerException ex) {
       System.out.println("Test true");
     }
     clearDB();
@@ -646,10 +644,10 @@ public class JdbcExecutorLoaderTest {
     if (!isTestSetup()) {
       return;
     }
-    ExecutorLoader loader = createLoader();
-    List<Executor> executors = addTestExecutors(loader);
-    for (Executor executor : executors) {
-      Executor fetchedExecutor = loader.fetchExecutor(executor.getId());
+    final ExecutorLoader loader = createLoader();
+    final List<Executor> executors = addTestExecutors(loader);
+    for (final Executor executor : executors) {
+      final Executor fetchedExecutor = loader.fetchExecutor(executor.getId());
       Assert.assertEquals(executor, fetchedExecutor);
     }
   }
@@ -660,13 +658,13 @@ public class JdbcExecutorLoaderTest {
     if (!isTestSetup()) {
       return;
     }
-    ExecutorLoader loader = createLoader();
-    List<Executor> executors = addTestExecutors(loader);
+    final ExecutorLoader loader = createLoader();
+    final List<Executor> executors = addTestExecutors(loader);
 
     executors.get(0).setActive(false);
     loader.updateExecutor(executors.get(0));
 
-    List<Executor> fetchedExecutors = loader.fetchAllExecutors();
+    final List<Executor> fetchedExecutors = loader.fetchAllExecutors();
     Assert.assertEquals(executors.size(), fetchedExecutors.size());
 
     Assert.assertArrayEquals(executors.toArray(), fetchedExecutors.toArray());
@@ -678,13 +676,13 @@ public class JdbcExecutorLoaderTest {
     if (!isTestSetup()) {
       return;
     }
-    ExecutorLoader loader = createLoader();
-    List<Executor> executors = addTestExecutors(loader);
+    final ExecutorLoader loader = createLoader();
+    final List<Executor> executors = addTestExecutors(loader);
 
     executors.get(0).setActive(false);
     loader.updateExecutor(executors.get(0));
 
-    List<Executor> fetchedExecutors = loader.fetchActiveExecutors();
+    final List<Executor> fetchedExecutors = loader.fetchActiveExecutors();
     Assert.assertEquals(executors.size(), fetchedExecutors.size() + 1);
     executors.remove(0);
 
@@ -697,19 +695,19 @@ public class JdbcExecutorLoaderTest {
     if (!isTestSetup()) {
       return;
     }
-    ExecutorLoader loader = createLoader();
-    List<Executor> executors = addTestExecutors(loader);
-    for (Executor executor : executors) {
-      Executor fetchedExecutor =
+    final ExecutorLoader loader = createLoader();
+    final List<Executor> executors = addTestExecutors(loader);
+    for (final Executor executor : executors) {
+      final Executor fetchedExecutor =
         loader.fetchExecutor(executor.getHost(), executor.getPort());
       Assert.assertEquals(executor, fetchedExecutor);
     }
   }
 
   /* Helper method used in methods testing jdbc interface for executors table */
-  private List<Executor> addTestExecutors(ExecutorLoader loader)
+  private List<Executor> addTestExecutors(final ExecutorLoader loader)
     throws ExecutorManagerException {
-    List<Executor> executors = new ArrayList<Executor>();
+    final List<Executor> executors = new ArrayList<>();
     executors.add(loader.addExecutor("localhost1", 12345));
     executors.add(loader.addExecutor("localhost2", 12346));
     executors.add(loader.addExecutor("localhost1", 12347));
@@ -723,14 +721,14 @@ public class JdbcExecutorLoaderTest {
       return;
     }
 
-    ExecutorLoader loader = createLoader();
-    Executor executor = loader.addExecutor("localhost1", 12345);
+    final ExecutorLoader loader = createLoader();
+    final Executor executor = loader.addExecutor("localhost1", 12345);
     Assert.assertTrue(executor.isActive());
 
     executor.setActive(false);
     loader.updateExecutor(executor);
 
-    Executor fetchedExecutor = loader.fetchExecutor(executor.getId());
+    final Executor fetchedExecutor = loader.fetchExecutor(executor.getId());
 
     Assert.assertEquals(executor.getHost(), fetchedExecutor.getHost());
     Assert.assertEquals(executor.getId(), fetchedExecutor.getId());
@@ -745,11 +743,11 @@ public class JdbcExecutorLoaderTest {
       return;
     }
 
-    ExecutorLoader loader = createLoader();
-    Executor executor = loader.addExecutor("localhost1", 12345);
+    final ExecutorLoader loader = createLoader();
+    final Executor executor = loader.addExecutor("localhost1", 12345);
     Assert.assertNotNull(executor);
     loader.removeExecutor("localhost1", 12345);
-    Executor fetchedExecutor = loader.fetchExecutor("localhost1", 12345);
+    final Executor fetchedExecutor = loader.fetchExecutor("localhost1", 12345);
     Assert.assertNull(fetchedExecutor);
   }
 
@@ -760,8 +758,8 @@ public class JdbcExecutorLoaderTest {
       return;
     }
 
-    ExecutorLoader loader = createLoader();
-    Executor executor = loader.addExecutor("localhost1", 12345);
+    final ExecutorLoader loader = createLoader();
+    final Executor executor = loader.addExecutor("localhost1", 12345);
     Assert.assertTrue(executor.isActive());
 
     executor.setActive(false);
@@ -783,23 +781,23 @@ public class JdbcExecutorLoaderTest {
     }
 
     // Upload flow1, executor assigned
-    ExecutorLoader loader = createLoader();
-    ExecutableFlow flow1 = TestUtils.createExecutableFlow("exectest1", "exec1");
+    final ExecutorLoader loader = createLoader();
+    final ExecutableFlow flow1 = TestUtils.createExecutableFlow("exectest1", "exec1");
     loader.uploadExecutableFlow(flow1);
-    Executor executor = loader.addExecutor("test", 1);
+    final Executor executor = loader.addExecutor("test", 1);
     loader.assignExecutor(executor.getId(), flow1.getExecutionId());
 
     // Upload flow2, executor not assigned
-    ExecutableFlow flow2 = TestUtils.createExecutableFlow("exectest1", "exec2");
+    final ExecutableFlow flow2 = TestUtils.createExecutableFlow("exectest1", "exec2");
     loader.uploadExecutableFlow(flow2);
 
-    Map<Integer, Pair<ExecutionReference, ExecutableFlow>> activeFlows1 =
+    final Map<Integer, Pair<ExecutionReference, ExecutableFlow>> activeFlows1 =
         loader.fetchActiveFlows();
 
     Assert.assertTrue(activeFlows1.containsKey(flow1.getExecutionId()));
     Assert.assertFalse(activeFlows1.containsKey(flow2.getExecutionId()));
 
-    ExecutableFlow flow1Result =
+    final ExecutableFlow flow1Result =
         activeFlows1.get(flow1.getExecutionId()).getSecond();
     Assert.assertNotNull(flow1Result);
     Assert.assertTrue(flow1 != flow1Result);
@@ -820,11 +818,11 @@ public class JdbcExecutorLoaderTest {
       return;
     }
 
-    ExecutorLoader loader = createLoader();
-    ExecutableFlow flow1 = TestUtils.createExecutableFlow("exectest1", "exec1");
+    final ExecutorLoader loader = createLoader();
+    final ExecutableFlow flow1 = TestUtils.createExecutableFlow("exectest1", "exec1");
     // Flow status is PREPARING when uploaded, should be in active flows
     loader.uploadExecutableFlow(flow1);
-    Executor executor = loader.addExecutor("test", 1);
+    final Executor executor = loader.addExecutor("test", 1);
     loader.assignExecutor(executor.getId(), flow1.getExecutionId());
 
     Map<Integer, Pair<ExecutionReference, ExecutableFlow>> activeFlows =
@@ -854,16 +852,16 @@ public class JdbcExecutorLoaderTest {
       return;
     }
 
-    ExecutorLoader loader = createLoader();
-    ExecutableFlow flow1 = TestUtils.createExecutableFlow("exectest1", "exec1");
+    final ExecutorLoader loader = createLoader();
+    final ExecutableFlow flow1 = TestUtils.createExecutableFlow("exectest1", "exec1");
     loader.uploadExecutableFlow(flow1);
-    Executor executor = loader.addExecutor("test", 1);
+    final Executor executor = loader.addExecutor("test", 1);
     loader.assignExecutor(executor.getId(), flow1.getExecutionId());
-    ExecutionReference ref1 =
+    final ExecutionReference ref1 =
         new ExecutionReference(flow1.getExecutionId(), executor);
     loader.addActiveExecutableReference(ref1);
 
-    Map<Integer, Pair<ExecutionReference, ExecutableFlow>> activeFlows1 =
+    final Map<Integer, Pair<ExecutionReference, ExecutableFlow>> activeFlows1 =
         loader.fetchActiveFlows();
     Assert.assertTrue(activeFlows1.containsKey(flow1.getExecutionId()));
 
@@ -878,17 +876,17 @@ public class JdbcExecutorLoaderTest {
       return;
     }
 
-    ExecutorLoader loader = createLoader();
-    ExecutableFlow flow1 = TestUtils.createExecutableFlow("exectest1", "exec1");
+    final ExecutorLoader loader = createLoader();
+    final ExecutableFlow flow1 = TestUtils.createExecutableFlow("exectest1", "exec1");
     loader.uploadExecutableFlow(flow1);
-    Executor executor = loader.addExecutor("test", 1);
+    final Executor executor = loader.addExecutor("test", 1);
     loader.assignExecutor(executor.getId(), flow1.getExecutionId());
 
-    Pair<ExecutionReference, ExecutableFlow> activeFlow1 =
+    final Pair<ExecutionReference, ExecutableFlow> activeFlow1 =
         loader.fetchActiveFlowByExecId(flow1.getExecutionId());
 
-    ExecutionReference execRef1 = activeFlow1.getFirst();
-    ExecutableFlow execFlow1 = activeFlow1.getSecond();
+    final ExecutionReference execRef1 = activeFlow1.getFirst();
+    final ExecutableFlow execFlow1 = activeFlow1.getSecond();
     Assert.assertNotNull(execRef1);
     Assert.assertNotNull(execFlow1);
     Assert.assertEquals(flow1.getExecutionId(), execFlow1.getExecutionId());
@@ -903,14 +901,14 @@ public class JdbcExecutorLoaderTest {
       return;
     }
 
-    ExecutorLoader loader = createLoader();
-    ExecutableFlow flow1 = TestUtils.createExecutableFlow("exectest1", "exec1");
+    final ExecutorLoader loader = createLoader();
+    final ExecutableFlow flow1 = TestUtils.createExecutableFlow("exectest1", "exec1");
     loader.uploadExecutableFlow(flow1);
     flow1.setStatus(Status.SUCCEEDED);
     flow1.setEndTime(DateTimeUtils.currentTimeMillis());
     loader.updateExecutableFlow(flow1);
     //Flow just finished. Fetch recently finished flows immediately. Should get it.
-    List<ExecutableFlow> flows = loader.fetchRecentlyFinishedFlows(
+    final List<ExecutableFlow> flows = loader.fetchRecentlyFinishedFlows(
         RECENTLY_FINISHED_LIFETIME);
     Assert.assertEquals(1, flows.size());
     Assert.assertEquals(flow1.getExecutionId(), flows.get(0).getExecutionId());
@@ -925,8 +923,8 @@ public class JdbcExecutorLoaderTest {
       return;
     }
 
-    ExecutorLoader loader = createLoader();
-    ExecutableFlow flow1 = TestUtils.createExecutableFlow("exectest1", "exec1");
+    final ExecutorLoader loader = createLoader();
+    final ExecutableFlow flow1 = TestUtils.createExecutableFlow("exectest1", "exec1");
     loader.uploadExecutableFlow(flow1);
     flow1.setStatus(Status.SUCCEEDED);
     flow1.setEndTime(DateTimeUtils.currentTimeMillis());
@@ -937,29 +935,29 @@ public class JdbcExecutorLoaderTest {
     flow1.setEndTime(DateTimeUtils.currentTimeMillis());
     loader.updateExecutableFlow(flow1);
     //Fetch recently finished flows within 1 min. Should be empty.
-    List<ExecutableFlow> flows = loader
+    final List<ExecutableFlow> flows = loader
         .fetchRecentlyFinishedFlows(RECENTLY_FINISHED_LIFETIME);
     Assert.assertTrue(flows.isEmpty());
   }
 
   @Ignore @Test
   public void testSmallUploadLog() throws ExecutorManagerException {
-    File logDir = new File(UNIT_BASE_DIR + "logtest");
-    File[] smalllog =
+    final File logDir = new File(UNIT_BASE_DIR + "logtest");
+    final File[] smalllog =
         { new File(logDir, "log1.log"), new File(logDir, "log2.log"),
             new File(logDir, "log3.log") };
 
-    ExecutorLoader loader = createLoader();
+    final ExecutorLoader loader = createLoader();
     loader.uploadLogFile(1, "smallFiles", 0, smalllog);
 
-    LogData data = loader.fetchLogs(1, "smallFiles", 0, 0, 50000);
+    final LogData data = loader.fetchLogs(1, "smallFiles", 0, 0, 50000);
     Assert.assertNotNull(data);
     Assert.assertEquals("Logs length is " + data.getLength(), data.getLength(),
         53);
 
     System.out.println(data.toString());
 
-    LogData data2 = loader.fetchLogs(1, "smallFiles", 0, 10, 20);
+    final LogData data2 = loader.fetchLogs(1, "smallFiles", 0, 10, 20);
     System.out.println(data2.toString());
     Assert.assertNotNull(data2);
     Assert.assertEquals("Logs length is " + data2.getLength(),
@@ -969,40 +967,40 @@ public class JdbcExecutorLoaderTest {
 
   @Ignore @Test
   public void testLargeUploadLog() throws ExecutorManagerException {
-    File logDir = new File(UNIT_BASE_DIR + "logtest");
+    final File logDir = new File(UNIT_BASE_DIR + "logtest");
 
     // Multiple of 255 for Henry the Eigth
-    File[] largelog =
+    final File[] largelog =
         { new File(logDir, "largeLog1.log"), new File(logDir, "largeLog2.log"),
             new File(logDir, "largeLog3.log") };
 
-    ExecutorLoader loader = createLoader();
+    final ExecutorLoader loader = createLoader();
     loader.uploadLogFile(1, "largeFiles", 0, largelog);
 
-    LogData logsResult = loader.fetchLogs(1, "largeFiles", 0, 0, 64000);
+    final LogData logsResult = loader.fetchLogs(1, "largeFiles", 0, 0, 64000);
     Assert.assertNotNull(logsResult);
     Assert.assertEquals("Logs length is " + logsResult.getLength(),
         logsResult.getLength(), 64000);
 
-    LogData logsResult2 = loader.fetchLogs(1, "largeFiles", 0, 1000, 64000);
+    final LogData logsResult2 = loader.fetchLogs(1, "largeFiles", 0, 1000, 64000);
     Assert.assertNotNull(logsResult2);
     Assert.assertEquals("Logs length is " + logsResult2.getLength(),
         logsResult2.getLength(), 64000);
 
-    LogData logsResult3 = loader.fetchLogs(1, "largeFiles", 0, 330000, 400000);
+    final LogData logsResult3 = loader.fetchLogs(1, "largeFiles", 0, 330000, 400000);
     Assert.assertNotNull(logsResult3);
     Assert.assertEquals("Logs length is " + logsResult3.getLength(),
         logsResult3.getLength(), 5493);
 
-    LogData logsResult4 = loader.fetchLogs(1, "largeFiles", 0, 340000, 400000);
+    final LogData logsResult4 = loader.fetchLogs(1, "largeFiles", 0, 340000, 400000);
     Assert.assertNull(logsResult4);
 
-    LogData logsResult5 = loader.fetchLogs(1, "largeFiles", 0, 153600, 204800);
+    final LogData logsResult5 = loader.fetchLogs(1, "largeFiles", 0, 153600, 204800);
     Assert.assertNotNull(logsResult5);
     Assert.assertEquals("Logs length is " + logsResult5.getLength(),
         logsResult5.getLength(), 181893);
 
-    LogData logsResult6 = loader.fetchLogs(1, "largeFiles", 0, 150000, 250000);
+    final LogData logsResult6 = loader.fetchLogs(1, "largeFiles", 0, 150000, 250000);
     Assert.assertNotNull(logsResult6);
     Assert.assertEquals("Logs length is " + logsResult6.getLength(),
         logsResult6.getLength(), 185493);
@@ -1013,24 +1011,24 @@ public class JdbcExecutorLoaderTest {
   public void testRemoveExecutionLogsByTime() throws ExecutorManagerException,
       IOException, InterruptedException {
 
-    ExecutorLoader loader = createLoader();
+    final ExecutorLoader loader = createLoader();
 
-    File logDir = new File(UNIT_BASE_DIR + "logtest");
+    final File logDir = new File(UNIT_BASE_DIR + "logtest");
 
     // Multiple of 255 for Henry the Eigth
-    File[] largelog =
+    final File[] largelog =
         { new File(logDir, "largeLog1.log"), new File(logDir, "largeLog2.log"),
             new File(logDir, "largeLog3.log") };
 
-    DateTime time1 = DateTime.now();
+    final DateTime time1 = DateTime.now();
     loader.uploadLogFile(1, "oldlog", 0, largelog);
     // sleep for 5 seconds
     Thread.currentThread().sleep(5000);
     loader.uploadLogFile(2, "newlog", 0, largelog);
 
-    DateTime time2 = time1.plusMillis(2500);
+    final DateTime time2 = time1.plusMillis(2500);
 
-    int count = loader.removeExecutionLogsByTime(time2.getMillis());
+    final int count = loader.removeExecutionLogsByTime(time2.getMillis());
     System.out.print("Removed " + count + " records");
     LogData logs = loader.fetchLogs(1, "oldlog", 0, 0, 22222);
     Assert.assertTrue(logs == null);
@@ -1038,16 +1036,16 @@ public class JdbcExecutorLoaderTest {
     Assert.assertFalse(logs == null);
   }
 
-  private ExecutableFlow createExecutableFlow(int executionId, String flowName)
+  private ExecutableFlow createExecutableFlow(final int executionId, final String flowName)
     throws IOException {
-    ExecutableFlow execFlow =
+    final ExecutableFlow execFlow =
       TestUtils.createExecutableFlow("exectest1", flowName);
     execFlow.setExecutionId(executionId);
     return execFlow;
   }
 
   private ExecutorLoader createLoader() {
-    Props props = new Props();
+    final Props props = new Props();
     props.put("database.type", "mysql");
 
     props.put("mysql.host", host);
@@ -1057,7 +1055,8 @@ public class JdbcExecutorLoaderTest {
     props.put("mysql.password", password);
     props.put("mysql.numconnections", numConnections);
 
-    return new JdbcExecutorLoader(props, new CommonMetrics(new MetricRegistry()));
+    return new JdbcExecutorLoader(props,
+        new CommonMetrics(new MetricsManager(new MetricRegistry())));
   }
 
   private boolean isTestSetup() {
@@ -1072,7 +1071,7 @@ public class JdbcExecutorLoaderTest {
 
   public static class CountHandler implements ResultSetHandler<Integer> {
     @Override
-    public Integer handle(ResultSet rs) throws SQLException {
+    public Integer handle(final ResultSet rs) throws SQLException {
       int val = 0;
       while (rs.next()) {
         val++;
diff --git a/azkaban-common/src/test/java/azkaban/metrics/CommonMetricsTest.java b/azkaban-common/src/test/java/azkaban/metrics/CommonMetricsTest.java
index 910b1e0..37efc7c 100644
--- a/azkaban-common/src/test/java/azkaban/metrics/CommonMetricsTest.java
+++ b/azkaban-common/src/test/java/azkaban/metrics/CommonMetricsTest.java
@@ -32,7 +32,7 @@ public class CommonMetricsTest {
   public void setUp() {
     final MetricRegistry metricRegistry = new MetricRegistry();
     this.testUtil = new MetricsTestUtility(metricRegistry);
-    this.metrics = new CommonMetrics(metricRegistry);
+    this.metrics = new CommonMetrics(new MetricsManager(metricRegistry));
   }
 
   @Test
diff --git a/azkaban-common/src/test/java/azkaban/project/JdbcProjectLoaderTest.java b/azkaban-common/src/test/java/azkaban/project/JdbcProjectLoaderTest.java
index 9117c94..e812048 100644
--- a/azkaban-common/src/test/java/azkaban/project/JdbcProjectLoaderTest.java
+++ b/azkaban-common/src/test/java/azkaban/project/JdbcProjectLoaderTest.java
@@ -21,6 +21,7 @@ import azkaban.flow.Edge;
 import azkaban.flow.Flow;
 import azkaban.flow.Node;
 import azkaban.metrics.CommonMetrics;
+import azkaban.metrics.MetricsManager;
 import azkaban.project.ProjectLogEvent.EventType;
 import azkaban.user.Permission;
 import azkaban.user.User;
@@ -655,7 +656,8 @@ public class JdbcProjectLoaderTest {
     props.put("mysql.password", password);
     props.put("mysql.numconnections", numConnections);
 
-    return new JdbcProjectLoader(props, new CommonMetrics(new MetricRegistry()));
+    return new JdbcProjectLoader(props,
+        new CommonMetrics(new MetricsManager(new MetricRegistry())));
   }
 
   private boolean isTestSetup() {
diff --git a/azkaban-common/src/test/java/azkaban/trigger/TriggerManagerDeadlockTest.java b/azkaban-common/src/test/java/azkaban/trigger/TriggerManagerDeadlockTest.java
index 8667451..d44a739 100644
--- a/azkaban-common/src/test/java/azkaban/trigger/TriggerManagerDeadlockTest.java
+++ b/azkaban-common/src/test/java/azkaban/trigger/TriggerManagerDeadlockTest.java
@@ -22,6 +22,7 @@ import azkaban.executor.ExecutorManager;
 import azkaban.executor.ExecutorManagerException;
 import azkaban.executor.MockExecutorLoader;
 import azkaban.metrics.CommonMetrics;
+import azkaban.metrics.MetricsManager;
 import azkaban.trigger.builtin.CreateTriggerAction;
 import azkaban.utils.Props;
 import com.codahale.metrics.MetricRegistry;
@@ -48,7 +49,7 @@ public class TriggerManagerDeadlockTest {
     props.put("executor.port", 12321);
     this.execLoader = new MockExecutorLoader();
     final ExecutorManager executorManager = new ExecutorManager(props, this.execLoader,
-        new AlerterHolder(props), new CommonMetrics(new MetricRegistry()));
+        new AlerterHolder(props), new CommonMetrics(new MetricsManager(new MetricRegistry())));
     this.triggerManager = new TriggerManager(props, this.loader, executorManager);
   }
 
diff --git a/azkaban-exec-server/src/main/java/azkaban/execapp/ExecMetrics.java b/azkaban-exec-server/src/main/java/azkaban/execapp/ExecMetrics.java
index d6b0ec0..39209c1 100644
--- a/azkaban-exec-server/src/main/java/azkaban/execapp/ExecMetrics.java
+++ b/azkaban-exec-server/src/main/java/azkaban/execapp/ExecMetrics.java
@@ -16,8 +16,7 @@
 
 package azkaban.execapp;
 
-import azkaban.metrics.MetricsUtility;
-import com.codahale.metrics.MetricRegistry;
+import azkaban.metrics.MetricsManager;
 import com.google.inject.Inject;
 import com.google.inject.Singleton;
 
@@ -27,11 +26,11 @@ import com.google.inject.Singleton;
 @Singleton
 public class ExecMetrics {
 
-  private final MetricRegistry registry;
+  private final MetricsManager metricsManager;
 
   @Inject
-  ExecMetrics(final MetricRegistry registry) {
-    this.registry = registry;
+  ExecMetrics(final MetricsManager metricsManager) {
+    this.metricsManager = metricsManager;
     setupStaticMetrics();
   }
 
@@ -40,9 +39,9 @@ public class ExecMetrics {
   }
 
   public void addFlowRunnerManagerMetrics(final FlowRunnerManager flowRunnerManager) {
-    MetricsUtility
-        .addGauge("EXEC-NumRunningFlows", this.registry, flowRunnerManager::getNumRunningFlows);
-    MetricsUtility
-        .addGauge("EXEC-NumQueuedFlows", this.registry, flowRunnerManager::getNumQueuedFlows);
+    this.metricsManager
+        .addGauge("EXEC-NumRunningFlows", flowRunnerManager::getNumRunningFlows);
+    this.metricsManager
+        .addGauge("EXEC-NumQueuedFlows", flowRunnerManager::getNumQueuedFlows);
   }
 }
diff --git a/azkaban-web-server/src/main/java/azkaban/webapp/AzkabanWebServer.java b/azkaban-web-server/src/main/java/azkaban/webapp/AzkabanWebServer.java
index 745e46d..a0eae64 100644
--- a/azkaban-web-server/src/main/java/azkaban/webapp/AzkabanWebServer.java
+++ b/azkaban-web-server/src/main/java/azkaban/webapp/AzkabanWebServer.java
@@ -27,7 +27,6 @@ import azkaban.jmx.JmxExecutorManager;
 import azkaban.jmx.JmxJettyServer;
 import azkaban.jmx.JmxTriggerManager;
 import azkaban.metrics.MetricsManager;
-import azkaban.metrics.MetricsUtility;
 import azkaban.project.ProjectManager;
 import azkaban.scheduler.ScheduleManager;
 import azkaban.server.AzkabanServer;
@@ -60,7 +59,6 @@ import azkaban.webapp.servlet.ProjectServlet;
 import azkaban.webapp.servlet.ScheduleServlet;
 import azkaban.webapp.servlet.StatsServlet;
 import azkaban.webapp.servlet.TriggerManagerServlet;
-import com.codahale.metrics.MetricRegistry;
 import com.google.inject.Guice;
 import com.google.inject.Inject;
 import com.google.inject.Injector;
@@ -134,7 +132,6 @@ public class AzkabanWebServer extends AzkabanServer {
   private final ExecutorManager executorManager;
   private final ScheduleManager scheduleManager;
   private final TriggerManager triggerManager;
-  private final MetricRegistry registry;
   private final MetricsManager metricsManager;
   private final Props props;
   private final SessionCache sessionCache;
@@ -152,7 +149,6 @@ public class AzkabanWebServer extends AzkabanServer {
       final ProjectManager projectManager,
       final TriggerManager triggerManager,
       final MetricsManager metricsManager,
-      final MetricRegistry metricRegistry,
       final SessionCache sessionCache,
       final UserManager userManager,
       final ScheduleManager scheduleManager,
@@ -163,7 +159,6 @@ public class AzkabanWebServer extends AzkabanServer {
     this.projectManager = requireNonNull(projectManager, "projectManager is null.");
     this.triggerManager = requireNonNull(triggerManager, "triggerManager is null.");
     this.metricsManager = requireNonNull(metricsManager, "metricsManager is null.");
-    this.registry = requireNonNull(metricRegistry, "metricRegistry is null.");
     this.sessionCache = requireNonNull(sessionCache, "sessionCache is null.");
     this.userManager = requireNonNull(userManager, "userManager is null.");
     this.scheduleManager = requireNonNull(scheduleManager, "scheduleManager is null.");
@@ -699,30 +694,25 @@ public class AzkabanWebServer extends AzkabanServer {
   private void startWebMetrics() throws Exception {
 
     // The number of idle threads in Jetty thread pool
-    MetricsUtility
-        .addGauge("JETTY-NumIdleThreads", this.registry, this.queuedThreadPool::getIdleThreads);
+    this.metricsManager.addGauge("JETTY-NumIdleThreads", this.queuedThreadPool::getIdleThreads);
 
     // The number of threads in Jetty thread pool. The formula is:
     // threads = idleThreads + busyThreads
-    MetricsUtility
-        .addGauge("JETTY-NumTotalThreads", this.registry, this.queuedThreadPool::getThreads);
+    this.metricsManager.addGauge("JETTY-NumTotalThreads", this.queuedThreadPool::getThreads);
 
     // The number of requests queued in the Jetty thread pool.
-    MetricsUtility
-        .addGauge("JETTY-NumQueueSize", this.registry, this.queuedThreadPool::getQueueSize);
+    this.metricsManager.addGauge("JETTY-NumQueueSize", this.queuedThreadPool::getQueueSize);
 
-    MetricsUtility
-        .addGauge("WEB-NumQueuedFlows", this.registry, this.executorManager::getQueuedFlowSize);
-    /**
+    this.metricsManager.addGauge("WEB-NumQueuedFlows", this.executorManager::getQueuedFlowSize);
+    /*
      * TODO: Currently {@link ExecutorManager#getRunningFlows()} includes both running and non-dispatched flows.
      * Originally we would like to do a subtraction between getRunningFlows and {@link ExecutorManager#getQueuedFlowSize()},
      * in order to have the correct runnable flows.
      * However, both getRunningFlows and getQueuedFlowSize are not synchronized, such that we can not make
      * a thread safe subtraction. We need to fix this in the future.
      */
-    MetricsUtility
-        .addGauge("WEB-NumRunningFlows", this.registry,
-            () -> this.executorManager.getRunningFlows().size());
+    this.metricsManager
+        .addGauge("WEB-NumRunningFlows", () -> this.executorManager.getRunningFlows().size());
 
     logger.info("starting reporting Web Server Metrics");
     this.metricsManager.startReporting("AZ-WEB", this.props);
diff --git a/azkaban-web-server/src/main/java/azkaban/webapp/WebMetrics.java b/azkaban-web-server/src/main/java/azkaban/webapp/WebMetrics.java
index 9b11bce..b22e428 100644
--- a/azkaban-web-server/src/main/java/azkaban/webapp/WebMetrics.java
+++ b/azkaban-web-server/src/main/java/azkaban/webapp/WebMetrics.java
@@ -16,9 +16,8 @@
 
 package azkaban.webapp;
 
-import azkaban.metrics.MetricsUtility;
+import azkaban.metrics.MetricsManager;
 import com.codahale.metrics.Meter;
-import com.codahale.metrics.MetricRegistry;
 import com.google.inject.Inject;
 import com.google.inject.Singleton;
 import java.util.concurrent.atomic.AtomicLong;
@@ -31,8 +30,6 @@ import java.util.concurrent.atomic.AtomicLong;
 @Singleton
 public class WebMetrics {
 
-  private final MetricRegistry registry;
-
   private final Meter webGetCall;
   private final Meter webPostCall;
 
@@ -40,26 +37,26 @@ public class WebMetrics {
   private final AtomicLong logFetchLatency = new AtomicLong(0L);
 
   @Inject
-  WebMetrics(final MetricRegistry registry) {
-    this.registry = registry;
-    this.webGetCall = MetricsUtility.addMeter("Web-Get-Call-Meter", this.registry);
-    this.webPostCall = MetricsUtility.addMeter("Web-Post-Call-Meter", this.registry);
-    MetricsUtility.addGauge("fetchLogLatency", this.registry, this.logFetchLatency::get);
+  WebMetrics(final MetricsManager metricsManager) {
+    this.webGetCall = metricsManager.addMeter("Web-Get-Call-Meter");
+    this.webPostCall = metricsManager.addMeter("Web-Post-Call-Meter");
+
+    metricsManager.addGauge("fetchLogLatency", this.logFetchLatency::get);
   }
 
+  /**
+   * Mark the occurrence of a GET call
+   *
+   * This method should be Thread Safe.
+   * Two reasons that we don't make this function call synchronized:
+   * 1). drop wizard metrics deals with concurrency internally;
+   * 2). mark is basically a math addition operation, which should not cause race condition issue.
+   */
   public void markWebGetCall() {
-
-    /*
-     * This method should be Thread Safe.
-     * Two reasons that we don't make this function call synchronized:
-     * 1). drop wizard metrics deals with concurrency internally;
-     * 2). mark is basically a math addition operation, which should not cause race condition issue.
-     */
     this.webGetCall.mark();
   }
 
   public void markWebPostCall() {
-
     this.webPostCall.mark();
   }
 
diff --git a/azkaban-web-server/src/test/java/azkaban/webapp/WebMetricsTest.java b/azkaban-web-server/src/test/java/azkaban/webapp/WebMetricsTest.java
index 3e502fa..eb34af3 100644
--- a/azkaban-web-server/src/test/java/azkaban/webapp/WebMetricsTest.java
+++ b/azkaban-web-server/src/test/java/azkaban/webapp/WebMetricsTest.java
@@ -18,6 +18,7 @@ package azkaban.webapp;
 
 import static org.junit.Assert.assertEquals;
 
+import azkaban.metrics.MetricsManager;
 import azkaban.metrics.MetricsTestUtility;
 import com.codahale.metrics.MetricRegistry;
 import org.junit.Before;
@@ -33,7 +34,7 @@ public class WebMetricsTest {
   public void setUp() {
     final MetricRegistry metricRegistry = new MetricRegistry();
     this.testUtil = new MetricsTestUtility(metricRegistry);
-    this.metrics = new WebMetrics(metricRegistry);
+    this.metrics = new WebMetrics(new MetricsManager(metricRegistry));
   }
 
   @Test