Details
diff --git a/azkaban-common/src/main/java/azkaban/executor/ExecutorManager.java b/azkaban-common/src/main/java/azkaban/executor/ExecutorManager.java
index 1c6a4f9..5f7bcd5 100644
--- a/azkaban-common/src/main/java/azkaban/executor/ExecutorManager.java
+++ b/azkaban-common/src/main/java/azkaban/executor/ExecutorManager.java
@@ -717,6 +717,11 @@ public class ExecutorManager extends EventHandler implements
return jsonResponse;
}
+ /**
+ * Manage servlet call for stats servlet in Azkaban execution server
+ * {@inheritDoc}
+ * @see azkaban.executor.ExecutorManagerAdapter#callExecutorStats(java.lang.String, azkaban.utils.Pair[])
+ */
@Override
public Map<String, Object> callExecutorStats(String action, Pair<String, String>... params) throws IOException {
diff --git a/azkaban-common/src/main/java/azkaban/metric/AbstractMetric.java b/azkaban-common/src/main/java/azkaban/metric/AbstractMetric.java
index b9ec1e7..e36040e 100644
--- a/azkaban-common/src/main/java/azkaban/metric/AbstractMetric.java
+++ b/azkaban-common/src/main/java/azkaban/metric/AbstractMetric.java
@@ -18,7 +18,10 @@ package azkaban.metric;
import org.apache.log4j.Logger;
-
+/**
+ * Abstract class for Metric
+ * @param <T> Type of Value of a given metric
+ */
public abstract class AbstractMetric<T> implements IMetric<T> {
protected static final Logger logger = Logger.getLogger(MetricReportManager.class);
protected String name;
@@ -26,6 +29,12 @@ public abstract class AbstractMetric<T> implements IMetric<T> {
protected String type;
protected MetricReportManager metricManager;
+ /**
+ * @param metricName Name of metric
+ * @param metricType Metric type. For display purposes.
+ * @param initialValue Initial Value of a metric
+ * @param manager Metric Manager whom a metric will report to
+ */
public AbstractMetric(String metricName, String metricType, T initialValue, MetricReportManager manager) {
name = metricName;
type = metricType;
@@ -33,22 +42,45 @@ public abstract class AbstractMetric<T> implements IMetric<T> {
metricManager = manager;
}
+ /**
+ * {@inheritDoc}
+ * @see azkaban.metric.IMetric#getName()
+ */
public String getName() {
return name;
}
+ /**
+ * {@inheritDoc}
+ * @see azkaban.metric.IMetric#getValueType()
+ */
public String getValueType() {
return type;
}
+ /**
+ * {@inheritDoc}
+ * @see azkaban.metric.IMetric#updateMetricManager(azkaban.metric.MetricReportManager)
+ */
public void updateMetricManager(final MetricReportManager manager) {
metricManager = manager;
}
+ /**
+ * {@inheritDoc}
+ * @see azkaban.metric.IMetric#getValue()
+ */
public T getValue() {
return value;
}
+ /**
+ * Method used to notify manager for a tracking event.
+ * Metric is free to call this method as per implementation.
+ * Timer based or Azkaban events can be the most common implementation
+ * {@inheritDoc}
+ * @see azkaban.metric.IMetric#notifyManager()
+ */
public synchronized void notifyManager() {
logger.debug(String.format("Notifying Manager for %s", this.getClass().getName()));
try {
diff --git a/azkaban-common/src/main/java/azkaban/metric/GangliaMetricEmitter.java b/azkaban-common/src/main/java/azkaban/metric/GangliaMetricEmitter.java
index 5eb1890..76fb4d4 100644
--- a/azkaban-common/src/main/java/azkaban/metric/GangliaMetricEmitter.java
+++ b/azkaban-common/src/main/java/azkaban/metric/GangliaMetricEmitter.java
@@ -18,12 +18,17 @@ package azkaban.metric;
import azkaban.utils.Props;
-
+/**
+ * MetricEmitter implementation to report metric to a ganglia gmetric process
+ */
public class GangliaMetricEmitter implements IMetricEmitter {
private static final String GANGLIA_METRIC_REPORTER_PATH = "azkaban.metric.ganglia.path";
private String gmetricPath;
+ /**
+ * @param azkProps Azkaban Properties
+ */
public GangliaMetricEmitter(Props azkProps) {
gmetricPath = azkProps.get(GANGLIA_METRIC_REPORTER_PATH);
}
@@ -32,6 +37,11 @@ public class GangliaMetricEmitter implements IMetricEmitter {
return String.format("%s -t %s -n %s -v %s", gmetricPath, metric.getValueType(), metric.getName(), metric.getValue().toString());
}
+ /**
+ * Report metric by executing command line interface of gmetrics
+ * {@inheritDoc}
+ * @see azkaban.metric.IMetricEmitter#reportMetric(azkaban.metric.IMetric)
+ */
@Override
public void reportMetric(final IMetric<?> metric) throws Exception {
String gangliaCommand = buildCommand(metric);
diff --git a/azkaban-common/src/main/java/azkaban/metric/IMetric.java b/azkaban-common/src/main/java/azkaban/metric/IMetric.java
index ff6d4cd..ad1bfe1 100644
--- a/azkaban-common/src/main/java/azkaban/metric/IMetric.java
+++ b/azkaban-common/src/main/java/azkaban/metric/IMetric.java
@@ -16,6 +16,10 @@
package azkaban.metric;
+/**
+ * Interface of any Metric
+ * @param <T> Type of Value of a given metric
+ */
public interface IMetric<T> {
String getName();
String getValueType();
diff --git a/azkaban-common/src/main/java/azkaban/metric/IMetricEmitter.java b/azkaban-common/src/main/java/azkaban/metric/IMetricEmitter.java
index f485d23..2436625 100644
--- a/azkaban-common/src/main/java/azkaban/metric/IMetricEmitter.java
+++ b/azkaban-common/src/main/java/azkaban/metric/IMetricEmitter.java
@@ -16,6 +16,9 @@
package azkaban.metric;
+/**
+ * Interface for metric emitters
+ */
public interface IMetricEmitter {
void reportMetric(final IMetric<?> metric) throws Exception;
void purgeAllData() throws Exception;
diff --git a/azkaban-common/src/main/java/azkaban/metric/inmemoryemitter/InMemoryHistoryNode.java b/azkaban-common/src/main/java/azkaban/metric/inmemoryemitter/InMemoryHistoryNode.java
index 20e8d93..39d18a1 100644
--- a/azkaban-common/src/main/java/azkaban/metric/inmemoryemitter/InMemoryHistoryNode.java
+++ b/azkaban-common/src/main/java/azkaban/metric/inmemoryemitter/InMemoryHistoryNode.java
@@ -16,14 +16,19 @@
package azkaban.metric.inmemoryemitter;
-import java.text.DateFormat;
-import java.text.SimpleDateFormat;
import java.util.Date;
+/**
+ * A snapshot of metric's value
+ */
public class InMemoryHistoryNode {
private Object value;
private Date date;
+ /**
+ * Takes snapshot of the metric with a given value
+ * @param val
+ */
public InMemoryHistoryNode(Object val) {
value = val;
date = new Date();
@@ -36,9 +41,4 @@ public class InMemoryHistoryNode {
public Date getTimestamp() {
return date;
}
-
- public String formattedTimestamp() {
- DateFormat dateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
- return dateFormat.format(date);
- }
}
diff --git a/azkaban-common/src/main/java/azkaban/metric/inmemoryemitter/InMemoryHistoryStatistics.java b/azkaban-common/src/main/java/azkaban/metric/inmemoryemitter/InMemoryHistoryStatistics.java
index adc617e..72061b4 100644
--- a/azkaban-common/src/main/java/azkaban/metric/inmemoryemitter/InMemoryHistoryStatistics.java
+++ b/azkaban-common/src/main/java/azkaban/metric/inmemoryemitter/InMemoryHistoryStatistics.java
@@ -18,10 +18,15 @@ package azkaban.metric.inmemoryemitter;
import java.util.List;
+/**
+ * Utility class for mathematical function of metric's history objects
+ */
public final class InMemoryHistoryStatistics {
/**
* Returns the average
+ * @param data
+ * @return mean of data
*/
public static double mean(List<InMemoryHistoryNode> data) {
double total = 0.0;
@@ -33,13 +38,18 @@ public final class InMemoryHistoryStatistics {
/**
* Returns the sample standard deviation
+ * @param data
+ * @return standard deviation of data
*/
public static double sdev(List<InMemoryHistoryNode> data) {
return Math.sqrt(variance(data));
}
+
/**
* Returns the sample variance
+ * @param data
+ * @return variance of data
*/
public static double variance(List<InMemoryHistoryNode> data) {
double mu = mean(data);
@@ -50,6 +60,11 @@ public final class InMemoryHistoryStatistics {
return sumsq / data.size();
}
+ /**
+ * Square of a number
+ * @param x
+ * @return x*x
+ */
public static double sqr(double x) {
return x * x;
}
diff --git a/azkaban-common/src/main/java/azkaban/metric/inmemoryemitter/InMemoryMetricEmitter.java b/azkaban-common/src/main/java/azkaban/metric/inmemoryemitter/InMemoryMetricEmitter.java
index 5250e69..e2a91fe 100644
--- a/azkaban-common/src/main/java/azkaban/metric/inmemoryemitter/InMemoryMetricEmitter.java
+++ b/azkaban-common/src/main/java/azkaban/metric/inmemoryemitter/InMemoryMetricEmitter.java
@@ -24,50 +24,92 @@ import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;
+import org.apache.log4j.Logger;
+
import azkaban.metric.IMetric;
import azkaban.metric.IMetricEmitter;
import azkaban.utils.Props;
-
+/**
+ * Metric Emitter which maintains in memory snapshots of the metrics
+ * This is also the default metric emitter and used by /stats servlet
+ */
public class InMemoryMetricEmitter implements IMetricEmitter {
+ protected static final Logger logger = Logger.getLogger(InMemoryMetricEmitter.class);
+
+ /**
+ * Data structure to keep track of snapshots
+ */
Map<String, LinkedList<InMemoryHistoryNode>> historyListMapping;
private static final String INMEMORY_METRIC_REPORTER_WINDOW = "azkaban.metric.inmemory.interval";
private static final String INMEMORY_METRIC_NUM_INSTANCES = "azkaban.metric.inmemory.maxinstances";
+ /**
+ * Interval (in millisecond) from today for which we should maintain the in memory snapshots
+ */
long interval;
+ /**
+ * Maximum number of snapshots that should be displayed on /stats servlet
+ */
long numInstances;
+ /**
+ * @param azkProps Azkaban Properties
+ */
public InMemoryMetricEmitter(Props azkProps) {
historyListMapping = new HashMap<String, LinkedList<InMemoryHistoryNode>>();
interval = azkProps.getLong(INMEMORY_METRIC_REPORTER_WINDOW, 60 * 60 * 24 * 7 * 1000);
numInstances = azkProps.getLong(INMEMORY_METRIC_NUM_INSTANCES, 50);
}
+ /**
+ * Update reporting interval
+ * @param val interval in milli seconds
+ */
public synchronized void setReportingInterval(long val) {
interval = val;
}
+ /**
+ * Set number of /stats servlet display points
+ * @param num
+ */
public void setReportingInstances(long num) {
numInstances = num;
}
+ /**
+ * Ingest metric in snapshot data structure while maintaining interval
+ * {@inheritDoc}
+ * @see azkaban.metric.IMetricEmitter#reportMetric(azkaban.metric.IMetric)
+ */
@Override
public void reportMetric(IMetric<?> metric) throws Exception {
String metricName = metric.getName();
if (!historyListMapping.containsKey(metricName)) {
+ logger.info("First time capturing metric: " + metricName);
historyListMapping.put(metricName, new LinkedList<InMemoryHistoryNode>());
}
synchronized (historyListMapping.get(metricName)) {
+ logger.debug("Ingesting metric: " + metricName);
historyListMapping.get(metricName).add(new InMemoryHistoryNode(metric.getValue()));
cleanUsingTime(metricName, historyListMapping.get(metricName).peekLast().getTimestamp());
}
}
+ /**
+ * Get snapshots for a given metric at a given time
+ * @param metricName name of the metric
+ * @param from Start date
+ * @param to end date
+ * @param useStats get statistically significant points only
+ * @return List of snapshots
+ */
public List<InMemoryHistoryNode> getDrawMetric(String metricName, Date from, Date to, Boolean useStats) {
LinkedList<InMemoryHistoryNode> selectedLists = new LinkedList<InMemoryHistoryNode>();
if (historyListMapping.containsKey(metricName)) {
- // selecting nodes within time frame
+ logger.debug("selecting snapshots within time frame");
synchronized (historyListMapping.get(metricName)) {
for (InMemoryHistoryNode node : historyListMapping.get(metricName)) {
if (node.getTimestamp().after(from) && node.getTimestamp().before(to)) {
@@ -90,8 +132,13 @@ public class InMemoryMetricEmitter implements IMetricEmitter {
return selectedLists;
}
+ /**
+ * filter snapshots using statistically significant points only
+ * @param selectedLists list of snapshots
+ */
private void statBasedSelectMetricHistory(LinkedList<InMemoryHistoryNode> selectedLists) {
- Iterator<InMemoryHistoryNode> ite = selectedLists.iterator();
+ logger.debug("selecting snapshots which are far away from mean value");
+ Iterator<InMemoryHistoryNode> ite = selectedLists.iterator();
Double mean = InMemoryHistoryStatistics.mean(selectedLists);
Double std = InMemoryHistoryStatistics.sdev(selectedLists);
@@ -105,7 +152,12 @@ public class InMemoryMetricEmitter implements IMetricEmitter {
}
}
+ /**
+ * filter snapshots by evenly selecting points across the interval
+ * @param selectedLists list of snapshots
+ */
private void generalSelectMetricHistory(LinkedList<InMemoryHistoryNode> selectedLists) {
+ logger.debug("selecting snapshots evenly from across the time interval");
if (selectedLists.size() > numInstances) {
double step = (double) selectedLists.size() / numInstances;
long nextIndex = 0, currentIndex = 0, numSelectedInstances = 1;
@@ -123,6 +175,11 @@ public class InMemoryMetricEmitter implements IMetricEmitter {
}
}
+ /**
+ * Remove snapshots to maintain reporting interval
+ * @param metricName Name of the metric
+ * @param firstAllowedDate End date of the interval
+ */
private void cleanUsingTime(String metricName, Date firstAllowedDate) {
if (historyListMapping.containsKey(metricName) && historyListMapping.get(metricName) != null) {
synchronized (historyListMapping.get(metricName)) {
@@ -146,6 +203,11 @@ public class InMemoryMetricEmitter implements IMetricEmitter {
}
}
+ /**
+ * Clear snapshot data structure
+ * {@inheritDoc}
+ * @see azkaban.metric.IMetricEmitter#purgeAllData()
+ */
@Override
public void purgeAllData() throws Exception {
historyListMapping.clear();
diff --git a/azkaban-common/src/main/java/azkaban/metric/MetricReportManager.java b/azkaban-common/src/main/java/azkaban/metric/MetricReportManager.java
index 3444e7b..2a49878 100644
--- a/azkaban-common/src/main/java/azkaban/metric/MetricReportManager.java
+++ b/azkaban-common/src/main/java/azkaban/metric/MetricReportManager.java
@@ -24,29 +24,47 @@ import java.util.concurrent.Executors;
import org.apache.log4j.Logger;
-
+/**
+ * Manager for access or updating metric related functionality of Azkaban
+ */
public class MetricReportManager {
+ /**
+ * Maximum number of metrics reporting threads
+ */
private static final int MAX_EMITTER_THREADS = 4;
private static final Logger logger = Logger.getLogger(MetricReportManager.class);
+ /**
+ * List of all the metrics that Azkaban is tracking
+ */
private List<IMetric<?>> metrics;
+ /**
+ * List of all the emitter listening all the metrics
+ */
private List<IMetricEmitter> metricEmitters;
private ExecutorService executorService;
+ // Singleton variable
private static volatile MetricReportManager instance = null;
boolean isManagerEnabled;
- // For singleton
private MetricReportManager() {
+ logger.debug("Instantiating Metric Manager");
executorService = Executors.newFixedThreadPool(MAX_EMITTER_THREADS);
metrics = new ArrayList<IMetric<?>>();
metricEmitters = new LinkedList<IMetricEmitter>();
enableManager();
}
+ /**
+ * @return true, if we have enabled metric manager from Azkaban exec server
+ */
public static boolean isInstantiated() {
return instance != null;
}
+ /**
+ * Get a singleton object for Metric Manager
+ */
public static MetricReportManager getInstance() {
if (instance == null) {
synchronized (MetricReportManager.class) {
@@ -82,27 +100,50 @@ public class MetricReportManager {
}
}
+ /**
+ * Add a metric emitter to report metric
+ * @param emitter
+ */
public void addMetricEmitter(final IMetricEmitter emitter) {
metricEmitters.add(emitter);
}
+ /**
+ * remove a metric emitter
+ * @param emitter
+ */
public void removeMetricEmitter(final IMetricEmitter emitter) {
metricEmitters.remove(emitter);
}
+ /**
+ * Get all the metric emitters
+ * @return
+ */
public List<IMetricEmitter> getMetricEmitters() {
return metricEmitters;
}
+ /**
+ * Add a metric to be managed by Metric Manager
+ * @param metric
+ */
public void addMetric(final IMetric<?> metric) {
// metric null or already present
if (metric != null && getMetricFromName(metric.getName()) == null) {
logger.debug(String.format("Adding %s metric in Metric Manager", metric.getName()));
metrics.add(metric);
metric.updateMetricManager(this);
+ } else {
+ logger.error("Failed to add metric");
}
}
+ /**
+ * Get metric object for a given metric name
+ * @param name metricName
+ * @return metric Object, if found. Otherwise null.
+ */
public IMetric<?> getMetricFromName(final String name) {
IMetric<?> metric = null;
if (name != null) {
@@ -116,15 +157,24 @@ public class MetricReportManager {
return metric;
}
+ /**
+ * Get all the emitters
+ * @return
+ */
public List<IMetric<?>> getAllMetrics() {
return metrics;
}
public void enableManager() {
+ logger.info("Enabling Metric Manager");
isManagerEnabled = true;
}
+ /**
+ * Disable Metric Manager and ask all emitters to purge all available data.
+ */
public void disableManager() {
+ logger.info("Disabling Metric Manager");
if(isManagerEnabled) {
isManagerEnabled = false;
for(IMetricEmitter emitter: metricEmitters) {
@@ -137,6 +187,11 @@ public class MetricReportManager {
}
}
+ /**
+ * Shutdown execution service
+ * {@inheritDoc}
+ * @see java.lang.Object#finalize()
+ */
protected void finalize() {
executorService.shutdown();
}
diff --git a/azkaban-common/src/main/java/azkaban/metric/TimeBasedReportingMetric.java b/azkaban-common/src/main/java/azkaban/metric/TimeBasedReportingMetric.java
index b8c8a98..bebb36d 100644
--- a/azkaban-common/src/main/java/azkaban/metric/TimeBasedReportingMetric.java
+++ b/azkaban-common/src/main/java/azkaban/metric/TimeBasedReportingMetric.java
@@ -19,10 +19,20 @@ package azkaban.metric;
import java.util.Timer;
import java.util.TimerTask;
-
+/**
+ * Metrics tracked after every interval using timer
+ * @param <T> Type of Value of a given metric
+ */
public abstract class TimeBasedReportingMetric<T> extends AbstractMetric<T> {
private Timer timer;
+ /**
+ * @param metricName Name of metric
+ * @param metricType Metric type. For display purposes.
+ * @param initialValue Initial Value of a metric
+ * @param manager Metric Manager whom a metric will report to
+ * @param interval Time interval for metric tracking
+ */
public TimeBasedReportingMetric(String metricName, String metricType, T initialValue, MetricReportManager manager,
long interval) {
super(metricName, metricType, initialValue, manager);
@@ -30,6 +40,10 @@ public abstract class TimeBasedReportingMetric<T> extends AbstractMetric<T> {
timer.schedule(getTimerTask(), interval, interval);
}
+ /**
+ * Get a TimerTask
+ * @return An anonymous TimerTask class
+ */
private TimerTask getTimerTask() {
TimerTask recurringReporting = new TimerTask() {
@Override
@@ -41,7 +55,12 @@ public abstract class TimeBasedReportingMetric<T> extends AbstractMetric<T> {
return recurringReporting;
}
+ /**
+ * Method to change tracking interval
+ * @param interval
+ */
public void updateInterval(long interval) {
+ logger.debug(String.format("Updating tracking interval to %d milisecond for %s metric", interval, getName()));
timer.cancel();
timer = new Timer();
timer.schedule(getTimerTask(), interval, interval);
diff --git a/azkaban-execserver/src/main/java/azkaban/execapp/AzkabanExecutorServer.java b/azkaban-execserver/src/main/java/azkaban/execapp/AzkabanExecutorServer.java
index 151f0b7..aa35498 100644
--- a/azkaban-execserver/src/main/java/azkaban/execapp/AzkabanExecutorServer.java
+++ b/azkaban-execserver/src/main/java/azkaban/execapp/AzkabanExecutorServer.java
@@ -129,7 +129,6 @@ public class AzkabanExecutorServer {
/**
* Configure Metric Reporting as per azkaban.properties settings
- *
*/
private void configureMetricReports() {
Props props = getAzkabanProps();
@@ -139,11 +138,11 @@ public class AzkabanExecutorServer {
IMetricEmitter metricEmitter = new InMemoryMetricEmitter(props);
metricManager.addMetricEmitter(metricEmitter);
- // Adding number of Jobs
+ logger.info("Adding number of Jobs metric");
metricManager.addMetric(new NumRunningJobMetric(metricManager, props.getInt("executor.metric.interval."
+ NumRunningJobMetric.NUM_RUNNING_JOB_METRIC_NAME, props.getInt("executor.metric.interval.default"))));
- // Adding number of flows
+ logger.info("Adding number of flows metric");
metricManager.addMetric(new NumRunningFlowMetric(runnerManager, metricManager, props.getInt(
"executor.metric.interval." + NumRunningFlowMetric.NUM_RUNNING_FLOW_METRIC_NAME,
props.getInt("executor.metric.interval.default"))));
diff --git a/azkaban-execserver/src/main/java/azkaban/execapp/FlowRunner.java b/azkaban-execserver/src/main/java/azkaban/execapp/FlowRunner.java
index c051cfd..d3611b4 100644
--- a/azkaban-execserver/src/main/java/azkaban/execapp/FlowRunner.java
+++ b/azkaban-execserver/src/main/java/azkaban/execapp/FlowRunner.java
@@ -818,7 +818,12 @@ public class FlowRunner extends EventHandler implements Runnable {
return jobRunner;
}
+ /**
+ * Configure Azkaban metrics tracking for a new jobRunner instance
+ * @param jobRunner
+ */
private void configureJobLevelMetrics(JobRunner jobRunner) {
+ logger.info("Configuring Azkaban metrics tracking for jobrunner object");
if(MetricReportManager.isInstantiated()) {
MetricReportManager metricManager = MetricReportManager.getInstance();
NumRunningJobMetric metric = (NumRunningJobMetric) metricManager.getMetricFromName(NumRunningJobMetric.NUM_RUNNING_JOB_METRIC_NAME);
diff --git a/azkaban-execserver/src/main/java/azkaban/execapp/metric/NumRunningFlowMetric.java b/azkaban-execserver/src/main/java/azkaban/execapp/metric/NumRunningFlowMetric.java
index f0b45ed..2ac8654 100644
--- a/azkaban-execserver/src/main/java/azkaban/execapp/metric/NumRunningFlowMetric.java
+++ b/azkaban-execserver/src/main/java/azkaban/execapp/metric/NumRunningFlowMetric.java
@@ -20,19 +20,31 @@ import azkaban.execapp.FlowRunnerManager;
import azkaban.metric.MetricReportManager;
import azkaban.metric.TimeBasedReportingMetric;
-
+/**
+ * Metric to keep track of number of running flows in Azkaban exec server
+ */
public class NumRunningFlowMetric extends TimeBasedReportingMetric<Integer> {
public static final String NUM_RUNNING_FLOW_METRIC_NAME = "NumRunningFlowMetric";
private static final String NUM_RUNNING_FLOW_METRIC_TYPE = "uint16";
private FlowRunnerManager flowManager;
+ /**
+ * @param flowRunnerManager Flow runner manager
+ * @param manager metric report manager
+ * @param interval reporting interval
+ */
public NumRunningFlowMetric(FlowRunnerManager flowRunnerManager, MetricReportManager manager, long interval) {
super(NUM_RUNNING_FLOW_METRIC_NAME, NUM_RUNNING_FLOW_METRIC_TYPE, 0, manager, interval);
logger.debug("Instantiated NumRunningFlowMetric");
flowManager = flowRunnerManager;
}
+ /**
+ * Update value using flow manager
+ * {@inheritDoc}
+ * @see azkaban.metric.TimeBasedReportingMetric#finalizeValue()
+ */
@Override
protected synchronized void finalizeValue() {
value = flowManager.getNumRunningFlows();
diff --git a/azkaban-execserver/src/main/java/azkaban/execapp/metric/NumRunningJobMetric.java b/azkaban-execserver/src/main/java/azkaban/execapp/metric/NumRunningJobMetric.java
index 66a4e38..58146bc 100644
--- a/azkaban-execserver/src/main/java/azkaban/execapp/metric/NumRunningJobMetric.java
+++ b/azkaban-execserver/src/main/java/azkaban/execapp/metric/NumRunningJobMetric.java
@@ -22,16 +22,27 @@ import azkaban.event.EventListener;
import azkaban.metric.MetricReportManager;
import azkaban.metric.TimeBasedReportingMetric;
-
+/**
+ * Metric to keep track of number of running jobs in Azkaban exec server
+ */
public class NumRunningJobMetric extends TimeBasedReportingMetric<Integer> implements EventListener {
public static final String NUM_RUNNING_JOB_METRIC_NAME = "NumRunningJobMetric";
private static final String NUM_RUNNING_JOB_METRIC_TYPE = "uint16";
+ /**
+ * @param manager metric manager
+ * @param interval reporting interval
+ */
public NumRunningJobMetric(MetricReportManager manager, long interval) {
super(NUM_RUNNING_JOB_METRIC_NAME, NUM_RUNNING_JOB_METRIC_TYPE, 0, manager, interval);
logger.debug("Instantiated NumRunningJobMetric");
}
+ /**
+ * Listen for events to maintain correct value of number of running jobs
+ * {@inheritDoc}
+ * @see azkaban.event.EventListener#handleEvent(azkaban.event.Event)
+ */
@Override
public synchronized void handleEvent(Event event) {
if (event.getType() == Type.JOB_STARTED) {
diff --git a/azkaban-execserver/src/main/java/azkaban/execapp/StatsServlet.java b/azkaban-execserver/src/main/java/azkaban/execapp/StatsServlet.java
index d31605d..e204088 100644
--- a/azkaban-execserver/src/main/java/azkaban/execapp/StatsServlet.java
+++ b/azkaban-execserver/src/main/java/azkaban/execapp/StatsServlet.java
@@ -44,7 +44,10 @@ import azkaban.server.HttpRequestUtils;
import azkaban.server.ServerConstants;
import azkaban.utils.JSONUtils;
-
+/**
+ * Servlet to communicate with Azkaban exec server
+ * This servlet get requests from stats servlet in Azkaban Web server
+ */
public class StatsServlet extends HttpServlet implements ConnectorParams {
private static final long serialVersionUID = 2L;
private static final Logger logger = Logger.getLogger(StatsServlet.class);
@@ -71,6 +74,11 @@ public class StatsServlet extends HttpServlet implements ConnectorParams {
return HttpRequestUtils.getLongParam(request, name);
}
+ /**
+ * Handle all get request to Stats Servlet
+ * {@inheritDoc}
+ * @see javax.servlet.http.HttpServlet#doGet(javax.servlet.http.HttpServletRequest, javax.servlet.http.HttpServletResponse)
+ */
protected void doGet(HttpServletRequest req, HttpServletResponse resp) throws ServletException, IOException {
Map<String, Object> ret = new HashMap<String, Object>();
@@ -96,11 +104,16 @@ public class StatsServlet extends HttpServlet implements ConnectorParams {
JSONUtils.toJSON(ret, resp.getOutputStream(), true);
}
- private void handleChangeManagerStatusRequest(HttpServletRequest req, Map<String, Object> ret, boolean enable) {
+ /**
+ * enable or disable metric Manager
+ * A disable will also purge all data from all metric emitters
+ */
+ private void handleChangeManagerStatusRequest(HttpServletRequest req, Map<String, Object> ret, boolean enableMetricManager) {
try {
+ logger.info("Updating metric manager status");
if (MetricReportManager.isInstantiated()) {
MetricReportManager metricManager = MetricReportManager.getInstance();
- if (enable) {
+ if (enableMetricManager) {
metricManager.enableManager();
} else {
metricManager.disableManager();
@@ -113,6 +126,9 @@ public class StatsServlet extends HttpServlet implements ConnectorParams {
}
}
+ /**
+ * Update number of display snapshots for /stats graphs
+ */
private void handleChangeEmitterPoints(HttpServletRequest req, Map<String, Object> ret) {
try {
long numInstance = getLongParam(req, STATS_MAP_EMITTERNUMINSTANCES);
@@ -128,6 +144,9 @@ public class StatsServlet extends HttpServlet implements ConnectorParams {
}
}
+ /**
+ * Update InMemoryMetricEmitter interval to maintain metric snapshots
+ */
private void handleChangeCleaningInterval(HttpServletRequest req, Map<String, Object> ret) {
try {
long newInterval = getLongParam(req, STATS_MAP_CLEANINGINTERVAL);
@@ -143,6 +162,10 @@ public class StatsServlet extends HttpServlet implements ConnectorParams {
}
}
+ /**
+ * Get metric snapshots for a metric and date specification
+ * @throws ServletException
+ */
private void handleGetMetricHistory(HttpServletRequest req, Map<String, Object> ret) throws ServletException {
if (MetricReportManager.isInstantiated()) {
MetricReportManager metricManager = MetricReportManager.getInstance();
@@ -173,6 +196,9 @@ public class StatsServlet extends HttpServlet implements ConnectorParams {
}
}
+ /**
+ * Get InMemoryMetricEmitter, if available else null
+ */
private InMemoryMetricEmitter extractInMemoryMetricEmitter(MetricReportManager metricManager) {
InMemoryMetricEmitter memoryEmitter = null;
for (IMetricEmitter emitter : metricManager.getMetricEmitters()) {
@@ -184,6 +210,9 @@ public class StatsServlet extends HttpServlet implements ConnectorParams {
return memoryEmitter;
}
+ /**
+ * Get all the metrics tracked by metric manager
+ */
private void handleGetAllMMetricsName(HttpServletRequest req, Map<String, Object> ret) {
if (MetricReportManager.isInstantiated()) {
MetricReportManager metricManager = MetricReportManager.getInstance();
@@ -198,6 +227,10 @@ public class StatsServlet extends HttpServlet implements ConnectorParams {
}
}
+ /**
+ * Update tracking interval for a given metrics
+ * @throws ServletException
+ */
private void handleChangeMetricInterval(HttpServletRequest req, Map<String, Object> ret) throws ServletException {
try {
String metricName = getParam(req, STATS_MAP_METRICNAMEPARAM);
diff --git a/azkaban-webserver/src/main/java/azkaban/webapp/servlet/StatsServlet.java b/azkaban-webserver/src/main/java/azkaban/webapp/servlet/StatsServlet.java
index 3c54e93..4a99e9b 100644
--- a/azkaban-webserver/src/main/java/azkaban/webapp/servlet/StatsServlet.java
+++ b/azkaban-webserver/src/main/java/azkaban/webapp/servlet/StatsServlet.java
@@ -40,7 +40,9 @@ import azkaban.user.UserManager;
import azkaban.utils.Pair;
import azkaban.webapp.AzkabanWebServer;
-
+/**
+ * User facing servlet for Azkaban default metric display
+ */
public class StatsServlet extends LoginAbstractAzkabanServlet {
private static final long serialVersionUID = 1L;
private UserManager userManager;
@@ -86,6 +88,10 @@ public class StatsServlet extends LoginAbstractAzkabanServlet {
writeJSON(resp, ret);
}
+ /**
+ * Generic method to facilitate actionName action using Azkaban exec server
+ * @param actionName Name of the action
+ */
private void handleChangeConfigurationRequest(String actionName, HttpServletRequest req, HashMap<String, Object> ret)
throws ServletException, IOException {
Map<String, Object> result = execManager.callExecutorStats(actionName, getAllParams(req));
@@ -96,6 +102,10 @@ public class StatsServlet extends LoginAbstractAzkabanServlet {
}
}
+ /**
+ * Get metric snapshots for a metric and date specification
+ * @throws ServletException
+ */
private void handleGetMetricHistory(HttpServletRequest req, HashMap<String, Object> ret, User user)
throws IOException, ServletException {
Map<String, Object> result =
@@ -107,6 +117,9 @@ public class StatsServlet extends LoginAbstractAzkabanServlet {
}
}
+ /**
+ *
+ */
private void handleStatePageLoad(HttpServletRequest req, HttpServletResponse resp, Session session)
throws ServletException {
Page page = newPage(req, resp, session, "azkaban/webapp/servlet/velocity/statsPage.vm");
@@ -143,10 +156,13 @@ public class StatsServlet extends LoginAbstractAzkabanServlet {
return true;
}
}
-
return false;
}
+ /**
+ * Parse all Http request params
+ * @return
+ */
@SuppressWarnings({ "unchecked", "rawtypes" })
private Pair<String, String>[] getAllParams(HttpServletRequest req) {
List<Pair<String, String>> allParams = new LinkedList<Pair<String, String>>();