azkaban-uncached
Changes
build.gradle 1(+1 -0)
Details
diff --git a/azkaban-common/src/main/java/azkaban/executor/ConnectorParams.java b/azkaban-common/src/main/java/azkaban/executor/ConnectorParams.java
index ddd8bba..8bc725f 100644
--- a/azkaban-common/src/main/java/azkaban/executor/ConnectorParams.java
+++ b/azkaban-common/src/main/java/azkaban/executor/ConnectorParams.java
@@ -32,8 +32,7 @@ public interface ConnectorParams {
public static final String LOG_ACTION = "log";
public static final String ATTACHMENTS_ACTION = "attachments";
public static final String METADATA_ACTION = "metadata";
- public static final String RELOAD_JOBTYPE_PLUGINS_ACTION =
- "reloadJobTypePlugins";
+ public static final String RELOAD_JOBTYPE_PLUGINS_ACTION = "reloadJobTypePlugins";
public static final String MODIFY_EXECUTION_ACTION = "modifyExecution";
public static final String MODIFY_EXECUTION_ACTION_TYPE = "modifyType";
@@ -80,12 +79,33 @@ public interface ConnectorParams {
public static final String JMX_GET_MBEANS = "getMBeans";
public static final String JMX_GET_MBEAN_INFO = "getMBeanInfo";
public static final String JMX_GET_MBEAN_ATTRIBUTE = "getAttribute";
- public static final String JMX_GET_ALL_MBEAN_ATTRIBUTES =
- "getAllMBeanAttributes";
+ public static final String JMX_GET_ALL_MBEAN_ATTRIBUTES = "getAllMBeanAttributes";
public static final String JMX_ATTRIBUTE = "attribute";
public static final String JMX_MBEAN = "mBean";
- public static final String JMX_GET_ALL_EXECUTOR_ATTRIBUTES =
- "getAllExecutorAttributes";
+ public static final String JMX_GET_ALL_EXECUTOR_ATTRIBUTES = "getAllExecutorAttributes";
public static final String JMX_HOSTPORT = "hostPort";
+
+ public static final String STATS_GET_ALLMETRICSNAME = "getAllMetricNames";
+ public static final String STATS_GET_METRICHISTORY = "getMetricHistory";
+ public static final String STATS_SET_REPORTINGINTERVAL = "changeMetricInterval";
+ public static final String STATS_SET_CLEANINGINTERVAL = "changeCleaningInterval";
+ public static final String STATS_SET_MAXREPORTERPOINTS = "changeEmitterPoints";
+ public static final String STATS_SET_ENABLEMETRICS = "enableMetrics";
+ public static final String STATS_SET_DISABLEMETRICS = "disableMetrics";
+ public static final String STATS_MAP_METRICNAMEPARAM = "metricName";
+
+ /**
+ * useStats param is used to filter datapoints on /stats graph by using standard deviation and means
+ * By default, we consider only top/bottom 5% datapoints
+ */
+
+ public static final String STATS_MAP_METRICRETRIEVALMODE = "useStats";
+ public static final String STATS_MAP_STARTDATE = "from";
+ public static final String STATS_MAP_ENDDATE = "to";
+ public static final String STATS_MAP_REPORTINGINTERVAL = "interval";
+ public static final String STATS_MAP_CLEANINGINTERVAL = "interval";
+ public static final String STATS_MAP_EMITTERNUMINSTANCES = "numInstances";
+
+
}
diff --git a/azkaban-common/src/main/java/azkaban/executor/ExecutorManager.java b/azkaban-common/src/main/java/azkaban/executor/ExecutorManager.java
index e3521db..5f7bcd5 100644
--- a/azkaban-common/src/main/java/azkaban/executor/ExecutorManager.java
+++ b/azkaban-common/src/main/java/azkaban/executor/ExecutorManager.java
@@ -38,7 +38,6 @@ import org.apache.http.client.utils.URIBuilder;
import org.apache.http.impl.client.BasicResponseHandler;
import org.apache.http.impl.client.DefaultHttpClient;
import org.apache.log4j.Logger;
-
import org.joda.time.DateTime;
import azkaban.alert.Alerter;
@@ -718,6 +717,53 @@ public class ExecutorManager extends EventHandler implements
return jsonResponse;
}
+ /**
+ * Manage servlet call for stats servlet in Azkaban execution server
+ * {@inheritDoc}
+ * @see azkaban.executor.ExecutorManagerAdapter#callExecutorStats(java.lang.String, azkaban.utils.Pair[])
+ */
+ @Override
+ public Map<String, Object> callExecutorStats(String action, Pair<String, String>... params) throws IOException {
+
+ URIBuilder builder = new URIBuilder();
+ builder.setScheme("http").setHost(executorHost).setPort(executorPort).setPath("/stats");
+
+ builder.setParameter(ConnectorParams.ACTION_PARAM, action);
+
+ if (params != null) {
+ for (Pair<String, String> pair : params) {
+ builder.setParameter(pair.getFirst(), pair.getSecond());
+ }
+ }
+
+ URI uri = null;
+ try {
+ uri = builder.build();
+ } catch (URISyntaxException e) {
+ throw new IOException(e);
+ }
+
+ ResponseHandler<String> responseHandler = new BasicResponseHandler();
+
+ HttpClient httpclient = new DefaultHttpClient();
+ HttpGet httpget = new HttpGet(uri);
+ String response = null;
+ try {
+ response = httpclient.execute(httpget, responseHandler);
+ } catch (IOException e) {
+ throw e;
+ } finally {
+ httpclient.getConnectionManager().shutdown();
+ }
+
+ @SuppressWarnings("unchecked")
+ Map<String, Object> jsonResponse =
+ (Map<String, Object>) JSONUtils.parseJSONFromString(response);
+
+ return jsonResponse;
+ }
+
+
@Override
public Map<String, Object> callExecutorJMX(String hostPort, String action,
String mBean) throws IOException {
diff --git a/azkaban-common/src/main/java/azkaban/executor/ExecutorManagerAdapter.java b/azkaban-common/src/main/java/azkaban/executor/ExecutorManagerAdapter.java
index c7e8913..10379ec 100644
--- a/azkaban-common/src/main/java/azkaban/executor/ExecutorManagerAdapter.java
+++ b/azkaban-common/src/main/java/azkaban/executor/ExecutorManagerAdapter.java
@@ -25,6 +25,7 @@ import java.util.Set;
import azkaban.project.Project;
import azkaban.utils.FileIOUtils.JobMetaData;
import azkaban.utils.FileIOUtils.LogData;
+import azkaban.utils.Pair;
public interface ExecutorManagerAdapter {
@@ -164,6 +165,22 @@ public interface ExecutorManagerAdapter {
public String submitExecutableFlow(ExecutableFlow exflow, String userId)
throws ExecutorManagerException;
+ /**
+ * Manage servlet call for stats servlet in Azkaban execution server
+ * Action can take any of the following values
+ * <ul>
+ * <li>{@link azkaban.executor.ConnectorParams#STATS_SET_REPORTINGINTERVAL}<li>
+ * <li>{@link azkaban.executor.ConnectorParams#STATS_SET_CLEANINGINTERVAL}<li>
+ * <li>{@link azkaban.executor.ConnectorParams#STATS_SET_MAXREPORTERPOINTS}<li>
+ * <li>{@link azkaban.executor.ConnectorParams#STATS_GET_ALLMETRICSNAME}<li>
+ * <li>{@link azkaban.executor.ConnectorParams#STATS_GET_METRICHISTORY}<li>
+ * <li>{@link azkaban.executor.ConnectorParams#STATS_SET_ENABLEMETRICS}<li>
+ * <li>{@link azkaban.executor.ConnectorParams#STATS_SET_DISABLEMETRICS}<li>
+ * </ul>
+ */
+ public Map<String, Object> callExecutorStats(String action,
+ Pair<String, String>... params) throws IOException;
+
public Map<String, Object> callExecutorJMX(String hostPort, String action,
String mBean) throws IOException;
diff --git a/azkaban-common/src/main/java/azkaban/metric/AbstractMetric.java b/azkaban-common/src/main/java/azkaban/metric/AbstractMetric.java
new file mode 100644
index 0000000..5a5a967
--- /dev/null
+++ b/azkaban-common/src/main/java/azkaban/metric/AbstractMetric.java
@@ -0,0 +1,102 @@
+/*
+ * Copyright 2012 LinkedIn Corp.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package azkaban.metric;
+
+import org.apache.log4j.Logger;
+
+/**
+ * Abstract class for Metric
+ * @param <T> Type of Value of a given metric
+ */
+public abstract class AbstractMetric<T> implements IMetric<T>, Cloneable{
+ protected static final Logger logger = Logger.getLogger(MetricReportManager.class);
+ protected String name;
+ protected T value;
+ protected String type;
+ protected MetricReportManager metricManager;
+
+ /**
+ * @param metricName Name of metric
+ * @param metricType Metric type. For display purposes.
+ * @param initialValue Initial Value of a metric
+ * @param manager Metric Manager whom a metric will report to
+ */
+ protected AbstractMetric(String metricName, String metricType, T initialValue, MetricReportManager manager) {
+ name = metricName;
+ type = metricType;
+ value = initialValue;
+ metricManager = manager;
+ }
+
+ /**
+ * {@inheritDoc}
+ * @see azkaban.metric.IMetric#getName()
+ */
+ public String getName() {
+ return name;
+ }
+
+ /**
+ * {@inheritDoc}
+ * @see azkaban.metric.IMetric#getValueType()
+ */
+ public String getValueType() {
+ return type;
+ }
+
+ /**
+ * {@inheritDoc}
+ * @see azkaban.metric.IMetric#updateMetricManager(azkaban.metric.MetricReportManager)
+ */
+ public void updateMetricManager(final MetricReportManager manager) {
+ metricManager = manager;
+ }
+
+ /**
+ * {@inheritDoc}
+ * @throws CloneNotSupportedException
+ * @see azkaban.metric.IMetric#getSnapshot()
+ */
+ @SuppressWarnings("unchecked")
+ public IMetric<T> getSnapshot() throws CloneNotSupportedException{
+ return (IMetric<T>) this.clone();
+ }
+
+ /**
+ * {@inheritDoc}
+ * @see azkaban.metric.IMetric#getValue()
+ */
+ public T getValue() {
+ return value;
+ }
+
+ /**
+ * Method used to notify manager for a tracking event.
+ * Metric is free to call this method as per implementation.
+ * Timer based or Azkaban events are the most common implementation
+ * {@inheritDoc}
+ * @see azkaban.metric.IMetric#notifyManager()
+ */
+ public void notifyManager() {
+ logger.debug(String.format("Notifying Manager for %s", this.getClass().getName()));
+ try {
+ metricManager.reportMetric(this);
+ } catch (Throwable ex) {
+ logger.error(String.format("Metric Manager is not set for %s metric", this.getClass().getName()), ex);
+ }
+ }
+}
diff --git a/azkaban-common/src/main/java/azkaban/metric/GangliaMetricEmitter.java b/azkaban-common/src/main/java/azkaban/metric/GangliaMetricEmitter.java
new file mode 100644
index 0000000..5e954b7
--- /dev/null
+++ b/azkaban-common/src/main/java/azkaban/metric/GangliaMetricEmitter.java
@@ -0,0 +1,80 @@
+/*
+ * Copyright 2012 LinkedIn Corp.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package azkaban.metric;
+
+import org.apache.commons.collections.bag.SynchronizedBag;
+
+import azkaban.utils.Props;
+
+
+/**
+ * MetricEmitter implementation to report metric to a ganglia gmetric process
+ */
+public class GangliaMetricEmitter implements IMetricEmitter {
+ private static final String GANGLIA_METRIC_REPORTER_PATH = "azkaban.metric.ganglia.path";
+ private String gmetricPath;
+
+ /**
+ * @param azkProps Azkaban Properties
+ */
+ public GangliaMetricEmitter(Props azkProps) {
+ gmetricPath = azkProps.get(GANGLIA_METRIC_REPORTER_PATH);
+ }
+
+ private String buildCommand(IMetric<?> metric) {
+ String cmd = null;
+
+ synchronized (metric) {
+ cmd =
+ String.format("%s -t %s -n %s -v %s", gmetricPath, metric.getValueType(), metric.getName(), metric.getValue()
+ .toString());
+ }
+
+ return cmd;
+ }
+
+ /**
+ * Report metric by executing command line interface of gmetrics
+ * {@inheritDoc}
+ * @see azkaban.metric.IMetricEmitter#reportMetric(azkaban.metric.IMetric)
+ */
+ @Override
+ public void reportMetric(final IMetric<?> metric) throws MetricException {
+ String gangliaCommand = buildCommand(metric);
+
+ if (gangliaCommand != null) {
+ // executes shell command to report metric to ganglia dashboard
+ try {
+ Process emission = Runtime.getRuntime().exec(gangliaCommand);
+ int exitCode;
+ exitCode = emission.waitFor();
+ if (exitCode != 0) {
+ throw new MetricException("Failed to report metric using gmetric");
+ }
+ } catch (Exception e) {
+ throw new MetricException("Failed to report metric using gmetric");
+ }
+ } else {
+ throw new MetricException("Failed to build ganglia Command");
+ }
+ }
+
+ @Override
+ public void purgeAllData() throws MetricException {
+
+ }
+}
diff --git a/azkaban-common/src/main/java/azkaban/metric/IMetric.java b/azkaban-common/src/main/java/azkaban/metric/IMetric.java
new file mode 100644
index 0000000..188b399
--- /dev/null
+++ b/azkaban-common/src/main/java/azkaban/metric/IMetric.java
@@ -0,0 +1,30 @@
+/*
+ * Copyright 2012 LinkedIn Corp.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package azkaban.metric;
+
+/**
+ * Interface of any Metric
+ * @param <T> Type of Value of a given metric
+ */
+public interface IMetric<T> {
+ String getName();
+ String getValueType();
+ void updateMetricManager(final MetricReportManager manager);
+ void notifyManager();
+ T getValue();
+ IMetric<T> getSnapshot() throws CloneNotSupportedException;
+}
diff --git a/azkaban-common/src/main/java/azkaban/metric/IMetricEmitter.java b/azkaban-common/src/main/java/azkaban/metric/IMetricEmitter.java
new file mode 100644
index 0000000..bdc874b
--- /dev/null
+++ b/azkaban-common/src/main/java/azkaban/metric/IMetricEmitter.java
@@ -0,0 +1,25 @@
+/*
+ * Copyright 2012 LinkedIn Corp.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package azkaban.metric;
+
+/**
+ * Interface for metric emitters
+ */
+public interface IMetricEmitter {
+ void reportMetric(final IMetric<?> metric) throws MetricException;
+ void purgeAllData() throws MetricException;
+}
\ No newline at end of file
diff --git a/azkaban-common/src/main/java/azkaban/metric/inmemoryemitter/InMemoryHistoryNode.java b/azkaban-common/src/main/java/azkaban/metric/inmemoryemitter/InMemoryHistoryNode.java
new file mode 100644
index 0000000..b8f1f6e
--- /dev/null
+++ b/azkaban-common/src/main/java/azkaban/metric/inmemoryemitter/InMemoryHistoryNode.java
@@ -0,0 +1,44 @@
+/*
+ * Copyright 2012 LinkedIn Corp.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package azkaban.metric.inmemoryemitter;
+
+import java.util.Date;
+
+/**
+ * A snapshot of metric's value
+ */
+public class InMemoryHistoryNode {
+ private Object value;
+ private Date date;
+
+ /**
+ * Takes snapshot of the metric with a given value
+ * @param val
+ */
+ public InMemoryHistoryNode(final Object val) {
+ value = val;
+ date = new Date();
+ }
+
+ public Object getValue() {
+ return value;
+ }
+
+ public Date getTimestamp() {
+ return date;
+ }
+}
diff --git a/azkaban-common/src/main/java/azkaban/metric/inmemoryemitter/InMemoryMetricEmitter.java b/azkaban-common/src/main/java/azkaban/metric/inmemoryemitter/InMemoryMetricEmitter.java
new file mode 100644
index 0000000..e93d11b
--- /dev/null
+++ b/azkaban-common/src/main/java/azkaban/metric/inmemoryemitter/InMemoryMetricEmitter.java
@@ -0,0 +1,235 @@
+/*
+ * Copyright 2012 LinkedIn Corp.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package azkaban.metric.inmemoryemitter;
+
+import java.util.Date;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.log4j.Logger;
+
+import azkaban.metric.IMetric;
+import azkaban.metric.IMetricEmitter;
+import azkaban.metric.MetricException;
+import azkaban.utils.Props;
+
+import org.apache.commons.math3.stat.descriptive.DescriptiveStatistics;
+
+
+/**
+ * Metric Emitter which maintains in memory snapshots of the metrics
+ * This is also the default metric emitter and used by /stats servlet
+ */
+public class InMemoryMetricEmitter implements IMetricEmitter {
+ protected static final Logger logger = Logger.getLogger(InMemoryMetricEmitter.class);
+
+ /**
+ * Data structure to keep track of snapshots
+ */
+ protected Map<String, LinkedList<InMemoryHistoryNode>> historyListMapping;
+ private static final String INMEMORY_METRIC_REPORTER_WINDOW = "azkaban.metric.inmemory.interval";
+ private static final String INMEMORY_METRIC_NUM_INSTANCES = "azkaban.metric.inmemory.maxinstances";
+ private static final String INMEMORY_METRIC_STANDARDDEVIATION_FACTOR =
+ "azkaban.metric.inmemory.standardDeviationFactor";
+
+ private double standardDeviationFactor;
+ /**
+ * Interval (in millisecond) from today for which we should maintain the in memory snapshots
+ */
+ private long timeWindow;
+ /**
+ * Maximum number of snapshots that should be displayed on /stats servlet
+ */
+ private long numInstances;
+
+ /**
+ * @param azkProps Azkaban Properties
+ */
+ public InMemoryMetricEmitter(Props azkProps) {
+ historyListMapping = new HashMap<String, LinkedList<InMemoryHistoryNode>>();
+ timeWindow = azkProps.getLong(INMEMORY_METRIC_REPORTER_WINDOW, 60 * 60 * 24 * 7 * 1000);
+ numInstances = azkProps.getLong(INMEMORY_METRIC_NUM_INSTANCES, 50);
+ standardDeviationFactor = azkProps.getDouble(INMEMORY_METRIC_STANDARDDEVIATION_FACTOR, 2);
+ }
+
+ /**
+ * Update reporting interval
+ * @param val interval in milli seconds
+ */
+ public synchronized void setReportingInterval(long val) {
+ timeWindow = val;
+ }
+
+ /**
+ * Set number of /stats servlet display points
+ * @param num
+ */
+ public void setReportingInstances(long num) {
+ numInstances = num;
+ }
+
+ /**
+ * Ingest metric in snapshot data structure while maintaining interval
+ * {@inheritDoc}
+ * @see azkaban.metric.IMetricEmitter#reportMetric(azkaban.metric.IMetric)
+ */
+ @Override
+ public void reportMetric(final IMetric<?> metric) throws MetricException {
+ String metricName = metric.getName();
+ if (!historyListMapping.containsKey(metricName)) {
+ logger.info("First time capturing metric: " + metricName);
+ historyListMapping.put(metricName, new LinkedList<InMemoryHistoryNode>());
+ }
+ synchronized (historyListMapping.get(metricName)) {
+ logger.debug("Ingesting metric: " + metricName);
+ historyListMapping.get(metricName).add(new InMemoryHistoryNode(metric.getValue()));
+ cleanUsingTime(metricName, historyListMapping.get(metricName).peekLast().getTimestamp());
+ }
+ }
+
+ /**
+ * Get snapshots for a given metric at a given time
+ * @param metricName name of the metric
+ * @param from Start date
+ * @param to end date
+ * @param useStats get statistically significant points only
+ * @return List of snapshots
+ */
+ public List<InMemoryHistoryNode> getMetrics(final String metricName, final Date from, final Date to,
+ final Boolean useStats) throws ClassCastException {
+ LinkedList<InMemoryHistoryNode> selectedLists = new LinkedList<InMemoryHistoryNode>();
+ if (historyListMapping.containsKey(metricName)) {
+
+ logger.debug("selecting snapshots within time frame");
+ synchronized (historyListMapping.get(metricName)) {
+ for (InMemoryHistoryNode node : historyListMapping.get(metricName)) {
+ if (node.getTimestamp().after(from) && node.getTimestamp().before(to)) {
+ selectedLists.add(node);
+ }
+ if (node.getTimestamp().after(to)) {
+ break;
+ }
+ }
+ }
+
+ // selecting nodes if num of nodes > numInstances
+ if (useStats) {
+ statBasedSelectMetricHistory(selectedLists);
+ } else {
+ generalSelectMetricHistory(selectedLists);
+ }
+ }
+ cleanUsingTime(metricName, new Date());
+ return selectedLists;
+ }
+
+ /**
+ * filter snapshots using statistically significant points only
+ * @param selectedLists list of snapshots
+ */
+ private void statBasedSelectMetricHistory(final LinkedList<InMemoryHistoryNode> selectedLists)
+ throws ClassCastException {
+ logger.debug("selecting snapshots which are far away from mean value");
+ DescriptiveStatistics descStats = getDescriptiveStatistics(selectedLists);
+ Double mean = descStats.getMean();
+ Double std = descStats.getStandardDeviation();
+
+ Iterator<InMemoryHistoryNode> ite = selectedLists.iterator();
+ while (ite.hasNext()) {
+ InMemoryHistoryNode currentNode = ite.next();
+ double value = ((Number) currentNode.getValue()).doubleValue();
+ // remove all elements which lies in 95% value band
+ if (value < mean + standardDeviationFactor * std && value > mean - standardDeviationFactor * std) {
+ ite.remove();
+ }
+ }
+ }
+
+ private DescriptiveStatistics getDescriptiveStatistics(final LinkedList<InMemoryHistoryNode> selectedLists)
+ throws ClassCastException {
+ DescriptiveStatistics descStats = new DescriptiveStatistics();
+ for (InMemoryHistoryNode node : selectedLists) {
+ descStats.addValue(((Number) node.getValue()).doubleValue());
+ }
+ return descStats;
+ }
+
+ /**
+ * filter snapshots by evenly selecting points across the interval
+ * @param selectedLists list of snapshots
+ */
+ private void generalSelectMetricHistory(final LinkedList<InMemoryHistoryNode> selectedLists) {
+ logger.debug("selecting snapshots evenly from across the time interval");
+ if (selectedLists.size() > numInstances) {
+ double step = (double) selectedLists.size() / numInstances;
+ long nextIndex = 0, currentIndex = 0, numSelectedInstances = 1;
+ Iterator<InMemoryHistoryNode> ite = selectedLists.iterator();
+ while (ite.hasNext()) {
+ ite.next();
+ if (currentIndex == nextIndex) {
+ nextIndex = (long) Math.floor(numSelectedInstances * step + 0.5);
+ numSelectedInstances++;
+ } else {
+ ite.remove();
+ }
+ currentIndex++;
+ }
+ }
+ }
+
+ /**
+ * Remove snapshots to maintain reporting interval
+ * @param metricName Name of the metric
+ * @param firstAllowedDate End date of the interval
+ */
+ private void cleanUsingTime(final String metricName, final Date firstAllowedDate) {
+ if (historyListMapping.containsKey(metricName) && historyListMapping.get(metricName) != null) {
+ synchronized (historyListMapping.get(metricName)) {
+
+ InMemoryHistoryNode firstNode = historyListMapping.get(metricName).peekFirst();
+ long localCopyOfTimeWindow = 0;
+
+ // go ahead for clean up using latest possible value of interval
+ // any interval change will not affect on going clean up
+ synchronized (this) {
+ localCopyOfTimeWindow = timeWindow;
+ }
+
+ // removing objects older than Interval time from firstAllowedDate
+ while (firstNode != null
+ && TimeUnit.MILLISECONDS.toMillis(firstAllowedDate.getTime() - firstNode.getTimestamp().getTime()) > localCopyOfTimeWindow) {
+ historyListMapping.get(metricName).removeFirst();
+ firstNode = historyListMapping.get(metricName).peekFirst();
+ }
+ }
+ }
+ }
+
+ /**
+ * Clear snapshot data structure
+ * {@inheritDoc}
+ * @see azkaban.metric.IMetricEmitter#purgeAllData()
+ */
+ @Override
+ public void purgeAllData() throws MetricException {
+ historyListMapping.clear();
+ }
+}
diff --git a/azkaban-common/src/main/java/azkaban/metric/MetricException.java b/azkaban-common/src/main/java/azkaban/metric/MetricException.java
new file mode 100644
index 0000000..6decb4a
--- /dev/null
+++ b/azkaban-common/src/main/java/azkaban/metric/MetricException.java
@@ -0,0 +1,37 @@
+/*
+ * Copyright 2012 LinkedIn Corp.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package azkaban.metric;
+
+/**
+ * Exception for Azkaban's Metric Component
+ */
+public class MetricException extends Exception {
+ private static final long serialVersionUID = 1L;
+
+ public MetricException(String message) {
+ super(message);
+ }
+
+ public MetricException(Throwable cause) {
+ super(cause);
+ }
+
+ public MetricException(String message, Throwable cause) {
+ super(message, cause);
+ }
+
+}
diff --git a/azkaban-common/src/main/java/azkaban/metric/MetricReportManager.java b/azkaban-common/src/main/java/azkaban/metric/MetricReportManager.java
new file mode 100644
index 0000000..4834361
--- /dev/null
+++ b/azkaban-common/src/main/java/azkaban/metric/MetricReportManager.java
@@ -0,0 +1,228 @@
+/*
+ * Copyright 2012 LinkedIn Corp.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package azkaban.metric;
+
+import java.util.ArrayList;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+
+import org.apache.log4j.Logger;
+
+
+/**
+ * Manager for access or updating metric related functionality of Azkaban
+ * MetricManager is responsible all handling all action requests from statsServlet in Exec server
+ * <p> Metric Manager 'has a' relationship with :-
+ * <ul>
+ * <li>all the metric Azkaban is tracking</li>
+ * <li>all the emitters Azkaban is supposed to report metrics</li>
+ * </ul></p>
+ */
+public class MetricReportManager {
+ /**
+ * Maximum number of metrics reporting threads
+ */
+ private static final int MAX_EMITTER_THREADS = 4;
+ private static final Logger logger = Logger.getLogger(MetricReportManager.class);
+
+ /**
+ * List of all the metrics that Azkaban is tracking
+ * Manager is not concerned with type of metric as long as it honors IMetric contracts
+ */
+ private List<IMetric<?>> metrics;
+
+ /**
+ * List of all the emitter listening all the metrics
+ * Manager is not concerned with how emitter is reporting value.
+ * Manager is only responsible to notify all emitters whenever an IMetric wants to be notified
+ */
+ private List<IMetricEmitter> metricEmitters;
+ private ExecutorService executorService;
+ // Singleton variable
+ private static volatile MetricReportManager instance = null;
+ private static volatile boolean isManagerEnabled;
+
+ private MetricReportManager() {
+ logger.debug("Instantiating Metric Manager");
+ executorService = Executors.newFixedThreadPool(MAX_EMITTER_THREADS);
+ metrics = new ArrayList<IMetric<?>>();
+ metricEmitters = new LinkedList<IMetricEmitter>();
+ enableManager();
+ }
+
+ /**
+ * @return true, if we have Instantiated and enabled metric manager from Azkaban exec server
+ */
+ public static boolean isAvailable() {
+ return isInstantiated() && isManagerEnabled;
+ }
+
+ /**
+ * @return true, if we have Instantiated metric manager from Azkaban exec server
+ */
+ public static boolean isInstantiated() {
+ return instance != null;
+ }
+
+ /**
+ * Get a singleton object for Metric Manager
+ */
+ public static MetricReportManager getInstance() {
+ if (instance == null) {
+ synchronized (MetricReportManager.class) {
+ if (instance == null) {
+ logger.info("Instantiating MetricReportManager");
+ instance = new MetricReportManager();
+ }
+ }
+ }
+ return instance;
+ }
+
+ /***
+ * each element of metrics List is responsible to call this method and report metrics
+ * @param metric
+ */
+ public void reportMetric(final IMetric<?> metric) {
+ if (metric != null && isAvailable()) {
+ try {
+ final IMetric<?> metricSnapshot;
+ // take snapshot
+ synchronized (metric) {
+ metricSnapshot = metric.getSnapshot();
+ }
+ logger.debug(String.format("Submitting %s metric for metric emission pool", metricSnapshot.getName()));
+ // report to all emitters
+ for (final IMetricEmitter metricEmitter : metricEmitters) {
+ executorService.submit(new Runnable() {
+ @Override
+ public void run() {
+ try {
+ metricEmitter.reportMetric(metricSnapshot);
+ } catch (Exception ex) {
+ logger.error(String.format("Failed to report %s metric due to ", metricSnapshot.getName()), ex);
+ }
+ }
+ });
+ }
+ } catch (CloneNotSupportedException ex) {
+ logger.error(String.format("Failed to take snapshot for %s metric", metric.getClass().getName()), ex);
+ }
+ }
+ }
+
+ /**
+ * Add a metric emitter to report metric
+ * @param emitter
+ */
+ public void addMetricEmitter(final IMetricEmitter emitter) {
+ metricEmitters.add(emitter);
+ }
+
+ /**
+ * remove a metric emitter
+ * @param emitter
+ */
+ public void removeMetricEmitter(final IMetricEmitter emitter) {
+ metricEmitters.remove(emitter);
+ }
+
+ /**
+ * Get all the metric emitters
+ * @return
+ */
+ public List<IMetricEmitter> getMetricEmitters() {
+ return metricEmitters;
+ }
+
+ /**
+ * Add a metric to be managed by Metric Manager
+ * @param metric
+ */
+ public void addMetric(final IMetric<?> metric) {
+ // metric null or already present
+ if(metric == null)
+ throw new IllegalArgumentException("Cannot add a null metric");
+
+ if (getMetricFromName(metric.getName()) == null) {
+ logger.debug(String.format("Adding %s metric in Metric Manager", metric.getName()));
+ metrics.add(metric);
+ metric.updateMetricManager(this);
+ } else {
+ logger.error("Failed to add metric");
+ }
+ }
+
+ /**
+ * Get metric object for a given metric name
+ * @param name metricName
+ * @return metric Object, if found. Otherwise null.
+ */
+ public IMetric<?> getMetricFromName(final String name) {
+ IMetric<?> metric = null;
+ if (name != null) {
+ for (IMetric<?> currentMetric : metrics) {
+ if (currentMetric.getName().equals(name)) {
+ metric = currentMetric;
+ break;
+ }
+ }
+ }
+ return metric;
+ }
+
+ /**
+ * Get all the emitters
+ * @return
+ */
+ public List<IMetric<?>> getAllMetrics() {
+ return metrics;
+ }
+
+ public void enableManager() {
+ logger.info("Enabling Metric Manager");
+ isManagerEnabled = true;
+ }
+
+ /**
+ * Disable Metric Manager and ask all emitters to purge all available data.
+ */
+ public void disableManager() {
+ logger.info("Disabling Metric Manager");
+ if (isManagerEnabled) {
+ isManagerEnabled = false;
+ for (IMetricEmitter emitter : metricEmitters) {
+ try {
+ emitter.purgeAllData();
+ } catch (MetricException ex) {
+ logger.error("Failed to purge data ", ex);
+ }
+ }
+ }
+ }
+
+ /**
+ * Shutdown execution service
+ * {@inheritDoc}
+ * @see java.lang.Object#finalize()
+ */
+ protected void finalize() {
+ executorService.shutdown();
+ }
+}
diff --git a/azkaban-common/src/main/java/azkaban/metric/TimeBasedReportingMetric.java b/azkaban-common/src/main/java/azkaban/metric/TimeBasedReportingMetric.java
new file mode 100644
index 0000000..cf1211d
--- /dev/null
+++ b/azkaban-common/src/main/java/azkaban/metric/TimeBasedReportingMetric.java
@@ -0,0 +1,97 @@
+/*
+ * Copyright 2012 LinkedIn Corp.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package azkaban.metric;
+
+import java.util.Timer;
+import java.util.TimerTask;
+
+/**
+ * Metrics tracked after every interval using timer
+ * @param <T> Type of Value of a given metric
+ */
+public abstract class TimeBasedReportingMetric<T> extends AbstractMetric<T> {
+ private Timer timer;
+ protected long MAX_MILISEC_INTERVAL = 60 * 60 * 1000;
+ protected long MIN_MILISEC_INTERVAL = 3 * 1000;
+
+ /**
+ * @param metricName Name of metric
+ * @param metricType Metric type. For display purposes.
+ * @param initialValue Initial Value of a metric
+ * @param manager Metric Manager whom a metric will report to
+ * @param interval Time interval for metric tracking
+ * @throws MetricException
+ */
+ public TimeBasedReportingMetric(String metricName, String metricType, T initialValue, MetricReportManager manager,
+ long interval) throws MetricException {
+ super(metricName, metricType, initialValue, manager);
+ if(validateInterval(interval)) {
+ throw new MetricException("Invalid interval: Cannot instantiate timer");
+ }
+ timer = new Timer();
+ timer.schedule(getTimerTask(), interval, interval);
+ }
+
+ /**
+ * Get a TimerTask to reschedule Timer
+ * @return An anonymous TimerTask class
+ */
+ private TimerTask getTimerTask() {
+ final TimeBasedReportingMetric<T> lockObject = this;
+ TimerTask recurringReporting = new TimerTask() {
+ @Override
+ public void run() {
+ synchronized (lockObject) {
+ preTrackingEventMethod();
+ notifyManager();
+ postTrackingEventMethod();
+ }
+ }
+ };
+ return recurringReporting;
+ }
+
+ /**
+ * Method to change tracking interval
+ * @param interval
+ * @throws MetricException
+ */
+ public void updateInterval(final long interval) throws MetricException {
+ if(validateInterval(interval)) {
+ throw new MetricException("Invalid interval: Cannot update timer");
+ }
+ logger.debug(String.format("Updating tracking interval to %d milisecond for %s metric", interval, getName()));
+ timer.cancel();
+ timer = new Timer();
+ timer.schedule(getTimerTask(), interval, interval);
+ }
+
+ private boolean validateInterval(final long interval) {
+ return interval >= MIN_MILISEC_INTERVAL && interval <= MAX_MILISEC_INTERVAL;
+ }
+
+ /**
+ * This method is responsible for making any last minute update to value, if any
+ */
+ protected abstract void preTrackingEventMethod();
+
+ /**
+ * This method is responsible for making any post processing after tracking
+ */
+ protected abstract void postTrackingEventMethod();
+
+}
diff --git a/azkaban-common/src/test/java/azkaban/metric/FakeMetric.java b/azkaban-common/src/test/java/azkaban/metric/FakeMetric.java
new file mode 100644
index 0000000..4043cb7
--- /dev/null
+++ b/azkaban-common/src/test/java/azkaban/metric/FakeMetric.java
@@ -0,0 +1,54 @@
+package azkaban.metric;
+
+/**
+ * Dummy Metric to test Azkaban Metrics
+ */
+public class FakeMetric extends AbstractMetric<Integer>{
+
+ public FakeMetric(MetricReportManager manager) {
+ super("FakeMetric", "int", 4, manager);
+ }
+
+ @Override
+ public int hashCode() {
+ final int prime = 31;
+ int result = 1;
+ result = prime * result + ((metricManager == null) ? 0 : metricManager.hashCode());
+ result = prime * result + ((name == null) ? 0 : name.hashCode());
+ result = prime * result + ((type == null) ? 0 : type.hashCode());
+ result = prime * result + ((value == null) ? 0 : value.hashCode());
+ return result;
+ }
+
+ @Override
+ public boolean equals(Object obj) {
+ if (this == obj)
+ return true;
+ if (obj == null)
+ return false;
+ if (!(obj instanceof FakeMetric))
+ return false;
+ FakeMetric other = (FakeMetric) obj;
+ if (metricManager == null) {
+ if (other.metricManager != null)
+ return false;
+ } else if (!metricManager.equals(other.metricManager))
+ return false;
+ if (name == null) {
+ if (other.name != null)
+ return false;
+ } else if (!name.equals(other.name))
+ return false;
+ if (type == null) {
+ if (other.type != null)
+ return false;
+ } else if (!type.equals(other.type))
+ return false;
+ if (value == null) {
+ if (other.value != null)
+ return false;
+ } else if (!value.equals(other.value))
+ return false;
+ return true;
+ }
+}
diff --git a/azkaban-common/src/test/java/azkaban/metric/MetricManagerTest.java b/azkaban-common/src/test/java/azkaban/metric/MetricManagerTest.java
new file mode 100644
index 0000000..45bfc5f
--- /dev/null
+++ b/azkaban-common/src/test/java/azkaban/metric/MetricManagerTest.java
@@ -0,0 +1,90 @@
+package azkaban.metric;
+
+import java.util.Date;
+import java.util.List;
+
+import org.junit.Before;
+import org.junit.Test;
+
+import azkaban.metric.inmemoryemitter.InMemoryHistoryNode;
+import azkaban.metric.inmemoryemitter.InMemoryMetricEmitter;
+import azkaban.utils.Props;
+import static org.junit.Assert.*;
+
+/**
+ * Azkaban Metric Manager Tests
+ */
+public class MetricManagerTest {
+ MetricReportManager manager;
+ FakeMetric metric;
+ InMemoryMetricEmitter emitter;
+
+ @Before
+ public void setUp() throws Exception {
+ manager = MetricReportManager.getInstance();
+ metric = new FakeMetric(manager);
+ manager.addMetric(metric);
+ emitter = new InMemoryMetricEmitter(new Props());
+ manager.addMetricEmitter(emitter);
+ }
+
+ /**
+ * Test enable disable and status methods
+ */
+ @Test
+ public void managerStatusTest() {
+ assertNotNull("Singleton Failed to instantiate", manager);
+ assertTrue("Failed to enable metric manager", MetricReportManager.isAvailable());
+ manager.disableManager();
+ assertFalse("Failed to disable metric manager", MetricReportManager.isAvailable());
+ manager.enableManager();
+ assertTrue("Failed to enable metric manager", MetricReportManager.isAvailable());
+ }
+
+ /**
+ * Test adding and accessing metric methods
+ */
+ @Test
+ public void managerMetricMaintenanceTest() {
+ assertEquals("Failed to add metric", manager.getAllMetrics().size(), 1);
+ assertTrue("Failed to add metric", manager.getAllMetrics().contains(metric));
+ assertEquals("Failed to get metric by Name", manager.getMetricFromName("FakeMetric"), metric);
+ }
+
+ /**
+ * Test adding, removing and accessing metric emitter.
+ */
+ @Test
+ public void managerEmitterMaintenanceTest() {
+ assertTrue("Failed to add Emitter", manager.getMetricEmitters().contains(emitter));
+
+ int originalSize = manager.getMetricEmitters().size();
+ manager.removeMetricEmitter(emitter);
+ assertEquals("Failed to remove emitter", manager.getMetricEmitters().size(), originalSize - 1);
+ manager.addMetricEmitter(emitter);
+ }
+
+ /**
+ * Test metric reporting methods, including InMemoryMetricEmitter methods
+ * @throws Exception
+ */
+ @Test
+ public void managerEmitterHandlingTest() throws Exception {
+ emitter.purgeAllData();
+ Date from = new Date();
+ metric.notifyManager();
+
+ synchronized (this) {
+ try {
+ wait(2000);
+ } catch (InterruptedException e) {
+ e.printStackTrace();
+ }
+ }
+ Date to = new Date();
+ List<InMemoryHistoryNode> nodes = emitter.getMetrics("FakeMetric", from, to, false);
+
+ assertEquals("Failed to report metric", 1, nodes.size());
+ assertEquals("Failed to report metric", nodes.get(0).getValue(), 4);
+ }
+}
diff --git a/azkaban-execserver/src/main/java/azkaban/execapp/AzkabanExecutorServer.java b/azkaban-execserver/src/main/java/azkaban/execapp/AzkabanExecutorServer.java
index 89351ee..d68bd16 100644
--- a/azkaban-execserver/src/main/java/azkaban/execapp/AzkabanExecutorServer.java
+++ b/azkaban-execserver/src/main/java/azkaban/execapp/AzkabanExecutorServer.java
@@ -29,9 +29,7 @@ import javax.management.MBeanServer;
import javax.management.ObjectName;
import org.apache.log4j.Logger;
-
import org.joda.time.DateTimeZone;
-
import org.mortbay.jetty.Connector;
import org.mortbay.jetty.Server;
import org.mortbay.jetty.servlet.Context;
@@ -41,7 +39,16 @@ import org.mortbay.thread.QueuedThreadPool;
import azkaban.executor.ExecutorLoader;
import azkaban.executor.JdbcExecutorLoader;
import azkaban.execapp.jmx.JmxFlowRunnerManager;
+import azkaban.execapp.metric.NumFailedFlowMetric;
+import azkaban.execapp.metric.NumFailedJobMetric;
+import azkaban.execapp.metric.NumQueuedFlowMetric;
+import azkaban.execapp.metric.NumRunningFlowMetric;
+import azkaban.execapp.metric.NumRunningJobMetric;
import azkaban.jmx.JmxJettyServer;
+import azkaban.metric.IMetricEmitter;
+import azkaban.metric.MetricException;
+import azkaban.metric.MetricReportManager;
+import azkaban.metric.inmemoryemitter.InMemoryMetricEmitter;
import azkaban.project.JdbcProjectLoader;
import azkaban.project.ProjectLoader;
import azkaban.server.AzkabanServer;
@@ -49,17 +56,17 @@ import azkaban.server.ServerConstants;
import azkaban.utils.Props;
import azkaban.utils.Utils;
+
public class AzkabanExecutorServer {
- private static final Logger logger = Logger
- .getLogger(AzkabanExecutorServer.class);
+ private static final Logger logger = Logger.getLogger(AzkabanExecutorServer.class);
private static final int MAX_FORM_CONTENT_SIZE = 10 * 1024 * 1024;
public static final String AZKABAN_HOME = "AZKABAN_HOME";
public static final String DEFAULT_CONF_PATH = "conf";
public static final String AZKABAN_PROPERTIES_FILE = "azkaban.properties";
- public static final String AZKABAN_PRIVATE_PROPERTIES_FILE =
- "azkaban.private.properties";
+ public static final String AZKABAN_PRIVATE_PROPERTIES_FILE = "azkaban.private.properties";
public static final String JOBTYPE_PLUGIN_DIR = "azkaban.jobtype.plugin.dir";
+ public static final String METRIC_INTERVAL = "executor.metric.milisecinterval.";
public static final int DEFAULT_PORT_NUMBER = 12321;
private static final String DEFAULT_TIMEZONE_ID = "default.timezone.id";
@@ -104,16 +111,16 @@ public class AzkabanExecutorServer {
root.addServlet(new ServletHolder(new ExecutorServlet()), "/executor");
root.addServlet(new ServletHolder(new JMXHttpServlet()), "/jmx");
- root.setAttribute(
- ServerConstants.AZKABAN_SERVLET_CONTEXT_KEY, this);
+ root.addServlet(new ServletHolder(new StatsServlet()), "/stats");
+
+ root.setAttribute(ServerConstants.AZKABAN_SERVLET_CONTEXT_KEY, this);
executionLoader = createExecLoader(props);
projectLoader = createProjectLoader(props);
- runnerManager =
- new FlowRunnerManager(props, executionLoader, projectLoader, this
- .getClass().getClassLoader());
+ runnerManager = new FlowRunnerManager(props, executionLoader, projectLoader, this.getClass().getClassLoader());
configureMBeanServer();
+ configureMetricReports();
try {
server.start();
@@ -125,6 +132,44 @@ public class AzkabanExecutorServer {
logger.info("Azkaban Executor Server started on port " + portNumber);
}
+ /**
+ * Configure Metric Reporting as per azkaban.properties settings
+ * @throws MetricException
+ */
+ private void configureMetricReports() throws MetricException {
+ Props props = getAzkabanProps();
+ if (props != null && props.getBoolean("executor.metric.reports", false)) {
+ logger.info("Starting to configure Metric Reports");
+ MetricReportManager metricManager = MetricReportManager.getInstance();
+ IMetricEmitter metricEmitter = new InMemoryMetricEmitter(props);
+ metricManager.addMetricEmitter(metricEmitter);
+
+ logger.info("Adding number of failed flow metric");
+ metricManager.addMetric(new NumFailedFlowMetric(metricManager, props.getInt(METRIC_INTERVAL
+ + NumFailedFlowMetric.NUM_FAILED_FLOW_METRIC_NAME, props.getInt(METRIC_INTERVAL + "default"))));
+
+ logger.info("Adding number of failed jobs metric");
+ metricManager.addMetric(new NumFailedJobMetric(metricManager, props.getInt(METRIC_INTERVAL
+ + NumFailedJobMetric.NUM_FAILED_JOB_METRIC_NAME, props.getInt(METRIC_INTERVAL + "default"))));
+
+ logger.info("Adding number of running Jobs metric");
+ metricManager.addMetric(new NumRunningJobMetric(metricManager, props.getInt(METRIC_INTERVAL
+ + NumRunningJobMetric.NUM_RUNNING_JOB_METRIC_NAME, props.getInt(METRIC_INTERVAL + "default"))));
+
+ logger.info("Adding number of running flows metric");
+ metricManager.addMetric(new NumRunningFlowMetric(runnerManager, metricManager, props.getInt(
+ METRIC_INTERVAL + NumRunningFlowMetric.NUM_RUNNING_FLOW_METRIC_NAME,
+ props.getInt(METRIC_INTERVAL + "default"))));
+
+ logger.info("Adding number of queued flows metric");
+ metricManager.addMetric(new NumQueuedFlowMetric(runnerManager, metricManager, props.getInt(
+ METRIC_INTERVAL + NumQueuedFlowMetric.NUM_QUEUED_FLOW_METRIC_NAME,
+ props.getInt(METRIC_INTERVAL + "default"))));
+
+ logger.info("Completed configuring Metric Reports");
+ }
+ }
+
private ExecutorLoader createExecLoader(Props props) {
return new JdbcExecutorLoader(props);
}
@@ -161,7 +206,7 @@ public class AzkabanExecutorServer {
/**
* Returns the currently executing executor server, if one exists.
- *
+ *
* @return
*/
public static AzkabanExecutorServer getApp() {
@@ -224,16 +269,14 @@ public class AzkabanExecutorServer {
return null;
}
- if (!new File(azkabanHome).isDirectory()
- || !new File(azkabanHome).canRead()) {
+ if (!new File(azkabanHome).isDirectory() || !new File(azkabanHome).canRead()) {
logger.error(azkabanHome + " is not a readable directory.");
return null;
}
File confPath = new File(azkabanHome, DEFAULT_CONF_PATH);
if (!confPath.exists() || !confPath.isDirectory() || !confPath.canRead()) {
- logger
- .error(azkabanHome + " does not contain a readable conf directory.");
+ logger.error(azkabanHome + " does not contain a readable conf directory.");
return null;
}
@@ -251,8 +294,7 @@ public class AzkabanExecutorServer {
* @return
*/
private static Props loadAzkabanConfigurationFromDirectory(File dir) {
- File azkabanPrivatePropsFile =
- new File(dir, AZKABAN_PRIVATE_PROPERTIES_FILE);
+ File azkabanPrivatePropsFile = new File(dir, AZKABAN_PRIVATE_PROPERTIES_FILE);
File azkabanPropsFile = new File(dir, AZKABAN_PROPERTIES_FILE);
Props props = null;
@@ -270,9 +312,7 @@ public class AzkabanExecutorServer {
} catch (FileNotFoundException e) {
logger.error("File not found. Could not load azkaban config file", e);
} catch (IOException e) {
- logger.error(
- "File found, but error reading. Could not load azkaban config file",
- e);
+ logger.error("File found, but error reading. Could not load azkaban config file", e);
}
return props;
@@ -306,8 +346,7 @@ public class AzkabanExecutorServer {
logger.info("Bean " + mbeanClass.getCanonicalName() + " registered.");
registeredMBeans.add(mbeanName);
} catch (Exception e) {
- logger.error("Error registering mbean " + mbeanClass.getCanonicalName(),
- e);
+ logger.error("Error registering mbean " + mbeanClass.getCanonicalName(), e);
}
}
diff --git a/azkaban-execserver/src/main/java/azkaban/execapp/FlowRunner.java b/azkaban-execserver/src/main/java/azkaban/execapp/FlowRunner.java
index f73a30a..e98f5f2 100644
--- a/azkaban-execserver/src/main/java/azkaban/execapp/FlowRunner.java
+++ b/azkaban-execserver/src/main/java/azkaban/execapp/FlowRunner.java
@@ -42,6 +42,8 @@ import azkaban.event.Event.Type;
import azkaban.event.EventHandler;
import azkaban.event.EventListener;
import azkaban.execapp.event.FlowWatcher;
+import azkaban.execapp.metric.NumFailedJobMetric;
+import azkaban.execapp.metric.NumRunningJobMetric;
import azkaban.executor.ExecutableFlow;
import azkaban.executor.ExecutableFlowBase;
import azkaban.executor.ExecutableNode;
@@ -52,19 +54,20 @@ import azkaban.executor.ExecutorManagerException;
import azkaban.executor.Status;
import azkaban.flow.FlowProps;
import azkaban.jobtype.JobTypeManager;
+import azkaban.metric.MetricReportManager;
import azkaban.project.ProjectLoader;
import azkaban.project.ProjectManagerException;
import azkaban.utils.Props;
import azkaban.utils.PropsUtils;
import azkaban.utils.SwapQueue;
+
/**
* Class that handles the running of a ExecutableFlow DAG
- *
+ *
*/
public class FlowRunner extends EventHandler implements Runnable {
- private static final Layout DEFAULT_LAYOUT = new PatternLayout(
- "%d{dd-MM-yyyy HH:mm:ss z} %c{1} %p - %m\n");
+ private static final Layout DEFAULT_LAYOUT = new PatternLayout("%d{dd-MM-yyyy HH:mm:ss z} %c{1} %p - %m\n");
// We check update every 5 minutes, just in case things get stuck. But for the
// most part, we'll be idling.
private static final long CHECK_WAIT_MS = 5 * 60 * 1000;
@@ -93,8 +96,7 @@ public class FlowRunner extends EventHandler implements Runnable {
private final JobTypeManager jobtypeManager;
private JobRunnerEventListener listener = new JobRunnerEventListener();
- private Set<JobRunner> activeJobRunners = Collections
- .newSetFromMap(new ConcurrentHashMap<JobRunner, Boolean>());
+ private Set<JobRunner> activeJobRunners = Collections.newSetFromMap(new ConcurrentHashMap<JobRunner, Boolean>());
// Thread safe swap queue for finishedExecutions.
private SwapQueue<ExecutableNode> finishedNodes;
@@ -122,23 +124,22 @@ public class FlowRunner extends EventHandler implements Runnable {
/**
* Constructor. This will create its own ExecutorService for thread pools
- *
+ *
* @param flow
* @param executorLoader
* @param projectLoader
* @param jobtypeManager
* @throws ExecutorManagerException
*/
- public FlowRunner(ExecutableFlow flow, ExecutorLoader executorLoader,
- ProjectLoader projectLoader, JobTypeManager jobtypeManager)
- throws ExecutorManagerException {
+ public FlowRunner(ExecutableFlow flow, ExecutorLoader executorLoader, ProjectLoader projectLoader,
+ JobTypeManager jobtypeManager) throws ExecutorManagerException {
this(flow, executorLoader, projectLoader, jobtypeManager, null);
}
/**
* Constructor. If executorService is null, then it will create it's own for
* thread pools.
- *
+ *
* @param flow
* @param executorLoader
* @param projectLoader
@@ -146,9 +147,8 @@ public class FlowRunner extends EventHandler implements Runnable {
* @param executorService
* @throws ExecutorManagerException
*/
- public FlowRunner(ExecutableFlow flow, ExecutorLoader executorLoader,
- ProjectLoader projectLoader, JobTypeManager jobtypeManager,
- ExecutorService executorService) throws ExecutorManagerException {
+ public FlowRunner(ExecutableFlow flow, ExecutorLoader executorLoader, ProjectLoader projectLoader,
+ JobTypeManager jobtypeManager, ExecutorService executorService) throws ExecutorManagerException {
this.execId = flow.getExecutionId();
this.flow = flow;
this.executorLoader = executorLoader;
@@ -210,23 +210,18 @@ public class FlowRunner extends EventHandler implements Runnable {
runFlow();
} catch (Throwable t) {
if (logger != null) {
- logger
- .error(
- "An error has occurred during the running of the flow. Quiting.",
- t);
+ logger.error("An error has occurred during the running of the flow. Quiting.", t);
}
flow.setStatus(Status.FAILED);
} finally {
if (watcher != null) {
logger.info("Watcher is attached. Stopping watcher.");
watcher.stopWatcher();
- logger
- .info("Watcher cancelled status is " + watcher.isWatchCancelled());
+ logger.info("Watcher cancelled status is " + watcher.isWatchCancelled());
}
flow.setEndTime(System.currentTimeMillis());
- logger.info("Setting end time for flow " + execId + " to "
- + System.currentTimeMillis());
+ logger.info("Setting end time for flow " + execId + " to " + System.currentTimeMillis());
closeLogger();
updateFlow();
@@ -251,8 +246,7 @@ public class FlowRunner extends EventHandler implements Runnable {
}
// If there are flow overrides, we apply them now.
- Map<String, String> flowParam =
- flow.getExecutionOptions().getFlowParameters();
+ Map<String, String> flowParam = flow.getExecutionOptions().getFlowParameters();
if (flowParam != null && !flowParam.isEmpty()) {
commonFlowProps = new Props(commonFlowProps, flowParam);
}
@@ -265,11 +259,9 @@ public class FlowRunner extends EventHandler implements Runnable {
this.watcher.setLogger(logger);
}
- logger.info("Running execid:" + execId + " flow:" + flowId + " project:"
- + projectId + " version:" + version);
+ logger.info("Running execid:" + execId + " flow:" + flowId + " project:" + projectId + " version:" + version);
if (pipelineExecId != null) {
- logger.info("Running simulateously with " + pipelineExecId
- + ". Pipelining level " + pipelineLevel);
+ logger.info("Running simulateously with " + pipelineExecId + ". Pipelining level " + pipelineLevel);
}
// The current thread is used for interrupting blocks
@@ -279,10 +271,8 @@ public class FlowRunner extends EventHandler implements Runnable {
private void updateFlowReference() throws ExecutorManagerException {
logger.info("Update active reference");
- if (!executorLoader.updateExecutableReference(execId,
- System.currentTimeMillis())) {
- throw new ExecutorManagerException(
- "The executor reference doesn't exist. May have been killed prematurely.");
+ if (!executorLoader.updateExecutableReference(execId, System.currentTimeMillis())) {
+ throw new ExecutorManagerException("The executor reference doesn't exist. May have been killed prematurely.");
}
}
@@ -356,7 +346,7 @@ public class FlowRunner extends EventHandler implements Runnable {
/**
* Main method that executes the jobs.
- *
+ *
* @throws Exception
*/
private void runFlow() throws Exception {
@@ -405,8 +395,7 @@ public class FlowRunner extends EventHandler implements Runnable {
resetFailedState(this.flow, retryJobs);
for (ExecutableNode node : retryJobs) {
- if (node.getStatus() == Status.READY
- || node.getStatus() == Status.DISABLED) {
+ if (node.getStatus() == Status.READY || node.getStatus() == Status.DISABLED) {
runReadyJob(node);
} else if (node.getStatus() == Status.SUCCEEDED) {
for (String outNodeId : node.getOutNodes()) {
@@ -475,8 +464,7 @@ public class FlowRunner extends EventHandler implements Runnable {
// Instant kill or skip if necessary.
boolean jobsRun = false;
for (ExecutableNode node : nodesToCheck) {
- if (Status.isStatusFinished(node.getStatus())
- || Status.isStatusRunning(node.getStatus())) {
+ if (Status.isStatusFinished(node.getStatus()) || Status.isStatusRunning(node.getStatus())) {
// Really shouldn't get in here.
continue;
}
@@ -493,8 +481,7 @@ public class FlowRunner extends EventHandler implements Runnable {
}
private boolean runReadyJob(ExecutableNode node) throws IOException {
- if (Status.isStatusFinished(node.getStatus())
- || Status.isStatusRunning(node.getStatus())) {
+ if (Status.isStatusFinished(node.getStatus()) || Status.isStatusRunning(node.getStatus())) {
return false;
}
@@ -504,8 +491,7 @@ public class FlowRunner extends EventHandler implements Runnable {
}
if (nextNodeStatus == Status.CANCELLED) {
- logger.info("Cancelling '" + node.getNestedId()
- + "' due to prior errors.");
+ logger.info("Cancelling '" + node.getNestedId() + "' due to prior errors.");
node.cancelNode(System.currentTimeMillis());
finishExecutableNode(node);
} else if (nextNodeStatus == Status.SKIPPED) {
@@ -537,8 +523,8 @@ public class FlowRunner extends EventHandler implements Runnable {
}
if (node.getRetries() > node.getAttempt()) {
- logger.info("Job '" + node.getId() + "' will be retried. Attempt "
- + node.getAttempt() + " of " + node.getRetries());
+ logger.info("Job '" + node.getId() + "' will be retried. Attempt " + node.getAttempt() + " of "
+ + node.getRetries());
node.setDelayedExecution(node.getRetryBackoff());
node.resetForRetry();
return true;
@@ -579,8 +565,7 @@ public class FlowRunner extends EventHandler implements Runnable {
for (String end : flow.getEndNodes()) {
ExecutableNode node = flow.getExecutableNode(end);
- if (node.getStatus() == Status.KILLED
- || node.getStatus() == Status.FAILED
+ if (node.getStatus() == Status.KILLED || node.getStatus() == Status.FAILED
|| node.getStatus() == Status.CANCELLED) {
succeeded = false;
}
@@ -602,22 +587,19 @@ public class FlowRunner extends EventHandler implements Runnable {
flow.setUpdateTime(System.currentTimeMillis());
long durationSec = (flow.getEndTime() - flow.getStartTime()) / 1000;
switch (flow.getStatus()) {
- case FAILED_FINISHING:
- logger.info("Setting flow '" + id + "' status to FAILED in "
- + durationSec + " seconds");
- flow.setStatus(Status.FAILED);
- break;
- case FAILED:
- case KILLED:
- case CANCELLED:
- case FAILED_SUCCEEDED:
- logger.info("Flow '" + id + "' is set to " + flow.getStatus().toString()
- + " in " + durationSec + " seconds");
- break;
- default:
- flow.setStatus(Status.SUCCEEDED);
- logger.info("Flow '" + id + "' is set to " + flow.getStatus().toString()
- + " in " + durationSec + " seconds");
+ case FAILED_FINISHING:
+ logger.info("Setting flow '" + id + "' status to FAILED in " + durationSec + " seconds");
+ flow.setStatus(Status.FAILED);
+ break;
+ case FAILED:
+ case KILLED:
+ case CANCELLED:
+ case FAILED_SUCCEEDED:
+ logger.info("Flow '" + id + "' is set to " + flow.getStatus().toString() + " in " + durationSec + " seconds");
+ break;
+ default:
+ flow.setStatus(Status.SUCCEEDED);
+ logger.info("Flow '" + id + "' is set to " + flow.getStatus().toString() + " in " + durationSec + " seconds");
}
// If the finalized flow is actually the top level flow, than we finish
@@ -678,13 +660,10 @@ public class FlowRunner extends EventHandler implements Runnable {
// load the override props if any
try {
- props =
- projectLoader.fetchProjectProperty(flow.getProjectId(),
- flow.getVersion(), node.getId() + ".jor");
+ props = projectLoader.fetchProjectProperty(flow.getProjectId(), flow.getVersion(), node.getId() + ".jor");
} catch (ProjectManagerException e) {
e.printStackTrace();
- logger.error("Error loading job override property for job "
- + node.getId());
+ logger.error("Error loading job override property for job " + node.getId());
}
File path = new File(execDir, source);
@@ -694,8 +673,7 @@ public class FlowRunner extends EventHandler implements Runnable {
props = new Props(null, path);
} catch (IOException e) {
e.printStackTrace();
- logger.error("Error loading job file " + source + " for job "
- + node.getId());
+ logger.error("Error loading job file " + source + " for job " + node.getId());
}
}
// setting this fake source as this will be used to determine the location
@@ -725,15 +703,14 @@ public class FlowRunner extends EventHandler implements Runnable {
/**
* Determines what the state of the next node should be. Returns null if the
* node should not be run.
- *
+ *
* @param node
* @return
*/
public Status getImpliedStatus(ExecutableNode node) {
// If it's running or finished with 'SUCCEEDED', than don't even
// bother starting this job.
- if (Status.isStatusRunning(node.getStatus())
- || node.getStatus() == Status.SUCCEEDED) {
+ if (Status.isStatusRunning(node.getStatus()) || node.getStatus() == Status.SUCCEEDED) {
return null;
}
@@ -748,8 +725,7 @@ public class FlowRunner extends EventHandler implements Runnable {
if (!Status.isStatusFinished(depStatus)) {
return null;
- } else if (depStatus == Status.FAILED || depStatus == Status.CANCELLED
- || depStatus == Status.KILLED) {
+ } else if (depStatus == Status.FAILED || depStatus == Status.CANCELLED || depStatus == Status.KILLED) {
// We propagate failures as KILLED states.
shouldKill = true;
}
@@ -757,16 +733,14 @@ public class FlowRunner extends EventHandler implements Runnable {
// If it's disabled but ready to run, we want to make sure it continues
// being disabled.
- if (node.getStatus() == Status.DISABLED
- || node.getStatus() == Status.SKIPPED) {
+ if (node.getStatus() == Status.DISABLED || node.getStatus() == Status.SKIPPED) {
return Status.SKIPPED;
}
// If the flow has failed, and we want to finish only the currently running
// jobs, we just
// kill everything else. We also kill, if the flow has been cancelled.
- if (flowFailed
- && failureAction == ExecutionOptions.FailureAction.FINISH_CURRENTLY_RUNNING) {
+ if (flowFailed && failureAction == ExecutionOptions.FailureAction.FINISH_CURRENTLY_RUNNING) {
return Status.CANCELLED;
} else if (shouldKill || isKilled()) {
return Status.CANCELLED;
@@ -780,8 +754,7 @@ public class FlowRunner extends EventHandler implements Runnable {
Props previousOutput = null;
// Iterate the in nodes again and create the dependencies
for (String dependency : node.getInNodes()) {
- Props output =
- node.getParentFlow().getExecutableNode(dependency).getOutputProps();
+ Props output = node.getParentFlow().getExecutableNode(dependency).getOutputProps();
if (output != null) {
output = Props.clone(output);
output.setParent(previousOutput);
@@ -796,9 +769,7 @@ public class FlowRunner extends EventHandler implements Runnable {
// Load job file.
File path = new File(execDir, node.getJobSource());
- JobRunner jobRunner =
- new JobRunner(node, path.getParentFile(), executorLoader,
- jobtypeManager);
+ JobRunner jobRunner = new JobRunner(node, path.getParentFile(), executorLoader, jobtypeManager);
if (watcher != null) {
jobRunner.setPipeline(watcher, pipelineLevel);
}
@@ -810,9 +781,31 @@ public class FlowRunner extends EventHandler implements Runnable {
jobRunner.setLogSettings(logger, jobLogFileSize, jobLogNumFiles);
jobRunner.addListener(listener);
+ configureJobLevelMetrics(jobRunner);
+
return jobRunner;
}
+ /**
+ * Configure Azkaban metrics tracking for a new jobRunner instance
+ * @param jobRunner
+ */
+ private void configureJobLevelMetrics(JobRunner jobRunner) {
+ logger.info("Configuring Azkaban metrics tracking for jobrunner object");
+ if (MetricReportManager.isAvailable()) {
+ MetricReportManager metricManager = MetricReportManager.getInstance();
+
+ //Adding NumRunningJobMetric listener
+ jobRunner.addListener((NumRunningJobMetric) metricManager
+ .getMetricFromName(NumRunningJobMetric.NUM_RUNNING_JOB_METRIC_NAME));
+
+ //Adding NumFailedJobMetric listener
+ jobRunner.addListener((NumFailedJobMetric) metricManager
+ .getMetricFromName(NumFailedJobMetric.NUM_FAILED_JOB_METRIC_NAME));
+
+ }
+ }
+
public void pause(String user) {
synchronized (mainSyncObj) {
if (!flowFinished) {
@@ -872,8 +865,7 @@ public class FlowRunner extends EventHandler implements Runnable {
if (watcher != null) {
logger.info("Watcher is attached. Stopping watcher.");
watcher.stopWatcher();
- logger
- .info("Watcher cancelled status is " + watcher.isWatchCancelled());
+ logger.info("Watcher cancelled status is " + watcher.isWatchCancelled());
}
logger.info("Killing " + activeJobRunners.size() + " jobs.");
@@ -891,8 +883,7 @@ public class FlowRunner extends EventHandler implements Runnable {
}
}
- private void resetFailedState(ExecutableFlowBase flow,
- List<ExecutableNode> nodesToRetry) {
+ private void resetFailedState(ExecutableFlowBase flow, List<ExecutableNode> nodesToRetry) {
// bottom up
LinkedList<ExecutableNode> queue = new LinkedList<ExecutableNode>();
for (String id : flow.getEndNodes()) {
@@ -921,24 +912,24 @@ public class FlowRunner extends EventHandler implements Runnable {
} else if (node instanceof ExecutableFlowBase) {
ExecutableFlowBase base = (ExecutableFlowBase) node;
switch (base.getStatus()) {
- case CANCELLED:
- node.setStatus(Status.READY);
- node.setEndTime(-1);
- node.setStartTime(-1);
- node.setUpdateTime(currentTime);
- // Break out of the switch. We'll reset the flow just like a normal
- // node
- break;
- case KILLED:
- case FAILED:
- case FAILED_FINISHING:
- resetFailedState(base, nodesToRetry);
- continue;
- default:
- // Continue the while loop. If the job is in a finished state that's
- // not
- // a failure, we don't want to reset the job.
- continue;
+ case CANCELLED:
+ node.setStatus(Status.READY);
+ node.setEndTime(-1);
+ node.setStartTime(-1);
+ node.setUpdateTime(currentTime);
+ // Break out of the switch. We'll reset the flow just like a normal
+ // node
+ break;
+ case KILLED:
+ case FAILED:
+ case FAILED_FINISHING:
+ resetFailedState(base, nodesToRetry);
+ continue;
+ default:
+ // Continue the while loop. If the job is in a finished state that's
+ // not
+ // a failure, we don't want to reset the job.
+ continue;
}
} else if (node.getStatus() == Status.CANCELLED) {
// Not a flow, but killed
@@ -946,16 +937,13 @@ public class FlowRunner extends EventHandler implements Runnable {
node.setStartTime(-1);
node.setEndTime(-1);
node.setUpdateTime(currentTime);
- } else if (node.getStatus() == Status.FAILED
- || node.getStatus() == Status.KILLED) {
+ } else if (node.getStatus() == Status.FAILED || node.getStatus() == Status.KILLED) {
node.resetForRetry();
nodesToRetry.add(node);
}
- if (!(node instanceof ExecutableFlowBase)
- && node.getStatus() != oldStatus) {
- logger.info("Resetting job '" + node.getNestedId() + "' from "
- + oldStatus + " to " + node.getStatus());
+ if (!(node instanceof ExecutableFlowBase) && node.getStatus() != oldStatus) {
+ logger.info("Resetting job '" + node.getNestedId() + "' from " + oldStatus + " to " + node.getStatus());
}
for (String inId : node.getInNodes()) {
@@ -977,16 +965,14 @@ public class FlowRunner extends EventHandler implements Runnable {
// start node has not.
for (String id : flow.getStartNodes()) {
ExecutableNode node = flow.getExecutableNode(id);
- if (node.getStatus() == Status.READY
- || node.getStatus() == Status.DISABLED) {
+ if (node.getStatus() == Status.READY || node.getStatus() == Status.DISABLED) {
nodesToRetry.add(node);
}
}
}
flow.setUpdateTime(System.currentTimeMillis());
flow.setEndTime(-1);
- logger.info("Resetting flow '" + flow.getNestedId() + "' from "
- + oldFlowState + " to " + flow.getStatus());
+ logger.info("Resetting flow '" + flow.getNestedId() + "' from " + oldFlowState + " to " + flow.getStatus());
}
private void interrupt() {
@@ -1007,14 +993,13 @@ public class FlowRunner extends EventHandler implements Runnable {
ExecutableNode node = runner.getNode();
long seconds = (node.getEndTime() - node.getStartTime()) / 1000;
synchronized (mainSyncObj) {
- logger.info("Job " + node.getNestedId() + " finished with status "
- + node.getStatus() + " in " + seconds + " seconds");
+ logger.info("Job " + node.getNestedId() + " finished with status " + node.getStatus() + " in " + seconds
+ + " seconds");
// Cancellation is handled in the main thread, but if the flow is
// paused, the main thread is paused too.
// This unpauses the flow for cancellation.
- if (flowPaused && node.getStatus() == Status.FAILED
- && failureAction == FailureAction.CANCEL_ALL) {
+ if (flowPaused && node.getStatus() == Status.FAILED && failureAction == FailureAction.CANCEL_ALL) {
flowPaused = false;
}
@@ -1057,8 +1042,7 @@ public class FlowRunner extends EventHandler implements Runnable {
ExecutableNode node = flow.getExecutableNodePath(jobId);
File path = new File(execDir, node.getJobSource());
- String attachmentFileName =
- JobRunner.createAttachmentFileName(node, attempt);
+ String attachmentFileName = JobRunner.createAttachmentFileName(node, attempt);
File attachmentFile = new File(path.getParentFile(), attachmentFileName);
if (!attachmentFile.exists()) {
return null;
diff --git a/azkaban-execserver/src/main/java/azkaban/execapp/FlowRunnerManager.java b/azkaban-execserver/src/main/java/azkaban/execapp/FlowRunnerManager.java
index 54d82e8..d052d99 100644
--- a/azkaban-execserver/src/main/java/azkaban/execapp/FlowRunnerManager.java
+++ b/azkaban-execserver/src/main/java/azkaban/execapp/FlowRunnerManager.java
@@ -42,12 +42,15 @@ import azkaban.event.EventListener;
import azkaban.execapp.event.FlowWatcher;
import azkaban.execapp.event.LocalFlowWatcher;
import azkaban.execapp.event.RemoteFlowWatcher;
+import azkaban.execapp.metric.NumFailedFlowMetric;
+import azkaban.execapp.metric.NumFailedJobMetric;
import azkaban.executor.ExecutableFlow;
import azkaban.executor.ExecutionOptions;
import azkaban.executor.ExecutorLoader;
import azkaban.executor.ExecutorManagerException;
import azkaban.jobtype.JobTypeManager;
import azkaban.jobtype.JobTypeManagerException;
+import azkaban.metric.MetricReportManager;
import azkaban.project.ProjectLoader;
import azkaban.utils.FileIOUtils;
import azkaban.utils.FileIOUtils.JobMetaData;
@@ -60,26 +63,26 @@ import azkaban.utils.TrackingThreadPool;
/**
* Execution manager for the server side execution.
- *
+ *
* When a flow is submitted to FlowRunnerManager, it is the
* {@link Status.PREPARING} status. When a flow is about to be executed by
* FlowRunner, its status is updated to {@link Status.RUNNING}
- *
+ *
* Two main data structures are used in this class to maintain flows.
- *
+ *
* runningFlows: this is used as a bookkeeping for submitted flows in
* FlowRunnerManager. It has nothing to do with the executor service that is
* used to execute the flows. This bookkeeping is used at the time of canceling
* or killing a flow. The flows in this data structure is removed in the
* handleEvent method.
- *
+ *
* submittedFlows: this is used to keep track the execution of the flows, so it
* has the mapping between a Future<?> and an execution id. This would allow us
* to find out the execution ids of the flows that are in the Status.PREPARING
* status. The entries in this map is removed once the flow execution is
* completed.
- *
- *
+ *
+ *
*/
public class FlowRunnerManager implements EventListener,
ThreadPoolExecutingListener {
@@ -483,6 +486,8 @@ public class FlowRunnerManager implements EventListener,
.setValidateProxyUser(validateProxyUser)
.setNumJobThreads(numJobThreads).addListener(this);
+ configureFlowLevelMetrics(runner);
+
// Check again.
if (runningFlows.containsKey(execId)) {
throw new ExecutorManagerException("Execution " + execId
@@ -508,6 +513,21 @@ public class FlowRunnerManager implements EventListener,
}
}
+ /**
+ * Configure Azkaban metrics tracking for a new flowRunner instance
+ * @param flowRunner
+ */
+ private void configureFlowLevelMetrics(FlowRunner flowRunner) {
+ logger.info("Configuring Azkaban metrics tracking for flow runner object");
+
+ if (MetricReportManager.isAvailable()) {
+ MetricReportManager metricManager = MetricReportManager.getInstance();
+ //Adding NumFailedFlow Metric listener
+ flowRunner.addListener((NumFailedFlowMetric) metricManager
+ .getMetricFromName(NumFailedFlowMetric.NUM_FAILED_FLOW_METRIC_NAME));
+ }
+ }
+
private void setupFlow(ExecutableFlow flow) throws ExecutorManagerException {
int execId = flow.getExecutionId();
File execPath = new File(executionDirectory, String.valueOf(execId));
diff --git a/azkaban-execserver/src/main/java/azkaban/execapp/metric/NumFailedFlowMetric.java b/azkaban-execserver/src/main/java/azkaban/execapp/metric/NumFailedFlowMetric.java
new file mode 100644
index 0000000..b77c74f
--- /dev/null
+++ b/azkaban-execserver/src/main/java/azkaban/execapp/metric/NumFailedFlowMetric.java
@@ -0,0 +1,65 @@
+/*
+ * Copyright 2012 LinkedIn Corp.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package azkaban.execapp.metric;
+
+import azkaban.event.Event;
+import azkaban.event.Event.Type;
+import azkaban.event.EventListener;
+import azkaban.execapp.FlowRunner;
+import azkaban.executor.Status;
+import azkaban.metric.MetricException;
+import azkaban.metric.MetricReportManager;
+import azkaban.metric.TimeBasedReportingMetric;
+
+/**
+ * Metric to keep track of number of failed flows in between the tracking events
+ */
+public class NumFailedFlowMetric extends TimeBasedReportingMetric<Integer> implements EventListener {
+ public static final String NUM_FAILED_FLOW_METRIC_NAME = "NumFailedFlowMetric";
+ private static final String NUM_FAILED_FLOW_METRIC_TYPE = "uint16";
+
+ public NumFailedFlowMetric(MetricReportManager manager, long interval) throws MetricException {
+ super(NUM_FAILED_FLOW_METRIC_NAME, NUM_FAILED_FLOW_METRIC_TYPE, 0, manager, interval);
+ logger.debug("Instantiated NumFailedJobMetric");
+ }
+
+ /**
+ * Listen for events to maintain correct value of number of failed flows
+ * {@inheritDoc}
+ * @see azkaban.event.EventListener#handleEvent(azkaban.event.Event)
+ */
+ @Override
+ public synchronized void handleEvent(Event event) {
+ if (event.getType() == Type.FLOW_FINISHED) {
+ FlowRunner runner = (FlowRunner) event.getRunner();
+ if (runner != null && runner.getExecutableFlow().getStatus().equals(Status.FAILED)) {
+ value = value + 1;
+ }
+ }
+ }
+
+ @Override
+ protected void preTrackingEventMethod() {
+ // Nothing to finalize before tracking event
+ }
+
+ @Override
+ protected synchronized void postTrackingEventMethod() {
+ value = 0;
+ }
+
+}
diff --git a/azkaban-execserver/src/main/java/azkaban/execapp/metric/NumFailedJobMetric.java b/azkaban-execserver/src/main/java/azkaban/execapp/metric/NumFailedJobMetric.java
new file mode 100644
index 0000000..bb463d4
--- /dev/null
+++ b/azkaban-execserver/src/main/java/azkaban/execapp/metric/NumFailedJobMetric.java
@@ -0,0 +1,63 @@
+/*
+ * Copyright 2012 LinkedIn Corp.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package azkaban.execapp.metric;
+
+import azkaban.event.Event;
+import azkaban.event.Event.Type;
+import azkaban.event.EventListener;
+import azkaban.execapp.JobRunner;
+import azkaban.executor.Status;
+import azkaban.metric.MetricException;
+import azkaban.metric.MetricReportManager;
+import azkaban.metric.TimeBasedReportingMetric;
+
+/**
+ * Metric to keep track of number of failed jobs in between the tracking events
+ */
+public class NumFailedJobMetric extends TimeBasedReportingMetric<Integer> implements EventListener {
+ public static final String NUM_FAILED_JOB_METRIC_NAME = "NumFailedJobMetric";
+ private static final String NUM_FAILED_JOB_METRIC_TYPE = "uint16";
+
+ public NumFailedJobMetric(MetricReportManager manager, long interval) throws MetricException {
+ super(NUM_FAILED_JOB_METRIC_NAME, NUM_FAILED_JOB_METRIC_TYPE, 0, manager, interval);
+ logger.debug("Instantiated NumFailedJobMetric");
+ }
+
+ /**
+ * Listen for events to maintain correct value of number of failed jobs
+ * {@inheritDoc}
+ * @see azkaban.event.EventListener#handleEvent(azkaban.event.Event)
+ */
+ @Override
+ public synchronized void handleEvent(Event event) {
+ JobRunner runner = (JobRunner) event.getRunner();
+ if (event.getType() == Type.JOB_FINISHED && runner.getStatus().equals(Status.FAILED)) {
+ value = value + 1;
+ }
+ }
+
+ @Override
+ protected void preTrackingEventMethod() {
+ // Nothing to finalize before tracking event
+ }
+
+ @Override
+ protected synchronized void postTrackingEventMethod() {
+ value = 0;
+ }
+
+}
diff --git a/azkaban-execserver/src/main/java/azkaban/execapp/metric/NumQueuedFlowMetric.java b/azkaban-execserver/src/main/java/azkaban/execapp/metric/NumQueuedFlowMetric.java
new file mode 100644
index 0000000..c44917a
--- /dev/null
+++ b/azkaban-execserver/src/main/java/azkaban/execapp/metric/NumQueuedFlowMetric.java
@@ -0,0 +1,60 @@
+/*
+ * Copyright 2012 LinkedIn Corp.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package azkaban.execapp.metric;
+
+import azkaban.execapp.FlowRunnerManager;
+import azkaban.metric.MetricException;
+import azkaban.metric.MetricReportManager;
+import azkaban.metric.TimeBasedReportingMetric;
+
+/**
+ * Metric to keep track of number of queued flows in Azkaban exec server
+ */
+public class NumQueuedFlowMetric extends TimeBasedReportingMetric<Integer> {
+ public static final String NUM_QUEUED_FLOW_METRIC_NAME = "NumQueuedFlowMetric";
+ private static final String NUM_QUEUED_FLOW_METRIC_TYPE = "uint16";
+
+ private FlowRunnerManager flowManager;
+
+ /**
+ * @param flowRunnerManager Flow runner manager
+ * @param manager metric report manager
+ * @param interval reporting interval
+ * @throws MetricException
+ */
+ public NumQueuedFlowMetric(FlowRunnerManager flowRunnerManager, MetricReportManager manager, long interval) throws MetricException {
+ super(NUM_QUEUED_FLOW_METRIC_NAME, NUM_QUEUED_FLOW_METRIC_TYPE, 0, manager, interval);
+ logger.debug("Instantiated NumQueuedFlowMetric");
+ flowManager = flowRunnerManager;
+ }
+
+ /**
+ * Update value using flow manager
+ * {@inheritDoc}
+ * @see azkaban.metric.TimeBasedReportingMetric#preTrackingEventMethod()
+ */
+ @Override
+ protected synchronized void preTrackingEventMethod() {
+ value = flowManager.getNumQueuedFlows();
+ }
+
+ @Override
+ protected void postTrackingEventMethod() {
+ // nothing to post process
+ }
+
+}
diff --git a/azkaban-execserver/src/main/java/azkaban/execapp/metric/NumRunningFlowMetric.java b/azkaban-execserver/src/main/java/azkaban/execapp/metric/NumRunningFlowMetric.java
new file mode 100644
index 0000000..d7d09cb
--- /dev/null
+++ b/azkaban-execserver/src/main/java/azkaban/execapp/metric/NumRunningFlowMetric.java
@@ -0,0 +1,60 @@
+/*
+ * Copyright 2012 LinkedIn Corp.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package azkaban.execapp.metric;
+
+import azkaban.execapp.FlowRunnerManager;
+import azkaban.metric.MetricException;
+import azkaban.metric.MetricReportManager;
+import azkaban.metric.TimeBasedReportingMetric;
+
+/**
+ * Metric to keep track of number of running flows in Azkaban exec server
+ */
+public class NumRunningFlowMetric extends TimeBasedReportingMetric<Integer> {
+ public static final String NUM_RUNNING_FLOW_METRIC_NAME = "NumRunningFlowMetric";
+ private static final String NUM_RUNNING_FLOW_METRIC_TYPE = "uint16";
+
+ private FlowRunnerManager flowManager;
+
+ /**
+ * @param flowRunnerManager Flow runner manager
+ * @param manager metric report manager
+ * @param interval reporting interval
+ * @throws MetricException
+ */
+ public NumRunningFlowMetric(FlowRunnerManager flowRunnerManager, MetricReportManager manager, long interval) throws MetricException {
+ super(NUM_RUNNING_FLOW_METRIC_NAME, NUM_RUNNING_FLOW_METRIC_TYPE, 0, manager, interval);
+ logger.debug("Instantiated NumRunningFlowMetric");
+ flowManager = flowRunnerManager;
+ }
+
+ /**
+ * Update value using flow manager
+ * {@inheritDoc}
+ * @see azkaban.metric.TimeBasedReportingMetric#preTrackingEventMethod()
+ */
+ @Override
+ protected synchronized void preTrackingEventMethod() {
+ value = flowManager.getNumRunningFlows();
+ }
+
+ @Override
+ protected void postTrackingEventMethod() {
+ // nothing to post process
+ }
+
+}
diff --git a/azkaban-execserver/src/main/java/azkaban/execapp/metric/NumRunningJobMetric.java b/azkaban-execserver/src/main/java/azkaban/execapp/metric/NumRunningJobMetric.java
new file mode 100644
index 0000000..84ebfd4
--- /dev/null
+++ b/azkaban-execserver/src/main/java/azkaban/execapp/metric/NumRunningJobMetric.java
@@ -0,0 +1,67 @@
+/*
+ * Copyright 2012 LinkedIn Corp.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package azkaban.execapp.metric;
+
+import azkaban.event.Event;
+import azkaban.event.Event.Type;
+import azkaban.event.EventListener;
+import azkaban.metric.MetricException;
+import azkaban.metric.MetricReportManager;
+import azkaban.metric.TimeBasedReportingMetric;
+
+/**
+ * Metric to keep track of number of running jobs in Azkaban exec server
+ */
+public class NumRunningJobMetric extends TimeBasedReportingMetric<Integer> implements EventListener {
+ public static final String NUM_RUNNING_JOB_METRIC_NAME = "NumRunningJobMetric";
+ private static final String NUM_RUNNING_JOB_METRIC_TYPE = "uint16";
+
+ /**
+ * @param manager metric manager
+ * @param interval reporting interval
+ * @throws MetricException
+ */
+ public NumRunningJobMetric(MetricReportManager manager, long interval) throws MetricException {
+ super(NUM_RUNNING_JOB_METRIC_NAME, NUM_RUNNING_JOB_METRIC_TYPE, 0, manager, interval);
+ logger.debug("Instantiated NumRunningJobMetric");
+ }
+
+ /**
+ * Listen for events to maintain correct value of number of running jobs
+ * {@inheritDoc}
+ * @see azkaban.event.EventListener#handleEvent(azkaban.event.Event)
+ */
+ @Override
+ public synchronized void handleEvent(Event event) {
+ if (event.getType() == Type.JOB_STARTED) {
+ value = value + 1;
+ } else if (event.getType() == Type.JOB_FINISHED) {
+ value = value - 1;
+ }
+ }
+
+ @Override
+ protected void preTrackingEventMethod() {
+ // nothing to finalize value is already updated
+ }
+
+ @Override
+ protected void postTrackingEventMethod() {
+ // nothing to post process
+ }
+
+}
diff --git a/azkaban-execserver/src/main/java/azkaban/execapp/StatsServlet.java b/azkaban-execserver/src/main/java/azkaban/execapp/StatsServlet.java
new file mode 100644
index 0000000..152a3aa
--- /dev/null
+++ b/azkaban-execserver/src/main/java/azkaban/execapp/StatsServlet.java
@@ -0,0 +1,265 @@
+/*
+ * Copyright 2014 LinkedIn Corp.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package azkaban.execapp;
+
+import java.io.IOException;
+import java.text.DateFormat;
+import java.text.ParseException;
+import java.text.SimpleDateFormat;
+import java.util.Date;
+import java.util.HashMap;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+
+import javax.servlet.ServletConfig;
+import javax.servlet.ServletException;
+import javax.servlet.http.HttpServlet;
+import javax.servlet.http.HttpServletRequest;
+import javax.servlet.http.HttpServletResponse;
+
+import org.apache.log4j.Logger;
+
+import azkaban.executor.ConnectorParams;
+import azkaban.metric.IMetric;
+import azkaban.metric.IMetricEmitter;
+import azkaban.metric.MetricReportManager;
+import azkaban.metric.TimeBasedReportingMetric;
+import azkaban.metric.inmemoryemitter.InMemoryHistoryNode;
+import azkaban.metric.inmemoryemitter.InMemoryMetricEmitter;
+import azkaban.server.HttpRequestUtils;
+import azkaban.server.ServerConstants;
+import azkaban.utils.JSONUtils;
+
+
+/**
+ * Servlet to communicate with Azkaban exec server
+ * This servlet get requests from stats servlet in Azkaban Web server
+ */
+public class StatsServlet extends HttpServlet implements ConnectorParams {
+ private static final long serialVersionUID = 2L;
+ private static final Logger logger = Logger.getLogger(StatsServlet.class);
+
+ public boolean hasParam(HttpServletRequest request, String param) {
+ return HttpRequestUtils.hasParam(request, param);
+ }
+
+ public String getParam(HttpServletRequest request, String name) throws ServletException {
+ return HttpRequestUtils.getParam(request, name);
+ }
+
+ public Boolean getBooleanParam(HttpServletRequest request, String name) throws ServletException {
+ return HttpRequestUtils.getBooleanParam(request, name);
+ }
+
+ public long getLongParam(HttpServletRequest request, String name) throws ServletException {
+ return HttpRequestUtils.getLongParam(request, name);
+ }
+
+ /**
+ * Handle all get request to Stats Servlet
+ * {@inheritDoc}
+ * @see javax.servlet.http.HttpServlet#doGet(javax.servlet.http.HttpServletRequest, javax.servlet.http.HttpServletResponse)
+ */
+ protected void doGet(HttpServletRequest req, HttpServletResponse resp) throws ServletException, IOException {
+ Map<String, Object> ret = new HashMap<String, Object>();
+
+ if (hasParam(req, ACTION_PARAM)) {
+ String action = getParam(req, ACTION_PARAM);
+ if (action.equals(STATS_SET_REPORTINGINTERVAL)) {
+ handleChangeMetricInterval(req, ret);
+ } else if (action.equals(STATS_SET_CLEANINGINTERVAL)) {
+ handleChangeCleaningInterval(req, ret);
+ } else if (action.equals(STATS_SET_MAXREPORTERPOINTS)) {
+ handleChangeEmitterPoints(req, ret);
+ } else if (action.equals(STATS_GET_ALLMETRICSNAME)) {
+ handleGetAllMMetricsName(req, ret);
+ } else if (action.equals(STATS_GET_METRICHISTORY)) {
+ handleGetMetricHistory(req, ret);
+ } else if (action.equals(STATS_SET_ENABLEMETRICS)) {
+ handleChangeManagerStatusRequest(req, ret, true);
+ } else if (action.equals(STATS_SET_DISABLEMETRICS)) {
+ handleChangeManagerStatusRequest(req, ret, false);
+ } else {
+ ret.put(RESPONSE_ERROR, "Invalid action");
+ }
+ }
+
+ JSONUtils.toJSON(ret, resp.getOutputStream(), true);
+ }
+
+ /**
+ * enable or disable metric Manager
+ * A disable will also purge all data from all metric emitters
+ */
+ private void handleChangeManagerStatusRequest(HttpServletRequest req, Map<String, Object> ret,
+ boolean enableMetricManager) {
+ try {
+ logger.info("Updating metric manager status");
+ if ((enableMetricManager && MetricReportManager.isInstantiated()) || MetricReportManager.isAvailable()) {
+ MetricReportManager metricManager = MetricReportManager.getInstance();
+ if (enableMetricManager) {
+ metricManager.enableManager();
+ } else {
+ metricManager.disableManager();
+ }
+ ret.put(STATUS_PARAM, RESPONSE_SUCCESS);
+ } else {
+ ret.put(RESPONSE_ERROR, "MetricManager is not available");
+ }
+ } catch (Exception e) {
+ logger.error(e);
+ ret.put(RESPONSE_ERROR, e.getMessage());
+ }
+ }
+
+ /**
+ * Update number of display snapshots for /stats graphs
+ */
+ private void handleChangeEmitterPoints(HttpServletRequest req, Map<String, Object> ret) {
+ try {
+ long numInstance = getLongParam(req, STATS_MAP_EMITTERNUMINSTANCES);
+ if (MetricReportManager.isAvailable()) {
+ MetricReportManager metricManager = MetricReportManager.getInstance();
+ InMemoryMetricEmitter memoryEmitter = extractInMemoryMetricEmitter(metricManager);
+ memoryEmitter.setReportingInstances(numInstance);
+ ret.put(STATUS_PARAM, RESPONSE_SUCCESS);
+ } else {
+ ret.put(RESPONSE_ERROR, "MetricManager is not available");
+ }
+ } catch (Exception e) {
+ logger.error(e);
+ ret.put(RESPONSE_ERROR, e.getMessage());
+ }
+ }
+
+ /**
+ * Update InMemoryMetricEmitter interval to maintain metric snapshots
+ */
+ private void handleChangeCleaningInterval(HttpServletRequest req, Map<String, Object> ret) {
+ try {
+ long newInterval = getLongParam(req, STATS_MAP_CLEANINGINTERVAL);
+ if (MetricReportManager.isAvailable()) {
+ MetricReportManager metricManager = MetricReportManager.getInstance();
+ InMemoryMetricEmitter memoryEmitter = extractInMemoryMetricEmitter(metricManager);
+ memoryEmitter.setReportingInterval(newInterval);
+ ret.put(STATUS_PARAM, RESPONSE_SUCCESS);
+ } else {
+ ret.put(RESPONSE_ERROR, "MetricManager is not available");
+ }
+ } catch (Exception e) {
+ logger.error(e);
+ ret.put(RESPONSE_ERROR, e.getMessage());
+ }
+ }
+
+ /**
+ * Get metric snapshots for a metric and date specification
+ * @throws ServletException
+ */
+ private void handleGetMetricHistory(HttpServletRequest req, Map<String, Object> ret) throws ServletException {
+ if (MetricReportManager.isAvailable()) {
+ MetricReportManager metricManager = MetricReportManager.getInstance();
+ InMemoryMetricEmitter memoryEmitter = extractInMemoryMetricEmitter(metricManager);
+
+ // if we have a memory emitter
+ if (memoryEmitter != null) {
+ try {
+ List<InMemoryHistoryNode> result =
+ memoryEmitter.getMetrics(getParam(req, STATS_MAP_METRICNAMEPARAM),
+ parseDate(getParam(req, STATS_MAP_STARTDATE)), parseDate(getParam(req, STATS_MAP_ENDDATE)),
+ getBooleanParam(req, STATS_MAP_METRICRETRIEVALMODE));
+
+ if (result != null && result.size() > 0) {
+ ret.put("data", result);
+ } else {
+ ret.put(RESPONSE_ERROR, "No metric stats available");
+ }
+
+ } catch (ParseException ex) {
+ ret.put(RESPONSE_ERROR, "Invalid Date filter");
+ }
+ } else {
+ ret.put(RESPONSE_ERROR, "InMemoryMetricEmitter not instantiated");
+ }
+ } else {
+ ret.put(RESPONSE_ERROR, "MetricReportManager is not available");
+ }
+ }
+
+ /**
+ * Get InMemoryMetricEmitter, if available else null
+ */
+ private InMemoryMetricEmitter extractInMemoryMetricEmitter(MetricReportManager metricManager) {
+ InMemoryMetricEmitter memoryEmitter = null;
+ for (IMetricEmitter emitter : metricManager.getMetricEmitters()) {
+ if (emitter instanceof InMemoryMetricEmitter) {
+ memoryEmitter = (InMemoryMetricEmitter) emitter;
+ break;
+ }
+ }
+ return memoryEmitter;
+ }
+
+ /**
+ * Get all the metrics tracked by metric manager
+ */
+ private void handleGetAllMMetricsName(HttpServletRequest req, Map<String, Object> ret) {
+ if (MetricReportManager.isAvailable()) {
+ MetricReportManager metricManager = MetricReportManager.getInstance();
+ List<IMetric<?>> result = metricManager.getAllMetrics();
+ if (result.size() == 0) {
+ ret.put(RESPONSE_ERROR, "No Metric being tracked");
+ } else {
+ List<String> metricNames = new LinkedList<String>();
+ for(IMetric<?> metric: result) {
+ metricNames.add(metric.getName());
+ }
+ ret.put("data", metricNames);
+ }
+ } else {
+ ret.put(RESPONSE_ERROR, "MetricReportManager is not available");
+ }
+ }
+
+ /**
+ * Update tracking interval for a given metrics
+ * @throws ServletException
+ */
+ private void handleChangeMetricInterval(HttpServletRequest req, Map<String, Object> ret) throws ServletException {
+ try {
+ String metricName = getParam(req, STATS_MAP_METRICNAMEPARAM);
+ long newInterval = getLongParam(req, STATS_MAP_REPORTINGINTERVAL);
+ if (MetricReportManager.isAvailable()) {
+ MetricReportManager metricManager = MetricReportManager.getInstance();
+ TimeBasedReportingMetric<?> metric = (TimeBasedReportingMetric<?>) metricManager.getMetricFromName(metricName);
+ metric.updateInterval(newInterval);
+ ret.put(STATUS_PARAM, RESPONSE_SUCCESS);
+ } else {
+ ret.put(RESPONSE_ERROR, "MetricManager is not available");
+ }
+ } catch (Exception e) {
+ logger.error(e);
+ ret.put(RESPONSE_ERROR, e.getMessage());
+ }
+ }
+
+ private Date parseDate(String date) throws ParseException {
+ DateFormat format = new SimpleDateFormat("EEE, dd MMM yyyy HH:mm:ss zzz");
+ return format.parse(date);
+ }
+}
diff --git a/azkaban-execserver/src/package/conf/azkaban.properties b/azkaban-execserver/src/package/conf/azkaban.properties
index fd27529..79e20e1 100644
--- a/azkaban-execserver/src/package/conf/azkaban.properties
+++ b/azkaban-execserver/src/package/conf/azkaban.properties
@@ -24,3 +24,7 @@ executor.flow.threads=30
# JMX stats
jetty.connector.stats=true
executor.connector.stats=true
+
+# uncomment to enable inmemory stats for azkaban
+#executor.metric.reports=true
+#executor.metric.milisecinterval.default=60000
\ No newline at end of file
diff --git a/azkaban-soloserver/src/package/conf/azkaban.properties b/azkaban-soloserver/src/package/conf/azkaban.properties
index 386cd43..325fd12 100644
--- a/azkaban-soloserver/src/package/conf/azkaban.properties
+++ b/azkaban-soloserver/src/package/conf/azkaban.properties
@@ -46,3 +46,7 @@ lockdown.create.projects=false
# JMX stats
jetty.connector.stats=true
executor.connector.stats=true
+
+# uncomment to enable inmemory stats for azkaban
+#executor.metric.reports=true
+#executor.metric.milisecinterval.default=60000
\ No newline at end of file
diff --git a/azkaban-webserver/src/main/java/azkaban/webapp/AzkabanWebServer.java b/azkaban-webserver/src/main/java/azkaban/webapp/AzkabanWebServer.java
index b4140cb..92e8fd3 100644
--- a/azkaban-webserver/src/main/java/azkaban/webapp/AzkabanWebServer.java
+++ b/azkaban-webserver/src/main/java/azkaban/webapp/AzkabanWebServer.java
@@ -41,9 +41,7 @@ import org.apache.velocity.app.VelocityEngine;
import org.apache.velocity.runtime.log.Log4JLogChute;
import org.apache.velocity.runtime.resource.loader.ClasspathResourceLoader;
import org.apache.velocity.runtime.resource.loader.JarResourceLoader;
-
import org.joda.time.DateTimeZone;
-
import org.mortbay.jetty.Connector;
import org.mortbay.jetty.Server;
import org.mortbay.jetty.bio.SocketConnector;
@@ -96,6 +94,7 @@ import azkaban.webapp.servlet.ScheduleServlet;
import azkaban.webapp.servlet.HistoryServlet;
import azkaban.webapp.servlet.ProjectServlet;
import azkaban.webapp.servlet.ProjectManagerServlet;
+import azkaban.webapp.servlet.StatsServlet;
import azkaban.webapp.servlet.TriggerManagerServlet;
import azkaban.webapp.plugin.TriggerPlugin;
import azkaban.webapp.plugin.ViewerPlugin;
@@ -772,6 +771,7 @@ public class AzkabanWebServer extends AzkabanServer {
root.addServlet(new ServletHolder(new ScheduleServlet()), "/schedule");
root.addServlet(new ServletHolder(new JMXHttpServlet()), "/jmx");
root.addServlet(new ServletHolder(new TriggerManagerServlet()), "/triggers");
+ root.addServlet(new ServletHolder(new StatsServlet()), "/stats");
ServletHolder restliHolder = new ServletHolder(new RestliServlet());
restliHolder.setInitParameter("resourcePackages", "azkaban.restli");
diff --git a/azkaban-webserver/src/main/java/azkaban/webapp/servlet/StatsServlet.java b/azkaban-webserver/src/main/java/azkaban/webapp/servlet/StatsServlet.java
new file mode 100644
index 0000000..6d14839
--- /dev/null
+++ b/azkaban-webserver/src/main/java/azkaban/webapp/servlet/StatsServlet.java
@@ -0,0 +1,178 @@
+/*
+ * Copyright 2014 LinkedIn Corp.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package azkaban.webapp.servlet;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+
+import javax.servlet.ServletConfig;
+import javax.servlet.ServletException;
+import javax.servlet.http.HttpServletRequest;
+import javax.servlet.http.HttpServletResponse;
+
+import azkaban.executor.ConnectorParams;
+import azkaban.executor.ExecutorManager;
+import azkaban.server.session.Session;
+import azkaban.user.Permission;
+import azkaban.user.Role;
+import azkaban.user.User;
+import azkaban.user.UserManager;
+import azkaban.utils.Pair;
+import azkaban.webapp.AzkabanWebServer;
+
+/**
+ * User facing servlet for Azkaban default metric display
+ */
+public class StatsServlet extends LoginAbstractAzkabanServlet {
+ private static final long serialVersionUID = 1L;
+ private UserManager userManager;
+ private ExecutorManager execManager;
+
+ @Override
+ public void init(ServletConfig config) throws ServletException {
+ super.init(config);
+ AzkabanWebServer server = (AzkabanWebServer) getApplication();
+ userManager = server.getUserManager();
+ execManager = server.getExecutorManager();
+ }
+
+ @Override
+ protected void handleGet(HttpServletRequest req, HttpServletResponse resp, Session session) throws ServletException,
+ IOException {
+ if (hasParam(req, ConnectorParams.ACTION_PARAM)) {
+ handleAJAXAction(req, resp, session);
+ } else {
+ handleStatePageLoad(req, resp, session);
+ }
+ }
+
+ private void handleAJAXAction(HttpServletRequest req, HttpServletResponse resp, Session session)
+ throws ServletException, IOException {
+ HashMap<String, Object> ret = new HashMap<String, Object>();
+ String actionName = getParam(req, ConnectorParams.ACTION_PARAM);
+
+ if (actionName.equals(ConnectorParams.STATS_GET_METRICHISTORY)) {
+ handleGetMetricHistory(req, ret, session.getUser());
+ } else if (actionName.equals(ConnectorParams.STATS_SET_REPORTINGINTERVAL)) {
+ handleChangeConfigurationRequest(ConnectorParams.STATS_SET_REPORTINGINTERVAL, req, ret);
+ } else if (actionName.equals(ConnectorParams.STATS_SET_CLEANINGINTERVAL)) {
+ handleChangeConfigurationRequest(ConnectorParams.STATS_SET_CLEANINGINTERVAL, req, ret);
+ } else if (actionName.equals(ConnectorParams.STATS_SET_MAXREPORTERPOINTS)) {
+ handleChangeConfigurationRequest(ConnectorParams.STATS_SET_MAXREPORTERPOINTS, req, ret);
+ } else if (actionName.equals(ConnectorParams.STATS_SET_ENABLEMETRICS)) {
+ handleChangeConfigurationRequest(ConnectorParams.STATS_SET_ENABLEMETRICS, req, ret);
+ } else if (actionName.equals(ConnectorParams.STATS_SET_DISABLEMETRICS)) {
+ handleChangeConfigurationRequest(ConnectorParams.STATS_SET_DISABLEMETRICS, req, ret);
+ }
+
+ writeJSON(resp, ret);
+ }
+
+ /**
+ * Generic method to facilitate actionName action using Azkaban exec server
+ * @param actionName Name of the action
+ */
+ private void handleChangeConfigurationRequest(String actionName, HttpServletRequest req, HashMap<String, Object> ret)
+ throws ServletException, IOException {
+ Map<String, Object> result = execManager.callExecutorStats(actionName, getAllParams(req));
+ if (result.containsKey(ConnectorParams.RESPONSE_ERROR)) {
+ ret.put(ConnectorParams.RESPONSE_ERROR, result.get(ConnectorParams.RESPONSE_ERROR).toString());
+ } else {
+ ret.put(ConnectorParams.STATUS_PARAM, result.get(ConnectorParams.STATUS_PARAM));
+ }
+ }
+
+ /**
+ * Get metric snapshots for a metric and date specification
+ * @throws ServletException
+ */
+ private void handleGetMetricHistory(HttpServletRequest req, HashMap<String, Object> ret, User user)
+ throws IOException, ServletException {
+ Map<String, Object> result =
+ execManager.callExecutorStats(ConnectorParams.STATS_GET_METRICHISTORY, getAllParams(req));
+ if (result.containsKey(ConnectorParams.RESPONSE_ERROR)) {
+ ret.put(ConnectorParams.RESPONSE_ERROR, result.get(ConnectorParams.RESPONSE_ERROR).toString());
+ } else {
+ ret.put("data", result.get("data"));
+ }
+ }
+
+ /**
+ *
+ */
+ private void handleStatePageLoad(HttpServletRequest req, HttpServletResponse resp, Session session)
+ throws ServletException {
+ Page page = newPage(req, resp, session, "azkaban/webapp/servlet/velocity/statsPage.vm");
+ if (!hasPermission(session.getUser(), Permission.Type.METRICS)) {
+ page.add("errorMsg", "User " + session.getUser().getUserId() + " has no permission.");
+ page.render();
+ return;
+ }
+
+ try {
+ Map<String, Object> result =
+ execManager.callExecutorStats(ConnectorParams.STATS_GET_ALLMETRICSNAME, (Pair<String, String>[]) null);
+ if (result.containsKey(ConnectorParams.RESPONSE_ERROR)) {
+ page.add("errorMsg", result.get(ConnectorParams.RESPONSE_ERROR).toString());
+ } else {
+ page.add("metricList", result.get("data"));
+ }
+ } catch (IOException e) {
+ page.add("errorMsg", "Failed to get a response from Azkaban exec server");
+ }
+
+ page.render();
+ }
+
+ @Override
+ protected void handlePost(HttpServletRequest req, HttpServletResponse resp, Session session) throws ServletException,
+ IOException {
+ }
+
+ protected boolean hasPermission(User user, Permission.Type type) {
+ for (String roleName : user.getRoles()) {
+ Role role = userManager.getRole(roleName);
+ if (role.getPermission().isPermissionSet(type) || role.getPermission().isPermissionSet(Permission.Type.ADMIN)) {
+ return true;
+ }
+ }
+ return false;
+ }
+
+ /**
+ * Parse all Http request params
+ * @return
+ */
+ @SuppressWarnings({ "unchecked", "rawtypes" })
+ private Pair<String, String>[] getAllParams(HttpServletRequest req) {
+ List<Pair<String, String>> allParams = new LinkedList<Pair<String, String>>();
+
+ Iterator it = req.getParameterMap().entrySet().iterator();
+ while (it.hasNext()) {
+ Map.Entry pairs = (Map.Entry) it.next();
+ for (Object value : (String[]) pairs.getValue()) {
+ allParams.add(new Pair<String, String>((String) pairs.getKey(), (String) value));
+ }
+ }
+
+ return allParams.toArray(new Pair[allParams.size()]);
+ }
+}
diff --git a/azkaban-webserver/src/main/resources/azkaban/webapp/servlet/velocity/statsPage.vm b/azkaban-webserver/src/main/resources/azkaban/webapp/servlet/velocity/statsPage.vm
new file mode 100644
index 0000000..4596ac2
--- /dev/null
+++ b/azkaban-webserver/src/main/resources/azkaban/webapp/servlet/velocity/statsPage.vm
@@ -0,0 +1,122 @@
+#*
+ * Copyright 2012 LinkedIn Corp.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+*#
+
+<!DOCTYPE html>
+<html lang="en">
+ <head>
+ #parse("azkaban/webapp/servlet/velocity/style.vm")
+ #parse("azkaban/webapp/servlet/velocity/javascript.vm")
+ <link rel="stylesheet" type="text/css" href="${context}/css/bootstrap-datetimepicker.css" />
+ <script type="text/javascript" src="${context}/js/raphael.min.js"></script>
+ <script type="text/javascript" src="${context}/js/morris.min.js"></script>
+ <script type="text/javascript" src="${context}/js/moment.min.js"></script>
+ <script type="text/javascript" src="${context}/js/bootstrap-datetimepicker.min.js"></script>
+ <script type="text/javascript">
+ var contextURL = "${context}";
+ var currentTime = ${currentTime};
+ var timezone = "${timezone}";
+
+ function refreshMetricChart() {
+ var requestURL = '/stats';
+ var requestData = {
+ 'action': 'getMetricHistory',
+ 'from': new Date($('#datetimebegin').val()).toUTCString(),
+ 'to' : new Date($('#datetimeend').val()).toUTCString(),
+ 'metricName': $('#metricName').val(),
+ 'useStats': $("#useStats").is(':checked')
+ };
+ var successHandler = function(responseData) {
+ if(responseData.error != null) {
+ $('#reportedMetric').html(responseData.error);
+ } else {
+ var graphDiv = document.createElement('div');
+ $('#reportedMetric').html(graphDiv);
+
+ Morris.Line({
+ element: graphDiv,
+ data: responseData.data,
+ xkey: 'timestamp',
+ ykeys: ['value'],
+ labels: [$('#metricName').val()]
+ });
+ }
+ };
+ $.get(requestURL, requestData, successHandler, 'json');
+ }
+
+ $(document).ready(function () {
+ $('#datetimebegin').datetimepicker();
+ $('#datetimeend').datetimepicker();
+ $('#datetimebegin').on('change.dp', function(e) {
+ $('#datetimeend').data('DateTimePicker').setStartDate(e.date);
+ });
+ $('#datetimeend').on('change.dp', function(e) {
+ $('#datetimebegin').data('DateTimePicker').setEndDate(e.date);
+ });
+ $('#retrieve').click(refreshMetricChart);
+ });
+
+ </script>
+ </head>
+ <body>
+ #set ($current_page="Statistics")
+ #parse ("azkaban/webapp/servlet/velocity/nav.vm")
+ #if ($errorMsg)
+ #parse ("azkaban/webapp/servlet/velocity/errormsg.vm")
+ #else
+ ## Page header.
+ <div class="az-page-header">
+ <div class="container-full">
+ <div class="row">
+ <div class="header-title" style="width: 17%;">
+ <h1><a href="${context}/stats">Statistics</a></h1>
+ </div>
+ <div class="header-control" style="width: 900px; padding-top: 5px;">
+ <form id="metric-form" method="get">
+ <label for="metricLabel" >Metric</label>
+ #if (!$metricList.isEmpty())
+ <select id="metricName" name="metricName" style="width:200px">
+ #foreach ($metric in $metricList)
+ <option value="${metric}" style="width:200px">${metric}</option>
+ #end
+ </select>
+ #end
+ <label for="datetimebegin" >Between</label>
+ <input type="text" id="datetimebegin" value="" class="ui-datetime-container" style="width:150px">
+ <label for="datetimeend" >and</label>
+ <input type="text" id="datetimeend" value="" class="ui-datetime-container" style="width:150px">
+ <input type="checkbox" name="useStats" id="useStats" value="true"> useStats
+ <input type="button" id="retrieve" value="Retrieve" class="btn btn-success" >
+ </div>
+ </div>
+ </form>
+ </div>
+ </div>
+ </div>
+ </div>
+ <div class="container-full">
+ #parse ("azkaban/webapp/servlet/velocity/alerts.vm")
+ <div class="row">
+ <div id="reportedMetric" style="padding: 60px 10px 10px 10px;height: 750px;">
+ </div>
+ </div>
+ <!-- /row -->
+ #parse ("azkaban/webapp/servlet/velocity/invalidsessionmodal.vm")
+ </div>
+ <!-- /container-full -->
+ #end
+ </body>
+ <html>
\ No newline at end of file
build.gradle 1(+1 -0)
diff --git a/build.gradle b/build.gradle
index 6fd240d..2ad1434 100644
--- a/build.gradle
+++ b/build.gradle
@@ -101,6 +101,7 @@ project(':azkaban-common') {
compile('net.sf.jopt-simple:jopt-simple:4.3')
compile('org.apache.commons:commons-email:1.2')
compile('org.apache.commons:commons-jexl:2.1.1')
+ compile('org.apache.commons:commons-math3:3.0')
compile('org.apache.httpcomponents:httpclient:4.2.1')
compile('org.apache.httpcomponents:httpcore:4.2.1')
compile('org.apache.velocity:velocity:1.7')