Details
diff --git a/azkaban-common/src/main/java/azkaban/metric/inmemoryemitter/InMemoryMetricEmitter.java b/azkaban-common/src/main/java/azkaban/metric/inmemoryemitter/InMemoryMetricEmitter.java
index c7753e9..14d3b9c 100644
--- a/azkaban-common/src/main/java/azkaban/metric/inmemoryemitter/InMemoryMetricEmitter.java
+++ b/azkaban-common/src/main/java/azkaban/metric/inmemoryemitter/InMemoryMetricEmitter.java
@@ -42,7 +42,7 @@ public class InMemoryMetricEmitter implements IMetricEmitter {
/**
* Data structure to keep track of snapshots
*/
- Map<String, LinkedList<InMemoryHistoryNode>> _historyListMapping;
+ protected Map<String, LinkedList<InMemoryHistoryNode>> _historyListMapping;
private static final String INMEMORY_METRIC_REPORTER_WINDOW = "azkaban.metric.inmemory.interval";
private static final String INMEMORY_METRIC_NUM_INSTANCES = "azkaban.metric.inmemory.maxinstances";
private static final String INMEMORY_METRIC_STANDARDDEVIATION_FACTOR =
diff --git a/azkaban-common/src/main/java/azkaban/metric/MetricReportManager.java b/azkaban-common/src/main/java/azkaban/metric/MetricReportManager.java
index b4a8590..b91004a 100644
--- a/azkaban-common/src/main/java/azkaban/metric/MetricReportManager.java
+++ b/azkaban-common/src/main/java/azkaban/metric/MetricReportManager.java
@@ -55,7 +55,7 @@ public class MetricReportManager {
private ExecutorService _executorService;
// Singleton variable
private static volatile MetricReportManager _instance = null;
- boolean _isManagerEnabled;
+ private static boolean _isManagerEnabled;
private MetricReportManager() {
_logger.debug("Instantiating Metric Manager");
@@ -68,8 +68,8 @@ public class MetricReportManager {
/**
* @return true, if we have enabled metric manager from Azkaban exec server
*/
- public static boolean isInstantiated() {
- return _instance != null;
+ public static boolean isAvailable() {
+ return _instance != null && _isManagerEnabled;
}
/**
@@ -89,7 +89,7 @@ public class MetricReportManager {
// each element of metrics List is responsible to call this method and report metrics
public void reportMetric(final IMetric<?> metric) {
- if (metric != null && _isManagerEnabled) {
+ if (metric != null && isAvailable()) {
// Report metric to all the emitters
synchronized (metric) {
diff --git a/azkaban-common/src/test/java/azkaban/metric/FakeMetric.java b/azkaban-common/src/test/java/azkaban/metric/FakeMetric.java
new file mode 100644
index 0000000..4a3cc90
--- /dev/null
+++ b/azkaban-common/src/test/java/azkaban/metric/FakeMetric.java
@@ -0,0 +1,23 @@
+package azkaban.metric;
+
+/**
+ * Dummy Metric to test Azkaban Metrics
+ */
+public class FakeMetric extends AbstractMetric<Integer>{
+
+ public FakeMetric(MetricReportManager manager) {
+ super("FakeMetric", "int", 4, manager);
+ }
+
+ @Override
+ public boolean equals(Object obj) {
+ if (obj == this) {
+ return true;
+ }
+ if (obj == null || obj.getClass() != this.getClass()) {
+ return false;
+ }
+ FakeMetric metric = (FakeMetric) obj;
+ return metric.getName() == getName() && metric.getValue() == getValue();
+ }
+}
diff --git a/azkaban-common/src/test/java/azkaban/metric/MetricManagerTest.java b/azkaban-common/src/test/java/azkaban/metric/MetricManagerTest.java
new file mode 100644
index 0000000..043bf26
--- /dev/null
+++ b/azkaban-common/src/test/java/azkaban/metric/MetricManagerTest.java
@@ -0,0 +1,90 @@
+package azkaban.metric;
+
+import java.util.Date;
+import java.util.List;
+
+import org.junit.Before;
+import org.junit.Test;
+
+import azkaban.metric.inmemoryemitter.InMemoryHistoryNode;
+import azkaban.metric.inmemoryemitter.InMemoryMetricEmitter;
+import azkaban.utils.Props;
+import static org.junit.Assert.*;
+
+/**
+ * Azkaban Metric Manager Tests
+ */
+public class MetricManagerTest {
+ MetricReportManager manager;
+ FakeMetric metric;
+ InMemoryMetricEmitter emitter;
+
+ @Before
+ public void setUp() throws Exception {
+ manager = MetricReportManager.getInstance();
+ metric = new FakeMetric(manager);
+ manager.addMetric(metric);
+ emitter = new InMemoryMetricEmitter(new Props());
+ manager.addMetricEmitter(emitter);
+ }
+
+ /**
+ * Test enable disable and status methods
+ */
+ @Test
+ public void managerStatusTest() {
+ assertNotNull("Singleton Failed to instantiate", manager);
+ assertTrue("Failed to enable metric manager", MetricReportManager.isAvailable());
+ manager.disableManager();
+ assertFalse("Failed to disable metric manager", MetricReportManager.isAvailable());
+ manager.enableManager();
+ assertTrue("Failed to enable metric manager", MetricReportManager.isAvailable());
+ }
+
+ /**
+ * Test adding and accessing metric methods
+ */
+ @Test
+ public void managerMetricMaintenanceTest() {
+ assertEquals("Failed to add metric", manager.getAllMetrics().size(), 1);
+ assertTrue("Failed to add metric", manager.getAllMetrics().contains(metric));
+ assertEquals("Failed to get metric by Name", manager.getMetricFromName("FakeMetric"), metric);
+ }
+
+ /**
+ * Test adding, removing and accessing metric emitter.
+ */
+ @Test
+ public void managerEmitterMaintenanceTest() {
+ assertTrue("Failed to add Emitter", manager.getMetricEmitters().contains(emitter));
+
+ int originalSize = manager.getMetricEmitters().size();
+ manager.removeMetricEmitter(emitter);
+ assertEquals("Failed to remove emitter", manager.getMetricEmitters().size(), originalSize - 1);
+ manager.addMetricEmitter(emitter);
+ }
+
+ /**
+ * Test metric reporting methods, including InMemoryMetricEmitter methods
+ * @throws Exception
+ */
+ @Test
+ public void managerEmitterHandlingTest() throws Exception {
+ emitter.purgeAllData();
+ metric.notifyManager();
+
+ Date from = new Date();
+ synchronized (this) {
+ try {
+ wait(2000);
+ } catch (InterruptedException e) {
+ e.printStackTrace();
+ }
+ }
+ Date to = new Date();
+ List<InMemoryHistoryNode> nodes = emitter.getDrawMetric("FakeMetric", from, to, false);
+
+ assertEquals("Failed to report metric", 1, nodes.size());
+ assertEquals("Failed to report metric", nodes.get(0).getValue(), 4);
+ }
+}
diff --git a/azkaban-execserver/src/main/java/azkaban/execapp/FlowRunner.java b/azkaban-execserver/src/main/java/azkaban/execapp/FlowRunner.java
index 226a68a..e98f5f2 100644
--- a/azkaban-execserver/src/main/java/azkaban/execapp/FlowRunner.java
+++ b/azkaban-execserver/src/main/java/azkaban/execapp/FlowRunner.java
@@ -792,7 +792,7 @@ public class FlowRunner extends EventHandler implements Runnable {
*/
private void configureJobLevelMetrics(JobRunner jobRunner) {
logger.info("Configuring Azkaban metrics tracking for jobrunner object");
- if (MetricReportManager.isInstantiated()) {
+ if (MetricReportManager.isAvailable()) {
MetricReportManager metricManager = MetricReportManager.getInstance();
//Adding NumRunningJobMetric listener
diff --git a/azkaban-execserver/src/main/java/azkaban/execapp/FlowRunnerManager.java b/azkaban-execserver/src/main/java/azkaban/execapp/FlowRunnerManager.java
index 9aa5420..d052d99 100644
--- a/azkaban-execserver/src/main/java/azkaban/execapp/FlowRunnerManager.java
+++ b/azkaban-execserver/src/main/java/azkaban/execapp/FlowRunnerManager.java
@@ -520,7 +520,7 @@ public class FlowRunnerManager implements EventListener,
private void configureFlowLevelMetrics(FlowRunner flowRunner) {
logger.info("Configuring Azkaban metrics tracking for flow runner object");
- if (MetricReportManager.isInstantiated()) {
+ if (MetricReportManager.isAvailable()) {
MetricReportManager metricManager = MetricReportManager.getInstance();
//Adding NumFailedFlow Metric listener
flowRunner.addListener((NumFailedFlowMetric) metricManager
diff --git a/azkaban-execserver/src/main/java/azkaban/execapp/StatsServlet.java b/azkaban-execserver/src/main/java/azkaban/execapp/StatsServlet.java
index aaa92a0..5962471 100644
--- a/azkaban-execserver/src/main/java/azkaban/execapp/StatsServlet.java
+++ b/azkaban-execserver/src/main/java/azkaban/execapp/StatsServlet.java
@@ -44,6 +44,7 @@ import azkaban.server.HttpRequestUtils;
import azkaban.server.ServerConstants;
import azkaban.utils.JSONUtils;
+
/**
* Servlet to communicate with Azkaban exec server
* This servlet get requests from stats servlet in Azkaban Web server
@@ -108,18 +109,21 @@ public class StatsServlet extends HttpServlet implements ConnectorParams {
* enable or disable metric Manager
* A disable will also purge all data from all metric emitters
*/
- private void handleChangeManagerStatusRequest(HttpServletRequest req, Map<String, Object> ret, boolean enableMetricManager) {
+ private void handleChangeManagerStatusRequest(HttpServletRequest req, Map<String, Object> ret,
+ boolean enableMetricManager) {
try {
_logger.info("Updating metric manager status");
- if (MetricReportManager.isInstantiated()) {
+ if (MetricReportManager.isAvailable()) {
MetricReportManager metricManager = MetricReportManager.getInstance();
if (enableMetricManager) {
metricManager.enableManager();
} else {
metricManager.disableManager();
}
+ ret.put(STATUS_PARAM, RESPONSE_SUCCESS);
+ } else {
+ ret.put(RESPONSE_ERROR, "MetricManager is not available");
}
- ret.put(STATUS_PARAM, RESPONSE_SUCCESS);
} catch (Exception e) {
_logger.error(e);
ret.put(RESPONSE_ERROR, e.getMessage());
@@ -132,12 +136,14 @@ public class StatsServlet extends HttpServlet implements ConnectorParams {
private void handleChangeEmitterPoints(HttpServletRequest req, Map<String, Object> ret) {
try {
long numInstance = getLongParam(req, STATS_MAP_EMITTERNUMINSTANCES);
- if (MetricReportManager.isInstantiated()) {
+ if (MetricReportManager.isAvailable()) {
MetricReportManager metricManager = MetricReportManager.getInstance();
InMemoryMetricEmitter memoryEmitter = extractInMemoryMetricEmitter(metricManager);
memoryEmitter.setReportingInstances(numInstance);
+ ret.put(STATUS_PARAM, RESPONSE_SUCCESS);
+ } else {
+ ret.put(RESPONSE_ERROR, "MetricManager is not available");
}
- ret.put(STATUS_PARAM, RESPONSE_SUCCESS);
} catch (Exception e) {
_logger.error(e);
ret.put(RESPONSE_ERROR, e.getMessage());
@@ -150,12 +156,14 @@ public class StatsServlet extends HttpServlet implements ConnectorParams {
private void handleChangeCleaningInterval(HttpServletRequest req, Map<String, Object> ret) {
try {
long newInterval = getLongParam(req, STATS_MAP_CLEANINGINTERVAL);
- if (MetricReportManager.isInstantiated()) {
+ if (MetricReportManager.isAvailable()) {
MetricReportManager metricManager = MetricReportManager.getInstance();
InMemoryMetricEmitter memoryEmitter = extractInMemoryMetricEmitter(metricManager);
memoryEmitter.setReportingInterval(newInterval);
+ ret.put(STATUS_PARAM, RESPONSE_SUCCESS);
+ } else {
+ ret.put(RESPONSE_ERROR, "MetricManager is not available");
}
- ret.put(STATUS_PARAM, RESPONSE_SUCCESS);
} catch (Exception e) {
_logger.error(e);
ret.put(RESPONSE_ERROR, e.getMessage());
@@ -167,7 +175,7 @@ public class StatsServlet extends HttpServlet implements ConnectorParams {
* @throws ServletException
*/
private void handleGetMetricHistory(HttpServletRequest req, Map<String, Object> ret) throws ServletException {
- if (MetricReportManager.isInstantiated()) {
+ if (MetricReportManager.isAvailable()) {
MetricReportManager metricManager = MetricReportManager.getInstance();
InMemoryMetricEmitter memoryEmitter = extractInMemoryMetricEmitter(metricManager);
@@ -192,7 +200,7 @@ public class StatsServlet extends HttpServlet implements ConnectorParams {
ret.put(RESPONSE_ERROR, "InMemoryMetricEmitter not instantiated");
}
} else {
- ret.put(RESPONSE_ERROR, "MetricReportManager not instantiated");
+ ret.put(RESPONSE_ERROR, "MetricReportManager is not available");
}
}
@@ -214,7 +222,7 @@ public class StatsServlet extends HttpServlet implements ConnectorParams {
* Get all the metrics tracked by metric manager
*/
private void handleGetAllMMetricsName(HttpServletRequest req, Map<String, Object> ret) {
- if (MetricReportManager.isInstantiated()) {
+ if (MetricReportManager.isAvailable()) {
MetricReportManager metricManager = MetricReportManager.getInstance();
List<IMetric<?>> result = metricManager.getAllMetrics();
if (result.size() == 0) {
@@ -223,7 +231,7 @@ public class StatsServlet extends HttpServlet implements ConnectorParams {
ret.put("data", result);
}
} else {
- ret.put(RESPONSE_ERROR, "MetricReportManager not instantiated");
+ ret.put(RESPONSE_ERROR, "MetricReportManager is not available");
}
}
@@ -235,12 +243,14 @@ public class StatsServlet extends HttpServlet implements ConnectorParams {
try {
String metricName = getParam(req, STATS_MAP_METRICNAMEPARAM);
long newInterval = getLongParam(req, STATS_MAP_REPORTINGINTERVAL);
- if (MetricReportManager.isInstantiated()) {
+ if (MetricReportManager.isAvailable()) {
MetricReportManager metricManager = MetricReportManager.getInstance();
TimeBasedReportingMetric<?> metric = (TimeBasedReportingMetric<?>) metricManager.getMetricFromName(metricName);
metric.updateInterval(newInterval);
+ ret.put(STATUS_PARAM, RESPONSE_SUCCESS);
+ } else {
+ ret.put(RESPONSE_ERROR, "MetricManager is not available");
}
- ret.put(STATUS_PARAM, RESPONSE_SUCCESS);
} catch (Exception e) {
_logger.error(e);
ret.put(RESPONSE_ERROR, e.getMessage());