azkaban-memoizeit
Changes
build.gradle 2(+1 -1)
Details
diff --git a/azkaban-common/src/main/java/azkaban/metric/AbstractMetric.java b/azkaban-common/src/main/java/azkaban/metric/AbstractMetric.java
index 3226de3..dabcc49 100644
--- a/azkaban-common/src/main/java/azkaban/metric/AbstractMetric.java
+++ b/azkaban-common/src/main/java/azkaban/metric/AbstractMetric.java
@@ -68,6 +68,15 @@ public abstract class AbstractMetric<T> implements IMetric<T>, Cloneable{
/**
* {@inheritDoc}
+ * @throws CloneNotSupportedException
+ * @see azkaban.metric.IMetric#getSnapshot()
+ */
+ public IMetric<T> getSnapshot() throws CloneNotSupportedException{
+ return (IMetric<T>) this.clone();
+ }
+
+ /**
+ * {@inheritDoc}
* @see azkaban.metric.IMetric#getValue()
*/
public T getValue() {
@@ -81,14 +90,12 @@ public abstract class AbstractMetric<T> implements IMetric<T>, Cloneable{
* {@inheritDoc}
* @see azkaban.metric.IMetric#notifyManager()
*/
- public synchronized void notifyManager() {
+ public void notifyManager() {
_logger.debug(String.format("Notifying Manager for %s", this.getClass().getName()));
try {
- _metricManager.reportMetric( (IMetric<?>) this.clone());
+ _metricManager.reportMetric(this);
} catch (NullPointerException ex) {
_logger.error(String.format("Metric Manager is not set for %s metric %s", this.getClass().getName(), ex.toString()));
- } catch (CloneNotSupportedException ex) {
- _logger.error(String.format("Failed to take snapshot for %s metric %s", this.getClass().getName(), ex.toString()));
}
}
}
diff --git a/azkaban-common/src/main/java/azkaban/metric/IMetric.java b/azkaban-common/src/main/java/azkaban/metric/IMetric.java
index ad1bfe1..188b399 100644
--- a/azkaban-common/src/main/java/azkaban/metric/IMetric.java
+++ b/azkaban-common/src/main/java/azkaban/metric/IMetric.java
@@ -26,4 +26,5 @@ public interface IMetric<T> {
void updateMetricManager(final MetricReportManager manager);
void notifyManager();
T getValue();
+ IMetric<T> getSnapshot() throws CloneNotSupportedException;
}
diff --git a/azkaban-common/src/main/java/azkaban/metric/inmemoryemitter/InMemoryMetricEmitter.java b/azkaban-common/src/main/java/azkaban/metric/inmemoryemitter/InMemoryMetricEmitter.java
index 14d3b9c..5ada55a 100644
--- a/azkaban-common/src/main/java/azkaban/metric/inmemoryemitter/InMemoryMetricEmitter.java
+++ b/azkaban-common/src/main/java/azkaban/metric/inmemoryemitter/InMemoryMetricEmitter.java
@@ -203,17 +203,17 @@ public class InMemoryMetricEmitter implements IMetricEmitter {
synchronized (_historyListMapping.get(metricName)) {
InMemoryHistoryNode firstNode = _historyListMapping.get(metricName).peekFirst();
- long localCopyOfInterval = 0;
+ long localCopyOfTimeWindow = 0;
// go ahead for clean up using latest possible value of interval
// any interval change will not affect on going clean up
synchronized (this) {
- localCopyOfInterval = _timeWindow;
+ localCopyOfTimeWindow = _timeWindow;
}
// removing objects older than Interval time from firstAllowedDate
while (firstNode != null
- && TimeUnit.MILLISECONDS.toMillis(firstAllowedDate.getTime() - firstNode.getTimestamp().getTime()) > localCopyOfInterval) {
+ && TimeUnit.MILLISECONDS.toMillis(firstAllowedDate.getTime() - firstNode.getTimestamp().getTime()) > localCopyOfTimeWindow) {
_historyListMapping.get(metricName).removeFirst();
firstNode = _historyListMapping.get(metricName).peekFirst();
}
diff --git a/azkaban-common/src/main/java/azkaban/metric/MetricReportManager.java b/azkaban-common/src/main/java/azkaban/metric/MetricReportManager.java
index b91004a..31d12c1 100644
--- a/azkaban-common/src/main/java/azkaban/metric/MetricReportManager.java
+++ b/azkaban-common/src/main/java/azkaban/metric/MetricReportManager.java
@@ -24,6 +24,7 @@ import java.util.concurrent.Executors;
import org.apache.log4j.Logger;
+
/**
* Manager for access or updating metric related functionality of Azkaban
* MetricManager is responsible all handling all action requests from statsServlet in Exec server
@@ -87,25 +88,36 @@ public class MetricReportManager {
return _instance;
}
- // each element of metrics List is responsible to call this method and report metrics
+ /***
+ * each element of metrics List is responsible to call this method and report metrics
+ * @param metric
+ */
public void reportMetric(final IMetric<?> metric) {
if (metric != null && isAvailable()) {
-
- // Report metric to all the emitters
- synchronized (metric) {
- _logger.debug(String.format("Submitting %s metric for metric emission pool", metric.getName()));
+ try {
+ final IMetric<?> metricSnapshot;
+ // take snapshot
+ synchronized (metric) {
+ metricSnapshot = metric.getSnapshot();
+ }
+ _logger.debug(String.format("Submitting %s metric for metric emission pool", metricSnapshot.getName()));
+ // report to all emitters
for (final IMetricEmitter metricEmitter : _metricEmitters) {
_executorService.submit(new Runnable() {
@Override
public void run() {
try {
- metricEmitter.reportMetric(metric);
+ metricEmitter.reportMetric(metricSnapshot);
} catch (Exception ex) {
- _logger.error(String.format("Failed to report %s metric due to %s", metric.getName(), ex.toString()));
+ _logger.error(String.format("Failed to report %s metric due to %s", metricSnapshot.getName(),
+ ex.toString()));
}
}
});
}
+ } catch (CloneNotSupportedException ex) {
+ _logger.error(String.format("Failed to take snapshot for %s metric %s", metric.getClass().getName(),
+ ex.toString()));
}
}
}
@@ -185,13 +197,13 @@ public class MetricReportManager {
*/
public void disableManager() {
_logger.info("Disabling Metric Manager");
- if(_isManagerEnabled) {
+ if (_isManagerEnabled) {
_isManagerEnabled = false;
- for(IMetricEmitter emitter: _metricEmitters) {
+ for (IMetricEmitter emitter : _metricEmitters) {
try {
emitter.purgeAllData();
} catch (Exception ex) {
- _logger.error("Failed to purge data " + ex.toString());
+ _logger.error("Failed to purge data " + ex.toString());
}
}
}
diff --git a/azkaban-common/src/test/java/azkaban/metric/FakeMetric.java b/azkaban-common/src/test/java/azkaban/metric/FakeMetric.java
index 4a3cc90..3918038 100644
--- a/azkaban-common/src/test/java/azkaban/metric/FakeMetric.java
+++ b/azkaban-common/src/test/java/azkaban/metric/FakeMetric.java
@@ -10,14 +10,45 @@ public class FakeMetric extends AbstractMetric<Integer>{
}
@Override
+ public int hashCode() {
+ final int prime = 31;
+ int result = 1;
+ result = prime * result + ((_metricManager == null) ? 0 : _metricManager.hashCode());
+ result = prime * result + ((_name == null) ? 0 : _name.hashCode());
+ result = prime * result + ((_type == null) ? 0 : _type.hashCode());
+ result = prime * result + ((_value == null) ? 0 : _value.hashCode());
+ return result;
+ }
+
+ @Override
public boolean equals(Object obj) {
- if (obj == this) {
- return true;
- }
- if (obj == null || obj.getClass() != this.getClass()) {
- return false;
- }
- FakeMetric metric = (FakeMetric) obj;
- return metric.getName() == getName() && metric.getValue() == getValue();
+ if (this == obj)
+ return true;
+ if (obj == null)
+ return false;
+ if (!(obj instanceof FakeMetric))
+ return false;
+ FakeMetric other = (FakeMetric) obj;
+ if (_metricManager == null) {
+ if (other._metricManager != null)
+ return false;
+ } else if (!_metricManager.equals(other._metricManager))
+ return false;
+ if (_name == null) {
+ if (other._name != null)
+ return false;
+ } else if (!_name.equals(other._name))
+ return false;
+ if (_type == null) {
+ if (other._type != null)
+ return false;
+ } else if (!_type.equals(other._type))
+ return false;
+ if (_value == null) {
+ if (other._value != null)
+ return false;
+ } else if (!_value.equals(other._value))
+ return false;
+ return true;
}
}
diff --git a/azkaban-common/src/test/java/azkaban/metric/MetricManagerTest.java b/azkaban-common/src/test/java/azkaban/metric/MetricManagerTest.java
index 043bf26..7fd7da2 100644
--- a/azkaban-common/src/test/java/azkaban/metric/MetricManagerTest.java
+++ b/azkaban-common/src/test/java/azkaban/metric/MetricManagerTest.java
@@ -71,9 +71,9 @@ public class MetricManagerTest {
@Test
public void managerEmitterHandlingTest() throws Exception {
emitter.purgeAllData();
+ Date from = new Date();
metric.notifyManager();
- Date from = new Date();
synchronized (this) {
try {
wait(2000);
diff --git a/azkaban-execserver/src/main/java/azkaban/execapp/StatsServlet.java b/azkaban-execserver/src/main/java/azkaban/execapp/StatsServlet.java
index 5962471..59d78f9 100644
--- a/azkaban-execserver/src/main/java/azkaban/execapp/StatsServlet.java
+++ b/azkaban-execserver/src/main/java/azkaban/execapp/StatsServlet.java
@@ -53,10 +53,6 @@ public class StatsServlet extends HttpServlet implements ConnectorParams {
private static final long serialVersionUID = 2L;
private static final Logger _logger = Logger.getLogger(StatsServlet.class);
- public void init(ServletConfig config) throws ServletException {
- // Nothing to initialize
- }
-
public boolean hasParam(HttpServletRequest request, String param) {
return HttpRequestUtils.hasParam(request, param);
}
build.gradle 2(+1 -1)
diff --git a/build.gradle b/build.gradle
index 54321c2..2ad1434 100644
--- a/build.gradle
+++ b/build.gradle
@@ -101,6 +101,7 @@ project(':azkaban-common') {
compile('net.sf.jopt-simple:jopt-simple:4.3')
compile('org.apache.commons:commons-email:1.2')
compile('org.apache.commons:commons-jexl:2.1.1')
+ compile('org.apache.commons:commons-math3:3.0')
compile('org.apache.httpcomponents:httpclient:4.2.1')
compile('org.apache.httpcomponents:httpcore:4.2.1')
compile('org.apache.velocity:velocity:1.7')
@@ -109,7 +110,6 @@ project(':azkaban-common') {
compile('org.mortbay.jetty:jetty:6.1.26')
compile('org.mortbay.jetty:jetty-util:6.1.26')
compile('org.slf4j:slf4j-api:1.6.1')
- compile('org.apache.commons:commons-math3:3.0')
testCompile('junit:junit:4.11')
testCompile('org.hamcrest:hamcrest-all:1.3')