Details
diff --git a/azkaban-common/src/main/java/azkaban/executor/selector/ExecutorComparator.java b/azkaban-common/src/main/java/azkaban/executor/selector/ExecutorComparator.java
index 6fc8f22..ce4e41a 100644
--- a/azkaban-common/src/main/java/azkaban/executor/selector/ExecutorComparator.java
+++ b/azkaban-common/src/main/java/azkaban/executor/selector/ExecutorComparator.java
@@ -38,9 +38,8 @@ public class ExecutorComparator extends CandidateComparator<Executor> {
}
// factor comparator names
- private static final String REMAININGFLOWSIZE_COMPARATOR_NAME = "RemainingFlowSize";
+ private static final String NUMOFASSIGNEDFLOW_COMPARATOR_NAME = "NumberOfAssignedFlowComparator";
private static final String MEMORY_COMPARATOR_NAME = "Memory";
- private static final String REMAININGTMPSIZE_COMPARATOR_NAME = "RemainingTmpSize";
private static final String PRIORITY_COMPARATOR_NAME = "Priority";
private static final String LSTDISPATCHED_COMPARATOR_NAME = "LastDispatched";
private static final String CPUUSAGE_COMPARATOR_NAME = "CpuUsage";
@@ -53,29 +52,25 @@ public class ExecutorComparator extends CandidateComparator<Executor> {
static {
comparatorCreatorRepository = new HashMap<String, ComparatorCreator>();
- // register the creator for remaining flow size comparator.
- comparatorCreatorRepository.put(REMAININGFLOWSIZE_COMPARATOR_NAME, new ComparatorCreator(){
- @Override public FactorComparator<Executor> create(int weight) { return getRemainingFlowSizeComparator(weight); }});
+ // register the creator for number of assigned flow comparator.
+ comparatorCreatorRepository.put(NUMOFASSIGNEDFLOW_COMPARATOR_NAME, new ComparatorCreator(){
+ public FactorComparator<Executor> create(int weight) { return getNumberOfAssignedFlowComparator(weight); }});
// register the creator for memory comparator.
comparatorCreatorRepository.put(MEMORY_COMPARATOR_NAME, new ComparatorCreator(){
- @Override public FactorComparator<Executor> create(int weight) { return getMemoryComparator(weight); }});
+ public FactorComparator<Executor> create(int weight) { return getMemoryComparator(weight); }});
// register the creator for priority comparator.
comparatorCreatorRepository.put(PRIORITY_COMPARATOR_NAME, new ComparatorCreator(){
- @Override public FactorComparator<Executor> create(int weight) { return getPriorityComparator(weight); }});
-
- // register the creator for remaining TMP size comparator.
- comparatorCreatorRepository.put(PRIORITY_COMPARATOR_NAME, new ComparatorCreator(){
- @Override public FactorComparator<Executor> create(int weight) { return getRemainingTmpSizeComparator(weight); }});
+ public FactorComparator<Executor> create(int weight) { return getPriorityComparator(weight); }});
// register the creator for last dispatched time comparator.
comparatorCreatorRepository.put(LSTDISPATCHED_COMPARATOR_NAME, new ComparatorCreator(){
- @Override public FactorComparator<Executor> create(int weight) { return getLstDispatchedTimeComparator(weight); }});
+ public FactorComparator<Executor> create(int weight) { return getLstDispatchedTimeComparator(weight); }});
// register the creator for CPU Usage comparator.
comparatorCreatorRepository.put(CPUUSAGE_COMPARATOR_NAME, new ComparatorCreator(){
- @Override public FactorComparator<Executor> create(int weight) { return getCpuUsageComparator(weight); }});
+ public FactorComparator<Executor> create(int weight) { return getCpuUsageComparator(weight); }});
}
@@ -97,8 +92,9 @@ public class ExecutorComparator extends CandidateComparator<Executor> {
get(entry.getKey()).
create(entry.getValue()));
} else {
- logger.error(String.format("failed to initialize executor comparator as the comparator implementation for requested factor '%s' doesn't exist.",
- entry.getKey()));
+ logger.error(String.format("failed to initialize executor comparator "+
+ "as the comparator implementation for requested factor '%s' doesn't exist.",
+ entry.getKey()));
throw new IllegalArgumentException("comparatorList");
}
}
@@ -123,7 +119,8 @@ public class ExecutorComparator extends CandidateComparator<Executor> {
* @return true if the passed statistics are NOT both valid, a shortcut can be made (caller can consume the result),
* false otherwise.
* */
- private static boolean statisticsObjectCheck(Statistics statisticsObj1, Statistics statisticsObj2, String caller, Integer result){
+ private static boolean statisticsObjectCheck(Statistics statisticsObj1,
+ Statistics statisticsObj2, String caller, Integer result){
result = 0 ;
// both doesn't expose the info
if (null == statisticsObj1 && null == statisticsObj2){
@@ -153,11 +150,11 @@ public class ExecutorComparator extends CandidateComparator<Executor> {
}
/**
- * function defines the remaining flow size comparator.
+ * function defines the number of assigned flow comparator.
* @param weight weight of the comparator.
* */
- private static FactorComparator<Executor> getRemainingFlowSizeComparator(int weight){
- return FactorComparator.create(REMAININGFLOWSIZE_COMPARATOR_NAME, weight, new Comparator<Executor>(){
+ private static FactorComparator<Executor> getNumberOfAssignedFlowComparator(int weight){
+ return FactorComparator.create(NUMOFASSIGNEDFLOW_COMPARATOR_NAME, weight, new Comparator<Executor>(){
@Override
public int compare(Executor o1, Executor o2) {
@@ -165,7 +162,7 @@ public class ExecutorComparator extends CandidateComparator<Executor> {
Statistics stat2 = o2.getExecutorStats();
Integer result = 0;
- if (statisticsObjectCheck(stat1,stat2,REMAININGFLOWSIZE_COMPARATOR_NAME,result)){
+ if (statisticsObjectCheck(stat1,stat2,NUMOFASSIGNEDFLOW_COMPARATOR_NAME,result)){
return result;
}
return ((Integer)stat1.getRemainingFlowCapacity()).compareTo(stat2.getRemainingFlowCapacity());
@@ -173,26 +170,6 @@ public class ExecutorComparator extends CandidateComparator<Executor> {
}
/**
- * function defines the remaining folder size comparator.
- * @param weight weight of the comparator.
- * */
- private static FactorComparator<Executor> getRemainingTmpSizeComparator(int weight){
- return FactorComparator.create(REMAININGTMPSIZE_COMPARATOR_NAME, weight, new Comparator<Executor>(){
-
- @Override
- public int compare(Executor o1, Executor o2) {
- Statistics stat1 = o1.getExecutorStats();
- Statistics stat2 = o2.getExecutorStats();
-
- Integer result = 0;
- if (statisticsObjectCheck(stat1,stat2,REMAININGTMPSIZE_COMPARATOR_NAME,result)){
- return result;
- }
- return ((Long)stat1.getRemainingStorage()).compareTo(stat2.getRemainingStorage());
- }});
- }
-
- /**
* function defines the priority comparator.
* @param weight weight of the comparator.
* @return
@@ -230,7 +207,9 @@ public class ExecutorComparator extends CandidateComparator<Executor> {
if (statisticsObjectCheck(stat1,stat2,CPUUSAGE_COMPARATOR_NAME,result)){
return result;
}
- return ((Double)stat1.getCpuUsage()).compareTo(stat2.getCpuUsage());
+
+ // CPU usage , the lesser the value is, the better.
+ return ((Double)stat2.getCpuUsage()).compareTo(stat1.getCpuUsage());
}});
}
diff --git a/azkaban-common/src/main/java/azkaban/executor/selector/ExecutorFilter.java b/azkaban-common/src/main/java/azkaban/executor/selector/ExecutorFilter.java
index f7e2c34..c27b673 100644
--- a/azkaban-common/src/main/java/azkaban/executor/selector/ExecutorFilter.java
+++ b/azkaban-common/src/main/java/azkaban/executor/selector/ExecutorFilter.java
@@ -40,6 +40,8 @@ public final class ExecutorFilter extends CandidateFilter<Executor, ExecutableFl
// factor filter names.
private static final String STATICREMAININGFLOWSIZE_FILTER_NAME = "StaticRemainingFlowSize";
+ private static final String MINIMUMFREEMEMORY_FILTER_NAME = "MinimunFreeMemory";
+ private static final String CPUSTATUS_FILTER_NAME = "CpuStatus";
/**
* static initializer of the class.
@@ -49,6 +51,8 @@ public final class ExecutorFilter extends CandidateFilter<Executor, ExecutableFl
static {
filterRepository = new HashMap<String, FactorFilter<Executor, ExecutableFlow>>();
filterRepository.put(STATICREMAININGFLOWSIZE_FILTER_NAME, getStaticRemainingFlowSizeFilter());
+ filterRepository.put(MINIMUMFREEMEMORY_FILTER_NAME, getMinimumReservedMemoryFilter());
+ filterRepository.put(CPUSTATUS_FILTER_NAME, getCpuStatusFilter());
}
/**
@@ -67,8 +71,9 @@ public final class ExecutorFilter extends CandidateFilter<Executor, ExecutableFl
if (filterRepository.containsKey(filterName)){
this.registerFactorFilter(filterRepository.get(filterName));
} else {
- logger.error(String.format("failed to initialize executor filter as the filter implementation for requested factor '%s' doesn't exist.",
- filterName));
+ logger.error(String.format("failed to initialize executor filter "+
+ "as the filter implementation for requested factor '%s' doesn't exist.",
+ filterName));
throw new IllegalArgumentException("filterList");
}
}
@@ -83,12 +88,10 @@ public final class ExecutorFilter extends CandidateFilter<Executor, ExecutableFl
* function to register the static remaining flow size filter.
* NOTE : this is a static filter which means the filter will be filtering based on the system standard which is not
* Coming for the passed flow.
- * Ideally this filter will make sure only the executor has remaining
+ * Ideally this filter will make sure only the executor hasn't reached the Max allowed # of executing flows.
* */
private static FactorFilter<Executor, ExecutableFlow> getStaticRemainingFlowSizeFilter(){
return FactorFilter.create(STATICREMAININGFLOWSIZE_FILTER_NAME, new FactorFilter.Filter<Executor, ExecutableFlow>() {
-
- @Override
public boolean filterTarget(Executor filteringTarget, ExecutableFlow referencingObject) {
if (null == filteringTarget){
logger.info(String.format("%s : filtering out the target as it is null.", STATICREMAININGFLOWSIZE_FILTER_NAME));
@@ -107,6 +110,58 @@ public final class ExecutorFilter extends CandidateFilter<Executor, ExecutableFl
});
}
- // TO-DO
- // Add more Filter definitions .
+ /**
+ * function to register the static Minimum Reserved Memory filter.
+ * NOTE : this is a static filter which means the filter will be filtering based on the system standard which is not
+ * Coming for the passed flow.
+ * This filter will filter out any executors that has the remaining memory below 6G
+ * */
+ private static FactorFilter<Executor, ExecutableFlow> getMinimumReservedMemoryFilter(){
+ return FactorFilter.create(MINIMUMFREEMEMORY_FILTER_NAME, new FactorFilter.Filter<Executor, ExecutableFlow>() {
+ private static final int MINIMUM_FREE_MEMORY = 6 * 1024;
+ public boolean filterTarget(Executor filteringTarget, ExecutableFlow referencingObject) {
+ if (null == filteringTarget){
+ logger.info(String.format("%s : filtering out the target as it is null.", MINIMUMFREEMEMORY_FILTER_NAME));
+ return false;
+ }
+
+ Statistics stats = filteringTarget.getExecutorStats();
+ if (null == stats) {
+ logger.info(String.format("%s : filtering out %s as it's stats is unavailable.",
+ MINIMUMFREEMEMORY_FILTER_NAME,
+ filteringTarget.toString()));
+ return false;
+ }
+ return stats.getRemainingMemory() > MINIMUM_FREE_MEMORY ;
+ }
+ });
+ }
+
+
+ /**
+ * function to register the static Minimum Reserved Memory filter.
+ * NOTE : this is a static filter which means the filter will be filtering based on the system standard which is not
+ * Coming for the passed flow.
+ * This filter will filter out any executors that the current CPU usage exceed 95%
+ * */
+ private static FactorFilter<Executor, ExecutableFlow> getCpuStatusFilter(){
+ return FactorFilter.create(CPUSTATUS_FILTER_NAME, new FactorFilter.Filter<Executor, ExecutableFlow>() {
+ private static final int MAX_CPU_CURRENT_USAGE = 95;
+ public boolean filterTarget(Executor filteringTarget, ExecutableFlow referencingObject) {
+ if (null == filteringTarget){
+ logger.info(String.format("%s : filtering out the target as it is null.", CPUSTATUS_FILTER_NAME));
+ return false;
+ }
+
+ Statistics stats = filteringTarget.getExecutorStats();
+ if (null == stats) {
+ logger.info(String.format("%s : filtering out %s as it's stats is unavailable.",
+ MINIMUMFREEMEMORY_FILTER_NAME,
+ filteringTarget.toString()));
+ return false;
+ }
+ return stats.getCpuUsage() < MAX_CPU_CURRENT_USAGE ;
+ }
+ });
+ }
}
diff --git a/azkaban-common/src/main/java/azkaban/executor/Statistics.java b/azkaban-common/src/main/java/azkaban/executor/Statistics.java
index 7c80f88..ba85154 100644
--- a/azkaban-common/src/main/java/azkaban/executor/Statistics.java
+++ b/azkaban-common/src/main/java/azkaban/executor/Statistics.java
@@ -23,6 +23,7 @@ import java.util.Map;
private double remainingMemoryPercent;
private long remainingMemory;
private int remainingFlowCapacity;
+ private int numberOfAssignedFlows;
private Date lastDispatchedTime;
private long remainingStorage;
private double cpuUsage;
@@ -84,6 +85,14 @@ import java.util.Map;
this.priority = value;
}
+ public int getNumberOfAssignedFlows () {
+ return this.numberOfAssignedFlows;
+ }
+
+ public void setNumberOfAssignedFlows (int value) {
+ this.numberOfAssignedFlows = value;
+ }
+
public Statistics(){}
public Statistics (double remainingMemoryPercent,
@@ -92,7 +101,8 @@ import java.util.Map;
Date lastDispatched,
long remainingStorage,
double cpuUsage,
- int priority ){
+ int priority,
+ int numberOfAssignedFlows){
this.remainingMemory = remainingMemory;
this.cpuUsage = cpuUsage;
this.remainingFlowCapacity = remainingFlowCapacity;
@@ -100,6 +110,7 @@ import java.util.Map;
this.remainingMemoryPercent = remainingMemoryPercent;
this.remainingStorage = remainingStorage;
this.lastDispatchedTime = lastDispatched;
+ this.numberOfAssignedFlows = numberOfAssignedFlows;
}
@Override
@@ -116,6 +127,7 @@ import java.util.Map;
result &=this.priority == stat.priority;
result &=this.remainingMemoryPercent == stat.remainingMemoryPercent;
result &=this.remainingStorage == stat.remainingStorage;
+ result &=this.numberOfAssignedFlows == stat.numberOfAssignedFlows;
result &= null == this.lastDispatchedTime ? stat.lastDispatchedTime == null :
this.lastDispatchedTime.equals(stat.lastDispatchedTime);
return result;
@@ -123,7 +135,7 @@ import java.util.Map;
return false;
}
-
+
// really ugly to have it home-made here for object-binding as base on the
// current code base there is no any better ways to do that.
public static Statistics fromJsonObject(Map<String,Object> mapObj){
@@ -150,6 +162,11 @@ import java.util.Map;
stats.setPriority(Integer.parseInt(mapObj.get(priority).toString()));
}
+ final String numberOfAssignedFlows = "numberOfAssignedFlows";
+ if (mapObj.containsKey(numberOfAssignedFlows) && null != mapObj.get(numberOfAssignedFlows)){
+ stats.setNumberOfAssignedFlows(Integer.parseInt(mapObj.get(numberOfAssignedFlows).toString()));
+ }
+
final String remainingMemoryPercent = "remainingMemoryPercent";
if (mapObj.containsKey(remainingMemoryPercent) && null != mapObj.get(remainingMemoryPercent)){
stats.setRemainingMemoryPercent(Double.parseDouble(mapObj.get(remainingMemoryPercent).toString()));
diff --git a/azkaban-common/src/test/java/azkaban/executor/SelectorTest.java b/azkaban-common/src/test/java/azkaban/executor/SelectorTest.java
index 4177b70..97b460a 100644
--- a/azkaban-common/src/test/java/azkaban/executor/SelectorTest.java
+++ b/azkaban-common/src/test/java/azkaban/executor/SelectorTest.java
@@ -682,9 +682,9 @@ public class SelectorTest {
executorList.add(new Executor(2, "host2", 80, true));
executorList.add(new Executor(3, "host3", 80, true));
- executorList.get(0).setExecutorStats(new Statistics(99.9, 4095, 50, new Date(), 4095,99, 0));
- executorList.get(1).setExecutorStats(new Statistics(50, 4095, 50, new Date(), 4095,99, 0));
- executorList.get(2).setExecutorStats(new Statistics(99.9, 4095, 50, new Date(), 2048,99, 0));
+ executorList.get(0).setExecutorStats(new Statistics(99.9, 14095, 50, new Date(), 4095,89, 0, 0));
+ executorList.get(1).setExecutorStats(new Statistics(50, 14095, 50, new Date(), 4095,90, 0, 0));
+ executorList.get(2).setExecutorStats(new Statistics(99.9, 14095, 50, new Date(), 2048,90, 0, 0));
ExecutableFlow flow = new ExecutableFlow();
@@ -697,7 +697,7 @@ public class SelectorTest {
// simulate that once the flow is assigned, executor1's remaining TMP storage dropped to 2048
// now we do the getBest again executor3 is expected to be selected as it has a earlier last dispatched time.
- executorList.get(0).setExecutorStats(new Statistics(99.9, 4095, 50, new Date(), 2048,99, 0));
+ executorList.get(0).setExecutorStats(new Statistics(99.9, 4095, 50, new Date(), 2048,90, 0, 1));
executor = selector.getBest(executorList, flow);
Assert.assertEquals(executorList.get(2), executor);
}
diff --git a/azkaban-execserver/src/main/java/azkaban/execapp/StatisticsServlet.java b/azkaban-execserver/src/main/java/azkaban/execapp/StatisticsServlet.java
index 3e260b9..2298b88 100644
--- a/azkaban-execserver/src/main/java/azkaban/execapp/StatisticsServlet.java
+++ b/azkaban-execserver/src/main/java/azkaban/execapp/StatisticsServlet.java
@@ -38,6 +38,7 @@ public class StatisticsServlet extends HttpServlet {
private static final long serialVersionUID = 1L;
private static final int cacheTimeInMilliseconds = 1000;
private static final Logger logger = Logger.getLogger(StatisticsServlet.class);
+ private static final String noCacheParamName = "nocache";
protected static Date lastRefreshedTime = null;
protected static Statistics cachedstats = null;
@@ -51,9 +52,11 @@ public class StatisticsServlet extends HttpServlet {
protected void doGet(HttpServletRequest req, HttpServletResponse resp)
throws ServletException, IOException {
- if (null == lastRefreshedTime ||
+ boolean noCache = null!= req && Boolean.getBoolean(req.getParameter(noCacheParamName));
+
+ if (noCache || null == lastRefreshedTime ||
new Date().getTime() - lastRefreshedTime.getTime() > cacheTimeInMilliseconds){
- this.populateStatistics();
+ this.populateStatistics(noCache);
}
JSONUtils.toJSON(cachedstats, resp.getOutputStream(), true);
@@ -144,9 +147,9 @@ public class StatisticsServlet extends HttpServlet {
* call the data providers to fill the returning data container for statistics data.
* This function refreshes the static cached copy of data in case if necessary.
* */
- protected synchronized void populateStatistics(){
+ protected synchronized void populateStatistics(boolean noCache){
//check again before starting the work.
- if (null == lastRefreshedTime ||
+ if (noCache || null == lastRefreshedTime ||
new Date().getTime() - lastRefreshedTime.getTime() > cacheTimeInMilliseconds){
final Statistics stats = new Statistics();
@@ -191,9 +194,9 @@ public class StatisticsServlet extends HttpServlet {
AzkabanExecutorServer server = AzkabanExecutorServer.getApp();
if (server != null){
FlowRunnerManager runnerMgr = AzkabanExecutorServer.getApp().getFlowRunnerManager();
- stats.setRemainingFlowCapacity(runnerMgr.getMaxNumRunningFlows() -
- runnerMgr.getNumRunningFlows() -
- runnerMgr.getNumQueuedFlows());
+ int assignedFlows = runnerMgr.getNumRunningFlows() + runnerMgr.getNumQueuedFlows();
+ stats.setRemainingFlowCapacity(runnerMgr.getMaxNumRunningFlows() - assignedFlows);
+ stats.setNumberOfAssignedFlows(assignedFlows);
stats.setLastDispatchedTime(runnerMgr.getLastFlowSubmittedTime());
}else {
logger.error("failed to get data for remaining flow capacity or LastDispatchedTime" +
diff --git a/azkaban-execserver/src/test/java/azkaban/execapp/StatisticsServletTest.java b/azkaban-execserver/src/test/java/azkaban/execapp/StatisticsServletTest.java
index 85d38e6..faf6abb 100644
--- a/azkaban-execserver/src/test/java/azkaban/execapp/StatisticsServletTest.java
+++ b/azkaban-execserver/src/test/java/azkaban/execapp/StatisticsServletTest.java
@@ -24,7 +24,7 @@ public class StatisticsServletTest {
}
public void callPopulateStatistics(){
- this.populateStatistics();
+ this.populateStatistics(false);
}
public void callFillCpuUsage(Statistics stats){
@@ -81,7 +81,17 @@ public class StatisticsServletTest {
@Test
public void testStatisticsJsonParser() throws IOException {
- Statistics stat = new Statistics(0.1,1,2,new Date(),3,4,5);
+ Statistics stat = new Statistics(0.1,1,2,new Date(),3,4,5,5);
+ String jSonStr = JSONUtils.toJSON(stat);
+ @SuppressWarnings("unchecked")
+ Map<String,Object> jSonObj = (Map<String,Object>)JSONUtils.parseJSONFromString(jSonStr);
+ Statistics stat2 = Statistics.fromJsonObject(jSonObj);
+ Assert.assertTrue(stat.equals(stat2));
+ }
+
+ @Test
+ public void testStatisticsJsonParserWNullDateValue() throws IOException {
+ Statistics stat = new Statistics(0.1,1,2,null,3,4,5,5);
String jSonStr = JSONUtils.toJSON(stat);
@SuppressWarnings("unchecked")
Map<String,Object> jSonObj = (Map<String,Object>)JSONUtils.parseJSONFromString(jSonStr);