azkaban-aplcache

Details

diff --git a/azkaban-common/src/main/java/azkaban/executor/Executor.java b/azkaban-common/src/main/java/azkaban/executor/Executor.java
index 216b45e..e21ae2a 100644
--- a/azkaban-common/src/main/java/azkaban/executor/Executor.java
+++ b/azkaban-common/src/main/java/azkaban/executor/Executor.java
@@ -113,19 +113,19 @@ public class Executor implements Comparable<Executor> {
     return id;
   }
 
-  public ExecutorInfo getExecutorStats() {
+  public ExecutorInfo getExecutorInfo() {
     return this.cachedExecutorStats;
   }
 
-  public void setExecutorStats(ExecutorInfo stats) {
-    this.cachedExecutorStats = stats;
+  public void setExecutorInfo(ExecutorInfo info) {
+    this.cachedExecutorStats = info;
     this.lastStatsUpdatedTime = new Date();
   }
 
   /**
-   * Gets the timestamp when the stats are last updated.
-   * @return date object represents the timestamp, null if the stats of this
-   *         specific object is never refreshed.
+   * Gets the timestamp when the executor info is last updated.
+   * @return date object represents the timestamp, null if the executor info of this
+   *         specific executor is never refreshed.
    * */
   public Date getLastStatsUpdatedTime(){
     return this.lastStatsUpdatedTime;
diff --git a/azkaban-common/src/main/java/azkaban/executor/ExecutorApiClient.java b/azkaban-common/src/main/java/azkaban/executor/ExecutorApiClient.java
index 47086c8..2ab814d 100644
--- a/azkaban-common/src/main/java/azkaban/executor/ExecutorApiClient.java
+++ b/azkaban-common/src/main/java/azkaban/executor/ExecutorApiClient.java
@@ -17,29 +17,27 @@
 package azkaban.executor;
 
 import java.io.IOException;
-import org.apache.http.HttpEntity;
 import org.apache.http.HttpResponse;
 import org.apache.http.StatusLine;
 import org.apache.http.client.HttpResponseException;
 import org.apache.http.util.EntityUtils;
-import org.codehaus.jackson.map.ObjectMapper;
-
 import azkaban.utils.RestfulApiClient;
 
 /** Client class that will be used to handle all Restful API calls between Executor and the host application.
  * */
-public class ExecutorApiClient<T extends java.io.Serializable> extends RestfulApiClient<T> {
-  private final Class<T> typeOfclass;
+public class ExecutorApiClient extends RestfulApiClient<String> {
+  private static ExecutorApiClient instance = null;
+  private ExecutorApiClient(){}
 
   /**
-   * Constructor of the class.
-   * @param typeOfclass the type of class that the T represents. Must provide.
+   * Singleton method to return the instance of the current object.
    * */
-  public ExecutorApiClient(Class<T> typeOfclass){
-    if (null == typeOfclass){
-      throw new IllegalArgumentException("Class type of the returning object must be specified.");
+  public static ExecutorApiClient getInstance(){
+    if (null == instance){
+      instance = new ExecutorApiClient();
     }
-    this.typeOfclass = typeOfclass;
+
+    return instance;
   }
 
   /**Implementing the parseResponse function to return de-serialized Json object.
@@ -47,7 +45,7 @@ public class ExecutorApiClient<T extends java.io.Serializable> extends RestfulAp
    * @return de-serialized object from Json or null if the response doesn't have a body.
    * */
   @Override
-  protected T parseResponse(HttpResponse response)
+  protected String parseResponse(HttpResponse response)
       throws HttpResponseException, IOException {
     final StatusLine statusLine = response.getStatusLine();
     String responseBody = response.getEntity() != null ?
@@ -61,11 +59,6 @@ public class ExecutorApiClient<T extends java.io.Serializable> extends RestfulAp
         throw new HttpResponseException(statusLine.getStatusCode(),responseBody);
     }
 
-    final HttpEntity entity = response.getEntity();
-    if (null == entity || entity.getContentLength() >= Integer.MAX_VALUE){
-      logger.error("unable to parse the response as the response is null or with an invlaid length");
-      return null;
-    }
-    return new ObjectMapper().readValue(EntityUtils.toString(entity), this.typeOfclass);
+    return responseBody;
   }
 }
diff --git a/azkaban-common/src/main/java/azkaban/executor/ExecutorInfo.java b/azkaban-common/src/main/java/azkaban/executor/ExecutorInfo.java
index d263231..2820c08 100644
--- a/azkaban-common/src/main/java/azkaban/executor/ExecutorInfo.java
+++ b/azkaban-common/src/main/java/azkaban/executor/ExecutorInfo.java
@@ -16,6 +16,12 @@
 
 package azkaban.executor;
 
+import java.io.IOException;
+
+import org.codehaus.jackson.JsonParseException;
+import org.codehaus.jackson.map.JsonMappingException;
+import org.codehaus.jackson.map.ObjectMapper;
+
  /** Class that exposes the statistics from the executor server.
   *  List of the statistics -
   *  remainingMemoryPercent;
@@ -117,4 +123,18 @@ package azkaban.executor;
         }
         return false;
     }
+
+    /**
+     * Helper function to get an ExecutorInfo instance from the JSon String serialized from another object.
+     * @param  jsonString the string that will be de-serialized from.
+     * @return instance of the object if the parsing is successful, null other wise.
+     * @throws JsonParseException,JsonMappingException,IOException
+     * */
+    public static ExecutorInfo fromJSONString(String jsonString) throws
+    JsonParseException,
+    JsonMappingException,
+    IOException{
+      if (null == jsonString || jsonString.length() == 0) return null;
+      return new ObjectMapper().readValue(jsonString, ExecutorInfo.class);
+    }
 }
diff --git a/azkaban-common/src/main/java/azkaban/executor/selector/ExecutorComparator.java b/azkaban-common/src/main/java/azkaban/executor/selector/ExecutorComparator.java
index c1346e6..c6d096e 100644
--- a/azkaban-common/src/main/java/azkaban/executor/selector/ExecutorComparator.java
+++ b/azkaban-common/src/main/java/azkaban/executor/selector/ExecutorComparator.java
@@ -156,8 +156,8 @@ public class ExecutorComparator extends CandidateComparator<Executor> {
 
       @Override
       public int compare(Executor o1, Executor o2) {
-        ExecutorInfo stat1 = o1.getExecutorStats();
-        ExecutorInfo stat2 = o2.getExecutorStats();
+        ExecutorInfo stat1 = o1.getExecutorInfo();
+        ExecutorInfo stat2 = o2.getExecutorInfo();
 
         Integer result = 0;
         if (statisticsObjectCheck(stat1,stat2,NUMOFASSIGNEDFLOW_COMPARATOR_NAME,result)){
@@ -177,8 +177,8 @@ public class ExecutorComparator extends CandidateComparator<Executor> {
 
       @Override
       public int compare(Executor o1, Executor o2) {
-        ExecutorInfo stat1 = o1.getExecutorStats();
-        ExecutorInfo stat2 = o2.getExecutorStats();
+        ExecutorInfo stat1 = o1.getExecutorInfo();
+        ExecutorInfo stat2 = o2.getExecutorInfo();
 
         int result = 0;
         if (statisticsObjectCheck(stat1,stat2,CPUUSAGE_COMPARATOR_NAME,result)){
@@ -201,8 +201,8 @@ public class ExecutorComparator extends CandidateComparator<Executor> {
 
       @Override
       public int compare(Executor o1, Executor o2) {
-        ExecutorInfo stat1 = o1.getExecutorStats();
-        ExecutorInfo stat2 = o2.getExecutorStats();
+        ExecutorInfo stat1 = o1.getExecutorInfo();
+        ExecutorInfo stat2 = o2.getExecutorInfo();
 
         int result = 0;
         if (statisticsObjectCheck(stat1,stat2,LSTDISPATCHED_COMPARATOR_NAME,result)){
@@ -228,8 +228,8 @@ public class ExecutorComparator extends CandidateComparator<Executor> {
 
       @Override
       public int compare(Executor o1, Executor o2) {
-       ExecutorInfo stat1 = o1.getExecutorStats();
-       ExecutorInfo stat2 = o2.getExecutorStats();
+       ExecutorInfo stat1 = o1.getExecutorInfo();
+       ExecutorInfo stat2 = o2.getExecutorInfo();
 
        int result = 0;
        if (statisticsObjectCheck(stat1,stat2,MEMORY_COMPARATOR_NAME,result)){
diff --git a/azkaban-common/src/main/java/azkaban/executor/selector/ExecutorFilter.java b/azkaban-common/src/main/java/azkaban/executor/selector/ExecutorFilter.java
index 5a3d33e..93b4a91 100644
--- a/azkaban-common/src/main/java/azkaban/executor/selector/ExecutorFilter.java
+++ b/azkaban-common/src/main/java/azkaban/executor/selector/ExecutorFilter.java
@@ -101,7 +101,7 @@ public final class ExecutorFilter extends CandidateFilter<Executor, ExecutableFl
           return false;
         }
 
-        ExecutorInfo stats = filteringTarget.getExecutorStats();
+        ExecutorInfo stats = filteringTarget.getExecutorInfo();
         if (null == stats) {
           logger.info(String.format("%s : filtering out %s as it's stats is unavailable.",
               STATICREMAININGFLOWSIZE_FILTER_NAME,
@@ -128,7 +128,7 @@ public final class ExecutorFilter extends CandidateFilter<Executor, ExecutableFl
           return false;
         }
 
-        ExecutorInfo stats = filteringTarget.getExecutorStats();
+        ExecutorInfo stats = filteringTarget.getExecutorInfo();
         if (null == stats) {
           logger.info(String.format("%s : filtering out %s as it's stats is unavailable.",
               MINIMUMFREEMEMORY_FILTER_NAME,
@@ -157,7 +157,7 @@ public final class ExecutorFilter extends CandidateFilter<Executor, ExecutableFl
           return false;
         }
 
-        ExecutorInfo stats = filteringTarget.getExecutorStats();
+        ExecutorInfo stats = filteringTarget.getExecutorInfo();
         if (null == stats) {
           logger.info(String.format("%s : filtering out %s as it's stats is unavailable.",
               MINIMUMFREEMEMORY_FILTER_NAME,
diff --git a/azkaban-common/src/test/java/azkaban/executor/SelectorTest.java b/azkaban-common/src/test/java/azkaban/executor/SelectorTest.java
index fd8768b..e54b182 100644
--- a/azkaban-common/src/test/java/azkaban/executor/SelectorTest.java
+++ b/azkaban-common/src/test/java/azkaban/executor/SelectorTest.java
@@ -32,6 +32,7 @@ import org.junit.BeforeClass;
 import org.junit.Test;
 
 import azkaban.executor.selector.*;
+import azkaban.utils.JSONUtils;
 
 public class SelectorTest {
   // mock executor object.
@@ -685,9 +686,9 @@ public class SelectorTest {
     executorList.add(new Executor(2, "host2", 80, true));
     executorList.add(new Executor(3, "host3", 80, true));
 
-    executorList.get(0).setExecutorStats(new ExecutorInfo(99.9, 14095, 50, System.currentTimeMillis(), 89, 0));
-    executorList.get(1).setExecutorStats(new ExecutorInfo(50, 14095, 50, System.currentTimeMillis(), 90,  0));
-    executorList.get(2).setExecutorStats(new ExecutorInfo(99.9, 14095, 50, System.currentTimeMillis(), 90,  0));
+    executorList.get(0).setExecutorInfo(new ExecutorInfo(99.9, 14095, 50, System.currentTimeMillis(), 89, 0));
+    executorList.get(1).setExecutorInfo(new ExecutorInfo(50, 14095, 50, System.currentTimeMillis(), 90,  0));
+    executorList.get(2).setExecutorInfo(new ExecutorInfo(99.9, 14095, 50, System.currentTimeMillis(), 90,  0));
 
     ExecutableFlow flow = new ExecutableFlow();
 
@@ -706,9 +707,9 @@ public class SelectorTest {
     executorList.add(new Executor(2, "host2", 80, true));
     executorList.add(new Executor(3, "host3", 80, true));
 
-    executorList.get(0).setExecutorStats(new ExecutorInfo(99.9, 14095, 50, System.currentTimeMillis(), 89, 0));
-    executorList.get(1).setExecutorStats(new ExecutorInfo(50, 14095, 50, System.currentTimeMillis(), 90,  0));
-    executorList.get(2).setExecutorStats(new ExecutorInfo(99.9, 14095, 50, System.currentTimeMillis(), 90,  0));
+    executorList.get(0).setExecutorInfo(new ExecutorInfo(99.9, 14095, 50, System.currentTimeMillis(), 89, 0));
+    executorList.get(1).setExecutorInfo(new ExecutorInfo(50, 14095, 50, System.currentTimeMillis(), 90,  0));
+    executorList.get(2).setExecutorInfo(new ExecutorInfo(99.9, 14095, 50, System.currentTimeMillis(), 90,  0));
 
     ExecutableFlow flow = new ExecutableFlow();
 
@@ -721,9 +722,17 @@ public class SelectorTest {
 
     // simulate that once the flow is assigned, executor1's remaining TMP storage dropped to 2048
     // now we do the getBest again executor3 is expected to be selected as it has a earlier last dispatched time.
-    executorList.get(0).setExecutorStats(new ExecutorInfo(99.9, 4095, 50, System.currentTimeMillis(), 90, 1));
+    executorList.get(0).setExecutorInfo(new ExecutorInfo(99.9, 4095, 50, System.currentTimeMillis(), 90, 1));
     executor = selector.getBest(executorList, flow);
     Assert.assertEquals(executorList.get(2), executor);
   }
 
+  @Test
+  public void  testExecutorInfoJsonParser() throws Exception{
+    ExecutorInfo exeInfo = new ExecutorInfo(99.9, 14095, 50, System.currentTimeMillis(), 89, 10);
+    String json = JSONUtils.toJSON(exeInfo);
+    ExecutorInfo exeInfo2 = ExecutorInfo.fromJSONString(json);
+    Assert.assertTrue(exeInfo.equals(exeInfo2));
+  }
+
 }