azkaban-memoizeit

Incorporating feedback iterator 2

12/25/2014 5:58:58 AM

Details

diff --git a/azkaban-common/src/main/java/azkaban/metric/AbstractMetric.java b/azkaban-common/src/main/java/azkaban/metric/AbstractMetric.java
index 3226de3..dabcc49 100644
--- a/azkaban-common/src/main/java/azkaban/metric/AbstractMetric.java
+++ b/azkaban-common/src/main/java/azkaban/metric/AbstractMetric.java
@@ -68,6 +68,15 @@ public abstract class AbstractMetric<T> implements IMetric<T>, Cloneable{
 
   /**
    * {@inheritDoc}
+   * @throws CloneNotSupportedException
+   * @see azkaban.metric.IMetric#getSnapshot()
+   */
+  public IMetric<T> getSnapshot() throws CloneNotSupportedException{
+    return (IMetric<T>) this.clone();
+  }
+
+  /**
+   * {@inheritDoc}
    * @see azkaban.metric.IMetric#getValue()
    */
   public T getValue() {
@@ -81,14 +90,12 @@ public abstract class AbstractMetric<T> implements IMetric<T>, Cloneable{
    * {@inheritDoc}
    * @see azkaban.metric.IMetric#notifyManager()
    */
-  public synchronized void notifyManager() {
+  public void notifyManager() {
     _logger.debug(String.format("Notifying Manager for %s", this.getClass().getName()));
     try {
-      _metricManager.reportMetric( (IMetric<?>) this.clone());
+      _metricManager.reportMetric(this);
     } catch (NullPointerException ex) {
       _logger.error(String.format("Metric Manager is not set for %s metric %s", this.getClass().getName(), ex.toString()));
-    } catch (CloneNotSupportedException ex) {
-      _logger.error(String.format("Failed to take snapshot for %s metric %s", this.getClass().getName(), ex.toString()));
     }
   }
 }
diff --git a/azkaban-common/src/main/java/azkaban/metric/IMetric.java b/azkaban-common/src/main/java/azkaban/metric/IMetric.java
index ad1bfe1..188b399 100644
--- a/azkaban-common/src/main/java/azkaban/metric/IMetric.java
+++ b/azkaban-common/src/main/java/azkaban/metric/IMetric.java
@@ -26,4 +26,5 @@ public interface IMetric<T> {
   void updateMetricManager(final MetricReportManager manager);
   void notifyManager();
   T getValue();
+  IMetric<T> getSnapshot() throws CloneNotSupportedException;
 }
diff --git a/azkaban-common/src/main/java/azkaban/metric/inmemoryemitter/InMemoryMetricEmitter.java b/azkaban-common/src/main/java/azkaban/metric/inmemoryemitter/InMemoryMetricEmitter.java
index 14d3b9c..5ada55a 100644
--- a/azkaban-common/src/main/java/azkaban/metric/inmemoryemitter/InMemoryMetricEmitter.java
+++ b/azkaban-common/src/main/java/azkaban/metric/inmemoryemitter/InMemoryMetricEmitter.java
@@ -203,17 +203,17 @@ public class InMemoryMetricEmitter implements IMetricEmitter {
       synchronized (_historyListMapping.get(metricName)) {
 
         InMemoryHistoryNode firstNode = _historyListMapping.get(metricName).peekFirst();
-        long localCopyOfInterval = 0;
+        long localCopyOfTimeWindow = 0;
 
         // go ahead for clean up using latest possible value of interval
         // any interval change will not affect on going clean up
         synchronized (this) {
-          localCopyOfInterval = _timeWindow;
+          localCopyOfTimeWindow = _timeWindow;
         }
 
         // removing objects older than Interval time from firstAllowedDate
         while (firstNode != null
-            && TimeUnit.MILLISECONDS.toMillis(firstAllowedDate.getTime() - firstNode.getTimestamp().getTime()) > localCopyOfInterval) {
+            && TimeUnit.MILLISECONDS.toMillis(firstAllowedDate.getTime() - firstNode.getTimestamp().getTime()) > localCopyOfTimeWindow) {
           _historyListMapping.get(metricName).removeFirst();
           firstNode = _historyListMapping.get(metricName).peekFirst();
         }
diff --git a/azkaban-common/src/main/java/azkaban/metric/MetricReportManager.java b/azkaban-common/src/main/java/azkaban/metric/MetricReportManager.java
index b91004a..31d12c1 100644
--- a/azkaban-common/src/main/java/azkaban/metric/MetricReportManager.java
+++ b/azkaban-common/src/main/java/azkaban/metric/MetricReportManager.java
@@ -24,6 +24,7 @@ import java.util.concurrent.Executors;
 
 import org.apache.log4j.Logger;
 
+
 /**
  * Manager for access or updating metric related functionality of Azkaban
  * MetricManager is responsible all handling all action requests from statsServlet in Exec server
@@ -87,25 +88,36 @@ public class MetricReportManager {
     return _instance;
   }
 
-  // each element of metrics List is responsible to call this method and report metrics
+  /***
+   * each element of metrics List is responsible to call this method and report metrics
+   * @param metric
+   */
   public void reportMetric(final IMetric<?> metric) {
     if (metric != null && isAvailable()) {
-
-      // Report metric to all the emitters
-      synchronized (metric) {
-        _logger.debug(String.format("Submitting %s metric for metric emission pool", metric.getName()));
+      try {
+        final IMetric<?> metricSnapshot;
+        // take snapshot
+        synchronized (metric) {
+          metricSnapshot = metric.getSnapshot();
+        }
+        _logger.debug(String.format("Submitting %s metric for metric emission pool", metricSnapshot.getName()));
+        // report to all emitters
         for (final IMetricEmitter metricEmitter : _metricEmitters) {
           _executorService.submit(new Runnable() {
             @Override
             public void run() {
               try {
-                metricEmitter.reportMetric(metric);
+                metricEmitter.reportMetric(metricSnapshot);
               } catch (Exception ex) {
-                _logger.error(String.format("Failed to report %s metric due to %s", metric.getName(), ex.toString()));
+                _logger.error(String.format("Failed to report %s metric due to %s", metricSnapshot.getName(),
+                    ex.toString()));
               }
             }
           });
         }
+      } catch (CloneNotSupportedException ex) {
+        _logger.error(String.format("Failed to take snapshot for %s metric %s", metric.getClass().getName(),
+            ex.toString()));
       }
     }
   }
@@ -185,13 +197,13 @@ public class MetricReportManager {
    */
   public void disableManager() {
     _logger.info("Disabling Metric Manager");
-    if(_isManagerEnabled) {
+    if (_isManagerEnabled) {
       _isManagerEnabled = false;
-      for(IMetricEmitter emitter: _metricEmitters) {
+      for (IMetricEmitter emitter : _metricEmitters) {
         try {
           emitter.purgeAllData();
         } catch (Exception ex) {
-          _logger.error("Failed to purge data "  + ex.toString());
+          _logger.error("Failed to purge data " + ex.toString());
         }
       }
     }
diff --git a/azkaban-common/src/test/java/azkaban/metric/FakeMetric.java b/azkaban-common/src/test/java/azkaban/metric/FakeMetric.java
index 4a3cc90..3918038 100644
--- a/azkaban-common/src/test/java/azkaban/metric/FakeMetric.java
+++ b/azkaban-common/src/test/java/azkaban/metric/FakeMetric.java
@@ -10,14 +10,45 @@ public class FakeMetric extends AbstractMetric<Integer>{
   }
 
   @Override
+  public int hashCode() {
+    final int prime = 31;
+    int result = 1;
+    result = prime * result + ((_metricManager == null) ? 0 : _metricManager.hashCode());
+    result = prime * result + ((_name == null) ? 0 : _name.hashCode());
+    result = prime * result + ((_type == null) ? 0 : _type.hashCode());
+    result = prime * result + ((_value == null) ? 0 : _value.hashCode());
+    return result;
+  }
+
+  @Override
   public boolean equals(Object obj) {
-      if (obj == this) {
-          return true;
-      }
-      if (obj == null || obj.getClass() != this.getClass()) {
-          return false;
-      }
-      FakeMetric metric = (FakeMetric) obj;
-      return metric.getName() == getName() && metric.getValue() == getValue();
+    if (this == obj)
+      return true;
+    if (obj == null)
+      return false;
+    if (!(obj instanceof FakeMetric))
+      return false;
+    FakeMetric other = (FakeMetric) obj;
+    if (_metricManager == null) {
+      if (other._metricManager != null)
+        return false;
+    } else if (!_metricManager.equals(other._metricManager))
+      return false;
+    if (_name == null) {
+      if (other._name != null)
+        return false;
+    } else if (!_name.equals(other._name))
+      return false;
+    if (_type == null) {
+      if (other._type != null)
+        return false;
+    } else if (!_type.equals(other._type))
+      return false;
+    if (_value == null) {
+      if (other._value != null)
+        return false;
+    } else if (!_value.equals(other._value))
+      return false;
+    return true;
   }
 }
diff --git a/azkaban-common/src/test/java/azkaban/metric/MetricManagerTest.java b/azkaban-common/src/test/java/azkaban/metric/MetricManagerTest.java
index 043bf26..7fd7da2 100644
--- a/azkaban-common/src/test/java/azkaban/metric/MetricManagerTest.java
+++ b/azkaban-common/src/test/java/azkaban/metric/MetricManagerTest.java
@@ -71,9 +71,9 @@ public class MetricManagerTest {
   @Test
   public void managerEmitterHandlingTest() throws Exception {
     emitter.purgeAllData();
+    Date from = new Date();
     metric.notifyManager();
 
-    Date from = new Date();
     synchronized (this) {
       try {
         wait(2000);
diff --git a/azkaban-execserver/src/main/java/azkaban/execapp/StatsServlet.java b/azkaban-execserver/src/main/java/azkaban/execapp/StatsServlet.java
index 5962471..59d78f9 100644
--- a/azkaban-execserver/src/main/java/azkaban/execapp/StatsServlet.java
+++ b/azkaban-execserver/src/main/java/azkaban/execapp/StatsServlet.java
@@ -53,10 +53,6 @@ public class StatsServlet extends HttpServlet implements ConnectorParams {
   private static final long serialVersionUID = 2L;
   private static final Logger _logger = Logger.getLogger(StatsServlet.class);
 
-  public void init(ServletConfig config) throws ServletException {
-    // Nothing to initialize
-  }
-
   public boolean hasParam(HttpServletRequest request, String param) {
     return HttpRequestUtils.hasParam(request, param);
   }

build.gradle 2(+1 -1)

diff --git a/build.gradle b/build.gradle
index 54321c2..2ad1434 100644
--- a/build.gradle
+++ b/build.gradle
@@ -101,6 +101,7 @@ project(':azkaban-common') {
     compile('net.sf.jopt-simple:jopt-simple:4.3')
     compile('org.apache.commons:commons-email:1.2')
     compile('org.apache.commons:commons-jexl:2.1.1')
+    compile('org.apache.commons:commons-math3:3.0')
     compile('org.apache.httpcomponents:httpclient:4.2.1')
     compile('org.apache.httpcomponents:httpcore:4.2.1')
     compile('org.apache.velocity:velocity:1.7')
@@ -109,7 +110,6 @@ project(':azkaban-common') {
     compile('org.mortbay.jetty:jetty:6.1.26')
     compile('org.mortbay.jetty:jetty-util:6.1.26')
     compile('org.slf4j:slf4j-api:1.6.1')
-    compile('org.apache.commons:commons-math3:3.0')
 
     testCompile('junit:junit:4.11')
     testCompile('org.hamcrest:hamcrest-all:1.3')