Details
diff --git a/azkaban-common/src/main/java/azkaban/metric/TimeBasedReportingMetric.java b/azkaban-common/src/main/java/azkaban/metric/TimeBasedReportingMetric.java
index 904586c..cf1211d 100644
--- a/azkaban-common/src/main/java/azkaban/metric/TimeBasedReportingMetric.java
+++ b/azkaban-common/src/main/java/azkaban/metric/TimeBasedReportingMetric.java
@@ -25,6 +25,8 @@ import java.util.TimerTask;
*/
public abstract class TimeBasedReportingMetric<T> extends AbstractMetric<T> {
private Timer timer;
+ protected long MAX_MILISEC_INTERVAL = 60 * 60 * 1000;
+ protected long MIN_MILISEC_INTERVAL = 3 * 1000;
/**
* @param metricName Name of metric
@@ -32,10 +34,14 @@ public abstract class TimeBasedReportingMetric<T> extends AbstractMetric<T> {
* @param initialValue Initial Value of a metric
* @param manager Metric Manager whom a metric will report to
* @param interval Time interval for metric tracking
+ * @throws MetricException
*/
public TimeBasedReportingMetric(String metricName, String metricType, T initialValue, MetricReportManager manager,
- long interval) {
+ long interval) throws MetricException {
super(metricName, metricType, initialValue, manager);
+ if(validateInterval(interval)) {
+ throw new MetricException("Invalid interval: Cannot instantiate timer");
+ }
timer = new Timer();
timer.schedule(getTimerTask(), interval, interval);
}
@@ -62,14 +68,22 @@ public abstract class TimeBasedReportingMetric<T> extends AbstractMetric<T> {
/**
* Method to change tracking interval
* @param interval
+ * @throws MetricException
*/
- public void updateInterval(final long interval) {
+ public void updateInterval(final long interval) throws MetricException {
+ if(validateInterval(interval)) {
+ throw new MetricException("Invalid interval: Cannot update timer");
+ }
logger.debug(String.format("Updating tracking interval to %d milisecond for %s metric", interval, getName()));
timer.cancel();
timer = new Timer();
timer.schedule(getTimerTask(), interval, interval);
}
+ private boolean validateInterval(final long interval) {
+ return interval >= MIN_MILISEC_INTERVAL && interval <= MAX_MILISEC_INTERVAL;
+ }
+
/**
* This method is responsible for making any last minute update to value, if any
*/
diff --git a/azkaban-execserver/src/main/java/azkaban/execapp/AzkabanExecutorServer.java b/azkaban-execserver/src/main/java/azkaban/execapp/AzkabanExecutorServer.java
index 367da00..b54b948 100644
--- a/azkaban-execserver/src/main/java/azkaban/execapp/AzkabanExecutorServer.java
+++ b/azkaban-execserver/src/main/java/azkaban/execapp/AzkabanExecutorServer.java
@@ -46,6 +46,7 @@ import azkaban.execapp.metric.NumRunningFlowMetric;
import azkaban.execapp.metric.NumRunningJobMetric;
import azkaban.jmx.JmxJettyServer;
import azkaban.metric.IMetricEmitter;
+import azkaban.metric.MetricException;
import azkaban.metric.MetricReportManager;
import azkaban.metric.inmemoryemitter.InMemoryMetricEmitter;
import azkaban.project.JdbcProjectLoader;
@@ -132,8 +133,9 @@ public class AzkabanExecutorServer {
/**
* Configure Metric Reporting as per azkaban.properties settings
+ * @throws MetricException
*/
- private void configureMetricReports() {
+ private void configureMetricReports() throws MetricException {
Props props = getAzkabanProps();
if (props != null && props.getBoolean("executor.metric.reports", false)) {
logger.info("Starting to configure Metric Reports");
@@ -142,26 +144,26 @@ public class AzkabanExecutorServer {
metricManager.addMetricEmitter(metricEmitter);
logger.info("Adding number of failed flow metric");
- metricManager.addMetric(new NumFailedFlowMetric(metricManager, props.getInt("executor.metric.interval."
- + NumFailedFlowMetric.NUM_FAILED_FLOW_METRIC_NAME, props.getInt("executor.metric.interval.default"))));
+ metricManager.addMetric(new NumFailedFlowMetric(metricManager, props.getInt("executor.metric.milisecinterval."
+ + NumFailedFlowMetric.NUM_FAILED_FLOW_METRIC_NAME, props.getInt("executor.metric.milisecinterval.default"))));
logger.info("Adding number of failed jobs metric");
- metricManager.addMetric(new NumFailedJobMetric(metricManager, props.getInt("executor.metric.interval."
- + NumFailedJobMetric.NUM_FAILED_JOB_METRIC_NAME, props.getInt("executor.metric.interval.default"))));
+ metricManager.addMetric(new NumFailedJobMetric(metricManager, props.getInt("executor.metric.milisecinterval."
+ + NumFailedJobMetric.NUM_FAILED_JOB_METRIC_NAME, props.getInt("executor.metric.milisecinterval.default"))));
logger.info("Adding number of running Jobs metric");
- metricManager.addMetric(new NumRunningJobMetric(metricManager, props.getInt("executor.metric.interval."
- + NumRunningJobMetric.NUM_RUNNING_JOB_METRIC_NAME, props.getInt("executor.metric.interval.default"))));
+ metricManager.addMetric(new NumRunningJobMetric(metricManager, props.getInt("executor.metric.milisecinterval."
+ + NumRunningJobMetric.NUM_RUNNING_JOB_METRIC_NAME, props.getInt("executor.metric.milisecinterval.default"))));
logger.info("Adding number of running flows metric");
metricManager.addMetric(new NumRunningFlowMetric(runnerManager, metricManager, props.getInt(
- "executor.metric.interval." + NumRunningFlowMetric.NUM_RUNNING_FLOW_METRIC_NAME,
- props.getInt("executor.metric.interval.default"))));
+ "executor.metric.milisecinterval." + NumRunningFlowMetric.NUM_RUNNING_FLOW_METRIC_NAME,
+ props.getInt("executor.metric.milisecinterval.default"))));
logger.info("Adding number of queued flows metric");
metricManager.addMetric(new NumQueuedFlowMetric(runnerManager, metricManager, props.getInt(
- "executor.metric.interval." + NumQueuedFlowMetric.NUM_QUEUED_FLOW_METRIC_NAME,
- props.getInt("executor.metric.interval.default"))));
+ "executor.metric.milisecinterval." + NumQueuedFlowMetric.NUM_QUEUED_FLOW_METRIC_NAME,
+ props.getInt("executor.metric.milisecinterval.default"))));
logger.info("Completed configuring Metric Reports");
}
diff --git a/azkaban-execserver/src/main/java/azkaban/execapp/metric/NumFailedFlowMetric.java b/azkaban-execserver/src/main/java/azkaban/execapp/metric/NumFailedFlowMetric.java
index f0e9b6b..b77c74f 100644
--- a/azkaban-execserver/src/main/java/azkaban/execapp/metric/NumFailedFlowMetric.java
+++ b/azkaban-execserver/src/main/java/azkaban/execapp/metric/NumFailedFlowMetric.java
@@ -21,6 +21,7 @@ import azkaban.event.Event.Type;
import azkaban.event.EventListener;
import azkaban.execapp.FlowRunner;
import azkaban.executor.Status;
+import azkaban.metric.MetricException;
import azkaban.metric.MetricReportManager;
import azkaban.metric.TimeBasedReportingMetric;
@@ -31,7 +32,7 @@ public class NumFailedFlowMetric extends TimeBasedReportingMetric<Integer> imple
public static final String NUM_FAILED_FLOW_METRIC_NAME = "NumFailedFlowMetric";
private static final String NUM_FAILED_FLOW_METRIC_TYPE = "uint16";
- public NumFailedFlowMetric(MetricReportManager manager, long interval) {
+ public NumFailedFlowMetric(MetricReportManager manager, long interval) throws MetricException {
super(NUM_FAILED_FLOW_METRIC_NAME, NUM_FAILED_FLOW_METRIC_TYPE, 0, manager, interval);
logger.debug("Instantiated NumFailedJobMetric");
}
diff --git a/azkaban-execserver/src/main/java/azkaban/execapp/metric/NumFailedJobMetric.java b/azkaban-execserver/src/main/java/azkaban/execapp/metric/NumFailedJobMetric.java
index 6e16899..bb463d4 100644
--- a/azkaban-execserver/src/main/java/azkaban/execapp/metric/NumFailedJobMetric.java
+++ b/azkaban-execserver/src/main/java/azkaban/execapp/metric/NumFailedJobMetric.java
@@ -21,6 +21,7 @@ import azkaban.event.Event.Type;
import azkaban.event.EventListener;
import azkaban.execapp.JobRunner;
import azkaban.executor.Status;
+import azkaban.metric.MetricException;
import azkaban.metric.MetricReportManager;
import azkaban.metric.TimeBasedReportingMetric;
@@ -31,7 +32,7 @@ public class NumFailedJobMetric extends TimeBasedReportingMetric<Integer> implem
public static final String NUM_FAILED_JOB_METRIC_NAME = "NumFailedJobMetric";
private static final String NUM_FAILED_JOB_METRIC_TYPE = "uint16";
- public NumFailedJobMetric(MetricReportManager manager, long interval) {
+ public NumFailedJobMetric(MetricReportManager manager, long interval) throws MetricException {
super(NUM_FAILED_JOB_METRIC_NAME, NUM_FAILED_JOB_METRIC_TYPE, 0, manager, interval);
logger.debug("Instantiated NumFailedJobMetric");
}
diff --git a/azkaban-execserver/src/main/java/azkaban/execapp/metric/NumQueuedFlowMetric.java b/azkaban-execserver/src/main/java/azkaban/execapp/metric/NumQueuedFlowMetric.java
index 192a4ce..c44917a 100644
--- a/azkaban-execserver/src/main/java/azkaban/execapp/metric/NumQueuedFlowMetric.java
+++ b/azkaban-execserver/src/main/java/azkaban/execapp/metric/NumQueuedFlowMetric.java
@@ -17,6 +17,7 @@
package azkaban.execapp.metric;
import azkaban.execapp.FlowRunnerManager;
+import azkaban.metric.MetricException;
import azkaban.metric.MetricReportManager;
import azkaban.metric.TimeBasedReportingMetric;
@@ -33,8 +34,9 @@ public class NumQueuedFlowMetric extends TimeBasedReportingMetric<Integer> {
* @param flowRunnerManager Flow runner manager
* @param manager metric report manager
* @param interval reporting interval
+ * @throws MetricException
*/
- public NumQueuedFlowMetric(FlowRunnerManager flowRunnerManager, MetricReportManager manager, long interval) {
+ public NumQueuedFlowMetric(FlowRunnerManager flowRunnerManager, MetricReportManager manager, long interval) throws MetricException {
super(NUM_QUEUED_FLOW_METRIC_NAME, NUM_QUEUED_FLOW_METRIC_TYPE, 0, manager, interval);
logger.debug("Instantiated NumQueuedFlowMetric");
flowManager = flowRunnerManager;
diff --git a/azkaban-execserver/src/main/java/azkaban/execapp/metric/NumRunningFlowMetric.java b/azkaban-execserver/src/main/java/azkaban/execapp/metric/NumRunningFlowMetric.java
index b611151..d7d09cb 100644
--- a/azkaban-execserver/src/main/java/azkaban/execapp/metric/NumRunningFlowMetric.java
+++ b/azkaban-execserver/src/main/java/azkaban/execapp/metric/NumRunningFlowMetric.java
@@ -17,6 +17,7 @@
package azkaban.execapp.metric;
import azkaban.execapp.FlowRunnerManager;
+import azkaban.metric.MetricException;
import azkaban.metric.MetricReportManager;
import azkaban.metric.TimeBasedReportingMetric;
@@ -33,8 +34,9 @@ public class NumRunningFlowMetric extends TimeBasedReportingMetric<Integer> {
* @param flowRunnerManager Flow runner manager
* @param manager metric report manager
* @param interval reporting interval
+ * @throws MetricException
*/
- public NumRunningFlowMetric(FlowRunnerManager flowRunnerManager, MetricReportManager manager, long interval) {
+ public NumRunningFlowMetric(FlowRunnerManager flowRunnerManager, MetricReportManager manager, long interval) throws MetricException {
super(NUM_RUNNING_FLOW_METRIC_NAME, NUM_RUNNING_FLOW_METRIC_TYPE, 0, manager, interval);
logger.debug("Instantiated NumRunningFlowMetric");
flowManager = flowRunnerManager;
diff --git a/azkaban-execserver/src/main/java/azkaban/execapp/metric/NumRunningJobMetric.java b/azkaban-execserver/src/main/java/azkaban/execapp/metric/NumRunningJobMetric.java
index e9a07f8..84ebfd4 100644
--- a/azkaban-execserver/src/main/java/azkaban/execapp/metric/NumRunningJobMetric.java
+++ b/azkaban-execserver/src/main/java/azkaban/execapp/metric/NumRunningJobMetric.java
@@ -19,6 +19,7 @@ package azkaban.execapp.metric;
import azkaban.event.Event;
import azkaban.event.Event.Type;
import azkaban.event.EventListener;
+import azkaban.metric.MetricException;
import azkaban.metric.MetricReportManager;
import azkaban.metric.TimeBasedReportingMetric;
@@ -32,8 +33,9 @@ public class NumRunningJobMetric extends TimeBasedReportingMetric<Integer> imple
/**
* @param manager metric manager
* @param interval reporting interval
+ * @throws MetricException
*/
- public NumRunningJobMetric(MetricReportManager manager, long interval) {
+ public NumRunningJobMetric(MetricReportManager manager, long interval) throws MetricException {
super(NUM_RUNNING_JOB_METRIC_NAME, NUM_RUNNING_JOB_METRIC_TYPE, 0, manager, interval);
logger.debug("Instantiated NumRunningJobMetric");
}
diff --git a/azkaban-execserver/src/package/conf/azkaban.properties b/azkaban-execserver/src/package/conf/azkaban.properties
index a9f832e..79e20e1 100644
--- a/azkaban-execserver/src/package/conf/azkaban.properties
+++ b/azkaban-execserver/src/package/conf/azkaban.properties
@@ -27,4 +27,4 @@ executor.connector.stats=true
# uncomment to enable inmemory stats for azkaban
#executor.metric.reports=true
-#executor.metric.interval.default=60000
\ No newline at end of file
+#executor.metric.milisecinterval.default=60000
\ No newline at end of file
diff --git a/azkaban-soloserver/src/package/conf/azkaban.properties b/azkaban-soloserver/src/package/conf/azkaban.properties
index d13bf32..325fd12 100644
--- a/azkaban-soloserver/src/package/conf/azkaban.properties
+++ b/azkaban-soloserver/src/package/conf/azkaban.properties
@@ -49,4 +49,4 @@ executor.connector.stats=true
# uncomment to enable inmemory stats for azkaban
#executor.metric.reports=true
-#executor.metric.interval.default=60000
\ No newline at end of file
+#executor.metric.milisecinterval.default=60000
\ No newline at end of file