azkaban-memoizeit

Details

diff --git a/azkaban-common/src/main/java/azkaban/metric/TimeBasedReportingMetric.java b/azkaban-common/src/main/java/azkaban/metric/TimeBasedReportingMetric.java
index 904586c..cf1211d 100644
--- a/azkaban-common/src/main/java/azkaban/metric/TimeBasedReportingMetric.java
+++ b/azkaban-common/src/main/java/azkaban/metric/TimeBasedReportingMetric.java
@@ -25,6 +25,8 @@ import java.util.TimerTask;
  */
 public abstract class TimeBasedReportingMetric<T> extends AbstractMetric<T> {
   private Timer timer;
+  protected long MAX_MILISEC_INTERVAL = 60 * 60 * 1000;
+  protected long MIN_MILISEC_INTERVAL = 3 * 1000;
 
   /**
    * @param metricName Name of metric
@@ -32,10 +34,14 @@ public abstract class TimeBasedReportingMetric<T> extends AbstractMetric<T> {
    * @param initialValue Initial Value of a metric
    * @param manager Metric Manager whom a metric will report to
    * @param interval Time interval for metric tracking
+   * @throws MetricException
    */
   public TimeBasedReportingMetric(String metricName, String metricType, T initialValue, MetricReportManager manager,
-      long interval) {
+      long interval) throws MetricException {
     super(metricName, metricType, initialValue, manager);
+    if(validateInterval(interval)) {
+      throw new MetricException("Invalid interval: Cannot instantiate timer");
+    }
     timer = new Timer();
     timer.schedule(getTimerTask(), interval, interval);
   }
@@ -62,14 +68,22 @@ public abstract class TimeBasedReportingMetric<T> extends AbstractMetric<T> {
   /**
    * Method to change tracking interval
    * @param interval
+   * @throws MetricException
    */
-  public void updateInterval(final long interval) {
+  public void updateInterval(final long interval) throws MetricException {
+    if(validateInterval(interval)) {
+      throw new MetricException("Invalid interval: Cannot update timer");
+    }
     logger.debug(String.format("Updating tracking interval to %d milisecond for %s metric", interval, getName()));
     timer.cancel();
     timer = new Timer();
     timer.schedule(getTimerTask(), interval, interval);
   }
 
+  private boolean validateInterval(final long interval) {
+    return interval >= MIN_MILISEC_INTERVAL && interval <= MAX_MILISEC_INTERVAL;
+  }
+
   /**
    * This method is responsible for making any last minute update to value, if any
    */
diff --git a/azkaban-execserver/src/main/java/azkaban/execapp/AzkabanExecutorServer.java b/azkaban-execserver/src/main/java/azkaban/execapp/AzkabanExecutorServer.java
index 367da00..b54b948 100644
--- a/azkaban-execserver/src/main/java/azkaban/execapp/AzkabanExecutorServer.java
+++ b/azkaban-execserver/src/main/java/azkaban/execapp/AzkabanExecutorServer.java
@@ -46,6 +46,7 @@ import azkaban.execapp.metric.NumRunningFlowMetric;
 import azkaban.execapp.metric.NumRunningJobMetric;
 import azkaban.jmx.JmxJettyServer;
 import azkaban.metric.IMetricEmitter;
+import azkaban.metric.MetricException;
 import azkaban.metric.MetricReportManager;
 import azkaban.metric.inmemoryemitter.InMemoryMetricEmitter;
 import azkaban.project.JdbcProjectLoader;
@@ -132,8 +133,9 @@ public class AzkabanExecutorServer {
 
   /**
    * Configure Metric Reporting as per azkaban.properties settings
+   * @throws MetricException
    */
-  private void configureMetricReports() {
+  private void configureMetricReports() throws MetricException {
     Props props = getAzkabanProps();
     if (props != null && props.getBoolean("executor.metric.reports", false)) {
       logger.info("Starting to configure Metric Reports");
@@ -142,26 +144,26 @@ public class AzkabanExecutorServer {
       metricManager.addMetricEmitter(metricEmitter);
 
       logger.info("Adding number of failed flow metric");
-      metricManager.addMetric(new NumFailedFlowMetric(metricManager, props.getInt("executor.metric.interval."
-          + NumFailedFlowMetric.NUM_FAILED_FLOW_METRIC_NAME, props.getInt("executor.metric.interval.default"))));
+      metricManager.addMetric(new NumFailedFlowMetric(metricManager, props.getInt("executor.metric.milisecinterval."
+          + NumFailedFlowMetric.NUM_FAILED_FLOW_METRIC_NAME, props.getInt("executor.metric.milisecinterval.default"))));
 
       logger.info("Adding number of failed jobs metric");
-      metricManager.addMetric(new NumFailedJobMetric(metricManager, props.getInt("executor.metric.interval."
-          + NumFailedJobMetric.NUM_FAILED_JOB_METRIC_NAME, props.getInt("executor.metric.interval.default"))));
+      metricManager.addMetric(new NumFailedJobMetric(metricManager, props.getInt("executor.metric.milisecinterval."
+          + NumFailedJobMetric.NUM_FAILED_JOB_METRIC_NAME, props.getInt("executor.metric.milisecinterval.default"))));
 
       logger.info("Adding number of running Jobs metric");
-      metricManager.addMetric(new NumRunningJobMetric(metricManager, props.getInt("executor.metric.interval."
-          + NumRunningJobMetric.NUM_RUNNING_JOB_METRIC_NAME, props.getInt("executor.metric.interval.default"))));
+      metricManager.addMetric(new NumRunningJobMetric(metricManager, props.getInt("executor.metric.milisecinterval."
+          + NumRunningJobMetric.NUM_RUNNING_JOB_METRIC_NAME, props.getInt("executor.metric.milisecinterval.default"))));
 
       logger.info("Adding number of running flows metric");
       metricManager.addMetric(new NumRunningFlowMetric(runnerManager, metricManager, props.getInt(
-          "executor.metric.interval." + NumRunningFlowMetric.NUM_RUNNING_FLOW_METRIC_NAME,
-          props.getInt("executor.metric.interval.default"))));
+          "executor.metric.milisecinterval." + NumRunningFlowMetric.NUM_RUNNING_FLOW_METRIC_NAME,
+          props.getInt("executor.metric.milisecinterval.default"))));
 
       logger.info("Adding number of queued flows metric");
       metricManager.addMetric(new NumQueuedFlowMetric(runnerManager, metricManager, props.getInt(
-          "executor.metric.interval." + NumQueuedFlowMetric.NUM_QUEUED_FLOW_METRIC_NAME,
-          props.getInt("executor.metric.interval.default"))));
+          "executor.metric.milisecinterval." + NumQueuedFlowMetric.NUM_QUEUED_FLOW_METRIC_NAME,
+          props.getInt("executor.metric.milisecinterval.default"))));
 
       logger.info("Completed configuring Metric Reports");
     }
diff --git a/azkaban-execserver/src/main/java/azkaban/execapp/metric/NumFailedFlowMetric.java b/azkaban-execserver/src/main/java/azkaban/execapp/metric/NumFailedFlowMetric.java
index f0e9b6b..b77c74f 100644
--- a/azkaban-execserver/src/main/java/azkaban/execapp/metric/NumFailedFlowMetric.java
+++ b/azkaban-execserver/src/main/java/azkaban/execapp/metric/NumFailedFlowMetric.java
@@ -21,6 +21,7 @@ import azkaban.event.Event.Type;
 import azkaban.event.EventListener;
 import azkaban.execapp.FlowRunner;
 import azkaban.executor.Status;
+import azkaban.metric.MetricException;
 import azkaban.metric.MetricReportManager;
 import azkaban.metric.TimeBasedReportingMetric;
 
@@ -31,7 +32,7 @@ public class NumFailedFlowMetric extends TimeBasedReportingMetric<Integer> imple
   public static final String NUM_FAILED_FLOW_METRIC_NAME = "NumFailedFlowMetric";
   private static final String NUM_FAILED_FLOW_METRIC_TYPE = "uint16";
 
-  public NumFailedFlowMetric(MetricReportManager manager, long interval) {
+  public NumFailedFlowMetric(MetricReportManager manager, long interval) throws MetricException {
     super(NUM_FAILED_FLOW_METRIC_NAME, NUM_FAILED_FLOW_METRIC_TYPE, 0, manager, interval);
     logger.debug("Instantiated NumFailedJobMetric");
   }
diff --git a/azkaban-execserver/src/main/java/azkaban/execapp/metric/NumFailedJobMetric.java b/azkaban-execserver/src/main/java/azkaban/execapp/metric/NumFailedJobMetric.java
index 6e16899..bb463d4 100644
--- a/azkaban-execserver/src/main/java/azkaban/execapp/metric/NumFailedJobMetric.java
+++ b/azkaban-execserver/src/main/java/azkaban/execapp/metric/NumFailedJobMetric.java
@@ -21,6 +21,7 @@ import azkaban.event.Event.Type;
 import azkaban.event.EventListener;
 import azkaban.execapp.JobRunner;
 import azkaban.executor.Status;
+import azkaban.metric.MetricException;
 import azkaban.metric.MetricReportManager;
 import azkaban.metric.TimeBasedReportingMetric;
 
@@ -31,7 +32,7 @@ public class NumFailedJobMetric extends TimeBasedReportingMetric<Integer> implem
   public static final String NUM_FAILED_JOB_METRIC_NAME = "NumFailedJobMetric";
   private static final String NUM_FAILED_JOB_METRIC_TYPE = "uint16";
 
-  public NumFailedJobMetric(MetricReportManager manager, long interval) {
+  public NumFailedJobMetric(MetricReportManager manager, long interval) throws MetricException {
     super(NUM_FAILED_JOB_METRIC_NAME, NUM_FAILED_JOB_METRIC_TYPE, 0, manager, interval);
     logger.debug("Instantiated NumFailedJobMetric");
   }
diff --git a/azkaban-execserver/src/main/java/azkaban/execapp/metric/NumQueuedFlowMetric.java b/azkaban-execserver/src/main/java/azkaban/execapp/metric/NumQueuedFlowMetric.java
index 192a4ce..c44917a 100644
--- a/azkaban-execserver/src/main/java/azkaban/execapp/metric/NumQueuedFlowMetric.java
+++ b/azkaban-execserver/src/main/java/azkaban/execapp/metric/NumQueuedFlowMetric.java
@@ -17,6 +17,7 @@
 package azkaban.execapp.metric;
 
 import azkaban.execapp.FlowRunnerManager;
+import azkaban.metric.MetricException;
 import azkaban.metric.MetricReportManager;
 import azkaban.metric.TimeBasedReportingMetric;
 
@@ -33,8 +34,9 @@ public class NumQueuedFlowMetric extends TimeBasedReportingMetric<Integer> {
    * @param flowRunnerManager Flow runner manager
    * @param manager metric report manager
    * @param interval reporting interval
+   * @throws MetricException
    */
-  public NumQueuedFlowMetric(FlowRunnerManager flowRunnerManager, MetricReportManager manager, long interval) {
+  public NumQueuedFlowMetric(FlowRunnerManager flowRunnerManager, MetricReportManager manager, long interval) throws MetricException {
     super(NUM_QUEUED_FLOW_METRIC_NAME, NUM_QUEUED_FLOW_METRIC_TYPE, 0, manager, interval);
     logger.debug("Instantiated NumQueuedFlowMetric");
     flowManager = flowRunnerManager;
diff --git a/azkaban-execserver/src/main/java/azkaban/execapp/metric/NumRunningFlowMetric.java b/azkaban-execserver/src/main/java/azkaban/execapp/metric/NumRunningFlowMetric.java
index b611151..d7d09cb 100644
--- a/azkaban-execserver/src/main/java/azkaban/execapp/metric/NumRunningFlowMetric.java
+++ b/azkaban-execserver/src/main/java/azkaban/execapp/metric/NumRunningFlowMetric.java
@@ -17,6 +17,7 @@
 package azkaban.execapp.metric;
 
 import azkaban.execapp.FlowRunnerManager;
+import azkaban.metric.MetricException;
 import azkaban.metric.MetricReportManager;
 import azkaban.metric.TimeBasedReportingMetric;
 
@@ -33,8 +34,9 @@ public class NumRunningFlowMetric extends TimeBasedReportingMetric<Integer> {
    * @param flowRunnerManager Flow runner manager
    * @param manager metric report manager
    * @param interval reporting interval
+   * @throws MetricException
    */
-  public NumRunningFlowMetric(FlowRunnerManager flowRunnerManager, MetricReportManager manager, long interval) {
+  public NumRunningFlowMetric(FlowRunnerManager flowRunnerManager, MetricReportManager manager, long interval) throws MetricException {
     super(NUM_RUNNING_FLOW_METRIC_NAME, NUM_RUNNING_FLOW_METRIC_TYPE, 0, manager, interval);
     logger.debug("Instantiated NumRunningFlowMetric");
     flowManager = flowRunnerManager;
diff --git a/azkaban-execserver/src/main/java/azkaban/execapp/metric/NumRunningJobMetric.java b/azkaban-execserver/src/main/java/azkaban/execapp/metric/NumRunningJobMetric.java
index e9a07f8..84ebfd4 100644
--- a/azkaban-execserver/src/main/java/azkaban/execapp/metric/NumRunningJobMetric.java
+++ b/azkaban-execserver/src/main/java/azkaban/execapp/metric/NumRunningJobMetric.java
@@ -19,6 +19,7 @@ package azkaban.execapp.metric;
 import azkaban.event.Event;
 import azkaban.event.Event.Type;
 import azkaban.event.EventListener;
+import azkaban.metric.MetricException;
 import azkaban.metric.MetricReportManager;
 import azkaban.metric.TimeBasedReportingMetric;
 
@@ -32,8 +33,9 @@ public class NumRunningJobMetric extends TimeBasedReportingMetric<Integer> imple
   /**
    * @param manager metric manager
    * @param interval reporting interval
+   * @throws MetricException
    */
-  public NumRunningJobMetric(MetricReportManager manager, long interval) {
+  public NumRunningJobMetric(MetricReportManager manager, long interval) throws MetricException {
     super(NUM_RUNNING_JOB_METRIC_NAME, NUM_RUNNING_JOB_METRIC_TYPE, 0, manager, interval);
     logger.debug("Instantiated NumRunningJobMetric");
   }
diff --git a/azkaban-execserver/src/package/conf/azkaban.properties b/azkaban-execserver/src/package/conf/azkaban.properties
index a9f832e..79e20e1 100644
--- a/azkaban-execserver/src/package/conf/azkaban.properties
+++ b/azkaban-execserver/src/package/conf/azkaban.properties
@@ -27,4 +27,4 @@ executor.connector.stats=true
 
 # uncomment to enable inmemory stats for azkaban
 #executor.metric.reports=true
-#executor.metric.interval.default=60000
\ No newline at end of file
+#executor.metric.milisecinterval.default=60000
\ No newline at end of file
diff --git a/azkaban-soloserver/src/package/conf/azkaban.properties b/azkaban-soloserver/src/package/conf/azkaban.properties
index d13bf32..325fd12 100644
--- a/azkaban-soloserver/src/package/conf/azkaban.properties
+++ b/azkaban-soloserver/src/package/conf/azkaban.properties
@@ -49,4 +49,4 @@ executor.connector.stats=true
 
 # uncomment to enable inmemory stats for azkaban
 #executor.metric.reports=true
-#executor.metric.interval.default=60000
\ No newline at end of file
+#executor.metric.milisecinterval.default=60000
\ No newline at end of file