azkaban-memoizeit
Changes
azkaban-common/src/main/java/azkaban/metric/inmemoryemitter/InMemoryHistoryStatistics.java 71(+0 -71)
build.gradle 1(+1 -0)
Details
diff --git a/azkaban-common/src/main/java/azkaban/metric/AbstractMetric.java b/azkaban-common/src/main/java/azkaban/metric/AbstractMetric.java
index 6114d36..3226de3 100644
--- a/azkaban-common/src/main/java/azkaban/metric/AbstractMetric.java
+++ b/azkaban-common/src/main/java/azkaban/metric/AbstractMetric.java
@@ -24,10 +24,10 @@ import org.apache.log4j.Logger;
*/
public abstract class AbstractMetric<T> implements IMetric<T>, Cloneable{
protected static final Logger _logger = Logger.getLogger(MetricReportManager.class);
- protected String name;
- protected T value;
- protected String type;
- protected MetricReportManager metricManager;
+ protected String _name;
+ protected T _value;
+ protected String _type;
+ protected MetricReportManager _metricManager;
/**
* @param metricName Name of metric
@@ -36,10 +36,10 @@ public abstract class AbstractMetric<T> implements IMetric<T>, Cloneable{
* @param manager Metric Manager whom a metric will report to
*/
protected AbstractMetric(String metricName, String metricType, T initialValue, MetricReportManager manager) {
- name = metricName;
- type = metricType;
- value = initialValue;
- metricManager = manager;
+ _name = metricName;
+ _type = metricType;
+ _value = initialValue;
+ _metricManager = manager;
}
/**
@@ -47,7 +47,7 @@ public abstract class AbstractMetric<T> implements IMetric<T>, Cloneable{
* @see azkaban.metric.IMetric#getName()
*/
public String getName() {
- return name;
+ return _name;
}
/**
@@ -55,7 +55,7 @@ public abstract class AbstractMetric<T> implements IMetric<T>, Cloneable{
* @see azkaban.metric.IMetric#getValueType()
*/
public String getValueType() {
- return type;
+ return _type;
}
/**
@@ -63,7 +63,7 @@ public abstract class AbstractMetric<T> implements IMetric<T>, Cloneable{
* @see azkaban.metric.IMetric#updateMetricManager(azkaban.metric.MetricReportManager)
*/
public void updateMetricManager(final MetricReportManager manager) {
- metricManager = manager;
+ _metricManager = manager;
}
/**
@@ -71,7 +71,7 @@ public abstract class AbstractMetric<T> implements IMetric<T>, Cloneable{
* @see azkaban.metric.IMetric#getValue()
*/
public T getValue() {
- return value;
+ return _value;
}
/**
@@ -84,7 +84,7 @@ public abstract class AbstractMetric<T> implements IMetric<T>, Cloneable{
public synchronized void notifyManager() {
_logger.debug(String.format("Notifying Manager for %s", this.getClass().getName()));
try {
- metricManager.reportMetric( (IMetric<?>) this.clone());
+ _metricManager.reportMetric( (IMetric<?>) this.clone());
} catch (NullPointerException ex) {
_logger.error(String.format("Metric Manager is not set for %s metric %s", this.getClass().getName(), ex.toString()));
} catch (CloneNotSupportedException ex) {
diff --git a/azkaban-common/src/main/java/azkaban/metric/GangliaMetricEmitter.java b/azkaban-common/src/main/java/azkaban/metric/GangliaMetricEmitter.java
index a5f2ead..6aaf007 100644
--- a/azkaban-common/src/main/java/azkaban/metric/GangliaMetricEmitter.java
+++ b/azkaban-common/src/main/java/azkaban/metric/GangliaMetricEmitter.java
@@ -26,14 +26,13 @@ import azkaban.utils.Props;
*/
public class GangliaMetricEmitter implements IMetricEmitter {
private static final String GANGLIA_METRIC_REPORTER_PATH = "azkaban.metric.ganglia.path";
-
- private String gmetricPath;
+ private String _gmetricPath;
/**
* @param azkProps Azkaban Properties
*/
public GangliaMetricEmitter(Props azkProps) {
- gmetricPath = azkProps.get(GANGLIA_METRIC_REPORTER_PATH);
+ _gmetricPath = azkProps.get(GANGLIA_METRIC_REPORTER_PATH);
}
private String buildCommand(IMetric<?> metric) {
@@ -41,7 +40,7 @@ public class GangliaMetricEmitter implements IMetricEmitter {
synchronized (metric) {
cmd =
- String.format("%s -t %s -n %s -v %s", gmetricPath, metric.getValueType(), metric.getName(), metric.getValue()
+ String.format("%s -t %s -n %s -v %s", _gmetricPath, metric.getValueType(), metric.getName(), metric.getValue()
.toString());
}
diff --git a/azkaban-common/src/main/java/azkaban/metric/inmemoryemitter/InMemoryHistoryNode.java b/azkaban-common/src/main/java/azkaban/metric/inmemoryemitter/InMemoryHistoryNode.java
index b8f1f6e..8e0cce5 100644
--- a/azkaban-common/src/main/java/azkaban/metric/inmemoryemitter/InMemoryHistoryNode.java
+++ b/azkaban-common/src/main/java/azkaban/metric/inmemoryemitter/InMemoryHistoryNode.java
@@ -22,23 +22,23 @@ import java.util.Date;
* A snapshot of metric's value
*/
public class InMemoryHistoryNode {
- private Object value;
- private Date date;
+ private Object _value;
+ private Date _date;
/**
* Takes snapshot of the metric with a given value
* @param val
*/
public InMemoryHistoryNode(final Object val) {
- value = val;
- date = new Date();
+ _value = val;
+ _date = new Date();
}
public Object getValue() {
- return value;
+ return _value;
}
public Date getTimestamp() {
- return date;
+ return _date;
}
}
diff --git a/azkaban-common/src/main/java/azkaban/metric/inmemoryemitter/InMemoryMetricEmitter.java b/azkaban-common/src/main/java/azkaban/metric/inmemoryemitter/InMemoryMetricEmitter.java
index 3798689..c7753e9 100644
--- a/azkaban-common/src/main/java/azkaban/metric/inmemoryemitter/InMemoryMetricEmitter.java
+++ b/azkaban-common/src/main/java/azkaban/metric/inmemoryemitter/InMemoryMetricEmitter.java
@@ -29,6 +29,7 @@ import org.apache.log4j.Logger;
import azkaban.metric.IMetric;
import azkaban.metric.IMetricEmitter;
import azkaban.utils.Props;
+import org.apache.commons.math3.stat.descriptive.DescriptiveStatistics;
/**
@@ -36,34 +37,35 @@ import azkaban.utils.Props;
* This is also the default metric emitter and used by /stats servlet
*/
public class InMemoryMetricEmitter implements IMetricEmitter {
- protected static final Logger logger = Logger.getLogger(InMemoryMetricEmitter.class);
+ protected static final Logger _logger = Logger.getLogger(InMemoryMetricEmitter.class);
/**
* Data structure to keep track of snapshots
*/
- Map<String, LinkedList<InMemoryHistoryNode>> historyListMapping;
+ Map<String, LinkedList<InMemoryHistoryNode>> _historyListMapping;
private static final String INMEMORY_METRIC_REPORTER_WINDOW = "azkaban.metric.inmemory.interval";
private static final String INMEMORY_METRIC_NUM_INSTANCES = "azkaban.metric.inmemory.maxinstances";
- private static final String INMEMORY_METRIC_STANDARDDEVIATION_FACTOR = "azkaban.metric.inmemory.standardDeviationFactor";
+ private static final String INMEMORY_METRIC_STANDARDDEVIATION_FACTOR =
+ "azkaban.metric.inmemory.standardDeviationFactor";
- private double standardDeviationFactor;
+ private double _standardDeviationFactor;
/**
* Interval (in millisecond) from today for which we should maintain the in memory snapshots
*/
- private long timeWindow;
+ private long _timeWindow;
/**
* Maximum number of snapshots that should be displayed on /stats servlet
*/
- private long numInstances;
+ private long _numInstances;
/**
* @param azkProps Azkaban Properties
*/
public InMemoryMetricEmitter(Props azkProps) {
- historyListMapping = new HashMap<String, LinkedList<InMemoryHistoryNode>>();
- timeWindow = azkProps.getLong(INMEMORY_METRIC_REPORTER_WINDOW, 60 * 60 * 24 * 7 * 1000);
- numInstances = azkProps.getLong(INMEMORY_METRIC_NUM_INSTANCES, 50);
- standardDeviationFactor = azkProps.getDouble(INMEMORY_METRIC_STANDARDDEVIATION_FACTOR, 2);
+ _historyListMapping = new HashMap<String, LinkedList<InMemoryHistoryNode>>();
+ _timeWindow = azkProps.getLong(INMEMORY_METRIC_REPORTER_WINDOW, 60 * 60 * 24 * 7 * 1000);
+ _numInstances = azkProps.getLong(INMEMORY_METRIC_NUM_INSTANCES, 50);
+ _standardDeviationFactor = azkProps.getDouble(INMEMORY_METRIC_STANDARDDEVIATION_FACTOR, 2);
}
/**
@@ -71,7 +73,7 @@ public class InMemoryMetricEmitter implements IMetricEmitter {
* @param val interval in milli seconds
*/
public synchronized void setReportingInterval(long val) {
- timeWindow = val;
+ _timeWindow = val;
}
/**
@@ -79,7 +81,7 @@ public class InMemoryMetricEmitter implements IMetricEmitter {
* @param num
*/
public void setReportingInstances(long num) {
- numInstances = num;
+ _numInstances = num;
}
/**
@@ -90,14 +92,14 @@ public class InMemoryMetricEmitter implements IMetricEmitter {
@Override
public void reportMetric(final IMetric<?> metric) throws Exception {
String metricName = metric.getName();
- if (!historyListMapping.containsKey(metricName)) {
- logger.info("First time capturing metric: " + metricName);
- historyListMapping.put(metricName, new LinkedList<InMemoryHistoryNode>());
+ if (!_historyListMapping.containsKey(metricName)) {
+ _logger.info("First time capturing metric: " + metricName);
+ _historyListMapping.put(metricName, new LinkedList<InMemoryHistoryNode>());
}
- synchronized (historyListMapping.get(metricName)) {
- logger.debug("Ingesting metric: " + metricName);
- historyListMapping.get(metricName).add(new InMemoryHistoryNode(metric.getValue()));
- cleanUsingTime(metricName, historyListMapping.get(metricName).peekLast().getTimestamp());
+ synchronized (_historyListMapping.get(metricName)) {
+ _logger.debug("Ingesting metric: " + metricName);
+ _historyListMapping.get(metricName).add(new InMemoryHistoryNode(metric.getValue()));
+ cleanUsingTime(metricName, _historyListMapping.get(metricName).peekLast().getTimestamp());
}
}
@@ -110,13 +112,13 @@ public class InMemoryMetricEmitter implements IMetricEmitter {
* @return List of snapshots
*/
public List<InMemoryHistoryNode> getDrawMetric(final String metricName, final Date from, final Date to,
- final Boolean useStats) {
+ final Boolean useStats) throws ClassCastException {
LinkedList<InMemoryHistoryNode> selectedLists = new LinkedList<InMemoryHistoryNode>();
- if (historyListMapping.containsKey(metricName)) {
+ if (_historyListMapping.containsKey(metricName)) {
- logger.debug("selecting snapshots within time frame");
- synchronized (historyListMapping.get(metricName)) {
- for (InMemoryHistoryNode node : historyListMapping.get(metricName)) {
+ _logger.debug("selecting snapshots within time frame");
+ synchronized (_historyListMapping.get(metricName)) {
+ for (InMemoryHistoryNode node : _historyListMapping.get(metricName)) {
if (node.getTimestamp().after(from) && node.getTimestamp().before(to)) {
selectedLists.add(node);
}
@@ -141,30 +143,41 @@ public class InMemoryMetricEmitter implements IMetricEmitter {
* filter snapshots using statistically significant points only
* @param selectedLists list of snapshots
*/
- private void statBasedSelectMetricHistory(final LinkedList<InMemoryHistoryNode> selectedLists) {
- logger.debug("selecting snapshots which are far away from mean value");
- Iterator<InMemoryHistoryNode> ite = selectedLists.iterator();
- Double mean = InMemoryHistoryStatistics.mean(selectedLists);
- Double std = InMemoryHistoryStatistics.sdev(selectedLists);
+ private void statBasedSelectMetricHistory(final LinkedList<InMemoryHistoryNode> selectedLists)
+ throws ClassCastException {
+ _logger.debug("selecting snapshots which are far away from mean value");
+ DescriptiveStatistics descStats = getDescriptiveStatistics(selectedLists);
+ Double mean = descStats.getMean();
+ Double std = descStats.getStandardDeviation();
+ Iterator<InMemoryHistoryNode> ite = selectedLists.iterator();
while (ite.hasNext()) {
InMemoryHistoryNode currentNode = ite.next();
double value = ((Number) currentNode.getValue()).doubleValue();
// remove all elements which lies in 95% value band
- if (value < mean + standardDeviationFactor * std && value > mean - standardDeviationFactor * std) {
+ if (value < mean + _standardDeviationFactor * std && value > mean - _standardDeviationFactor * std) {
ite.remove();
}
}
}
+ private DescriptiveStatistics getDescriptiveStatistics(final LinkedList<InMemoryHistoryNode> selectedLists)
+ throws ClassCastException {
+ DescriptiveStatistics descStats = new DescriptiveStatistics();
+ for (InMemoryHistoryNode node : selectedLists) {
+ descStats.addValue(((Number) node.getValue()).doubleValue());
+ }
+ return descStats;
+ }
+
/**
* filter snapshots by evenly selecting points across the interval
* @param selectedLists list of snapshots
*/
private void generalSelectMetricHistory(final LinkedList<InMemoryHistoryNode> selectedLists) {
- logger.debug("selecting snapshots evenly from across the time interval");
- if (selectedLists.size() > numInstances) {
- double step = (double) selectedLists.size() / numInstances;
+ _logger.debug("selecting snapshots evenly from across the time interval");
+ if (selectedLists.size() > _numInstances) {
+ double step = (double) selectedLists.size() / _numInstances;
long nextIndex = 0, currentIndex = 0, numSelectedInstances = 1;
Iterator<InMemoryHistoryNode> ite = selectedLists.iterator();
while (ite.hasNext()) {
@@ -186,23 +199,23 @@ public class InMemoryMetricEmitter implements IMetricEmitter {
* @param firstAllowedDate End date of the interval
*/
private void cleanUsingTime(final String metricName, final Date firstAllowedDate) {
- if (historyListMapping.containsKey(metricName) && historyListMapping.get(metricName) != null) {
- synchronized (historyListMapping.get(metricName)) {
+ if (_historyListMapping.containsKey(metricName) && _historyListMapping.get(metricName) != null) {
+ synchronized (_historyListMapping.get(metricName)) {
- InMemoryHistoryNode firstNode = historyListMapping.get(metricName).peekFirst();
+ InMemoryHistoryNode firstNode = _historyListMapping.get(metricName).peekFirst();
long localCopyOfInterval = 0;
// go ahead for clean up using latest possible value of interval
// any interval change will not affect on going clean up
synchronized (this) {
- localCopyOfInterval = timeWindow;
+ localCopyOfInterval = _timeWindow;
}
// removing objects older than Interval time from firstAllowedDate
while (firstNode != null
&& TimeUnit.MILLISECONDS.toMillis(firstAllowedDate.getTime() - firstNode.getTimestamp().getTime()) > localCopyOfInterval) {
- historyListMapping.get(metricName).removeFirst();
- firstNode = historyListMapping.get(metricName).peekFirst();
+ _historyListMapping.get(metricName).removeFirst();
+ firstNode = _historyListMapping.get(metricName).peekFirst();
}
}
}
@@ -215,6 +228,6 @@ public class InMemoryMetricEmitter implements IMetricEmitter {
*/
@Override
public void purgeAllData() throws Exception {
- historyListMapping.clear();
+ _historyListMapping.clear();
}
}
diff --git a/azkaban-common/src/main/java/azkaban/metric/MetricReportManager.java b/azkaban-common/src/main/java/azkaban/metric/MetricReportManager.java
index b69f6e5..b4a8590 100644
--- a/azkaban-common/src/main/java/azkaban/metric/MetricReportManager.java
+++ b/azkaban-common/src/main/java/azkaban/metric/MetricReportManager.java
@@ -38,30 +38,30 @@ public class MetricReportManager {
* Maximum number of metrics reporting threads
*/
private static final int MAX_EMITTER_THREADS = 4;
- private static final Logger logger = Logger.getLogger(MetricReportManager.class);
+ private static final Logger _logger = Logger.getLogger(MetricReportManager.class);
/**
* List of all the metrics that Azkaban is tracking
* Manager is not concerned with type of metric as long as it honors IMetric contracts
*/
- private List<IMetric<?>> metrics;
+ private List<IMetric<?>> _metrics;
/**
* List of all the emitter listening all the metrics
* Manager is not concerned with how emitter is reporting value.
* Manager is only responsible to notify all emitters whenever an IMetric wants to be notified
*/
- private List<IMetricEmitter> metricEmitters;
- private ExecutorService executorService;
+ private List<IMetricEmitter> _metricEmitters;
+ private ExecutorService _executorService;
// Singleton variable
- private static volatile MetricReportManager instance = null;
- boolean isManagerEnabled;
+ private static volatile MetricReportManager _instance = null;
+ boolean _isManagerEnabled;
private MetricReportManager() {
- logger.debug("Instantiating Metric Manager");
- executorService = Executors.newFixedThreadPool(MAX_EMITTER_THREADS);
- metrics = new ArrayList<IMetric<?>>();
- metricEmitters = new LinkedList<IMetricEmitter>();
+ _logger.debug("Instantiating Metric Manager");
+ _executorService = Executors.newFixedThreadPool(MAX_EMITTER_THREADS);
+ _metrics = new ArrayList<IMetric<?>>();
+ _metricEmitters = new LinkedList<IMetricEmitter>();
enableManager();
}
@@ -69,39 +69,39 @@ public class MetricReportManager {
* @return true, if we have enabled metric manager from Azkaban exec server
*/
public static boolean isInstantiated() {
- return instance != null;
+ return _instance != null;
}
/**
* Get a singleton object for Metric Manager
*/
public static MetricReportManager getInstance() {
- if (instance == null) {
+ if (_instance == null) {
synchronized (MetricReportManager.class) {
- if (instance == null) {
- logger.info("Instantiating MetricReportManager");
- instance = new MetricReportManager();
+ if (_instance == null) {
+ _logger.info("Instantiating MetricReportManager");
+ _instance = new MetricReportManager();
}
}
}
- return instance;
+ return _instance;
}
// each element of metrics List is responsible to call this method and report metrics
public void reportMetric(final IMetric<?> metric) {
- if (metric != null && isManagerEnabled) {
+ if (metric != null && _isManagerEnabled) {
// Report metric to all the emitters
synchronized (metric) {
- logger.debug(String.format("Submitting %s metric for metric emission pool", metric.getName()));
- for (final IMetricEmitter metricEmitter : metricEmitters) {
- executorService.submit(new Runnable() {
+ _logger.debug(String.format("Submitting %s metric for metric emission pool", metric.getName()));
+ for (final IMetricEmitter metricEmitter : _metricEmitters) {
+ _executorService.submit(new Runnable() {
@Override
public void run() {
try {
metricEmitter.reportMetric(metric);
} catch (Exception ex) {
- logger.error(String.format("Failed to report %s metric due to %s", metric.getName(), ex.toString()));
+ _logger.error(String.format("Failed to report %s metric due to %s", metric.getName(), ex.toString()));
}
}
});
@@ -115,7 +115,7 @@ public class MetricReportManager {
* @param emitter
*/
public void addMetricEmitter(final IMetricEmitter emitter) {
- metricEmitters.add(emitter);
+ _metricEmitters.add(emitter);
}
/**
@@ -123,7 +123,7 @@ public class MetricReportManager {
* @param emitter
*/
public void removeMetricEmitter(final IMetricEmitter emitter) {
- metricEmitters.remove(emitter);
+ _metricEmitters.remove(emitter);
}
/**
@@ -131,7 +131,7 @@ public class MetricReportManager {
* @return
*/
public List<IMetricEmitter> getMetricEmitters() {
- return metricEmitters;
+ return _metricEmitters;
}
/**
@@ -141,11 +141,11 @@ public class MetricReportManager {
public void addMetric(final IMetric<?> metric) {
// metric null or already present
if (metric != null && getMetricFromName(metric.getName()) == null) {
- logger.debug(String.format("Adding %s metric in Metric Manager", metric.getName()));
- metrics.add(metric);
+ _logger.debug(String.format("Adding %s metric in Metric Manager", metric.getName()));
+ _metrics.add(metric);
metric.updateMetricManager(this);
} else {
- logger.error("Failed to add metric");
+ _logger.error("Failed to add metric");
}
}
@@ -157,7 +157,7 @@ public class MetricReportManager {
public IMetric<?> getMetricFromName(final String name) {
IMetric<?> metric = null;
if (name != null) {
- for (IMetric<?> currentMetric : metrics) {
+ for (IMetric<?> currentMetric : _metrics) {
if (currentMetric.getName().equals(name)) {
metric = currentMetric;
break;
@@ -172,26 +172,26 @@ public class MetricReportManager {
* @return
*/
public List<IMetric<?>> getAllMetrics() {
- return metrics;
+ return _metrics;
}
public void enableManager() {
- logger.info("Enabling Metric Manager");
- isManagerEnabled = true;
+ _logger.info("Enabling Metric Manager");
+ _isManagerEnabled = true;
}
/**
* Disable Metric Manager and ask all emitters to purge all available data.
*/
public void disableManager() {
- logger.info("Disabling Metric Manager");
- if(isManagerEnabled) {
- isManagerEnabled = false;
- for(IMetricEmitter emitter: metricEmitters) {
+ _logger.info("Disabling Metric Manager");
+ if(_isManagerEnabled) {
+ _isManagerEnabled = false;
+ for(IMetricEmitter emitter: _metricEmitters) {
try {
emitter.purgeAllData();
} catch (Exception ex) {
- logger.error("Failed to purge data " + ex.toString());
+ _logger.error("Failed to purge data " + ex.toString());
}
}
}
@@ -203,6 +203,6 @@ public class MetricReportManager {
* @see java.lang.Object#finalize()
*/
protected void finalize() {
- executorService.shutdown();
+ _executorService.shutdown();
}
}
diff --git a/azkaban-common/src/main/java/azkaban/metric/TimeBasedReportingMetric.java b/azkaban-common/src/main/java/azkaban/metric/TimeBasedReportingMetric.java
index 62a2e10..ece9c97 100644
--- a/azkaban-common/src/main/java/azkaban/metric/TimeBasedReportingMetric.java
+++ b/azkaban-common/src/main/java/azkaban/metric/TimeBasedReportingMetric.java
@@ -24,7 +24,7 @@ import java.util.TimerTask;
* @param <T> Type of Value of a given metric
*/
public abstract class TimeBasedReportingMetric<T> extends AbstractMetric<T> {
- private Timer timer;
+ private Timer _timer;
/**
* @param metricName Name of metric
@@ -36,8 +36,8 @@ public abstract class TimeBasedReportingMetric<T> extends AbstractMetric<T> {
public TimeBasedReportingMetric(String metricName, String metricType, T initialValue, MetricReportManager manager,
long interval) {
super(metricName, metricType, initialValue, manager);
- timer = new Timer();
- timer.schedule(getTimerTask(), interval, interval);
+ _timer = new Timer();
+ _timer.schedule(getTimerTask(), interval, interval);
}
/**
@@ -65,9 +65,9 @@ public abstract class TimeBasedReportingMetric<T> extends AbstractMetric<T> {
*/
public void updateInterval(final long interval) {
_logger.debug(String.format("Updating tracking interval to %d milisecond for %s metric", interval, getName()));
- timer.cancel();
- timer = new Timer();
- timer.schedule(getTimerTask(), interval, interval);
+ _timer.cancel();
+ _timer = new Timer();
+ _timer.schedule(getTimerTask(), interval, interval);
}
/**
diff --git a/azkaban-execserver/src/main/java/azkaban/execapp/metric/NumFailedFlowMetric.java b/azkaban-execserver/src/main/java/azkaban/execapp/metric/NumFailedFlowMetric.java
index 4945ecc..a570a10 100644
--- a/azkaban-execserver/src/main/java/azkaban/execapp/metric/NumFailedFlowMetric.java
+++ b/azkaban-execserver/src/main/java/azkaban/execapp/metric/NumFailedFlowMetric.java
@@ -44,7 +44,7 @@ public class NumFailedFlowMetric extends TimeBasedReportingMetric<Integer> imple
if (event.getType() == Type.FLOW_FINISHED) {
FlowRunner runner = (FlowRunner) event.getRunner();
if (runner != null && runner.getExecutableFlow().getStatus().equals(Status.FAILED)) {
- value = value + 1;
+ _value = _value + 1;
}
}
}
@@ -56,7 +56,7 @@ public class NumFailedFlowMetric extends TimeBasedReportingMetric<Integer> imple
@Override
protected synchronized void postTrackingEventMethod() {
- value = 0;
+ _value = 0;
}
}
diff --git a/azkaban-execserver/src/main/java/azkaban/execapp/metric/NumFailedJobMetric.java b/azkaban-execserver/src/main/java/azkaban/execapp/metric/NumFailedJobMetric.java
index 94eff06..9578c50 100644
--- a/azkaban-execserver/src/main/java/azkaban/execapp/metric/NumFailedJobMetric.java
+++ b/azkaban-execserver/src/main/java/azkaban/execapp/metric/NumFailedJobMetric.java
@@ -45,7 +45,7 @@ public class NumFailedJobMetric extends TimeBasedReportingMetric<Integer> implem
public synchronized void handleEvent(Event event) {
JobRunner runner = (JobRunner) event.getRunner();
if (event.getType() == Type.JOB_FINISHED && runner.getStatus().equals(Status.FAILED)) {
- value = value + 1;
+ _value = _value + 1;
}
}
@@ -56,7 +56,7 @@ public class NumFailedJobMetric extends TimeBasedReportingMetric<Integer> implem
@Override
protected synchronized void postTrackingEventMethod() {
- value = 0;
+ _value = 0;
}
}
diff --git a/azkaban-execserver/src/main/java/azkaban/execapp/metric/NumQueuedFlowMetric.java b/azkaban-execserver/src/main/java/azkaban/execapp/metric/NumQueuedFlowMetric.java
index 98446b6..a7326c8 100644
--- a/azkaban-execserver/src/main/java/azkaban/execapp/metric/NumQueuedFlowMetric.java
+++ b/azkaban-execserver/src/main/java/azkaban/execapp/metric/NumQueuedFlowMetric.java
@@ -24,7 +24,7 @@ public class NumQueuedFlowMetric extends TimeBasedReportingMetric<Integer> {
public static final String NUM_QUEUED_FLOW_METRIC_NAME = "NumQueuedFlowMetric";
private static final String NUM_QUEUED_FLOW_METRIC_TYPE = "uint16";
- private FlowRunnerManager flowManager;
+ private FlowRunnerManager _flowManager;
/**
* @param flowRunnerManager Flow runner manager
@@ -34,7 +34,7 @@ public class NumQueuedFlowMetric extends TimeBasedReportingMetric<Integer> {
public NumQueuedFlowMetric(FlowRunnerManager flowRunnerManager, MetricReportManager manager, long interval) {
super(NUM_QUEUED_FLOW_METRIC_NAME, NUM_QUEUED_FLOW_METRIC_TYPE, 0, manager, interval);
_logger.debug("Instantiated NumQueuedFlowMetric");
- flowManager = flowRunnerManager;
+ _flowManager = flowRunnerManager;
}
/**
@@ -44,7 +44,7 @@ public class NumQueuedFlowMetric extends TimeBasedReportingMetric<Integer> {
*/
@Override
protected synchronized void preTrackingEventMethod() {
- value = flowManager.getNumQueuedFlows();
+ _value = _flowManager.getNumQueuedFlows();
}
@Override
diff --git a/azkaban-execserver/src/main/java/azkaban/execapp/metric/NumRunningFlowMetric.java b/azkaban-execserver/src/main/java/azkaban/execapp/metric/NumRunningFlowMetric.java
index 7c87a4e..d17471c 100644
--- a/azkaban-execserver/src/main/java/azkaban/execapp/metric/NumRunningFlowMetric.java
+++ b/azkaban-execserver/src/main/java/azkaban/execapp/metric/NumRunningFlowMetric.java
@@ -27,7 +27,7 @@ public class NumRunningFlowMetric extends TimeBasedReportingMetric<Integer> {
public static final String NUM_RUNNING_FLOW_METRIC_NAME = "NumRunningFlowMetric";
private static final String NUM_RUNNING_FLOW_METRIC_TYPE = "uint16";
- private FlowRunnerManager flowManager;
+ private FlowRunnerManager _flowManager;
/**
* @param flowRunnerManager Flow runner manager
@@ -37,7 +37,7 @@ public class NumRunningFlowMetric extends TimeBasedReportingMetric<Integer> {
public NumRunningFlowMetric(FlowRunnerManager flowRunnerManager, MetricReportManager manager, long interval) {
super(NUM_RUNNING_FLOW_METRIC_NAME, NUM_RUNNING_FLOW_METRIC_TYPE, 0, manager, interval);
_logger.debug("Instantiated NumRunningFlowMetric");
- flowManager = flowRunnerManager;
+ _flowManager = flowRunnerManager;
}
/**
@@ -47,7 +47,7 @@ public class NumRunningFlowMetric extends TimeBasedReportingMetric<Integer> {
*/
@Override
protected synchronized void preTrackingEventMethod() {
- value = flowManager.getNumRunningFlows();
+ _value = _flowManager.getNumRunningFlows();
}
@Override
diff --git a/azkaban-execserver/src/main/java/azkaban/execapp/metric/NumRunningJobMetric.java b/azkaban-execserver/src/main/java/azkaban/execapp/metric/NumRunningJobMetric.java
index bc9ffc2..6ef1f38 100644
--- a/azkaban-execserver/src/main/java/azkaban/execapp/metric/NumRunningJobMetric.java
+++ b/azkaban-execserver/src/main/java/azkaban/execapp/metric/NumRunningJobMetric.java
@@ -46,9 +46,9 @@ public class NumRunningJobMetric extends TimeBasedReportingMetric<Integer> imple
@Override
public synchronized void handleEvent(Event event) {
if (event.getType() == Type.JOB_STARTED) {
- value = value + 1;
+ _value = _value + 1;
} else if (event.getType() == Type.JOB_FINISHED) {
- value = value - 1;
+ _value = _value - 1;
}
}
diff --git a/azkaban-execserver/src/main/java/azkaban/execapp/StatsServlet.java b/azkaban-execserver/src/main/java/azkaban/execapp/StatsServlet.java
index 1cf6b07..aaa92a0 100644
--- a/azkaban-execserver/src/main/java/azkaban/execapp/StatsServlet.java
+++ b/azkaban-execserver/src/main/java/azkaban/execapp/StatsServlet.java
@@ -50,7 +50,7 @@ import azkaban.utils.JSONUtils;
*/
public class StatsServlet extends HttpServlet implements ConnectorParams {
private static final long serialVersionUID = 2L;
- private static final Logger logger = Logger.getLogger(StatsServlet.class);
+ private static final Logger _logger = Logger.getLogger(StatsServlet.class);
public void init(ServletConfig config) throws ServletException {
// Nothing to initialize
@@ -110,7 +110,7 @@ public class StatsServlet extends HttpServlet implements ConnectorParams {
*/
private void handleChangeManagerStatusRequest(HttpServletRequest req, Map<String, Object> ret, boolean enableMetricManager) {
try {
- logger.info("Updating metric manager status");
+ _logger.info("Updating metric manager status");
if (MetricReportManager.isInstantiated()) {
MetricReportManager metricManager = MetricReportManager.getInstance();
if (enableMetricManager) {
@@ -121,7 +121,7 @@ public class StatsServlet extends HttpServlet implements ConnectorParams {
}
ret.put(STATUS_PARAM, RESPONSE_SUCCESS);
} catch (Exception e) {
- logger.error(e);
+ _logger.error(e);
ret.put(RESPONSE_ERROR, e.getMessage());
}
}
@@ -139,7 +139,7 @@ public class StatsServlet extends HttpServlet implements ConnectorParams {
}
ret.put(STATUS_PARAM, RESPONSE_SUCCESS);
} catch (Exception e) {
- logger.error(e);
+ _logger.error(e);
ret.put(RESPONSE_ERROR, e.getMessage());
}
}
@@ -157,7 +157,7 @@ public class StatsServlet extends HttpServlet implements ConnectorParams {
}
ret.put(STATUS_PARAM, RESPONSE_SUCCESS);
} catch (Exception e) {
- logger.error(e);
+ _logger.error(e);
ret.put(RESPONSE_ERROR, e.getMessage());
}
}
@@ -242,7 +242,7 @@ public class StatsServlet extends HttpServlet implements ConnectorParams {
}
ret.put(STATUS_PARAM, RESPONSE_SUCCESS);
} catch (Exception e) {
- logger.error(e);
+ _logger.error(e);
ret.put(RESPONSE_ERROR, e.getMessage());
}
}
diff --git a/azkaban-webserver/src/main/java/azkaban/webapp/servlet/StatsServlet.java b/azkaban-webserver/src/main/java/azkaban/webapp/servlet/StatsServlet.java
index 88b76f3..19438fd 100644
--- a/azkaban-webserver/src/main/java/azkaban/webapp/servlet/StatsServlet.java
+++ b/azkaban-webserver/src/main/java/azkaban/webapp/servlet/StatsServlet.java
@@ -22,8 +22,6 @@ import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
-import java.util.Map.Entry;
-import java.util.Set;
import javax.servlet.ServletConfig;
import javax.servlet.ServletException;
@@ -45,15 +43,15 @@ import azkaban.webapp.AzkabanWebServer;
*/
public class StatsServlet extends LoginAbstractAzkabanServlet {
private static final long serialVersionUID = 1L;
- private UserManager userManager;
- private ExecutorManager execManager;
+ private UserManager _userManager;
+ private ExecutorManager _execManager;
@Override
public void init(ServletConfig config) throws ServletException {
super.init(config);
AzkabanWebServer server = (AzkabanWebServer) getApplication();
- userManager = server.getUserManager();
- execManager = server.getExecutorManager();
+ _userManager = server.getUserManager();
+ _execManager = server.getExecutorManager();
}
@Override
@@ -94,7 +92,7 @@ public class StatsServlet extends LoginAbstractAzkabanServlet {
*/
private void handleChangeConfigurationRequest(String actionName, HttpServletRequest req, HashMap<String, Object> ret)
throws ServletException, IOException {
- Map<String, Object> result = execManager.callExecutorStats(actionName, getAllParams(req));
+ Map<String, Object> result = _execManager.callExecutorStats(actionName, getAllParams(req));
if (result.containsKey(ConnectorParams.RESPONSE_ERROR)) {
ret.put(ConnectorParams.RESPONSE_ERROR, result.get(ConnectorParams.RESPONSE_ERROR).toString());
} else {
@@ -109,7 +107,7 @@ public class StatsServlet extends LoginAbstractAzkabanServlet {
private void handleGetMetricHistory(HttpServletRequest req, HashMap<String, Object> ret, User user)
throws IOException, ServletException {
Map<String, Object> result =
- execManager.callExecutorStats(ConnectorParams.STATS_GET_METRICHISTORY, getAllParams(req));
+ _execManager.callExecutorStats(ConnectorParams.STATS_GET_METRICHISTORY, getAllParams(req));
if (result.containsKey(ConnectorParams.RESPONSE_ERROR)) {
ret.put(ConnectorParams.RESPONSE_ERROR, result.get(ConnectorParams.RESPONSE_ERROR).toString());
} else {
@@ -131,7 +129,7 @@ public class StatsServlet extends LoginAbstractAzkabanServlet {
try {
Map<String, Object> result =
- execManager.callExecutorStats(ConnectorParams.STATS_GET_ALLMETRICSNAME, (Pair<String, String>[]) null);
+ _execManager.callExecutorStats(ConnectorParams.STATS_GET_ALLMETRICSNAME, (Pair<String, String>[]) null);
if (result.containsKey(ConnectorParams.RESPONSE_ERROR)) {
page.add("errorMsg", result.get(ConnectorParams.RESPONSE_ERROR).toString());
} else {
@@ -151,7 +149,7 @@ public class StatsServlet extends LoginAbstractAzkabanServlet {
protected boolean hasPermission(User user, Permission.Type type) {
for (String roleName : user.getRoles()) {
- Role role = userManager.getRole(roleName);
+ Role role = _userManager.getRole(roleName);
if (role.getPermission().isPermissionSet(type) || role.getPermission().isPermissionSet(Permission.Type.ADMIN)) {
return true;
}
build.gradle 1(+1 -0)
diff --git a/build.gradle b/build.gradle
index 6fd240d..54321c2 100644
--- a/build.gradle
+++ b/build.gradle
@@ -109,6 +109,7 @@ project(':azkaban-common') {
compile('org.mortbay.jetty:jetty:6.1.26')
compile('org.mortbay.jetty:jetty-util:6.1.26')
compile('org.slf4j:slf4j-api:1.6.1')
+ compile('org.apache.commons:commons-math3:3.0')
testCompile('junit:junit:4.11')
testCompile('org.hamcrest:hamcrest-all:1.3')