azkaban-memoizeit

Details

diff --git a/azkaban-common/src/main/java/azkaban/executor/ExecutorManager.java b/azkaban-common/src/main/java/azkaban/executor/ExecutorManager.java
index 1c6a4f9..5f7bcd5 100644
--- a/azkaban-common/src/main/java/azkaban/executor/ExecutorManager.java
+++ b/azkaban-common/src/main/java/azkaban/executor/ExecutorManager.java
@@ -717,6 +717,11 @@ public class ExecutorManager extends EventHandler implements
     return jsonResponse;
   }
 
+  /**
+   * Manage servlet call for stats servlet in Azkaban execution server
+   * {@inheritDoc}
+   * @see azkaban.executor.ExecutorManagerAdapter#callExecutorStats(java.lang.String, azkaban.utils.Pair[])
+   */
   @Override
   public Map<String, Object> callExecutorStats(String action, Pair<String, String>... params) throws IOException {
 
diff --git a/azkaban-common/src/main/java/azkaban/metric/AbstractMetric.java b/azkaban-common/src/main/java/azkaban/metric/AbstractMetric.java
index b9ec1e7..e36040e 100644
--- a/azkaban-common/src/main/java/azkaban/metric/AbstractMetric.java
+++ b/azkaban-common/src/main/java/azkaban/metric/AbstractMetric.java
@@ -18,7 +18,10 @@ package azkaban.metric;
 
 import org.apache.log4j.Logger;
 
-
+/**
+ * Abstract class for Metric
+ * @param <T> Type of Value of a given metric
+ */
 public abstract class AbstractMetric<T> implements IMetric<T> {
   protected static final Logger logger = Logger.getLogger(MetricReportManager.class);
   protected String name;
@@ -26,6 +29,12 @@ public abstract class AbstractMetric<T> implements IMetric<T> {
   protected String type;
   protected MetricReportManager metricManager;
 
+  /**
+   * @param metricName Name of metric
+   * @param metricType Metric type. For display purposes.
+   * @param initialValue Initial Value of a metric
+   * @param manager Metric Manager whom a metric will report to
+   */
   public AbstractMetric(String metricName, String metricType, T initialValue, MetricReportManager manager) {
     name = metricName;
     type = metricType;
@@ -33,22 +42,45 @@ public abstract class AbstractMetric<T> implements IMetric<T> {
     metricManager = manager;
   }
 
+  /**
+   * {@inheritDoc}
+   * @see azkaban.metric.IMetric#getName()
+   */
   public String getName() {
     return name;
   }
 
+  /**
+   * {@inheritDoc}
+   * @see azkaban.metric.IMetric#getValueType()
+   */
   public String getValueType() {
     return type;
   }
 
+  /**
+   * {@inheritDoc}
+   * @see azkaban.metric.IMetric#updateMetricManager(azkaban.metric.MetricReportManager)
+   */
   public void updateMetricManager(final MetricReportManager manager) {
     metricManager = manager;
   }
 
+  /**
+   * {@inheritDoc}
+   * @see azkaban.metric.IMetric#getValue()
+   */
   public T getValue() {
     return value;
   }
 
+  /**
+   * Method used to notify manager for a tracking event.
+   * Metric is free to call this method as per implementation.
+   * Timer based or Azkaban events can be the most common implementation
+   * {@inheritDoc}
+   * @see azkaban.metric.IMetric#notifyManager()
+   */
   public synchronized void notifyManager() {
     logger.debug(String.format("Notifying Manager for %s", this.getClass().getName()));
     try {
diff --git a/azkaban-common/src/main/java/azkaban/metric/GangliaMetricEmitter.java b/azkaban-common/src/main/java/azkaban/metric/GangliaMetricEmitter.java
index 5eb1890..76fb4d4 100644
--- a/azkaban-common/src/main/java/azkaban/metric/GangliaMetricEmitter.java
+++ b/azkaban-common/src/main/java/azkaban/metric/GangliaMetricEmitter.java
@@ -18,12 +18,17 @@ package azkaban.metric;
 
 import azkaban.utils.Props;
 
-
+/**
+ * MetricEmitter implementation to report metric to a ganglia gmetric process
+ */
 public class GangliaMetricEmitter implements IMetricEmitter {
   private static final String GANGLIA_METRIC_REPORTER_PATH = "azkaban.metric.ganglia.path";
 
   private String gmetricPath;
 
+  /**
+   * @param azkProps Azkaban Properties
+   */
   public GangliaMetricEmitter(Props azkProps) {
     gmetricPath = azkProps.get(GANGLIA_METRIC_REPORTER_PATH);
   }
@@ -32,6 +37,11 @@ public class GangliaMetricEmitter implements IMetricEmitter {
     return String.format("%s -t %s -n %s -v %s", gmetricPath, metric.getValueType(), metric.getName(), metric.getValue().toString());
   }
 
+  /**
+   * Report metric by executing command line interface of gmetrics
+   * {@inheritDoc}
+   * @see azkaban.metric.IMetricEmitter#reportMetric(azkaban.metric.IMetric)
+   */
   @Override
   public void reportMetric(final IMetric<?> metric) throws Exception {
     String gangliaCommand = buildCommand(metric);
diff --git a/azkaban-common/src/main/java/azkaban/metric/IMetric.java b/azkaban-common/src/main/java/azkaban/metric/IMetric.java
index ff6d4cd..ad1bfe1 100644
--- a/azkaban-common/src/main/java/azkaban/metric/IMetric.java
+++ b/azkaban-common/src/main/java/azkaban/metric/IMetric.java
@@ -16,6 +16,10 @@
 
 package azkaban.metric;
 
+/**
+ * Interface of any Metric
+ * @param <T> Type of Value of a given metric
+ */
 public interface IMetric<T> {
   String getName();
   String getValueType();
diff --git a/azkaban-common/src/main/java/azkaban/metric/IMetricEmitter.java b/azkaban-common/src/main/java/azkaban/metric/IMetricEmitter.java
index f485d23..2436625 100644
--- a/azkaban-common/src/main/java/azkaban/metric/IMetricEmitter.java
+++ b/azkaban-common/src/main/java/azkaban/metric/IMetricEmitter.java
@@ -16,6 +16,9 @@
 
 package azkaban.metric;
 
+/**
+ * Interface for metric emitters
+ */
 public interface IMetricEmitter {
   void reportMetric(final IMetric<?> metric) throws Exception;
   void purgeAllData() throws Exception;
diff --git a/azkaban-common/src/main/java/azkaban/metric/inmemoryemitter/InMemoryHistoryNode.java b/azkaban-common/src/main/java/azkaban/metric/inmemoryemitter/InMemoryHistoryNode.java
index 20e8d93..39d18a1 100644
--- a/azkaban-common/src/main/java/azkaban/metric/inmemoryemitter/InMemoryHistoryNode.java
+++ b/azkaban-common/src/main/java/azkaban/metric/inmemoryemitter/InMemoryHistoryNode.java
@@ -16,14 +16,19 @@
 
 package azkaban.metric.inmemoryemitter;
 
-import java.text.DateFormat;
-import java.text.SimpleDateFormat;
 import java.util.Date;
 
+/**
+ * A snapshot of metric's value
+ */
 public class InMemoryHistoryNode {
   private Object value;
   private Date date;
 
+  /**
+   * Takes snapshot of the metric with a given value
+   * @param val
+   */
   public InMemoryHistoryNode(Object val) {
     value = val;
     date = new Date();
@@ -36,9 +41,4 @@ public class InMemoryHistoryNode {
   public Date getTimestamp() {
     return date;
   }
-
-  public String formattedTimestamp() {
-    DateFormat dateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
-    return dateFormat.format(date);
-  }
 }
diff --git a/azkaban-common/src/main/java/azkaban/metric/inmemoryemitter/InMemoryHistoryStatistics.java b/azkaban-common/src/main/java/azkaban/metric/inmemoryemitter/InMemoryHistoryStatistics.java
index adc617e..72061b4 100644
--- a/azkaban-common/src/main/java/azkaban/metric/inmemoryemitter/InMemoryHistoryStatistics.java
+++ b/azkaban-common/src/main/java/azkaban/metric/inmemoryemitter/InMemoryHistoryStatistics.java
@@ -18,10 +18,15 @@ package azkaban.metric.inmemoryemitter;
 
 import java.util.List;
 
+/**
+ * Utility class for mathematical function of metric's history objects
+ */
 public final class InMemoryHistoryStatistics {
 
   /**
    * Returns the average
+   * @param data
+   * @return mean of data
    */
   public static double mean(List<InMemoryHistoryNode> data) {
     double total = 0.0;
@@ -33,13 +38,18 @@ public final class InMemoryHistoryStatistics {
 
   /**
    * Returns the sample standard deviation
+   * @param data
+   * @return standard deviation of data
    */
   public static double sdev(List<InMemoryHistoryNode> data) {
     return Math.sqrt(variance(data));
   }
 
+
   /**
    * Returns the sample variance
+   * @param data
+   * @return variance of data
    */
   public static double variance(List<InMemoryHistoryNode> data) {
     double mu = mean(data);
@@ -50,6 +60,11 @@ public final class InMemoryHistoryStatistics {
     return sumsq / data.size();
   }
 
+  /**
+   * Square of a number
+   * @param x
+   * @return x*x
+   */
   public static double sqr(double x) {
     return x * x;
   }
diff --git a/azkaban-common/src/main/java/azkaban/metric/inmemoryemitter/InMemoryMetricEmitter.java b/azkaban-common/src/main/java/azkaban/metric/inmemoryemitter/InMemoryMetricEmitter.java
index 5250e69..e2a91fe 100644
--- a/azkaban-common/src/main/java/azkaban/metric/inmemoryemitter/InMemoryMetricEmitter.java
+++ b/azkaban-common/src/main/java/azkaban/metric/inmemoryemitter/InMemoryMetricEmitter.java
@@ -24,50 +24,92 @@ import java.util.List;
 import java.util.Map;
 import java.util.concurrent.TimeUnit;
 
+import org.apache.log4j.Logger;
+
 import azkaban.metric.IMetric;
 import azkaban.metric.IMetricEmitter;
 import azkaban.utils.Props;
 
-
+/**
+ * Metric Emitter which maintains in memory snapshots of the metrics
+ * This is also the default metric emitter and used by /stats servlet
+ */
 public class InMemoryMetricEmitter implements IMetricEmitter {
+  protected static final Logger logger = Logger.getLogger(InMemoryMetricEmitter.class);
+
+  /**
+   * Data structure to keep track of snapshots
+   */
   Map<String, LinkedList<InMemoryHistoryNode>> historyListMapping;
   private static final String INMEMORY_METRIC_REPORTER_WINDOW = "azkaban.metric.inmemory.interval";
   private static final String INMEMORY_METRIC_NUM_INSTANCES = "azkaban.metric.inmemory.maxinstances";
 
+  /**
+   * Interval (in millisecond) from today for which we should maintain the in memory snapshots
+   */
   long interval;
+  /**
+   * Maximum number of snapshots that should be displayed on /stats servlet
+   */
   long numInstances;
 
+  /**
+   * @param azkProps Azkaban Properties
+   */
   public InMemoryMetricEmitter(Props azkProps) {
     historyListMapping = new HashMap<String, LinkedList<InMemoryHistoryNode>>();
     interval = azkProps.getLong(INMEMORY_METRIC_REPORTER_WINDOW, 60 * 60 * 24 * 7 * 1000);
     numInstances = azkProps.getLong(INMEMORY_METRIC_NUM_INSTANCES, 50);
   }
 
+  /**
+   * Update reporting interval
+   * @param val interval in milli seconds
+   */
   public synchronized void setReportingInterval(long val) {
     interval = val;
   }
 
+  /**
+   * Set number of /stats servlet display points
+   * @param num
+   */
   public void setReportingInstances(long num) {
     numInstances = num;
   }
 
+  /**
+   * Ingest metric in snapshot data structure while maintaining interval
+   * {@inheritDoc}
+   * @see azkaban.metric.IMetricEmitter#reportMetric(azkaban.metric.IMetric)
+   */
   @Override
   public void reportMetric(IMetric<?> metric) throws Exception {
     String metricName = metric.getName();
     if (!historyListMapping.containsKey(metricName)) {
+      logger.info("First time capturing metric: " + metricName);
       historyListMapping.put(metricName, new LinkedList<InMemoryHistoryNode>());
     }
     synchronized (historyListMapping.get(metricName)) {
+      logger.debug("Ingesting metric: " + metricName);
       historyListMapping.get(metricName).add(new InMemoryHistoryNode(metric.getValue()));
       cleanUsingTime(metricName, historyListMapping.get(metricName).peekLast().getTimestamp());
     }
   }
 
+  /**
+   * Get snapshots for a given metric at a given time
+   * @param metricName name of the metric
+   * @param from Start date
+   * @param to end date
+   * @param useStats get statistically significant points only
+   * @return List of snapshots
+   */
   public List<InMemoryHistoryNode> getDrawMetric(String metricName, Date from, Date to, Boolean useStats) {
     LinkedList<InMemoryHistoryNode> selectedLists = new LinkedList<InMemoryHistoryNode>();
     if (historyListMapping.containsKey(metricName)) {
 
-      // selecting nodes within time frame
+      logger.debug("selecting snapshots within time frame");
       synchronized (historyListMapping.get(metricName)) {
         for (InMemoryHistoryNode node : historyListMapping.get(metricName)) {
           if (node.getTimestamp().after(from) && node.getTimestamp().before(to)) {
@@ -90,8 +132,13 @@ public class InMemoryMetricEmitter implements IMetricEmitter {
     return selectedLists;
   }
 
+  /**
+   * filter snapshots using statistically significant points only
+   * @param selectedLists list of snapshots
+   */
   private void statBasedSelectMetricHistory(LinkedList<InMemoryHistoryNode> selectedLists) {
-      Iterator<InMemoryHistoryNode> ite = selectedLists.iterator();
+    logger.debug("selecting snapshots which are far away from mean value");
+    Iterator<InMemoryHistoryNode> ite = selectedLists.iterator();
       Double mean = InMemoryHistoryStatistics.mean(selectedLists);
       Double std = InMemoryHistoryStatistics.sdev(selectedLists);
 
@@ -105,7 +152,12 @@ public class InMemoryMetricEmitter implements IMetricEmitter {
       }
   }
 
+  /**
+   * filter snapshots by evenly selecting points across the interval
+   * @param selectedLists list of snapshots
+   */
   private void generalSelectMetricHistory(LinkedList<InMemoryHistoryNode> selectedLists) {
+    logger.debug("selecting snapshots evenly from across the time interval");
     if (selectedLists.size() > numInstances) {
       double step = (double) selectedLists.size() / numInstances;
       long nextIndex = 0, currentIndex = 0, numSelectedInstances = 1;
@@ -123,6 +175,11 @@ public class InMemoryMetricEmitter implements IMetricEmitter {
     }
   }
 
+  /**
+   * Remove snapshots to maintain reporting interval
+   * @param metricName Name of the metric
+   * @param firstAllowedDate End date of the interval
+   */
   private void cleanUsingTime(String metricName, Date firstAllowedDate) {
     if (historyListMapping.containsKey(metricName) && historyListMapping.get(metricName) != null) {
       synchronized (historyListMapping.get(metricName)) {
@@ -146,6 +203,11 @@ public class InMemoryMetricEmitter implements IMetricEmitter {
     }
   }
 
+  /**
+   * Clear snapshot data structure
+   * {@inheritDoc}
+   * @see azkaban.metric.IMetricEmitter#purgeAllData()
+   */
   @Override
   public void purgeAllData() throws Exception {
     historyListMapping.clear();
diff --git a/azkaban-common/src/main/java/azkaban/metric/MetricReportManager.java b/azkaban-common/src/main/java/azkaban/metric/MetricReportManager.java
index 3444e7b..2a49878 100644
--- a/azkaban-common/src/main/java/azkaban/metric/MetricReportManager.java
+++ b/azkaban-common/src/main/java/azkaban/metric/MetricReportManager.java
@@ -24,29 +24,47 @@ import java.util.concurrent.Executors;
 
 import org.apache.log4j.Logger;
 
-
+/**
+ * Manager for access or updating metric related functionality of Azkaban
+ */
 public class MetricReportManager {
+  /**
+   * Maximum number of metrics reporting threads
+   */
   private static final int MAX_EMITTER_THREADS = 4;
   private static final Logger logger = Logger.getLogger(MetricReportManager.class);
 
+  /**
+   * List of all the metrics that Azkaban is tracking
+   */
   private List<IMetric<?>> metrics;
+  /**
+   * List of all the emitter listening all the metrics
+   */
   private List<IMetricEmitter> metricEmitters;
   private ExecutorService executorService;
+  // Singleton variable
   private static volatile MetricReportManager instance = null;
   boolean isManagerEnabled;
 
-  // For singleton
   private MetricReportManager() {
+    logger.debug("Instantiating Metric Manager");
     executorService = Executors.newFixedThreadPool(MAX_EMITTER_THREADS);
     metrics = new ArrayList<IMetric<?>>();
     metricEmitters = new LinkedList<IMetricEmitter>();
     enableManager();
   }
 
+  /**
+   * @return true, if we have enabled metric manager from Azkaban exec server
+   */
   public static boolean isInstantiated() {
     return instance != null;
   }
 
+  /**
+   * Get a singleton object for Metric Manager
+   */
   public static MetricReportManager getInstance() {
     if (instance == null) {
       synchronized (MetricReportManager.class) {
@@ -82,27 +100,50 @@ public class MetricReportManager {
     }
   }
 
+  /**
+   * Add a metric emitter to report metric
+   * @param emitter
+   */
   public void addMetricEmitter(final IMetricEmitter emitter) {
     metricEmitters.add(emitter);
   }
 
+  /**
+   * remove a metric emitter
+   * @param emitter
+   */
   public void removeMetricEmitter(final IMetricEmitter emitter) {
     metricEmitters.remove(emitter);
   }
 
+  /**
+   * Get all the metric emitters
+   * @return
+   */
   public List<IMetricEmitter> getMetricEmitters() {
     return metricEmitters;
   }
 
+  /**
+   * Add a metric to be managed by Metric Manager
+   * @param metric
+   */
   public void addMetric(final IMetric<?> metric) {
     // metric null or already present
     if (metric != null && getMetricFromName(metric.getName()) == null) {
       logger.debug(String.format("Adding %s metric in Metric Manager", metric.getName()));
       metrics.add(metric);
       metric.updateMetricManager(this);
+    } else {
+      logger.error("Failed to add metric");
     }
   }
 
+  /**
+   * Get metric object for a given metric name
+   * @param name metricName
+   * @return metric Object, if found. Otherwise null.
+   */
   public IMetric<?> getMetricFromName(final String name) {
     IMetric<?> metric = null;
     if (name != null) {
@@ -116,15 +157,24 @@ public class MetricReportManager {
     return metric;
   }
 
+  /**
+   * Get all the emitters
+   * @return
+   */
   public List<IMetric<?>> getAllMetrics() {
     return metrics;
   }
 
   public void enableManager() {
+    logger.info("Enabling Metric Manager");
     isManagerEnabled = true;
   }
 
+  /**
+   * Disable Metric Manager and ask all emitters to purge all available data.
+   */
   public void disableManager() {
+    logger.info("Disabling Metric Manager");
     if(isManagerEnabled) {
       isManagerEnabled = false;
       for(IMetricEmitter emitter: metricEmitters) {
@@ -137,6 +187,11 @@ public class MetricReportManager {
     }
   }
 
+  /**
+   * Shutdown execution service
+   * {@inheritDoc}
+   * @see java.lang.Object#finalize()
+   */
   protected void finalize() {
     executorService.shutdown();
   }
diff --git a/azkaban-common/src/main/java/azkaban/metric/TimeBasedReportingMetric.java b/azkaban-common/src/main/java/azkaban/metric/TimeBasedReportingMetric.java
index b8c8a98..bebb36d 100644
--- a/azkaban-common/src/main/java/azkaban/metric/TimeBasedReportingMetric.java
+++ b/azkaban-common/src/main/java/azkaban/metric/TimeBasedReportingMetric.java
@@ -19,10 +19,20 @@ package azkaban.metric;
 import java.util.Timer;
 import java.util.TimerTask;
 
-
+/**
+ * Metrics tracked after every interval using timer
+ * @param <T> Type of Value of a given metric
+ */
 public abstract class TimeBasedReportingMetric<T> extends AbstractMetric<T> {
   private Timer timer;
 
+  /**
+   * @param metricName Name of metric
+   * @param metricType Metric type. For display purposes.
+   * @param initialValue Initial Value of a metric
+   * @param manager Metric Manager whom a metric will report to
+   * @param interval Time interval for metric tracking
+   */
   public TimeBasedReportingMetric(String metricName, String metricType, T initialValue, MetricReportManager manager,
       long interval) {
     super(metricName, metricType, initialValue, manager);
@@ -30,6 +40,10 @@ public abstract class TimeBasedReportingMetric<T> extends AbstractMetric<T> {
     timer.schedule(getTimerTask(), interval, interval);
   }
 
+  /**
+   * Get a TimerTask
+   * @return An anonymous TimerTask class
+   */
   private TimerTask getTimerTask() {
     TimerTask recurringReporting = new TimerTask() {
       @Override
@@ -41,7 +55,12 @@ public abstract class TimeBasedReportingMetric<T> extends AbstractMetric<T> {
     return recurringReporting;
   }
 
+  /**
+   * Method to change tracking interval
+   * @param interval
+   */
   public void updateInterval(long interval) {
+    logger.debug(String.format("Updating tracking interval to %d milisecond for %s metric", interval, getName()));
     timer.cancel();
     timer = new Timer();
     timer.schedule(getTimerTask(), interval, interval);
diff --git a/azkaban-execserver/src/main/java/azkaban/execapp/AzkabanExecutorServer.java b/azkaban-execserver/src/main/java/azkaban/execapp/AzkabanExecutorServer.java
index 151f0b7..aa35498 100644
--- a/azkaban-execserver/src/main/java/azkaban/execapp/AzkabanExecutorServer.java
+++ b/azkaban-execserver/src/main/java/azkaban/execapp/AzkabanExecutorServer.java
@@ -129,7 +129,6 @@ public class AzkabanExecutorServer {
 
   /**
    * Configure Metric Reporting as per azkaban.properties settings
-   *
    */
   private void configureMetricReports() {
     Props props = getAzkabanProps();
@@ -139,11 +138,11 @@ public class AzkabanExecutorServer {
       IMetricEmitter metricEmitter = new InMemoryMetricEmitter(props);
       metricManager.addMetricEmitter(metricEmitter);
 
-      // Adding number of Jobs
+      logger.info("Adding number of Jobs metric");
       metricManager.addMetric(new NumRunningJobMetric(metricManager, props.getInt("executor.metric.interval."
           + NumRunningJobMetric.NUM_RUNNING_JOB_METRIC_NAME, props.getInt("executor.metric.interval.default"))));
 
-      // Adding number of flows
+      logger.info("Adding number of flows metric");
       metricManager.addMetric(new NumRunningFlowMetric(runnerManager, metricManager, props.getInt(
           "executor.metric.interval." + NumRunningFlowMetric.NUM_RUNNING_FLOW_METRIC_NAME,
           props.getInt("executor.metric.interval.default"))));
diff --git a/azkaban-execserver/src/main/java/azkaban/execapp/FlowRunner.java b/azkaban-execserver/src/main/java/azkaban/execapp/FlowRunner.java
index c051cfd..d3611b4 100644
--- a/azkaban-execserver/src/main/java/azkaban/execapp/FlowRunner.java
+++ b/azkaban-execserver/src/main/java/azkaban/execapp/FlowRunner.java
@@ -818,7 +818,12 @@ public class FlowRunner extends EventHandler implements Runnable {
     return jobRunner;
   }
 
+  /**
+   * Configure Azkaban metrics tracking for a new jobRunner instance
+   * @param jobRunner
+   */
   private void configureJobLevelMetrics(JobRunner jobRunner) {
+    logger.info("Configuring Azkaban metrics tracking for jobrunner object");
     if(MetricReportManager.isInstantiated()) {
       MetricReportManager metricManager = MetricReportManager.getInstance();
       NumRunningJobMetric metric = (NumRunningJobMetric) metricManager.getMetricFromName(NumRunningJobMetric.NUM_RUNNING_JOB_METRIC_NAME);
diff --git a/azkaban-execserver/src/main/java/azkaban/execapp/metric/NumRunningFlowMetric.java b/azkaban-execserver/src/main/java/azkaban/execapp/metric/NumRunningFlowMetric.java
index f0b45ed..2ac8654 100644
--- a/azkaban-execserver/src/main/java/azkaban/execapp/metric/NumRunningFlowMetric.java
+++ b/azkaban-execserver/src/main/java/azkaban/execapp/metric/NumRunningFlowMetric.java
@@ -20,19 +20,31 @@ import azkaban.execapp.FlowRunnerManager;
 import azkaban.metric.MetricReportManager;
 import azkaban.metric.TimeBasedReportingMetric;
 
-
+/**
+ * Metric to keep track of number of running flows in Azkaban exec server
+ */
 public class NumRunningFlowMetric extends TimeBasedReportingMetric<Integer> {
   public static final String NUM_RUNNING_FLOW_METRIC_NAME = "NumRunningFlowMetric";
   private static final String NUM_RUNNING_FLOW_METRIC_TYPE = "uint16";
 
   private FlowRunnerManager flowManager;
 
+  /**
+   * @param flowRunnerManager Flow runner manager
+   * @param manager metric report manager
+   * @param interval reporting interval
+   */
   public NumRunningFlowMetric(FlowRunnerManager flowRunnerManager, MetricReportManager manager, long interval) {
     super(NUM_RUNNING_FLOW_METRIC_NAME, NUM_RUNNING_FLOW_METRIC_TYPE, 0, manager, interval);
     logger.debug("Instantiated NumRunningFlowMetric");
     flowManager = flowRunnerManager;
   }
 
+  /**
+   * Update value using flow manager
+   * {@inheritDoc}
+   * @see azkaban.metric.TimeBasedReportingMetric#finalizeValue()
+   */
   @Override
   protected synchronized void finalizeValue() {
     value = flowManager.getNumRunningFlows();
diff --git a/azkaban-execserver/src/main/java/azkaban/execapp/metric/NumRunningJobMetric.java b/azkaban-execserver/src/main/java/azkaban/execapp/metric/NumRunningJobMetric.java
index 66a4e38..58146bc 100644
--- a/azkaban-execserver/src/main/java/azkaban/execapp/metric/NumRunningJobMetric.java
+++ b/azkaban-execserver/src/main/java/azkaban/execapp/metric/NumRunningJobMetric.java
@@ -22,16 +22,27 @@ import azkaban.event.EventListener;
 import azkaban.metric.MetricReportManager;
 import azkaban.metric.TimeBasedReportingMetric;
 
-
+/**
+ * Metric to keep track of number of running jobs in Azkaban exec server
+ */
 public class NumRunningJobMetric extends TimeBasedReportingMetric<Integer> implements EventListener {
   public static final String NUM_RUNNING_JOB_METRIC_NAME = "NumRunningJobMetric";
   private static final String NUM_RUNNING_JOB_METRIC_TYPE = "uint16";
 
+  /**
+   * @param manager metric manager
+   * @param interval reporting interval
+   */
   public NumRunningJobMetric(MetricReportManager manager, long interval) {
     super(NUM_RUNNING_JOB_METRIC_NAME, NUM_RUNNING_JOB_METRIC_TYPE, 0, manager, interval);
     logger.debug("Instantiated NumRunningJobMetric");
   }
 
+  /**
+   * Listen for events to maintain correct value of number of running jobs
+   * {@inheritDoc}
+   * @see azkaban.event.EventListener#handleEvent(azkaban.event.Event)
+   */
   @Override
   public synchronized void handleEvent(Event event) {
     if (event.getType() == Type.JOB_STARTED) {
diff --git a/azkaban-execserver/src/main/java/azkaban/execapp/StatsServlet.java b/azkaban-execserver/src/main/java/azkaban/execapp/StatsServlet.java
index d31605d..e204088 100644
--- a/azkaban-execserver/src/main/java/azkaban/execapp/StatsServlet.java
+++ b/azkaban-execserver/src/main/java/azkaban/execapp/StatsServlet.java
@@ -44,7 +44,10 @@ import azkaban.server.HttpRequestUtils;
 import azkaban.server.ServerConstants;
 import azkaban.utils.JSONUtils;
 
-
+/**
+ * Servlet to communicate with Azkaban exec server
+ * This servlet get requests from stats servlet in Azkaban Web server
+ */
 public class StatsServlet extends HttpServlet implements ConnectorParams {
   private static final long serialVersionUID = 2L;
   private static final Logger logger = Logger.getLogger(StatsServlet.class);
@@ -71,6 +74,11 @@ public class StatsServlet extends HttpServlet implements ConnectorParams {
     return HttpRequestUtils.getLongParam(request, name);
   }
 
+  /**
+   * Handle all get request to Stats Servlet
+   * {@inheritDoc}
+   * @see javax.servlet.http.HttpServlet#doGet(javax.servlet.http.HttpServletRequest, javax.servlet.http.HttpServletResponse)
+   */
   protected void doGet(HttpServletRequest req, HttpServletResponse resp) throws ServletException, IOException {
     Map<String, Object> ret = new HashMap<String, Object>();
 
@@ -96,11 +104,16 @@ public class StatsServlet extends HttpServlet implements ConnectorParams {
     JSONUtils.toJSON(ret, resp.getOutputStream(), true);
   }
 
-  private void handleChangeManagerStatusRequest(HttpServletRequest req, Map<String, Object> ret, boolean enable) {
+  /**
+   * enable or disable metric Manager
+   * A disable will also purge all data from all metric emitters
+   */
+  private void handleChangeManagerStatusRequest(HttpServletRequest req, Map<String, Object> ret, boolean enableMetricManager) {
     try {
+      logger.info("Updating metric manager status");
       if (MetricReportManager.isInstantiated()) {
         MetricReportManager metricManager = MetricReportManager.getInstance();
-        if (enable) {
+        if (enableMetricManager) {
           metricManager.enableManager();
         } else {
           metricManager.disableManager();
@@ -113,6 +126,9 @@ public class StatsServlet extends HttpServlet implements ConnectorParams {
     }
   }
 
+  /**
+   * Update number of display snapshots for /stats graphs
+   */
   private void handleChangeEmitterPoints(HttpServletRequest req, Map<String, Object> ret) {
     try {
       long numInstance = getLongParam(req, STATS_MAP_EMITTERNUMINSTANCES);
@@ -128,6 +144,9 @@ public class StatsServlet extends HttpServlet implements ConnectorParams {
     }
   }
 
+  /**
+   * Update InMemoryMetricEmitter interval to maintain metric snapshots
+   */
   private void handleChangeCleaningInterval(HttpServletRequest req, Map<String, Object> ret) {
     try {
       long newInterval = getLongParam(req, STATS_MAP_CLEANINGINTERVAL);
@@ -143,6 +162,10 @@ public class StatsServlet extends HttpServlet implements ConnectorParams {
     }
   }
 
+  /**
+   * Get metric snapshots for a metric and date specification
+   * @throws ServletException
+   */
   private void handleGetMetricHistory(HttpServletRequest req, Map<String, Object> ret) throws ServletException {
     if (MetricReportManager.isInstantiated()) {
       MetricReportManager metricManager = MetricReportManager.getInstance();
@@ -173,6 +196,9 @@ public class StatsServlet extends HttpServlet implements ConnectorParams {
     }
   }
 
+  /**
+   * Get InMemoryMetricEmitter, if available else null
+   */
   private InMemoryMetricEmitter extractInMemoryMetricEmitter(MetricReportManager metricManager) {
     InMemoryMetricEmitter memoryEmitter = null;
     for (IMetricEmitter emitter : metricManager.getMetricEmitters()) {
@@ -184,6 +210,9 @@ public class StatsServlet extends HttpServlet implements ConnectorParams {
     return memoryEmitter;
   }
 
+  /**
+   * Get all the metrics tracked by metric manager
+   */
   private void handleGetAllMMetricsName(HttpServletRequest req, Map<String, Object> ret) {
     if (MetricReportManager.isInstantiated()) {
       MetricReportManager metricManager = MetricReportManager.getInstance();
@@ -198,6 +227,10 @@ public class StatsServlet extends HttpServlet implements ConnectorParams {
     }
   }
 
+  /**
+   * Update tracking interval for a given metrics
+   * @throws ServletException
+   */
   private void handleChangeMetricInterval(HttpServletRequest req, Map<String, Object> ret) throws ServletException {
     try {
       String metricName = getParam(req, STATS_MAP_METRICNAMEPARAM);
diff --git a/azkaban-webserver/src/main/java/azkaban/webapp/servlet/StatsServlet.java b/azkaban-webserver/src/main/java/azkaban/webapp/servlet/StatsServlet.java
index 3c54e93..4a99e9b 100644
--- a/azkaban-webserver/src/main/java/azkaban/webapp/servlet/StatsServlet.java
+++ b/azkaban-webserver/src/main/java/azkaban/webapp/servlet/StatsServlet.java
@@ -40,7 +40,9 @@ import azkaban.user.UserManager;
 import azkaban.utils.Pair;
 import azkaban.webapp.AzkabanWebServer;
 
-
+/**
+ * User facing servlet for Azkaban default metric display
+ */
 public class StatsServlet extends LoginAbstractAzkabanServlet {
   private static final long serialVersionUID = 1L;
   private UserManager userManager;
@@ -86,6 +88,10 @@ public class StatsServlet extends LoginAbstractAzkabanServlet {
     writeJSON(resp, ret);
   }
 
+  /**
+   * Generic method to facilitate actionName action using Azkaban exec server
+   * @param actionName  Name of the action
+   */
   private void handleChangeConfigurationRequest(String actionName, HttpServletRequest req, HashMap<String, Object> ret)
       throws ServletException, IOException {
     Map<String, Object> result = execManager.callExecutorStats(actionName, getAllParams(req));
@@ -96,6 +102,10 @@ public class StatsServlet extends LoginAbstractAzkabanServlet {
     }
   }
 
+  /**
+   * Get metric snapshots for a metric and date specification
+   * @throws ServletException
+   */
   private void handleGetMetricHistory(HttpServletRequest req, HashMap<String, Object> ret, User user)
       throws IOException, ServletException {
     Map<String, Object> result =
@@ -107,6 +117,9 @@ public class StatsServlet extends LoginAbstractAzkabanServlet {
     }
   }
 
+  /**
+   *
+   */
   private void handleStatePageLoad(HttpServletRequest req, HttpServletResponse resp, Session session)
       throws ServletException {
     Page page = newPage(req, resp, session, "azkaban/webapp/servlet/velocity/statsPage.vm");
@@ -143,10 +156,13 @@ public class StatsServlet extends LoginAbstractAzkabanServlet {
         return true;
       }
     }
-
     return false;
   }
 
+  /**
+   * Parse all Http request params
+   * @return
+   */
   @SuppressWarnings({ "unchecked", "rawtypes" })
   private Pair<String, String>[] getAllParams(HttpServletRequest req) {
     List<Pair<String, String>> allParams = new LinkedList<Pair<String, String>>();