azkaban-aplcache

Details

diff --git a/azkaban-common/src/main/java/azkaban/executor/Executor.java b/azkaban-common/src/main/java/azkaban/executor/Executor.java
index efe00f9..216b45e 100644
--- a/azkaban-common/src/main/java/azkaban/executor/Executor.java
+++ b/azkaban-common/src/main/java/azkaban/executor/Executor.java
@@ -30,7 +30,7 @@ public class Executor implements Comparable<Executor> {
   private final int port;
   private boolean isActive;
   // cached copy of the latest statistics from  the executor.
-  private ServerStatistics cachedExecutorStats;
+  private ExecutorInfo cachedExecutorStats;
   private Date lastStatsUpdatedTime;
 
   /**
@@ -113,11 +113,11 @@ public class Executor implements Comparable<Executor> {
     return id;
   }
 
-  public ServerStatistics getExecutorStats() {
+  public ExecutorInfo getExecutorStats() {
     return this.cachedExecutorStats;
   }
 
-  public void setExecutorStats(ServerStatistics stats) {
+  public void setExecutorStats(ExecutorInfo stats) {
     this.cachedExecutorStats = stats;
     this.lastStatsUpdatedTime = new Date();
   }
diff --git a/azkaban-common/src/main/java/azkaban/executor/ExecutorApiClient.java b/azkaban-common/src/main/java/azkaban/executor/ExecutorApiClient.java
index 4d77897..47086c8 100644
--- a/azkaban-common/src/main/java/azkaban/executor/ExecutorApiClient.java
+++ b/azkaban-common/src/main/java/azkaban/executor/ExecutorApiClient.java
@@ -17,37 +17,37 @@
 package azkaban.executor;
 
 import java.io.IOException;
-import java.util.HashMap;
-import java.util.Map;
-
 import org.apache.http.HttpEntity;
 import org.apache.http.HttpResponse;
 import org.apache.http.StatusLine;
 import org.apache.http.client.HttpResponseException;
 import org.apache.http.util.EntityUtils;
+import org.codehaus.jackson.map.ObjectMapper;
 
-import azkaban.utils.JSONUtils;
 import azkaban.utils.RestfulApiClient;
 
 /** Client class that will be used to handle all Restful API calls between Executor and the host application.
  * */
-public class ExecutorApiClient extends RestfulApiClient<Map<String, Object>> {
-  private ExecutorApiClient(){}
-  private static ExecutorApiClient instance = new ExecutorApiClient();
+public class ExecutorApiClient<T extends java.io.Serializable> extends RestfulApiClient<T> {
+  private final Class<T> typeOfclass;
 
-  /** Function to return the instance of the ExecutorApiClient class.
+  /**
+   * Constructor of the class.
+   * @param typeOfclass the type of class that the T represents. Must provide.
    * */
-  public static ExecutorApiClient getInstance() {
-    return instance;
+  public ExecutorApiClient(Class<T> typeOfclass){
+    if (null == typeOfclass){
+      throw new IllegalArgumentException("Class type of the returning object must be specified.");
+    }
+    this.typeOfclass = typeOfclass;
   }
 
   /**Implementing the parseResponse function to return de-serialized Json object.
    * @param response  the returned response from the HttpClient.
-   * @return de-serialized object from Json or empty object if the response doesn't have a body.
+   * @return de-serialized object from Json or null if the response doesn't have a body.
    * */
-  @SuppressWarnings("unchecked")
   @Override
-  protected Map<String, Object> parseResponse(HttpResponse response)
+  protected T parseResponse(HttpResponse response)
       throws HttpResponseException, IOException {
     final StatusLine statusLine = response.getStatusLine();
     String responseBody = response.getEntity() != null ?
@@ -62,13 +62,10 @@ public class ExecutorApiClient extends RestfulApiClient<Map<String, Object>> {
     }
 
     final HttpEntity entity = response.getEntity();
-    if (null != entity){
-      Object returnVal = JSONUtils.parseJSONFromString(EntityUtils.toString(entity));
-      if (null!= returnVal){
-        return (Map<String, Object>) returnVal;
-      }
+    if (null == entity || entity.getContentLength() >= Integer.MAX_VALUE){
+      logger.error("unable to parse the response as the response is null or with an invlaid length");
+      return null;
     }
-
-    return new HashMap<String, Object>() ;
+    return new ObjectMapper().readValue(EntityUtils.toString(entity), this.typeOfclass);
   }
 }
diff --git a/azkaban-common/src/main/java/azkaban/executor/selector/ExecutorComparator.java b/azkaban-common/src/main/java/azkaban/executor/selector/ExecutorComparator.java
index 1d48685..c1346e6 100644
--- a/azkaban-common/src/main/java/azkaban/executor/selector/ExecutorComparator.java
+++ b/azkaban-common/src/main/java/azkaban/executor/selector/ExecutorComparator.java
@@ -17,14 +17,13 @@
 package azkaban.executor.selector;
 
 import java.util.Comparator;
-import java.util.Date;
 import java.util.HashMap;
 import java.util.Map;
 import java.util.Map.Entry;
 import java.util.Set;
 
 import azkaban.executor.Executor;
-import azkaban.executor.ServerStatistics;
+import azkaban.executor.ExecutorInfo;
 
 
 /**
@@ -118,8 +117,8 @@ public class ExecutorComparator extends CandidateComparator<Executor> {
    * @return true if the passed statistics are NOT both valid, a shortcut can be made (caller can consume the result),
    *         false otherwise.
    * */
-  private static boolean statisticsObjectCheck(ServerStatistics statisticsObj1,
-                                               ServerStatistics statisticsObj2, String caller, Integer result){
+  private static boolean statisticsObjectCheck(ExecutorInfo statisticsObj1,
+                                               ExecutorInfo statisticsObj2, String caller, Integer result){
     result = 0 ;
     // both doesn't expose the info
     if (null == statisticsObj1 && null == statisticsObj2){
@@ -157,8 +156,8 @@ public class ExecutorComparator extends CandidateComparator<Executor> {
 
       @Override
       public int compare(Executor o1, Executor o2) {
-        ServerStatistics stat1 = o1.getExecutorStats();
-        ServerStatistics stat2 = o2.getExecutorStats();
+        ExecutorInfo stat1 = o1.getExecutorStats();
+        ExecutorInfo stat2 = o2.getExecutorStats();
 
         Integer result = 0;
         if (statisticsObjectCheck(stat1,stat2,NUMOFASSIGNEDFLOW_COMPARATOR_NAME,result)){
@@ -178,8 +177,8 @@ public class ExecutorComparator extends CandidateComparator<Executor> {
 
       @Override
       public int compare(Executor o1, Executor o2) {
-        ServerStatistics stat1 = o1.getExecutorStats();
-        ServerStatistics stat2 = o2.getExecutorStats();
+        ExecutorInfo stat1 = o1.getExecutorStats();
+        ExecutorInfo stat2 = o2.getExecutorStats();
 
         int result = 0;
         if (statisticsObjectCheck(stat1,stat2,CPUUSAGE_COMPARATOR_NAME,result)){
@@ -202,34 +201,15 @@ public class ExecutorComparator extends CandidateComparator<Executor> {
 
       @Override
       public int compare(Executor o1, Executor o2) {
-        ServerStatistics stat1 = o1.getExecutorStats();
-        ServerStatistics stat2 = o2.getExecutorStats();
+        ExecutorInfo stat1 = o1.getExecutorStats();
+        ExecutorInfo stat2 = o2.getExecutorStats();
 
         int result = 0;
         if (statisticsObjectCheck(stat1,stat2,LSTDISPATCHED_COMPARATOR_NAME,result)){
           return result;
         }
-
-        if (null == stat1.getLastDispatchedTime() && null == stat1.getLastDispatchedTime()){
-          logger.info(String.format("%s : stats from both side doesn't contain last dispatched time info.",
-              LSTDISPATCHED_COMPARATOR_NAME));
-          return 0;
-        }
-
-        if (null == stat2.getLastDispatchedTime()){
-          logger.info(String.format("%s : choosing left side as right doesn't contain last dispatched time info.",
-              LSTDISPATCHED_COMPARATOR_NAME));
-          return 1;
-        }
-
-        if (null == stat1.getLastDispatchedTime()){
-          logger.info(String.format("%s : choosing right side as left doesn't contain last dispatched time info.",
-              LSTDISPATCHED_COMPARATOR_NAME));
-          return -1;
-        }
-
         // Note: an earlier date time indicates higher weight.
-        return ((Date)stat2.getLastDispatchedTime()).compareTo(stat1.getLastDispatchedTime());
+        return ((Long)stat2.getLastDispatchedTime()).compareTo(stat1.getLastDispatchedTime());
       }});
   }
 
@@ -248,8 +228,8 @@ public class ExecutorComparator extends CandidateComparator<Executor> {
 
       @Override
       public int compare(Executor o1, Executor o2) {
-       ServerStatistics stat1 = o1.getExecutorStats();
-       ServerStatistics stat2 = o2.getExecutorStats();
+       ExecutorInfo stat1 = o1.getExecutorStats();
+       ExecutorInfo stat2 = o2.getExecutorStats();
 
        int result = 0;
        if (statisticsObjectCheck(stat1,stat2,MEMORY_COMPARATOR_NAME,result)){
diff --git a/azkaban-common/src/main/java/azkaban/executor/selector/ExecutorFilter.java b/azkaban-common/src/main/java/azkaban/executor/selector/ExecutorFilter.java
index 9adc0dc..5a3d33e 100644
--- a/azkaban-common/src/main/java/azkaban/executor/selector/ExecutorFilter.java
+++ b/azkaban-common/src/main/java/azkaban/executor/selector/ExecutorFilter.java
@@ -23,7 +23,7 @@ import java.util.Set;
 
 import azkaban.executor.ExecutableFlow;
 import azkaban.executor.Executor;
-import azkaban.executor.ServerStatistics;
+import azkaban.executor.ExecutorInfo;
 
 /**
  * De-normalized version of the candidateFilter, which also contains the implementation of the factor filters.
@@ -101,7 +101,7 @@ public final class ExecutorFilter extends CandidateFilter<Executor, ExecutableFl
           return false;
         }
 
-        ServerStatistics stats = filteringTarget.getExecutorStats();
+        ExecutorInfo stats = filteringTarget.getExecutorStats();
         if (null == stats) {
           logger.info(String.format("%s : filtering out %s as it's stats is unavailable.",
               STATICREMAININGFLOWSIZE_FILTER_NAME,
@@ -128,7 +128,7 @@ public final class ExecutorFilter extends CandidateFilter<Executor, ExecutableFl
           return false;
         }
 
-        ServerStatistics stats = filteringTarget.getExecutorStats();
+        ExecutorInfo stats = filteringTarget.getExecutorStats();
         if (null == stats) {
           logger.info(String.format("%s : filtering out %s as it's stats is unavailable.",
               MINIMUMFREEMEMORY_FILTER_NAME,
@@ -157,7 +157,7 @@ public final class ExecutorFilter extends CandidateFilter<Executor, ExecutableFl
           return false;
         }
 
-        ServerStatistics stats = filteringTarget.getExecutorStats();
+        ExecutorInfo stats = filteringTarget.getExecutorStats();
         if (null == stats) {
           logger.info(String.format("%s : filtering out %s as it's stats is unavailable.",
               MINIMUMFREEMEMORY_FILTER_NAME,
diff --git a/azkaban-common/src/test/java/azkaban/executor/SelectorTest.java b/azkaban-common/src/test/java/azkaban/executor/SelectorTest.java
index ca43678..fd8768b 100644
--- a/azkaban-common/src/test/java/azkaban/executor/SelectorTest.java
+++ b/azkaban-common/src/test/java/azkaban/executor/SelectorTest.java
@@ -685,9 +685,9 @@ public class SelectorTest {
     executorList.add(new Executor(2, "host2", 80, true));
     executorList.add(new Executor(3, "host3", 80, true));
 
-    executorList.get(0).setExecutorStats(new ServerStatistics(99.9, 14095, 50, new Date(), 89, 0));
-    executorList.get(1).setExecutorStats(new ServerStatistics(50, 14095, 50, new Date(), 90,  0));
-    executorList.get(2).setExecutorStats(new ServerStatistics(99.9, 14095, 50, new Date(), 90,  0));
+    executorList.get(0).setExecutorStats(new ExecutorInfo(99.9, 14095, 50, System.currentTimeMillis(), 89, 0));
+    executorList.get(1).setExecutorStats(new ExecutorInfo(50, 14095, 50, System.currentTimeMillis(), 90,  0));
+    executorList.get(2).setExecutorStats(new ExecutorInfo(99.9, 14095, 50, System.currentTimeMillis(), 90,  0));
 
     ExecutableFlow flow = new ExecutableFlow();
 
@@ -706,9 +706,9 @@ public class SelectorTest {
     executorList.add(new Executor(2, "host2", 80, true));
     executorList.add(new Executor(3, "host3", 80, true));
 
-    executorList.get(0).setExecutorStats(new ServerStatistics(99.9, 14095, 50, new Date(), 89, 0));
-    executorList.get(1).setExecutorStats(new ServerStatistics(50, 14095, 50, new Date(), 90,  0));
-    executorList.get(2).setExecutorStats(new ServerStatistics(99.9, 14095, 50, new Date(), 90,  0));
+    executorList.get(0).setExecutorStats(new ExecutorInfo(99.9, 14095, 50, System.currentTimeMillis(), 89, 0));
+    executorList.get(1).setExecutorStats(new ExecutorInfo(50, 14095, 50, System.currentTimeMillis(), 90,  0));
+    executorList.get(2).setExecutorStats(new ExecutorInfo(99.9, 14095, 50, System.currentTimeMillis(), 90,  0));
 
     ExecutableFlow flow = new ExecutableFlow();
 
@@ -721,7 +721,7 @@ public class SelectorTest {
 
     // simulate that once the flow is assigned, executor1's remaining TMP storage dropped to 2048
     // now we do the getBest again executor3 is expected to be selected as it has a earlier last dispatched time.
-    executorList.get(0).setExecutorStats(new ServerStatistics(99.9, 4095, 50, new Date(), 90, 1));
+    executorList.get(0).setExecutorStats(new ExecutorInfo(99.9, 4095, 50, System.currentTimeMillis(), 90, 1));
     executor = selector.getBest(executorList, flow);
     Assert.assertEquals(executorList.get(2), executor);
   }
diff --git a/azkaban-execserver/src/main/java/azkaban/execapp/FlowRunnerManager.java b/azkaban-execserver/src/main/java/azkaban/execapp/FlowRunnerManager.java
index 71caaff..361db00 100644
--- a/azkaban-execserver/src/main/java/azkaban/execapp/FlowRunnerManager.java
+++ b/azkaban-execserver/src/main/java/azkaban/execapp/FlowRunnerManager.java
@@ -144,7 +144,7 @@ public class FlowRunnerManager implements EventListener,
   private Object executionDirDeletionSync = new Object();
 
   // date time of the the last flow submitted.
-  private Date lastFlowSubmittedDate = null;
+  private long lastFlowSubmittedDate = 0;
 
   public FlowRunnerManager(Props props, ExecutorLoader executorLoader,
       ProjectLoader projectLoader, ClassLoader parentClassLoader)
@@ -258,7 +258,7 @@ public class FlowRunnerManager implements EventListener,
     return allProjects;
   }
 
-  public Date getLastFlowSubmittedTime(){
+  public long getLastFlowSubmittedTime(){
     // Note: this is not thread safe and may result in providing dirty data.
     //       we will provide this data as is for now and will revisit if there
     //       is a string justification for change.
@@ -519,7 +519,7 @@ public class FlowRunnerManager implements EventListener,
       // keep track of this future
       submittedFlows.put(future, runner.getExecutionId());
       // update the last submitted time.
-      this.lastFlowSubmittedDate = new Date();
+      this.lastFlowSubmittedDate = System.currentTimeMillis();
     } catch (RejectedExecutionException re) {
       throw new ExecutorManagerException(
           "Azkaban server can't execute any more flows. "
diff --git a/azkaban-execserver/src/main/java/azkaban/execapp/ServerStatisticsServlet.java b/azkaban-execserver/src/main/java/azkaban/execapp/ServerStatisticsServlet.java
index 2dfae31..3dfe0ae 100644
--- a/azkaban-execserver/src/main/java/azkaban/execapp/ServerStatisticsServlet.java
+++ b/azkaban-execserver/src/main/java/azkaban/execapp/ServerStatisticsServlet.java
@@ -30,7 +30,7 @@ import javax.servlet.http.HttpServletResponse;
 
 import org.apache.log4j.Logger;
 
-import azkaban.executor.ServerStatistics;
+import azkaban.executor.ExecutorInfo;
 import azkaban.utils.JSONUtils;
 
 public class ServerStatisticsServlet extends HttpServlet  {
@@ -42,7 +42,7 @@ public class ServerStatisticsServlet extends HttpServlet  {
   private static final String noCacheParamName = "nocache";
 
   protected static long lastRefreshedTime = 0;
-  protected static ServerStatistics cachedstats = null;
+  protected static ExecutorInfo cachedstats = null;
 
   /**
    * Handle all get request to Statistics Servlet {@inheritDoc}
@@ -71,7 +71,7 @@ public class ServerStatisticsServlet extends HttpServlet  {
    * a double value will be used to present the remaining memory,
    *         a returning value of '55.6' means 55.6%
    */
-  protected void fillRemainingMemoryPercent(ServerStatistics stats){
+  protected void fillRemainingMemoryPercent(ExecutorInfo stats){
     if (new File("/bin/bash").exists() &&  new File("/usr/bin/free").exists()) {
       java.lang.ProcessBuilder processBuilder =
           new java.lang.ProcessBuilder("/bin/bash", "-c", String.format("/usr/bin/free -m -s 0.1 -c %s | grep Mem:",
@@ -96,7 +96,6 @@ public class ServerStatisticsServlet extends HttpServlet  {
           long totalMemory = 0 ;
           long freeMemory  = 0 ;
           int  sampleCount = 0 ;
-
           for(String line : output){
             String[] splitedresult = line.split("\\s+");
             // expected return format -
@@ -150,7 +149,7 @@ public class ServerStatisticsServlet extends HttpServlet  {
   protected synchronized void populateStatistics(boolean noCache){
     //check again before starting the work.
     if (noCache || System.currentTimeMillis() - lastRefreshedTime  > cacheTimeInMilliseconds){
-      final ServerStatistics stats = new ServerStatistics();
+      final ExecutorInfo stats = new ExecutorInfo();
 
       List<Thread> workerPool = new ArrayList<Thread>();
       workerPool.add(new Thread(new Runnable(){ public void run() {
@@ -188,7 +187,7 @@ public class ServerStatisticsServlet extends HttpServlet  {
    * @param stats reference to the result container which contains all the results, this specific method
    *              will only work on the property "remainingFlowCapacity".
    */
-  protected void fillRemainingFlowCapacityAndLastDispatchedTime(ServerStatistics stats){
+  protected void fillRemainingFlowCapacityAndLastDispatchedTime(ExecutorInfo stats){
 
     AzkabanExecutorServer server = AzkabanExecutorServer.getApp();
     if (server != null){
@@ -209,7 +208,7 @@ public class ServerStatisticsServlet extends HttpServlet  {
    * @param stats reference to the result container which contains all the results, this specific method
    *              will only work on the property "cpuUdage".
    */
-  protected void fillCpuUsage(ServerStatistics stats){
+  protected void fillCpuUsage(ExecutorInfo stats){
     if (new File("/bin/bash").exists() &&  new File("/usr/bin/top").exists()) {
       java.lang.ProcessBuilder processBuilder =
           new java.lang.ProcessBuilder("/bin/bash", "-c", String.format("/usr/bin/top -bn%s -d 0.1 | grep \"Cpu(s)\"",
diff --git a/azkaban-execserver/src/test/java/azkaban/execapp/StatisticsServletTest.java b/azkaban-execserver/src/test/java/azkaban/execapp/StatisticsServletTest.java
index 781973b..b754158 100644
--- a/azkaban-execserver/src/test/java/azkaban/execapp/StatisticsServletTest.java
+++ b/azkaban-execserver/src/test/java/azkaban/execapp/StatisticsServletTest.java
@@ -1,21 +1,16 @@
 package azkaban.execapp;
 
-import java.io.IOException;
-import java.util.Date;
-import java.util.Map;
-
 import org.junit.Assert;
 import org.junit.Test;
 
-import azkaban.executor.ServerStatistics;
-import azkaban.utils.JSONUtils;
+import azkaban.executor.ExecutorInfo;
 
 public class StatisticsServletTest {
   private class MockStatisticsServlet extends ServerStatisticsServlet{
     /** */
     private static final long serialVersionUID = 1L;
 
-    public  ServerStatistics getStastics(){
+    public  ExecutorInfo getStastics(){
       return cachedstats;
     }
 
@@ -27,17 +22,17 @@ public class StatisticsServletTest {
        this.populateStatistics(false);
     }
 
-    public void callFillCpuUsage(ServerStatistics stats){
+    public void callFillCpuUsage(ExecutorInfo stats){
       this.fillCpuUsage(stats);}
 
-    public void callFillRemainingMemoryPercent(ServerStatistics stats){
+    public void callFillRemainingMemoryPercent(ExecutorInfo stats){
         this.fillRemainingMemoryPercent(stats);}
   }
   private MockStatisticsServlet statServlet = new MockStatisticsServlet();
 
   @Test
   public void testFillMemory()  {
-    ServerStatistics stats = new ServerStatistics();
+    ExecutorInfo stats = new ExecutorInfo();
     this.statServlet.callFillRemainingMemoryPercent(stats);
     // assume any machine that runs this test should
     // have bash and top available and at least got some remaining memory.
@@ -47,7 +42,7 @@ public class StatisticsServletTest {
 
   @Test
   public void testFillCpu()  {
-    ServerStatistics stats = new ServerStatistics();
+    ExecutorInfo stats = new ExecutorInfo();
     this.statServlet.callFillCpuUsage(stats);
     Assert.assertTrue(stats.getCpuUsage() > 0);
   }
@@ -78,24 +73,4 @@ public class StatisticsServletTest {
     this.statServlet.callPopulateStatistics();
     Assert.assertNotEquals(updatedTime, this.statServlet.getUpdatedTime());
   }
-
-  @Test
-  public void testStatisticsJsonParser() throws IOException  {
-    ServerStatistics stat = new ServerStatistics(0.1,1,2,new Date(),3,5);
-    String jSonStr = JSONUtils.toJSON(stat);
-    @SuppressWarnings("unchecked")
-    Map<String,Object> jSonObj = (Map<String,Object>)JSONUtils.parseJSONFromString(jSonStr);
-    ServerStatistics stat2 = ServerStatistics.fromJsonObject(jSonObj);
-    Assert.assertTrue(stat.equals(stat2));
-    }
-
-  @Test
-  public void testStatisticsJsonParserWNullDateValue() throws IOException  {
-    ServerStatistics stat = new ServerStatistics(0.1,1,2,null,3,5);
-    String jSonStr = JSONUtils.toJSON(stat);
-    @SuppressWarnings("unchecked")
-    Map<String,Object> jSonObj = (Map<String,Object>)JSONUtils.parseJSONFromString(jSonStr);
-    ServerStatistics stat2 = ServerStatistics.fromJsonObject(jSonObj);
-    Assert.assertTrue(stat.equals(stat2));
-    }
 }