Details
diff --git a/azkaban-common/src/main/java/azkaban/executor/Executor.java b/azkaban-common/src/main/java/azkaban/executor/Executor.java
index efe00f9..216b45e 100644
--- a/azkaban-common/src/main/java/azkaban/executor/Executor.java
+++ b/azkaban-common/src/main/java/azkaban/executor/Executor.java
@@ -30,7 +30,7 @@ public class Executor implements Comparable<Executor> {
private final int port;
private boolean isActive;
// cached copy of the latest statistics from the executor.
- private ServerStatistics cachedExecutorStats;
+ private ExecutorInfo cachedExecutorStats;
private Date lastStatsUpdatedTime;
/**
@@ -113,11 +113,11 @@ public class Executor implements Comparable<Executor> {
return id;
}
- public ServerStatistics getExecutorStats() {
+ public ExecutorInfo getExecutorStats() {
return this.cachedExecutorStats;
}
- public void setExecutorStats(ServerStatistics stats) {
+ public void setExecutorStats(ExecutorInfo stats) {
this.cachedExecutorStats = stats;
this.lastStatsUpdatedTime = new Date();
}
diff --git a/azkaban-common/src/main/java/azkaban/executor/ExecutorApiClient.java b/azkaban-common/src/main/java/azkaban/executor/ExecutorApiClient.java
index 4d77897..47086c8 100644
--- a/azkaban-common/src/main/java/azkaban/executor/ExecutorApiClient.java
+++ b/azkaban-common/src/main/java/azkaban/executor/ExecutorApiClient.java
@@ -17,37 +17,37 @@
package azkaban.executor;
import java.io.IOException;
-import java.util.HashMap;
-import java.util.Map;
-
import org.apache.http.HttpEntity;
import org.apache.http.HttpResponse;
import org.apache.http.StatusLine;
import org.apache.http.client.HttpResponseException;
import org.apache.http.util.EntityUtils;
+import org.codehaus.jackson.map.ObjectMapper;
-import azkaban.utils.JSONUtils;
import azkaban.utils.RestfulApiClient;
/** Client class that will be used to handle all Restful API calls between Executor and the host application.
* */
-public class ExecutorApiClient extends RestfulApiClient<Map<String, Object>> {
- private ExecutorApiClient(){}
- private static ExecutorApiClient instance = new ExecutorApiClient();
+public class ExecutorApiClient<T extends java.io.Serializable> extends RestfulApiClient<T> {
+ private final Class<T> typeOfclass;
- /** Function to return the instance of the ExecutorApiClient class.
+ /**
+ * Constructor of the class.
+ * @param typeOfclass the type of class that the T represents. Must provide.
* */
- public static ExecutorApiClient getInstance() {
- return instance;
+ public ExecutorApiClient(Class<T> typeOfclass){
+ if (null == typeOfclass){
+ throw new IllegalArgumentException("Class type of the returning object must be specified.");
+ }
+ this.typeOfclass = typeOfclass;
}
/**Implementing the parseResponse function to return de-serialized Json object.
* @param response the returned response from the HttpClient.
- * @return de-serialized object from Json or empty object if the response doesn't have a body.
+ * @return de-serialized object from Json or null if the response doesn't have a body.
* */
- @SuppressWarnings("unchecked")
@Override
- protected Map<String, Object> parseResponse(HttpResponse response)
+ protected T parseResponse(HttpResponse response)
throws HttpResponseException, IOException {
final StatusLine statusLine = response.getStatusLine();
String responseBody = response.getEntity() != null ?
@@ -62,13 +62,10 @@ public class ExecutorApiClient extends RestfulApiClient<Map<String, Object>> {
}
final HttpEntity entity = response.getEntity();
- if (null != entity){
- Object returnVal = JSONUtils.parseJSONFromString(EntityUtils.toString(entity));
- if (null!= returnVal){
- return (Map<String, Object>) returnVal;
- }
+ if (null == entity || entity.getContentLength() >= Integer.MAX_VALUE){
+ logger.error("unable to parse the response as the response is null or with an invlaid length");
+ return null;
}
-
- return new HashMap<String, Object>() ;
+ return new ObjectMapper().readValue(EntityUtils.toString(entity), this.typeOfclass);
}
}
diff --git a/azkaban-common/src/main/java/azkaban/executor/selector/ExecutorComparator.java b/azkaban-common/src/main/java/azkaban/executor/selector/ExecutorComparator.java
index 1d48685..c1346e6 100644
--- a/azkaban-common/src/main/java/azkaban/executor/selector/ExecutorComparator.java
+++ b/azkaban-common/src/main/java/azkaban/executor/selector/ExecutorComparator.java
@@ -17,14 +17,13 @@
package azkaban.executor.selector;
import java.util.Comparator;
-import java.util.Date;
import java.util.HashMap;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Set;
import azkaban.executor.Executor;
-import azkaban.executor.ServerStatistics;
+import azkaban.executor.ExecutorInfo;
/**
@@ -118,8 +117,8 @@ public class ExecutorComparator extends CandidateComparator<Executor> {
* @return true if the passed statistics are NOT both valid, a shortcut can be made (caller can consume the result),
* false otherwise.
* */
- private static boolean statisticsObjectCheck(ServerStatistics statisticsObj1,
- ServerStatistics statisticsObj2, String caller, Integer result){
+ private static boolean statisticsObjectCheck(ExecutorInfo statisticsObj1,
+ ExecutorInfo statisticsObj2, String caller, Integer result){
result = 0 ;
// both doesn't expose the info
if (null == statisticsObj1 && null == statisticsObj2){
@@ -157,8 +156,8 @@ public class ExecutorComparator extends CandidateComparator<Executor> {
@Override
public int compare(Executor o1, Executor o2) {
- ServerStatistics stat1 = o1.getExecutorStats();
- ServerStatistics stat2 = o2.getExecutorStats();
+ ExecutorInfo stat1 = o1.getExecutorStats();
+ ExecutorInfo stat2 = o2.getExecutorStats();
Integer result = 0;
if (statisticsObjectCheck(stat1,stat2,NUMOFASSIGNEDFLOW_COMPARATOR_NAME,result)){
@@ -178,8 +177,8 @@ public class ExecutorComparator extends CandidateComparator<Executor> {
@Override
public int compare(Executor o1, Executor o2) {
- ServerStatistics stat1 = o1.getExecutorStats();
- ServerStatistics stat2 = o2.getExecutorStats();
+ ExecutorInfo stat1 = o1.getExecutorStats();
+ ExecutorInfo stat2 = o2.getExecutorStats();
int result = 0;
if (statisticsObjectCheck(stat1,stat2,CPUUSAGE_COMPARATOR_NAME,result)){
@@ -202,34 +201,15 @@ public class ExecutorComparator extends CandidateComparator<Executor> {
@Override
public int compare(Executor o1, Executor o2) {
- ServerStatistics stat1 = o1.getExecutorStats();
- ServerStatistics stat2 = o2.getExecutorStats();
+ ExecutorInfo stat1 = o1.getExecutorStats();
+ ExecutorInfo stat2 = o2.getExecutorStats();
int result = 0;
if (statisticsObjectCheck(stat1,stat2,LSTDISPATCHED_COMPARATOR_NAME,result)){
return result;
}
-
- if (null == stat1.getLastDispatchedTime() && null == stat1.getLastDispatchedTime()){
- logger.info(String.format("%s : stats from both side doesn't contain last dispatched time info.",
- LSTDISPATCHED_COMPARATOR_NAME));
- return 0;
- }
-
- if (null == stat2.getLastDispatchedTime()){
- logger.info(String.format("%s : choosing left side as right doesn't contain last dispatched time info.",
- LSTDISPATCHED_COMPARATOR_NAME));
- return 1;
- }
-
- if (null == stat1.getLastDispatchedTime()){
- logger.info(String.format("%s : choosing right side as left doesn't contain last dispatched time info.",
- LSTDISPATCHED_COMPARATOR_NAME));
- return -1;
- }
-
// Note: an earlier date time indicates higher weight.
- return ((Date)stat2.getLastDispatchedTime()).compareTo(stat1.getLastDispatchedTime());
+ return ((Long)stat2.getLastDispatchedTime()).compareTo(stat1.getLastDispatchedTime());
}});
}
@@ -248,8 +228,8 @@ public class ExecutorComparator extends CandidateComparator<Executor> {
@Override
public int compare(Executor o1, Executor o2) {
- ServerStatistics stat1 = o1.getExecutorStats();
- ServerStatistics stat2 = o2.getExecutorStats();
+ ExecutorInfo stat1 = o1.getExecutorStats();
+ ExecutorInfo stat2 = o2.getExecutorStats();
int result = 0;
if (statisticsObjectCheck(stat1,stat2,MEMORY_COMPARATOR_NAME,result)){
diff --git a/azkaban-common/src/main/java/azkaban/executor/selector/ExecutorFilter.java b/azkaban-common/src/main/java/azkaban/executor/selector/ExecutorFilter.java
index 9adc0dc..5a3d33e 100644
--- a/azkaban-common/src/main/java/azkaban/executor/selector/ExecutorFilter.java
+++ b/azkaban-common/src/main/java/azkaban/executor/selector/ExecutorFilter.java
@@ -23,7 +23,7 @@ import java.util.Set;
import azkaban.executor.ExecutableFlow;
import azkaban.executor.Executor;
-import azkaban.executor.ServerStatistics;
+import azkaban.executor.ExecutorInfo;
/**
* De-normalized version of the candidateFilter, which also contains the implementation of the factor filters.
@@ -101,7 +101,7 @@ public final class ExecutorFilter extends CandidateFilter<Executor, ExecutableFl
return false;
}
- ServerStatistics stats = filteringTarget.getExecutorStats();
+ ExecutorInfo stats = filteringTarget.getExecutorStats();
if (null == stats) {
logger.info(String.format("%s : filtering out %s as it's stats is unavailable.",
STATICREMAININGFLOWSIZE_FILTER_NAME,
@@ -128,7 +128,7 @@ public final class ExecutorFilter extends CandidateFilter<Executor, ExecutableFl
return false;
}
- ServerStatistics stats = filteringTarget.getExecutorStats();
+ ExecutorInfo stats = filteringTarget.getExecutorStats();
if (null == stats) {
logger.info(String.format("%s : filtering out %s as it's stats is unavailable.",
MINIMUMFREEMEMORY_FILTER_NAME,
@@ -157,7 +157,7 @@ public final class ExecutorFilter extends CandidateFilter<Executor, ExecutableFl
return false;
}
- ServerStatistics stats = filteringTarget.getExecutorStats();
+ ExecutorInfo stats = filteringTarget.getExecutorStats();
if (null == stats) {
logger.info(String.format("%s : filtering out %s as it's stats is unavailable.",
MINIMUMFREEMEMORY_FILTER_NAME,
diff --git a/azkaban-common/src/test/java/azkaban/executor/SelectorTest.java b/azkaban-common/src/test/java/azkaban/executor/SelectorTest.java
index ca43678..fd8768b 100644
--- a/azkaban-common/src/test/java/azkaban/executor/SelectorTest.java
+++ b/azkaban-common/src/test/java/azkaban/executor/SelectorTest.java
@@ -685,9 +685,9 @@ public class SelectorTest {
executorList.add(new Executor(2, "host2", 80, true));
executorList.add(new Executor(3, "host3", 80, true));
- executorList.get(0).setExecutorStats(new ServerStatistics(99.9, 14095, 50, new Date(), 89, 0));
- executorList.get(1).setExecutorStats(new ServerStatistics(50, 14095, 50, new Date(), 90, 0));
- executorList.get(2).setExecutorStats(new ServerStatistics(99.9, 14095, 50, new Date(), 90, 0));
+ executorList.get(0).setExecutorStats(new ExecutorInfo(99.9, 14095, 50, System.currentTimeMillis(), 89, 0));
+ executorList.get(1).setExecutorStats(new ExecutorInfo(50, 14095, 50, System.currentTimeMillis(), 90, 0));
+ executorList.get(2).setExecutorStats(new ExecutorInfo(99.9, 14095, 50, System.currentTimeMillis(), 90, 0));
ExecutableFlow flow = new ExecutableFlow();
@@ -706,9 +706,9 @@ public class SelectorTest {
executorList.add(new Executor(2, "host2", 80, true));
executorList.add(new Executor(3, "host3", 80, true));
- executorList.get(0).setExecutorStats(new ServerStatistics(99.9, 14095, 50, new Date(), 89, 0));
- executorList.get(1).setExecutorStats(new ServerStatistics(50, 14095, 50, new Date(), 90, 0));
- executorList.get(2).setExecutorStats(new ServerStatistics(99.9, 14095, 50, new Date(), 90, 0));
+ executorList.get(0).setExecutorStats(new ExecutorInfo(99.9, 14095, 50, System.currentTimeMillis(), 89, 0));
+ executorList.get(1).setExecutorStats(new ExecutorInfo(50, 14095, 50, System.currentTimeMillis(), 90, 0));
+ executorList.get(2).setExecutorStats(new ExecutorInfo(99.9, 14095, 50, System.currentTimeMillis(), 90, 0));
ExecutableFlow flow = new ExecutableFlow();
@@ -721,7 +721,7 @@ public class SelectorTest {
// simulate that once the flow is assigned, executor1's remaining TMP storage dropped to 2048
// now we do the getBest again executor3 is expected to be selected as it has a earlier last dispatched time.
- executorList.get(0).setExecutorStats(new ServerStatistics(99.9, 4095, 50, new Date(), 90, 1));
+ executorList.get(0).setExecutorStats(new ExecutorInfo(99.9, 4095, 50, System.currentTimeMillis(), 90, 1));
executor = selector.getBest(executorList, flow);
Assert.assertEquals(executorList.get(2), executor);
}
diff --git a/azkaban-execserver/src/main/java/azkaban/execapp/FlowRunnerManager.java b/azkaban-execserver/src/main/java/azkaban/execapp/FlowRunnerManager.java
index 71caaff..361db00 100644
--- a/azkaban-execserver/src/main/java/azkaban/execapp/FlowRunnerManager.java
+++ b/azkaban-execserver/src/main/java/azkaban/execapp/FlowRunnerManager.java
@@ -144,7 +144,7 @@ public class FlowRunnerManager implements EventListener,
private Object executionDirDeletionSync = new Object();
// date time of the the last flow submitted.
- private Date lastFlowSubmittedDate = null;
+ private long lastFlowSubmittedDate = 0;
public FlowRunnerManager(Props props, ExecutorLoader executorLoader,
ProjectLoader projectLoader, ClassLoader parentClassLoader)
@@ -258,7 +258,7 @@ public class FlowRunnerManager implements EventListener,
return allProjects;
}
- public Date getLastFlowSubmittedTime(){
+ public long getLastFlowSubmittedTime(){
// Note: this is not thread safe and may result in providing dirty data.
// we will provide this data as is for now and will revisit if there
// is a string justification for change.
@@ -519,7 +519,7 @@ public class FlowRunnerManager implements EventListener,
// keep track of this future
submittedFlows.put(future, runner.getExecutionId());
// update the last submitted time.
- this.lastFlowSubmittedDate = new Date();
+ this.lastFlowSubmittedDate = System.currentTimeMillis();
} catch (RejectedExecutionException re) {
throw new ExecutorManagerException(
"Azkaban server can't execute any more flows. "
diff --git a/azkaban-execserver/src/main/java/azkaban/execapp/ServerStatisticsServlet.java b/azkaban-execserver/src/main/java/azkaban/execapp/ServerStatisticsServlet.java
index 2dfae31..3dfe0ae 100644
--- a/azkaban-execserver/src/main/java/azkaban/execapp/ServerStatisticsServlet.java
+++ b/azkaban-execserver/src/main/java/azkaban/execapp/ServerStatisticsServlet.java
@@ -30,7 +30,7 @@ import javax.servlet.http.HttpServletResponse;
import org.apache.log4j.Logger;
-import azkaban.executor.ServerStatistics;
+import azkaban.executor.ExecutorInfo;
import azkaban.utils.JSONUtils;
public class ServerStatisticsServlet extends HttpServlet {
@@ -42,7 +42,7 @@ public class ServerStatisticsServlet extends HttpServlet {
private static final String noCacheParamName = "nocache";
protected static long lastRefreshedTime = 0;
- protected static ServerStatistics cachedstats = null;
+ protected static ExecutorInfo cachedstats = null;
/**
* Handle all get request to Statistics Servlet {@inheritDoc}
@@ -71,7 +71,7 @@ public class ServerStatisticsServlet extends HttpServlet {
* a double value will be used to present the remaining memory,
* a returning value of '55.6' means 55.6%
*/
- protected void fillRemainingMemoryPercent(ServerStatistics stats){
+ protected void fillRemainingMemoryPercent(ExecutorInfo stats){
if (new File("/bin/bash").exists() && new File("/usr/bin/free").exists()) {
java.lang.ProcessBuilder processBuilder =
new java.lang.ProcessBuilder("/bin/bash", "-c", String.format("/usr/bin/free -m -s 0.1 -c %s | grep Mem:",
@@ -96,7 +96,6 @@ public class ServerStatisticsServlet extends HttpServlet {
long totalMemory = 0 ;
long freeMemory = 0 ;
int sampleCount = 0 ;
-
for(String line : output){
String[] splitedresult = line.split("\\s+");
// expected return format -
@@ -150,7 +149,7 @@ public class ServerStatisticsServlet extends HttpServlet {
protected synchronized void populateStatistics(boolean noCache){
//check again before starting the work.
if (noCache || System.currentTimeMillis() - lastRefreshedTime > cacheTimeInMilliseconds){
- final ServerStatistics stats = new ServerStatistics();
+ final ExecutorInfo stats = new ExecutorInfo();
List<Thread> workerPool = new ArrayList<Thread>();
workerPool.add(new Thread(new Runnable(){ public void run() {
@@ -188,7 +187,7 @@ public class ServerStatisticsServlet extends HttpServlet {
* @param stats reference to the result container which contains all the results, this specific method
* will only work on the property "remainingFlowCapacity".
*/
- protected void fillRemainingFlowCapacityAndLastDispatchedTime(ServerStatistics stats){
+ protected void fillRemainingFlowCapacityAndLastDispatchedTime(ExecutorInfo stats){
AzkabanExecutorServer server = AzkabanExecutorServer.getApp();
if (server != null){
@@ -209,7 +208,7 @@ public class ServerStatisticsServlet extends HttpServlet {
* @param stats reference to the result container which contains all the results, this specific method
* will only work on the property "cpuUdage".
*/
- protected void fillCpuUsage(ServerStatistics stats){
+ protected void fillCpuUsage(ExecutorInfo stats){
if (new File("/bin/bash").exists() && new File("/usr/bin/top").exists()) {
java.lang.ProcessBuilder processBuilder =
new java.lang.ProcessBuilder("/bin/bash", "-c", String.format("/usr/bin/top -bn%s -d 0.1 | grep \"Cpu(s)\"",
diff --git a/azkaban-execserver/src/test/java/azkaban/execapp/StatisticsServletTest.java b/azkaban-execserver/src/test/java/azkaban/execapp/StatisticsServletTest.java
index 781973b..b754158 100644
--- a/azkaban-execserver/src/test/java/azkaban/execapp/StatisticsServletTest.java
+++ b/azkaban-execserver/src/test/java/azkaban/execapp/StatisticsServletTest.java
@@ -1,21 +1,16 @@
package azkaban.execapp;
-import java.io.IOException;
-import java.util.Date;
-import java.util.Map;
-
import org.junit.Assert;
import org.junit.Test;
-import azkaban.executor.ServerStatistics;
-import azkaban.utils.JSONUtils;
+import azkaban.executor.ExecutorInfo;
public class StatisticsServletTest {
private class MockStatisticsServlet extends ServerStatisticsServlet{
/** */
private static final long serialVersionUID = 1L;
- public ServerStatistics getStastics(){
+ public ExecutorInfo getStastics(){
return cachedstats;
}
@@ -27,17 +22,17 @@ public class StatisticsServletTest {
this.populateStatistics(false);
}
- public void callFillCpuUsage(ServerStatistics stats){
+ public void callFillCpuUsage(ExecutorInfo stats){
this.fillCpuUsage(stats);}
- public void callFillRemainingMemoryPercent(ServerStatistics stats){
+ public void callFillRemainingMemoryPercent(ExecutorInfo stats){
this.fillRemainingMemoryPercent(stats);}
}
private MockStatisticsServlet statServlet = new MockStatisticsServlet();
@Test
public void testFillMemory() {
- ServerStatistics stats = new ServerStatistics();
+ ExecutorInfo stats = new ExecutorInfo();
this.statServlet.callFillRemainingMemoryPercent(stats);
// assume any machine that runs this test should
// have bash and top available and at least got some remaining memory.
@@ -47,7 +42,7 @@ public class StatisticsServletTest {
@Test
public void testFillCpu() {
- ServerStatistics stats = new ServerStatistics();
+ ExecutorInfo stats = new ExecutorInfo();
this.statServlet.callFillCpuUsage(stats);
Assert.assertTrue(stats.getCpuUsage() > 0);
}
@@ -78,24 +73,4 @@ public class StatisticsServletTest {
this.statServlet.callPopulateStatistics();
Assert.assertNotEquals(updatedTime, this.statServlet.getUpdatedTime());
}
-
- @Test
- public void testStatisticsJsonParser() throws IOException {
- ServerStatistics stat = new ServerStatistics(0.1,1,2,new Date(),3,5);
- String jSonStr = JSONUtils.toJSON(stat);
- @SuppressWarnings("unchecked")
- Map<String,Object> jSonObj = (Map<String,Object>)JSONUtils.parseJSONFromString(jSonStr);
- ServerStatistics stat2 = ServerStatistics.fromJsonObject(jSonObj);
- Assert.assertTrue(stat.equals(stat2));
- }
-
- @Test
- public void testStatisticsJsonParserWNullDateValue() throws IOException {
- ServerStatistics stat = new ServerStatistics(0.1,1,2,null,3,5);
- String jSonStr = JSONUtils.toJSON(stat);
- @SuppressWarnings("unchecked")
- Map<String,Object> jSonObj = (Map<String,Object>)JSONUtils.parseJSONFromString(jSonStr);
- ServerStatistics stat2 = ServerStatistics.fromJsonObject(jSonObj);
- Assert.assertTrue(stat.equals(stat2));
- }
}