azkaban-memoizeit

Adding unit tests for metricManager

12/23/2014 11:25:16 PM

Details

diff --git a/azkaban-common/src/main/java/azkaban/metric/inmemoryemitter/InMemoryMetricEmitter.java b/azkaban-common/src/main/java/azkaban/metric/inmemoryemitter/InMemoryMetricEmitter.java
index c7753e9..14d3b9c 100644
--- a/azkaban-common/src/main/java/azkaban/metric/inmemoryemitter/InMemoryMetricEmitter.java
+++ b/azkaban-common/src/main/java/azkaban/metric/inmemoryemitter/InMemoryMetricEmitter.java
@@ -42,7 +42,7 @@ public class InMemoryMetricEmitter implements IMetricEmitter {
   /**
    * Data structure to keep track of snapshots
    */
-  Map<String, LinkedList<InMemoryHistoryNode>> _historyListMapping;
+  protected Map<String, LinkedList<InMemoryHistoryNode>> _historyListMapping;
   private static final String INMEMORY_METRIC_REPORTER_WINDOW = "azkaban.metric.inmemory.interval";
   private static final String INMEMORY_METRIC_NUM_INSTANCES = "azkaban.metric.inmemory.maxinstances";
   private static final String INMEMORY_METRIC_STANDARDDEVIATION_FACTOR =
diff --git a/azkaban-common/src/main/java/azkaban/metric/MetricReportManager.java b/azkaban-common/src/main/java/azkaban/metric/MetricReportManager.java
index b4a8590..b91004a 100644
--- a/azkaban-common/src/main/java/azkaban/metric/MetricReportManager.java
+++ b/azkaban-common/src/main/java/azkaban/metric/MetricReportManager.java
@@ -55,7 +55,7 @@ public class MetricReportManager {
   private ExecutorService _executorService;
   // Singleton variable
   private static volatile MetricReportManager _instance = null;
-  boolean _isManagerEnabled;
+  private static boolean _isManagerEnabled;
 
   private MetricReportManager() {
     _logger.debug("Instantiating Metric Manager");
@@ -68,8 +68,8 @@ public class MetricReportManager {
   /**
    * @return true, if we have enabled metric manager from Azkaban exec server
    */
-  public static boolean isInstantiated() {
-    return _instance != null;
+  public static boolean isAvailable() {
+    return _instance != null && _isManagerEnabled;
   }
 
   /**
@@ -89,7 +89,7 @@ public class MetricReportManager {
 
   // each element of metrics List is responsible to call this method and report metrics
   public void reportMetric(final IMetric<?> metric) {
-    if (metric != null && _isManagerEnabled) {
+    if (metric != null && isAvailable()) {
 
       // Report metric to all the emitters
       synchronized (metric) {
diff --git a/azkaban-common/src/test/java/azkaban/metric/FakeMetric.java b/azkaban-common/src/test/java/azkaban/metric/FakeMetric.java
new file mode 100644
index 0000000..4a3cc90
--- /dev/null
+++ b/azkaban-common/src/test/java/azkaban/metric/FakeMetric.java
@@ -0,0 +1,23 @@
+package azkaban.metric;
+
+/**
+ * Dummy Metric to test Azkaban Metrics
+ */
+public class FakeMetric extends AbstractMetric<Integer>{
+
+  public FakeMetric(MetricReportManager manager) {
+    super("FakeMetric", "int", 4, manager);
+  }
+
+  @Override
+  public boolean equals(Object obj) {
+      if (obj == this) {
+          return true;
+      }
+      if (obj == null || obj.getClass() != this.getClass()) {
+          return false;
+      }
+      FakeMetric metric = (FakeMetric) obj;
+      return metric.getName() == getName() && metric.getValue() == getValue();
+  }
+}
diff --git a/azkaban-common/src/test/java/azkaban/metric/MetricManagerTest.java b/azkaban-common/src/test/java/azkaban/metric/MetricManagerTest.java
new file mode 100644
index 0000000..043bf26
--- /dev/null
+++ b/azkaban-common/src/test/java/azkaban/metric/MetricManagerTest.java
@@ -0,0 +1,90 @@
+package azkaban.metric;
+
+import java.util.Date;
+import java.util.List;
+
+import org.junit.Before;
+import org.junit.Test;
+
+import azkaban.metric.inmemoryemitter.InMemoryHistoryNode;
+import azkaban.metric.inmemoryemitter.InMemoryMetricEmitter;
+import azkaban.utils.Props;
+import static org.junit.Assert.*;
+
+/**
+ * Azkaban Metric Manager Tests
+ */
+public class MetricManagerTest {
+  MetricReportManager manager;
+  FakeMetric metric;
+  InMemoryMetricEmitter emitter;
+
+  @Before
+  public void setUp() throws Exception {
+    manager = MetricReportManager.getInstance();
+    metric = new FakeMetric(manager);
+    manager.addMetric(metric);
+    emitter = new InMemoryMetricEmitter(new Props());
+    manager.addMetricEmitter(emitter);
+  }
+
+  /**
+   * Test enable disable and status methods
+   */
+  @Test
+  public void managerStatusTest() {
+    assertNotNull("Singleton Failed to instantiate", manager);
+    assertTrue("Failed to enable metric manager", MetricReportManager.isAvailable());
+    manager.disableManager();
+    assertFalse("Failed to disable metric manager", MetricReportManager.isAvailable());
+    manager.enableManager();
+    assertTrue("Failed to enable metric manager", MetricReportManager.isAvailable());
+  }
+
+  /**
+   * Test adding and accessing metric methods
+   */
+  @Test
+  public void managerMetricMaintenanceTest() {
+    assertEquals("Failed to add metric", manager.getAllMetrics().size(), 1);
+    assertTrue("Failed to add metric", manager.getAllMetrics().contains(metric));
+    assertEquals("Failed to get metric by Name", manager.getMetricFromName("FakeMetric"), metric);
+  }
+
+  /**
+   * Test adding, removing and accessing metric emitter.
+   */
+  @Test
+  public void managerEmitterMaintenanceTest() {
+    assertTrue("Failed to add Emitter", manager.getMetricEmitters().contains(emitter));
+
+    int originalSize = manager.getMetricEmitters().size();
+    manager.removeMetricEmitter(emitter);
+    assertEquals("Failed to remove emitter", manager.getMetricEmitters().size(), originalSize - 1);
+    manager.addMetricEmitter(emitter);
+  }
+
+  /**
+   * Test metric reporting methods, including InMemoryMetricEmitter methods
+   * @throws Exception
+   */
+  @Test
+  public void managerEmitterHandlingTest() throws Exception {
+    emitter.purgeAllData();
+    metric.notifyManager();
+
+    Date from = new Date();
+    synchronized (this) {
+      try {
+        wait(2000);
+      } catch (InterruptedException e) {
+        e.printStackTrace();
+      }
+    }
+    Date to = new Date();
+    List<InMemoryHistoryNode> nodes = emitter.getDrawMetric("FakeMetric", from, to, false);
+
+    assertEquals("Failed to report metric", 1, nodes.size());
+    assertEquals("Failed to report metric", nodes.get(0).getValue(), 4);
+  }
+}
diff --git a/azkaban-execserver/src/main/java/azkaban/execapp/FlowRunner.java b/azkaban-execserver/src/main/java/azkaban/execapp/FlowRunner.java
index 226a68a..e98f5f2 100644
--- a/azkaban-execserver/src/main/java/azkaban/execapp/FlowRunner.java
+++ b/azkaban-execserver/src/main/java/azkaban/execapp/FlowRunner.java
@@ -792,7 +792,7 @@ public class FlowRunner extends EventHandler implements Runnable {
    */
   private void configureJobLevelMetrics(JobRunner jobRunner) {
     logger.info("Configuring Azkaban metrics tracking for jobrunner object");
-    if (MetricReportManager.isInstantiated()) {
+    if (MetricReportManager.isAvailable()) {
       MetricReportManager metricManager = MetricReportManager.getInstance();
 
       //Adding NumRunningJobMetric listener
diff --git a/azkaban-execserver/src/main/java/azkaban/execapp/FlowRunnerManager.java b/azkaban-execserver/src/main/java/azkaban/execapp/FlowRunnerManager.java
index 9aa5420..d052d99 100644
--- a/azkaban-execserver/src/main/java/azkaban/execapp/FlowRunnerManager.java
+++ b/azkaban-execserver/src/main/java/azkaban/execapp/FlowRunnerManager.java
@@ -520,7 +520,7 @@ public class FlowRunnerManager implements EventListener,
   private void configureFlowLevelMetrics(FlowRunner flowRunner) {
     logger.info("Configuring Azkaban metrics tracking for flow runner object");
 
-    if (MetricReportManager.isInstantiated()) {
+    if (MetricReportManager.isAvailable()) {
       MetricReportManager metricManager = MetricReportManager.getInstance();
       //Adding NumFailedFlow Metric listener
       flowRunner.addListener((NumFailedFlowMetric) metricManager
diff --git a/azkaban-execserver/src/main/java/azkaban/execapp/StatsServlet.java b/azkaban-execserver/src/main/java/azkaban/execapp/StatsServlet.java
index aaa92a0..5962471 100644
--- a/azkaban-execserver/src/main/java/azkaban/execapp/StatsServlet.java
+++ b/azkaban-execserver/src/main/java/azkaban/execapp/StatsServlet.java
@@ -44,6 +44,7 @@ import azkaban.server.HttpRequestUtils;
 import azkaban.server.ServerConstants;
 import azkaban.utils.JSONUtils;
 
+
 /**
  * Servlet to communicate with Azkaban exec server
  * This servlet get requests from stats servlet in Azkaban Web server
@@ -108,18 +109,21 @@ public class StatsServlet extends HttpServlet implements ConnectorParams {
    * enable or disable metric Manager
    * A disable will also purge all data from all metric emitters
    */
-  private void handleChangeManagerStatusRequest(HttpServletRequest req, Map<String, Object> ret, boolean enableMetricManager) {
+  private void handleChangeManagerStatusRequest(HttpServletRequest req, Map<String, Object> ret,
+      boolean enableMetricManager) {
     try {
       _logger.info("Updating metric manager status");
-      if (MetricReportManager.isInstantiated()) {
+      if (MetricReportManager.isAvailable()) {
         MetricReportManager metricManager = MetricReportManager.getInstance();
         if (enableMetricManager) {
           metricManager.enableManager();
         } else {
           metricManager.disableManager();
         }
+        ret.put(STATUS_PARAM, RESPONSE_SUCCESS);
+      } else {
+        ret.put(RESPONSE_ERROR, "MetricManager is not available");
       }
-      ret.put(STATUS_PARAM, RESPONSE_SUCCESS);
     } catch (Exception e) {
       _logger.error(e);
       ret.put(RESPONSE_ERROR, e.getMessage());
@@ -132,12 +136,14 @@ public class StatsServlet extends HttpServlet implements ConnectorParams {
   private void handleChangeEmitterPoints(HttpServletRequest req, Map<String, Object> ret) {
     try {
       long numInstance = getLongParam(req, STATS_MAP_EMITTERNUMINSTANCES);
-      if (MetricReportManager.isInstantiated()) {
+      if (MetricReportManager.isAvailable()) {
         MetricReportManager metricManager = MetricReportManager.getInstance();
         InMemoryMetricEmitter memoryEmitter = extractInMemoryMetricEmitter(metricManager);
         memoryEmitter.setReportingInstances(numInstance);
+        ret.put(STATUS_PARAM, RESPONSE_SUCCESS);
+      } else {
+        ret.put(RESPONSE_ERROR, "MetricManager is not available");
       }
-      ret.put(STATUS_PARAM, RESPONSE_SUCCESS);
     } catch (Exception e) {
       _logger.error(e);
       ret.put(RESPONSE_ERROR, e.getMessage());
@@ -150,12 +156,14 @@ public class StatsServlet extends HttpServlet implements ConnectorParams {
   private void handleChangeCleaningInterval(HttpServletRequest req, Map<String, Object> ret) {
     try {
       long newInterval = getLongParam(req, STATS_MAP_CLEANINGINTERVAL);
-      if (MetricReportManager.isInstantiated()) {
+      if (MetricReportManager.isAvailable()) {
         MetricReportManager metricManager = MetricReportManager.getInstance();
         InMemoryMetricEmitter memoryEmitter = extractInMemoryMetricEmitter(metricManager);
         memoryEmitter.setReportingInterval(newInterval);
+        ret.put(STATUS_PARAM, RESPONSE_SUCCESS);
+      } else {
+        ret.put(RESPONSE_ERROR, "MetricManager is not available");
       }
-      ret.put(STATUS_PARAM, RESPONSE_SUCCESS);
     } catch (Exception e) {
       _logger.error(e);
       ret.put(RESPONSE_ERROR, e.getMessage());
@@ -167,7 +175,7 @@ public class StatsServlet extends HttpServlet implements ConnectorParams {
    * @throws ServletException
    */
   private void handleGetMetricHistory(HttpServletRequest req, Map<String, Object> ret) throws ServletException {
-    if (MetricReportManager.isInstantiated()) {
+    if (MetricReportManager.isAvailable()) {
       MetricReportManager metricManager = MetricReportManager.getInstance();
       InMemoryMetricEmitter memoryEmitter = extractInMemoryMetricEmitter(metricManager);
 
@@ -192,7 +200,7 @@ public class StatsServlet extends HttpServlet implements ConnectorParams {
         ret.put(RESPONSE_ERROR, "InMemoryMetricEmitter not instantiated");
       }
     } else {
-      ret.put(RESPONSE_ERROR, "MetricReportManager not instantiated");
+      ret.put(RESPONSE_ERROR, "MetricReportManager is not available");
     }
   }
 
@@ -214,7 +222,7 @@ public class StatsServlet extends HttpServlet implements ConnectorParams {
    * Get all the metrics tracked by metric manager
    */
   private void handleGetAllMMetricsName(HttpServletRequest req, Map<String, Object> ret) {
-    if (MetricReportManager.isInstantiated()) {
+    if (MetricReportManager.isAvailable()) {
       MetricReportManager metricManager = MetricReportManager.getInstance();
       List<IMetric<?>> result = metricManager.getAllMetrics();
       if (result.size() == 0) {
@@ -223,7 +231,7 @@ public class StatsServlet extends HttpServlet implements ConnectorParams {
         ret.put("data", result);
       }
     } else {
-      ret.put(RESPONSE_ERROR, "MetricReportManager not instantiated");
+      ret.put(RESPONSE_ERROR, "MetricReportManager is not available");
     }
   }
 
@@ -235,12 +243,14 @@ public class StatsServlet extends HttpServlet implements ConnectorParams {
     try {
       String metricName = getParam(req, STATS_MAP_METRICNAMEPARAM);
       long newInterval = getLongParam(req, STATS_MAP_REPORTINGINTERVAL);
-      if (MetricReportManager.isInstantiated()) {
+      if (MetricReportManager.isAvailable()) {
         MetricReportManager metricManager = MetricReportManager.getInstance();
         TimeBasedReportingMetric<?> metric = (TimeBasedReportingMetric<?>) metricManager.getMetricFromName(metricName);
         metric.updateInterval(newInterval);
+        ret.put(STATUS_PARAM, RESPONSE_SUCCESS);
+      } else {
+        ret.put(RESPONSE_ERROR, "MetricManager is not available");
       }
-      ret.put(STATUS_PARAM, RESPONSE_SUCCESS);
     } catch (Exception e) {
       _logger.error(e);
       ret.put(RESPONSE_ERROR, e.getMessage());