Details
diff --git a/azkaban-common/src/main/java/azkaban/executor/ConnectorParams.java b/azkaban-common/src/main/java/azkaban/executor/ConnectorParams.java
index 8e933ef..8bc725f 100644
--- a/azkaban-common/src/main/java/azkaban/executor/ConnectorParams.java
+++ b/azkaban-common/src/main/java/azkaban/executor/ConnectorParams.java
@@ -87,7 +87,7 @@ public interface ConnectorParams {
public static final String JMX_HOSTPORT = "hostPort";
public static final String STATS_GET_ALLMETRICSNAME = "getAllMetricNames";
- public static final String STATS_GET_METRICHISTORY = "metricHistory";
+ public static final String STATS_GET_METRICHISTORY = "getMetricHistory";
public static final String STATS_SET_REPORTINGINTERVAL = "changeMetricInterval";
public static final String STATS_SET_CLEANINGINTERVAL = "changeCleaningInterval";
public static final String STATS_SET_MAXREPORTERPOINTS = "changeEmitterPoints";
diff --git a/azkaban-common/src/main/java/azkaban/metric/AbstractMetric.java b/azkaban-common/src/main/java/azkaban/metric/AbstractMetric.java
index dabcc49..3eb4491 100644
--- a/azkaban-common/src/main/java/azkaban/metric/AbstractMetric.java
+++ b/azkaban-common/src/main/java/azkaban/metric/AbstractMetric.java
@@ -23,11 +23,11 @@ import org.apache.log4j.Logger;
* @param <T> Type of Value of a given metric
*/
public abstract class AbstractMetric<T> implements IMetric<T>, Cloneable{
- protected static final Logger _logger = Logger.getLogger(MetricReportManager.class);
- protected String _name;
- protected T _value;
- protected String _type;
- protected MetricReportManager _metricManager;
+ protected static final Logger logger = Logger.getLogger(MetricReportManager.class);
+ protected String name;
+ protected T value;
+ protected String type;
+ protected MetricReportManager metricManager;
/**
* @param metricName Name of metric
@@ -36,10 +36,10 @@ public abstract class AbstractMetric<T> implements IMetric<T>, Cloneable{
* @param manager Metric Manager whom a metric will report to
*/
protected AbstractMetric(String metricName, String metricType, T initialValue, MetricReportManager manager) {
- _name = metricName;
- _type = metricType;
- _value = initialValue;
- _metricManager = manager;
+ name = metricName;
+ type = metricType;
+ value = initialValue;
+ metricManager = manager;
}
/**
@@ -47,7 +47,7 @@ public abstract class AbstractMetric<T> implements IMetric<T>, Cloneable{
* @see azkaban.metric.IMetric#getName()
*/
public String getName() {
- return _name;
+ return name;
}
/**
@@ -55,7 +55,7 @@ public abstract class AbstractMetric<T> implements IMetric<T>, Cloneable{
* @see azkaban.metric.IMetric#getValueType()
*/
public String getValueType() {
- return _type;
+ return type;
}
/**
@@ -63,7 +63,7 @@ public abstract class AbstractMetric<T> implements IMetric<T>, Cloneable{
* @see azkaban.metric.IMetric#updateMetricManager(azkaban.metric.MetricReportManager)
*/
public void updateMetricManager(final MetricReportManager manager) {
- _metricManager = manager;
+ metricManager = manager;
}
/**
@@ -80,7 +80,7 @@ public abstract class AbstractMetric<T> implements IMetric<T>, Cloneable{
* @see azkaban.metric.IMetric#getValue()
*/
public T getValue() {
- return _value;
+ return value;
}
/**
@@ -91,11 +91,11 @@ public abstract class AbstractMetric<T> implements IMetric<T>, Cloneable{
* @see azkaban.metric.IMetric#notifyManager()
*/
public void notifyManager() {
- _logger.debug(String.format("Notifying Manager for %s", this.getClass().getName()));
+ logger.debug(String.format("Notifying Manager for %s", this.getClass().getName()));
try {
- _metricManager.reportMetric(this);
- } catch (NullPointerException ex) {
- _logger.error(String.format("Metric Manager is not set for %s metric %s", this.getClass().getName(), ex.toString()));
+ metricManager.reportMetric(this);
+ } catch (Throwable ex) {
+ logger.error(String.format("Metric Manager is not set for %s metric", this.getClass().getName()), ex);
}
}
}
diff --git a/azkaban-common/src/main/java/azkaban/metric/GangliaMetricEmitter.java b/azkaban-common/src/main/java/azkaban/metric/GangliaMetricEmitter.java
index 6aaf007..5e954b7 100644
--- a/azkaban-common/src/main/java/azkaban/metric/GangliaMetricEmitter.java
+++ b/azkaban-common/src/main/java/azkaban/metric/GangliaMetricEmitter.java
@@ -26,13 +26,13 @@ import azkaban.utils.Props;
*/
public class GangliaMetricEmitter implements IMetricEmitter {
private static final String GANGLIA_METRIC_REPORTER_PATH = "azkaban.metric.ganglia.path";
- private String _gmetricPath;
+ private String gmetricPath;
/**
* @param azkProps Azkaban Properties
*/
public GangliaMetricEmitter(Props azkProps) {
- _gmetricPath = azkProps.get(GANGLIA_METRIC_REPORTER_PATH);
+ gmetricPath = azkProps.get(GANGLIA_METRIC_REPORTER_PATH);
}
private String buildCommand(IMetric<?> metric) {
@@ -40,7 +40,7 @@ public class GangliaMetricEmitter implements IMetricEmitter {
synchronized (metric) {
cmd =
- String.format("%s -t %s -n %s -v %s", _gmetricPath, metric.getValueType(), metric.getName(), metric.getValue()
+ String.format("%s -t %s -n %s -v %s", gmetricPath, metric.getValueType(), metric.getName(), metric.getValue()
.toString());
}
@@ -53,23 +53,28 @@ public class GangliaMetricEmitter implements IMetricEmitter {
* @see azkaban.metric.IMetricEmitter#reportMetric(azkaban.metric.IMetric)
*/
@Override
- public void reportMetric(final IMetric<?> metric) throws Exception {
+ public void reportMetric(final IMetric<?> metric) throws MetricException {
String gangliaCommand = buildCommand(metric);
if (gangliaCommand != null) {
// executes shell command to report metric to ganglia dashboard
- Process emission = Runtime.getRuntime().exec(gangliaCommand);
- int exitCode = emission.waitFor();
- if (exitCode != 0) {
- throw new RuntimeException("Failed to report metric using gmetric");
+ try {
+ Process emission = Runtime.getRuntime().exec(gangliaCommand);
+ int exitCode;
+ exitCode = emission.waitFor();
+ if (exitCode != 0) {
+ throw new MetricException("Failed to report metric using gmetric");
+ }
+ } catch (Exception e) {
+ throw new MetricException("Failed to report metric using gmetric");
}
} else {
- throw new Exception("Failed to build ganglia Command");
+ throw new MetricException("Failed to build ganglia Command");
}
}
@Override
- public void purgeAllData() throws Exception {
+ public void purgeAllData() throws MetricException {
}
}
diff --git a/azkaban-common/src/main/java/azkaban/metric/IMetricEmitter.java b/azkaban-common/src/main/java/azkaban/metric/IMetricEmitter.java
index 2436625..bdc874b 100644
--- a/azkaban-common/src/main/java/azkaban/metric/IMetricEmitter.java
+++ b/azkaban-common/src/main/java/azkaban/metric/IMetricEmitter.java
@@ -20,6 +20,6 @@ package azkaban.metric;
* Interface for metric emitters
*/
public interface IMetricEmitter {
- void reportMetric(final IMetric<?> metric) throws Exception;
- void purgeAllData() throws Exception;
+ void reportMetric(final IMetric<?> metric) throws MetricException;
+ void purgeAllData() throws MetricException;
}
\ No newline at end of file
diff --git a/azkaban-common/src/main/java/azkaban/metric/inmemoryemitter/InMemoryHistoryNode.java b/azkaban-common/src/main/java/azkaban/metric/inmemoryemitter/InMemoryHistoryNode.java
index 8e0cce5..b8f1f6e 100644
--- a/azkaban-common/src/main/java/azkaban/metric/inmemoryemitter/InMemoryHistoryNode.java
+++ b/azkaban-common/src/main/java/azkaban/metric/inmemoryemitter/InMemoryHistoryNode.java
@@ -22,23 +22,23 @@ import java.util.Date;
* A snapshot of metric's value
*/
public class InMemoryHistoryNode {
- private Object _value;
- private Date _date;
+ private Object value;
+ private Date date;
/**
* Takes snapshot of the metric with a given value
* @param val
*/
public InMemoryHistoryNode(final Object val) {
- _value = val;
- _date = new Date();
+ value = val;
+ date = new Date();
}
public Object getValue() {
- return _value;
+ return value;
}
public Date getTimestamp() {
- return _date;
+ return date;
}
}
diff --git a/azkaban-common/src/main/java/azkaban/metric/inmemoryemitter/InMemoryMetricEmitter.java b/azkaban-common/src/main/java/azkaban/metric/inmemoryemitter/InMemoryMetricEmitter.java
index 5ada55a..e93d11b 100644
--- a/azkaban-common/src/main/java/azkaban/metric/inmemoryemitter/InMemoryMetricEmitter.java
+++ b/azkaban-common/src/main/java/azkaban/metric/inmemoryemitter/InMemoryMetricEmitter.java
@@ -28,7 +28,9 @@ import org.apache.log4j.Logger;
import azkaban.metric.IMetric;
import azkaban.metric.IMetricEmitter;
+import azkaban.metric.MetricException;
import azkaban.utils.Props;
+
import org.apache.commons.math3.stat.descriptive.DescriptiveStatistics;
@@ -37,35 +39,35 @@ import org.apache.commons.math3.stat.descriptive.DescriptiveStatistics;
* This is also the default metric emitter and used by /stats servlet
*/
public class InMemoryMetricEmitter implements IMetricEmitter {
- protected static final Logger _logger = Logger.getLogger(InMemoryMetricEmitter.class);
+ protected static final Logger logger = Logger.getLogger(InMemoryMetricEmitter.class);
/**
* Data structure to keep track of snapshots
*/
- protected Map<String, LinkedList<InMemoryHistoryNode>> _historyListMapping;
+ protected Map<String, LinkedList<InMemoryHistoryNode>> historyListMapping;
private static final String INMEMORY_METRIC_REPORTER_WINDOW = "azkaban.metric.inmemory.interval";
private static final String INMEMORY_METRIC_NUM_INSTANCES = "azkaban.metric.inmemory.maxinstances";
private static final String INMEMORY_METRIC_STANDARDDEVIATION_FACTOR =
"azkaban.metric.inmemory.standardDeviationFactor";
- private double _standardDeviationFactor;
+ private double standardDeviationFactor;
/**
* Interval (in millisecond) from today for which we should maintain the in memory snapshots
*/
- private long _timeWindow;
+ private long timeWindow;
/**
* Maximum number of snapshots that should be displayed on /stats servlet
*/
- private long _numInstances;
+ private long numInstances;
/**
* @param azkProps Azkaban Properties
*/
public InMemoryMetricEmitter(Props azkProps) {
- _historyListMapping = new HashMap<String, LinkedList<InMemoryHistoryNode>>();
- _timeWindow = azkProps.getLong(INMEMORY_METRIC_REPORTER_WINDOW, 60 * 60 * 24 * 7 * 1000);
- _numInstances = azkProps.getLong(INMEMORY_METRIC_NUM_INSTANCES, 50);
- _standardDeviationFactor = azkProps.getDouble(INMEMORY_METRIC_STANDARDDEVIATION_FACTOR, 2);
+ historyListMapping = new HashMap<String, LinkedList<InMemoryHistoryNode>>();
+ timeWindow = azkProps.getLong(INMEMORY_METRIC_REPORTER_WINDOW, 60 * 60 * 24 * 7 * 1000);
+ numInstances = azkProps.getLong(INMEMORY_METRIC_NUM_INSTANCES, 50);
+ standardDeviationFactor = azkProps.getDouble(INMEMORY_METRIC_STANDARDDEVIATION_FACTOR, 2);
}
/**
@@ -73,7 +75,7 @@ public class InMemoryMetricEmitter implements IMetricEmitter {
* @param val interval in milli seconds
*/
public synchronized void setReportingInterval(long val) {
- _timeWindow = val;
+ timeWindow = val;
}
/**
@@ -81,7 +83,7 @@ public class InMemoryMetricEmitter implements IMetricEmitter {
* @param num
*/
public void setReportingInstances(long num) {
- _numInstances = num;
+ numInstances = num;
}
/**
@@ -90,16 +92,16 @@ public class InMemoryMetricEmitter implements IMetricEmitter {
* @see azkaban.metric.IMetricEmitter#reportMetric(azkaban.metric.IMetric)
*/
@Override
- public void reportMetric(final IMetric<?> metric) throws Exception {
+ public void reportMetric(final IMetric<?> metric) throws MetricException {
String metricName = metric.getName();
- if (!_historyListMapping.containsKey(metricName)) {
- _logger.info("First time capturing metric: " + metricName);
- _historyListMapping.put(metricName, new LinkedList<InMemoryHistoryNode>());
+ if (!historyListMapping.containsKey(metricName)) {
+ logger.info("First time capturing metric: " + metricName);
+ historyListMapping.put(metricName, new LinkedList<InMemoryHistoryNode>());
}
- synchronized (_historyListMapping.get(metricName)) {
- _logger.debug("Ingesting metric: " + metricName);
- _historyListMapping.get(metricName).add(new InMemoryHistoryNode(metric.getValue()));
- cleanUsingTime(metricName, _historyListMapping.get(metricName).peekLast().getTimestamp());
+ synchronized (historyListMapping.get(metricName)) {
+ logger.debug("Ingesting metric: " + metricName);
+ historyListMapping.get(metricName).add(new InMemoryHistoryNode(metric.getValue()));
+ cleanUsingTime(metricName, historyListMapping.get(metricName).peekLast().getTimestamp());
}
}
@@ -111,14 +113,14 @@ public class InMemoryMetricEmitter implements IMetricEmitter {
* @param useStats get statistically significant points only
* @return List of snapshots
*/
- public List<InMemoryHistoryNode> getDrawMetric(final String metricName, final Date from, final Date to,
+ public List<InMemoryHistoryNode> getMetrics(final String metricName, final Date from, final Date to,
final Boolean useStats) throws ClassCastException {
LinkedList<InMemoryHistoryNode> selectedLists = new LinkedList<InMemoryHistoryNode>();
- if (_historyListMapping.containsKey(metricName)) {
+ if (historyListMapping.containsKey(metricName)) {
- _logger.debug("selecting snapshots within time frame");
- synchronized (_historyListMapping.get(metricName)) {
- for (InMemoryHistoryNode node : _historyListMapping.get(metricName)) {
+ logger.debug("selecting snapshots within time frame");
+ synchronized (historyListMapping.get(metricName)) {
+ for (InMemoryHistoryNode node : historyListMapping.get(metricName)) {
if (node.getTimestamp().after(from) && node.getTimestamp().before(to)) {
selectedLists.add(node);
}
@@ -145,7 +147,7 @@ public class InMemoryMetricEmitter implements IMetricEmitter {
*/
private void statBasedSelectMetricHistory(final LinkedList<InMemoryHistoryNode> selectedLists)
throws ClassCastException {
- _logger.debug("selecting snapshots which are far away from mean value");
+ logger.debug("selecting snapshots which are far away from mean value");
DescriptiveStatistics descStats = getDescriptiveStatistics(selectedLists);
Double mean = descStats.getMean();
Double std = descStats.getStandardDeviation();
@@ -155,7 +157,7 @@ public class InMemoryMetricEmitter implements IMetricEmitter {
InMemoryHistoryNode currentNode = ite.next();
double value = ((Number) currentNode.getValue()).doubleValue();
// remove all elements which lies in 95% value band
- if (value < mean + _standardDeviationFactor * std && value > mean - _standardDeviationFactor * std) {
+ if (value < mean + standardDeviationFactor * std && value > mean - standardDeviationFactor * std) {
ite.remove();
}
}
@@ -175,9 +177,9 @@ public class InMemoryMetricEmitter implements IMetricEmitter {
* @param selectedLists list of snapshots
*/
private void generalSelectMetricHistory(final LinkedList<InMemoryHistoryNode> selectedLists) {
- _logger.debug("selecting snapshots evenly from across the time interval");
- if (selectedLists.size() > _numInstances) {
- double step = (double) selectedLists.size() / _numInstances;
+ logger.debug("selecting snapshots evenly from across the time interval");
+ if (selectedLists.size() > numInstances) {
+ double step = (double) selectedLists.size() / numInstances;
long nextIndex = 0, currentIndex = 0, numSelectedInstances = 1;
Iterator<InMemoryHistoryNode> ite = selectedLists.iterator();
while (ite.hasNext()) {
@@ -199,23 +201,23 @@ public class InMemoryMetricEmitter implements IMetricEmitter {
* @param firstAllowedDate End date of the interval
*/
private void cleanUsingTime(final String metricName, final Date firstAllowedDate) {
- if (_historyListMapping.containsKey(metricName) && _historyListMapping.get(metricName) != null) {
- synchronized (_historyListMapping.get(metricName)) {
+ if (historyListMapping.containsKey(metricName) && historyListMapping.get(metricName) != null) {
+ synchronized (historyListMapping.get(metricName)) {
- InMemoryHistoryNode firstNode = _historyListMapping.get(metricName).peekFirst();
+ InMemoryHistoryNode firstNode = historyListMapping.get(metricName).peekFirst();
long localCopyOfTimeWindow = 0;
// go ahead for clean up using latest possible value of interval
// any interval change will not affect on going clean up
synchronized (this) {
- localCopyOfTimeWindow = _timeWindow;
+ localCopyOfTimeWindow = timeWindow;
}
// removing objects older than Interval time from firstAllowedDate
while (firstNode != null
&& TimeUnit.MILLISECONDS.toMillis(firstAllowedDate.getTime() - firstNode.getTimestamp().getTime()) > localCopyOfTimeWindow) {
- _historyListMapping.get(metricName).removeFirst();
- firstNode = _historyListMapping.get(metricName).peekFirst();
+ historyListMapping.get(metricName).removeFirst();
+ firstNode = historyListMapping.get(metricName).peekFirst();
}
}
}
@@ -227,7 +229,7 @@ public class InMemoryMetricEmitter implements IMetricEmitter {
* @see azkaban.metric.IMetricEmitter#purgeAllData()
*/
@Override
- public void purgeAllData() throws Exception {
- _historyListMapping.clear();
+ public void purgeAllData() throws MetricException {
+ historyListMapping.clear();
}
}
diff --git a/azkaban-common/src/main/java/azkaban/metric/MetricException.java b/azkaban-common/src/main/java/azkaban/metric/MetricException.java
new file mode 100644
index 0000000..6decb4a
--- /dev/null
+++ b/azkaban-common/src/main/java/azkaban/metric/MetricException.java
@@ -0,0 +1,37 @@
+/*
+ * Copyright 2012 LinkedIn Corp.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package azkaban.metric;
+
+/**
+ * Exception for Azkaban's Metric Component
+ */
+public class MetricException extends Exception {
+ private static final long serialVersionUID = 1L;
+
+ public MetricException(String message) {
+ super(message);
+ }
+
+ public MetricException(Throwable cause) {
+ super(cause);
+ }
+
+ public MetricException(String message, Throwable cause) {
+ super(message, cause);
+ }
+
+}
diff --git a/azkaban-common/src/main/java/azkaban/metric/MetricReportManager.java b/azkaban-common/src/main/java/azkaban/metric/MetricReportManager.java
index 31d12c1..9ae5c03 100644
--- a/azkaban-common/src/main/java/azkaban/metric/MetricReportManager.java
+++ b/azkaban-common/src/main/java/azkaban/metric/MetricReportManager.java
@@ -39,30 +39,30 @@ public class MetricReportManager {
* Maximum number of metrics reporting threads
*/
private static final int MAX_EMITTER_THREADS = 4;
- private static final Logger _logger = Logger.getLogger(MetricReportManager.class);
+ private static final Logger logger = Logger.getLogger(MetricReportManager.class);
/**
* List of all the metrics that Azkaban is tracking
* Manager is not concerned with type of metric as long as it honors IMetric contracts
*/
- private List<IMetric<?>> _metrics;
+ private List<IMetric<?>> metrics;
/**
* List of all the emitter listening all the metrics
* Manager is not concerned with how emitter is reporting value.
* Manager is only responsible to notify all emitters whenever an IMetric wants to be notified
*/
- private List<IMetricEmitter> _metricEmitters;
- private ExecutorService _executorService;
+ private List<IMetricEmitter> metricEmitters;
+ private ExecutorService executorService;
// Singleton variable
- private static volatile MetricReportManager _instance = null;
- private static boolean _isManagerEnabled;
+ private static volatile MetricReportManager instance = null;
+ private static volatile boolean isManagerEnabled;
private MetricReportManager() {
- _logger.debug("Instantiating Metric Manager");
- _executorService = Executors.newFixedThreadPool(MAX_EMITTER_THREADS);
- _metrics = new ArrayList<IMetric<?>>();
- _metricEmitters = new LinkedList<IMetricEmitter>();
+ logger.debug("Instantiating Metric Manager");
+ executorService = Executors.newFixedThreadPool(MAX_EMITTER_THREADS);
+ metrics = new ArrayList<IMetric<?>>();
+ metricEmitters = new LinkedList<IMetricEmitter>();
enableManager();
}
@@ -70,22 +70,22 @@ public class MetricReportManager {
* @return true, if we have enabled metric manager from Azkaban exec server
*/
public static boolean isAvailable() {
- return _instance != null && _isManagerEnabled;
+ return instance != null && isManagerEnabled;
}
/**
* Get a singleton object for Metric Manager
*/
public static MetricReportManager getInstance() {
- if (_instance == null) {
+ if (instance == null) {
synchronized (MetricReportManager.class) {
- if (_instance == null) {
- _logger.info("Instantiating MetricReportManager");
- _instance = new MetricReportManager();
+ if (instance == null) {
+ logger.info("Instantiating MetricReportManager");
+ instance = new MetricReportManager();
}
}
}
- return _instance;
+ return instance;
}
/***
@@ -100,24 +100,22 @@ public class MetricReportManager {
synchronized (metric) {
metricSnapshot = metric.getSnapshot();
}
- _logger.debug(String.format("Submitting %s metric for metric emission pool", metricSnapshot.getName()));
+ logger.debug(String.format("Submitting %s metric for metric emission pool", metricSnapshot.getName()));
// report to all emitters
- for (final IMetricEmitter metricEmitter : _metricEmitters) {
- _executorService.submit(new Runnable() {
+ for (final IMetricEmitter metricEmitter : metricEmitters) {
+ executorService.submit(new Runnable() {
@Override
public void run() {
try {
metricEmitter.reportMetric(metricSnapshot);
} catch (Exception ex) {
- _logger.error(String.format("Failed to report %s metric due to %s", metricSnapshot.getName(),
- ex.toString()));
+ logger.error(String.format("Failed to report %s metric due to ", metricSnapshot.getName()), ex);
}
}
});
}
} catch (CloneNotSupportedException ex) {
- _logger.error(String.format("Failed to take snapshot for %s metric %s", metric.getClass().getName(),
- ex.toString()));
+ logger.error(String.format("Failed to take snapshot for %s metric", metric.getClass().getName()), ex);
}
}
}
@@ -127,7 +125,7 @@ public class MetricReportManager {
* @param emitter
*/
public void addMetricEmitter(final IMetricEmitter emitter) {
- _metricEmitters.add(emitter);
+ metricEmitters.add(emitter);
}
/**
@@ -135,7 +133,7 @@ public class MetricReportManager {
* @param emitter
*/
public void removeMetricEmitter(final IMetricEmitter emitter) {
- _metricEmitters.remove(emitter);
+ metricEmitters.remove(emitter);
}
/**
@@ -143,7 +141,7 @@ public class MetricReportManager {
* @return
*/
public List<IMetricEmitter> getMetricEmitters() {
- return _metricEmitters;
+ return metricEmitters;
}
/**
@@ -152,12 +150,15 @@ public class MetricReportManager {
*/
public void addMetric(final IMetric<?> metric) {
// metric null or already present
- if (metric != null && getMetricFromName(metric.getName()) == null) {
- _logger.debug(String.format("Adding %s metric in Metric Manager", metric.getName()));
- _metrics.add(metric);
+ if(metric == null)
+ throw new IllegalArgumentException("Cannot add a null metric");
+
+ if (getMetricFromName(metric.getName()) == null) {
+ logger.debug(String.format("Adding %s metric in Metric Manager", metric.getName()));
+ metrics.add(metric);
metric.updateMetricManager(this);
} else {
- _logger.error("Failed to add metric");
+ logger.error("Failed to add metric");
}
}
@@ -169,7 +170,7 @@ public class MetricReportManager {
public IMetric<?> getMetricFromName(final String name) {
IMetric<?> metric = null;
if (name != null) {
- for (IMetric<?> currentMetric : _metrics) {
+ for (IMetric<?> currentMetric : metrics) {
if (currentMetric.getName().equals(name)) {
metric = currentMetric;
break;
@@ -184,26 +185,26 @@ public class MetricReportManager {
* @return
*/
public List<IMetric<?>> getAllMetrics() {
- return _metrics;
+ return metrics;
}
public void enableManager() {
- _logger.info("Enabling Metric Manager");
- _isManagerEnabled = true;
+ logger.info("Enabling Metric Manager");
+ isManagerEnabled = true;
}
/**
* Disable Metric Manager and ask all emitters to purge all available data.
*/
public void disableManager() {
- _logger.info("Disabling Metric Manager");
- if (_isManagerEnabled) {
- _isManagerEnabled = false;
- for (IMetricEmitter emitter : _metricEmitters) {
+ logger.info("Disabling Metric Manager");
+ if (isManagerEnabled) {
+ isManagerEnabled = false;
+ for (IMetricEmitter emitter : metricEmitters) {
try {
emitter.purgeAllData();
- } catch (Exception ex) {
- _logger.error("Failed to purge data " + ex.toString());
+ } catch (MetricException ex) {
+ logger.error("Failed to purge data ", ex);
}
}
}
@@ -215,6 +216,6 @@ public class MetricReportManager {
* @see java.lang.Object#finalize()
*/
protected void finalize() {
- _executorService.shutdown();
+ executorService.shutdown();
}
}
diff --git a/azkaban-common/src/main/java/azkaban/metric/TimeBasedReportingMetric.java b/azkaban-common/src/main/java/azkaban/metric/TimeBasedReportingMetric.java
index ece9c97..69c23ec 100644
--- a/azkaban-common/src/main/java/azkaban/metric/TimeBasedReportingMetric.java
+++ b/azkaban-common/src/main/java/azkaban/metric/TimeBasedReportingMetric.java
@@ -24,7 +24,7 @@ import java.util.TimerTask;
* @param <T> Type of Value of a given metric
*/
public abstract class TimeBasedReportingMetric<T> extends AbstractMetric<T> {
- private Timer _timer;
+ private Timer timer;
/**
* @param metricName Name of metric
@@ -36,8 +36,8 @@ public abstract class TimeBasedReportingMetric<T> extends AbstractMetric<T> {
public TimeBasedReportingMetric(String metricName, String metricType, T initialValue, MetricReportManager manager,
long interval) {
super(metricName, metricType, initialValue, manager);
- _timer = new Timer();
- _timer.schedule(getTimerTask(), interval, interval);
+ timer = new Timer();
+ timer.schedule(getTimerTask(), interval, interval);
}
/**
@@ -64,10 +64,10 @@ public abstract class TimeBasedReportingMetric<T> extends AbstractMetric<T> {
* @param interval
*/
public void updateInterval(final long interval) {
- _logger.debug(String.format("Updating tracking interval to %d milisecond for %s metric", interval, getName()));
- _timer.cancel();
- _timer = new Timer();
- _timer.schedule(getTimerTask(), interval, interval);
+ logger.debug(String.format("Updating tracking interval to %d milisecond for %s metric", interval, getName()));
+ timer.cancel();
+ timer = new Timer();
+ timer.schedule(getTimerTask(), interval, interval);
}
/**
diff --git a/azkaban-common/src/test/java/azkaban/metric/FakeMetric.java b/azkaban-common/src/test/java/azkaban/metric/FakeMetric.java
index 3918038..4043cb7 100644
--- a/azkaban-common/src/test/java/azkaban/metric/FakeMetric.java
+++ b/azkaban-common/src/test/java/azkaban/metric/FakeMetric.java
@@ -13,10 +13,10 @@ public class FakeMetric extends AbstractMetric<Integer>{
public int hashCode() {
final int prime = 31;
int result = 1;
- result = prime * result + ((_metricManager == null) ? 0 : _metricManager.hashCode());
- result = prime * result + ((_name == null) ? 0 : _name.hashCode());
- result = prime * result + ((_type == null) ? 0 : _type.hashCode());
- result = prime * result + ((_value == null) ? 0 : _value.hashCode());
+ result = prime * result + ((metricManager == null) ? 0 : metricManager.hashCode());
+ result = prime * result + ((name == null) ? 0 : name.hashCode());
+ result = prime * result + ((type == null) ? 0 : type.hashCode());
+ result = prime * result + ((value == null) ? 0 : value.hashCode());
return result;
}
@@ -29,25 +29,25 @@ public class FakeMetric extends AbstractMetric<Integer>{
if (!(obj instanceof FakeMetric))
return false;
FakeMetric other = (FakeMetric) obj;
- if (_metricManager == null) {
- if (other._metricManager != null)
+ if (metricManager == null) {
+ if (other.metricManager != null)
return false;
- } else if (!_metricManager.equals(other._metricManager))
+ } else if (!metricManager.equals(other.metricManager))
return false;
- if (_name == null) {
- if (other._name != null)
+ if (name == null) {
+ if (other.name != null)
return false;
- } else if (!_name.equals(other._name))
+ } else if (!name.equals(other.name))
return false;
- if (_type == null) {
- if (other._type != null)
+ if (type == null) {
+ if (other.type != null)
return false;
- } else if (!_type.equals(other._type))
+ } else if (!type.equals(other.type))
return false;
- if (_value == null) {
- if (other._value != null)
+ if (value == null) {
+ if (other.value != null)
return false;
- } else if (!_value.equals(other._value))
+ } else if (!value.equals(other.value))
return false;
return true;
}
diff --git a/azkaban-common/src/test/java/azkaban/metric/MetricManagerTest.java b/azkaban-common/src/test/java/azkaban/metric/MetricManagerTest.java
index 7fd7da2..45bfc5f 100644
--- a/azkaban-common/src/test/java/azkaban/metric/MetricManagerTest.java
+++ b/azkaban-common/src/test/java/azkaban/metric/MetricManagerTest.java
@@ -82,7 +82,7 @@ public class MetricManagerTest {
}
}
Date to = new Date();
- List<InMemoryHistoryNode> nodes = emitter.getDrawMetric("FakeMetric", from, to, false);
+ List<InMemoryHistoryNode> nodes = emitter.getMetrics("FakeMetric", from, to, false);
assertEquals("Failed to report metric", 1, nodes.size());
assertEquals("Failed to report metric", nodes.get(0).getValue(), 4);
diff --git a/azkaban-execserver/src/main/java/azkaban/execapp/metric/NumFailedFlowMetric.java b/azkaban-execserver/src/main/java/azkaban/execapp/metric/NumFailedFlowMetric.java
index a570a10..f0e9b6b 100644
--- a/azkaban-execserver/src/main/java/azkaban/execapp/metric/NumFailedFlowMetric.java
+++ b/azkaban-execserver/src/main/java/azkaban/execapp/metric/NumFailedFlowMetric.java
@@ -24,14 +24,16 @@ import azkaban.executor.Status;
import azkaban.metric.MetricReportManager;
import azkaban.metric.TimeBasedReportingMetric;
-
+/**
+ * Metric to keep track of number of failed flows in between the tracking events
+ */
public class NumFailedFlowMetric extends TimeBasedReportingMetric<Integer> implements EventListener {
public static final String NUM_FAILED_FLOW_METRIC_NAME = "NumFailedFlowMetric";
private static final String NUM_FAILED_FLOW_METRIC_TYPE = "uint16";
public NumFailedFlowMetric(MetricReportManager manager, long interval) {
super(NUM_FAILED_FLOW_METRIC_NAME, NUM_FAILED_FLOW_METRIC_TYPE, 0, manager, interval);
- _logger.debug("Instantiated NumFailedJobMetric");
+ logger.debug("Instantiated NumFailedJobMetric");
}
/**
@@ -44,7 +46,7 @@ public class NumFailedFlowMetric extends TimeBasedReportingMetric<Integer> imple
if (event.getType() == Type.FLOW_FINISHED) {
FlowRunner runner = (FlowRunner) event.getRunner();
if (runner != null && runner.getExecutableFlow().getStatus().equals(Status.FAILED)) {
- _value = _value + 1;
+ value = value + 1;
}
}
}
@@ -56,7 +58,7 @@ public class NumFailedFlowMetric extends TimeBasedReportingMetric<Integer> imple
@Override
protected synchronized void postTrackingEventMethod() {
- _value = 0;
+ value = 0;
}
}
diff --git a/azkaban-execserver/src/main/java/azkaban/execapp/metric/NumFailedJobMetric.java b/azkaban-execserver/src/main/java/azkaban/execapp/metric/NumFailedJobMetric.java
index 9578c50..6e16899 100644
--- a/azkaban-execserver/src/main/java/azkaban/execapp/metric/NumFailedJobMetric.java
+++ b/azkaban-execserver/src/main/java/azkaban/execapp/metric/NumFailedJobMetric.java
@@ -33,7 +33,7 @@ public class NumFailedJobMetric extends TimeBasedReportingMetric<Integer> implem
public NumFailedJobMetric(MetricReportManager manager, long interval) {
super(NUM_FAILED_JOB_METRIC_NAME, NUM_FAILED_JOB_METRIC_TYPE, 0, manager, interval);
- _logger.debug("Instantiated NumFailedJobMetric");
+ logger.debug("Instantiated NumFailedJobMetric");
}
/**
@@ -45,7 +45,7 @@ public class NumFailedJobMetric extends TimeBasedReportingMetric<Integer> implem
public synchronized void handleEvent(Event event) {
JobRunner runner = (JobRunner) event.getRunner();
if (event.getType() == Type.JOB_FINISHED && runner.getStatus().equals(Status.FAILED)) {
- _value = _value + 1;
+ value = value + 1;
}
}
@@ -56,7 +56,7 @@ public class NumFailedJobMetric extends TimeBasedReportingMetric<Integer> implem
@Override
protected synchronized void postTrackingEventMethod() {
- _value = 0;
+ value = 0;
}
}
diff --git a/azkaban-execserver/src/main/java/azkaban/execapp/metric/NumQueuedFlowMetric.java b/azkaban-execserver/src/main/java/azkaban/execapp/metric/NumQueuedFlowMetric.java
index a7326c8..192a4ce 100644
--- a/azkaban-execserver/src/main/java/azkaban/execapp/metric/NumQueuedFlowMetric.java
+++ b/azkaban-execserver/src/main/java/azkaban/execapp/metric/NumQueuedFlowMetric.java
@@ -20,11 +20,14 @@ import azkaban.execapp.FlowRunnerManager;
import azkaban.metric.MetricReportManager;
import azkaban.metric.TimeBasedReportingMetric;
+/**
+ * Metric to keep track of number of queued flows in Azkaban exec server
+ */
public class NumQueuedFlowMetric extends TimeBasedReportingMetric<Integer> {
public static final String NUM_QUEUED_FLOW_METRIC_NAME = "NumQueuedFlowMetric";
private static final String NUM_QUEUED_FLOW_METRIC_TYPE = "uint16";
- private FlowRunnerManager _flowManager;
+ private FlowRunnerManager flowManager;
/**
* @param flowRunnerManager Flow runner manager
@@ -33,8 +36,8 @@ public class NumQueuedFlowMetric extends TimeBasedReportingMetric<Integer> {
*/
public NumQueuedFlowMetric(FlowRunnerManager flowRunnerManager, MetricReportManager manager, long interval) {
super(NUM_QUEUED_FLOW_METRIC_NAME, NUM_QUEUED_FLOW_METRIC_TYPE, 0, manager, interval);
- _logger.debug("Instantiated NumQueuedFlowMetric");
- _flowManager = flowRunnerManager;
+ logger.debug("Instantiated NumQueuedFlowMetric");
+ flowManager = flowRunnerManager;
}
/**
@@ -44,7 +47,7 @@ public class NumQueuedFlowMetric extends TimeBasedReportingMetric<Integer> {
*/
@Override
protected synchronized void preTrackingEventMethod() {
- _value = _flowManager.getNumQueuedFlows();
+ value = flowManager.getNumQueuedFlows();
}
@Override
diff --git a/azkaban-execserver/src/main/java/azkaban/execapp/metric/NumRunningFlowMetric.java b/azkaban-execserver/src/main/java/azkaban/execapp/metric/NumRunningFlowMetric.java
index d17471c..b611151 100644
--- a/azkaban-execserver/src/main/java/azkaban/execapp/metric/NumRunningFlowMetric.java
+++ b/azkaban-execserver/src/main/java/azkaban/execapp/metric/NumRunningFlowMetric.java
@@ -27,7 +27,7 @@ public class NumRunningFlowMetric extends TimeBasedReportingMetric<Integer> {
public static final String NUM_RUNNING_FLOW_METRIC_NAME = "NumRunningFlowMetric";
private static final String NUM_RUNNING_FLOW_METRIC_TYPE = "uint16";
- private FlowRunnerManager _flowManager;
+ private FlowRunnerManager flowManager;
/**
* @param flowRunnerManager Flow runner manager
@@ -36,8 +36,8 @@ public class NumRunningFlowMetric extends TimeBasedReportingMetric<Integer> {
*/
public NumRunningFlowMetric(FlowRunnerManager flowRunnerManager, MetricReportManager manager, long interval) {
super(NUM_RUNNING_FLOW_METRIC_NAME, NUM_RUNNING_FLOW_METRIC_TYPE, 0, manager, interval);
- _logger.debug("Instantiated NumRunningFlowMetric");
- _flowManager = flowRunnerManager;
+ logger.debug("Instantiated NumRunningFlowMetric");
+ flowManager = flowRunnerManager;
}
/**
@@ -47,7 +47,7 @@ public class NumRunningFlowMetric extends TimeBasedReportingMetric<Integer> {
*/
@Override
protected synchronized void preTrackingEventMethod() {
- _value = _flowManager.getNumRunningFlows();
+ value = flowManager.getNumRunningFlows();
}
@Override
diff --git a/azkaban-execserver/src/main/java/azkaban/execapp/metric/NumRunningJobMetric.java b/azkaban-execserver/src/main/java/azkaban/execapp/metric/NumRunningJobMetric.java
index 6ef1f38..e9a07f8 100644
--- a/azkaban-execserver/src/main/java/azkaban/execapp/metric/NumRunningJobMetric.java
+++ b/azkaban-execserver/src/main/java/azkaban/execapp/metric/NumRunningJobMetric.java
@@ -35,7 +35,7 @@ public class NumRunningJobMetric extends TimeBasedReportingMetric<Integer> imple
*/
public NumRunningJobMetric(MetricReportManager manager, long interval) {
super(NUM_RUNNING_JOB_METRIC_NAME, NUM_RUNNING_JOB_METRIC_TYPE, 0, manager, interval);
- _logger.debug("Instantiated NumRunningJobMetric");
+ logger.debug("Instantiated NumRunningJobMetric");
}
/**
@@ -46,9 +46,9 @@ public class NumRunningJobMetric extends TimeBasedReportingMetric<Integer> imple
@Override
public synchronized void handleEvent(Event event) {
if (event.getType() == Type.JOB_STARTED) {
- _value = _value + 1;
+ value = value + 1;
} else if (event.getType() == Type.JOB_FINISHED) {
- _value = _value - 1;
+ value = value - 1;
}
}
diff --git a/azkaban-execserver/src/main/java/azkaban/execapp/StatsServlet.java b/azkaban-execserver/src/main/java/azkaban/execapp/StatsServlet.java
index 59d78f9..177d26b 100644
--- a/azkaban-execserver/src/main/java/azkaban/execapp/StatsServlet.java
+++ b/azkaban-execserver/src/main/java/azkaban/execapp/StatsServlet.java
@@ -51,7 +51,7 @@ import azkaban.utils.JSONUtils;
*/
public class StatsServlet extends HttpServlet implements ConnectorParams {
private static final long serialVersionUID = 2L;
- private static final Logger _logger = Logger.getLogger(StatsServlet.class);
+ private static final Logger logger = Logger.getLogger(StatsServlet.class);
public boolean hasParam(HttpServletRequest request, String param) {
return HttpRequestUtils.hasParam(request, param);
@@ -108,7 +108,7 @@ public class StatsServlet extends HttpServlet implements ConnectorParams {
private void handleChangeManagerStatusRequest(HttpServletRequest req, Map<String, Object> ret,
boolean enableMetricManager) {
try {
- _logger.info("Updating metric manager status");
+ logger.info("Updating metric manager status");
if (MetricReportManager.isAvailable()) {
MetricReportManager metricManager = MetricReportManager.getInstance();
if (enableMetricManager) {
@@ -121,7 +121,7 @@ public class StatsServlet extends HttpServlet implements ConnectorParams {
ret.put(RESPONSE_ERROR, "MetricManager is not available");
}
} catch (Exception e) {
- _logger.error(e);
+ logger.error(e);
ret.put(RESPONSE_ERROR, e.getMessage());
}
}
@@ -141,7 +141,7 @@ public class StatsServlet extends HttpServlet implements ConnectorParams {
ret.put(RESPONSE_ERROR, "MetricManager is not available");
}
} catch (Exception e) {
- _logger.error(e);
+ logger.error(e);
ret.put(RESPONSE_ERROR, e.getMessage());
}
}
@@ -161,7 +161,7 @@ public class StatsServlet extends HttpServlet implements ConnectorParams {
ret.put(RESPONSE_ERROR, "MetricManager is not available");
}
} catch (Exception e) {
- _logger.error(e);
+ logger.error(e);
ret.put(RESPONSE_ERROR, e.getMessage());
}
}
@@ -179,7 +179,7 @@ public class StatsServlet extends HttpServlet implements ConnectorParams {
if (memoryEmitter != null) {
try {
List<InMemoryHistoryNode> result =
- memoryEmitter.getDrawMetric(getParam(req, STATS_MAP_METRICNAMEPARAM),
+ memoryEmitter.getMetrics(getParam(req, STATS_MAP_METRICNAMEPARAM),
parseDate(getParam(req, STATS_MAP_STARTDATE)), parseDate(getParam(req, STATS_MAP_ENDDATE)),
getBooleanParam(req, STATS_MAP_METRICRETRIEVALMODE));
@@ -248,7 +248,7 @@ public class StatsServlet extends HttpServlet implements ConnectorParams {
ret.put(RESPONSE_ERROR, "MetricManager is not available");
}
} catch (Exception e) {
- _logger.error(e);
+ logger.error(e);
ret.put(RESPONSE_ERROR, e.getMessage());
}
}
diff --git a/azkaban-webserver/src/main/resources/azkaban/webapp/servlet/velocity/statsPage.vm b/azkaban-webserver/src/main/resources/azkaban/webapp/servlet/velocity/statsPage.vm
index f853453..a54d0f2 100644
--- a/azkaban-webserver/src/main/resources/azkaban/webapp/servlet/velocity/statsPage.vm
+++ b/azkaban-webserver/src/main/resources/azkaban/webapp/servlet/velocity/statsPage.vm
@@ -32,7 +32,7 @@
function refreshMetricChart() {
var requestURL = '/stats';
var requestData = {
- 'action': 'metricHistory',
+ 'action': 'getMetricHistory',
'from': new Date($('#datetimebegin').val()).toUTCString(),
'to' : new Date($('#datetimeend').val()).toUTCString(),
'metricName': $('#metricName').val(),