diff --git a/azkaban-common/src/main/java/azkaban/executor/selector/ExecutorComparator.java b/azkaban-common/src/main/java/azkaban/executor/selector/ExecutorComparator.java
index 978bcb9..1ea7ecb 100644
--- a/azkaban-common/src/main/java/azkaban/executor/selector/ExecutorComparator.java
+++ b/azkaban-common/src/main/java/azkaban/executor/selector/ExecutorComparator.java
@@ -52,23 +52,20 @@ public class ExecutorComparator extends CandidateComparator<Executor> {
* when a new comparator is added, please do remember to register it here.
* */
static {
- comparatorCreatorRepository = new HashMap<String, ComparatorCreator>();
+ comparatorCreatorRepository = new HashMap<>();
// register the creator for number of assigned flow comparator.
- comparatorCreatorRepository.put(NUMOFASSIGNEDFLOW_COMPARATOR_NAME, new ComparatorCreator(){
- public FactorComparator<Executor> create(int weight) { return getNumberOfAssignedFlowComparator(weight); }});
+ comparatorCreatorRepository.put(NUMOFASSIGNEDFLOW_COMPARATOR_NAME,
+ ExecutorComparator::getNumberOfAssignedFlowComparator);
// register the creator for memory comparator.
- comparatorCreatorRepository.put(MEMORY_COMPARATOR_NAME, new ComparatorCreator(){
- public FactorComparator<Executor> create(int weight) { return getMemoryComparator(weight); }});
+ comparatorCreatorRepository.put(MEMORY_COMPARATOR_NAME, ExecutorComparator::getMemoryComparator);
// register the creator for last dispatched time comparator.
- comparatorCreatorRepository.put(LSTDISPATCHED_COMPARATOR_NAME, new ComparatorCreator(){
- public FactorComparator<Executor> create(int weight) { return getLstDispatchedTimeComparator(weight); }});
+ comparatorCreatorRepository.put(LSTDISPATCHED_COMPARATOR_NAME, ExecutorComparator::getLstDispatchedTimeComparator);
// register the creator for CPU Usage comparator.
- comparatorCreatorRepository.put(CPUUSAGE_COMPARATOR_NAME, new ComparatorCreator(){
- public FactorComparator<Executor> create(int weight) { return getCpuUsageComparator(weight); }});
+ comparatorCreatorRepository.put(CPUUSAGE_COMPARATOR_NAME, ExecutorComparator::getCpuUsageComparator);
}
@@ -113,13 +110,10 @@ public class ExecutorComparator extends CandidateComparator<Executor> {
* @param stat1 the first statistics object to be checked .
* @param stat2 the second statistics object to be checked.
* @param caller the name of the calling function, for logging purpose.
- * @param result result Integer to pass out the result in case the statistics are not both valid.
* @return true if the passed statistics are NOT both valid, a shortcut can be made (caller can consume the result),
* false otherwise.
* */
- private static boolean statisticsObjectCheck(ExecutorInfo statisticsObj1,
- ExecutorInfo statisticsObj2, String caller, Integer result){
- result = 0 ;
+ private static boolean statisticsObjectCheck(ExecutorInfo statisticsObj1, ExecutorInfo statisticsObj2, String caller){
// both doesn't expose the info
if (null == statisticsObj1 && null == statisticsObj2){
logger.debug(String.format("%s : neither of the executors exposed statistics info.",
@@ -131,15 +125,13 @@ public class ExecutorComparator extends CandidateComparator<Executor> {
if (null == statisticsObj2 ){
logger.debug(String.format("%s : choosing left side and the right side executor doesn't expose statistics info",
caller));
- result = 1;
- return true;
+ return true;
}
//left side doesn't expose the info.
if (null == statisticsObj1 ){
logger.debug(String.format("%s : choosing right side and the left side executor doesn't expose statistics info",
caller));
- result = -1;
return true;
}
@@ -160,7 +152,7 @@ public class ExecutorComparator extends CandidateComparator<Executor> {
ExecutorInfo stat2 = o2.getExecutorInfo();
Integer result = 0;
- if (statisticsObjectCheck(stat1,stat2,NUMOFASSIGNEDFLOW_COMPARATOR_NAME,result)){
+ if (statisticsObjectCheck(stat1,stat2,NUMOFASSIGNEDFLOW_COMPARATOR_NAME)){
return result;
}
return ((Integer)stat1.getRemainingFlowCapacity()).compareTo(stat2.getRemainingFlowCapacity());
@@ -181,7 +173,7 @@ public class ExecutorComparator extends CandidateComparator<Executor> {
ExecutorInfo stat2 = o2.getExecutorInfo();
int result = 0;
- if (statisticsObjectCheck(stat1,stat2,CPUUSAGE_COMPARATOR_NAME,result)){
+ if (statisticsObjectCheck(stat1,stat2,CPUUSAGE_COMPARATOR_NAME)){
return result;
}
@@ -205,7 +197,7 @@ public class ExecutorComparator extends CandidateComparator<Executor> {
ExecutorInfo stat2 = o2.getExecutorInfo();
int result = 0;
- if (statisticsObjectCheck(stat1,stat2,LSTDISPATCHED_COMPARATOR_NAME,result)){
+ if (statisticsObjectCheck(stat1,stat2,LSTDISPATCHED_COMPARATOR_NAME)){
return result;
}
// Note: an earlier date time indicates higher weight.
@@ -232,7 +224,7 @@ public class ExecutorComparator extends CandidateComparator<Executor> {
ExecutorInfo stat2 = o2.getExecutorInfo();
int result = 0;
- if (statisticsObjectCheck(stat1,stat2,MEMORY_COMPARATOR_NAME,result)){
+ if (statisticsObjectCheck(stat1,stat2,MEMORY_COMPARATOR_NAME)){
return result;
}
diff --git a/azkaban-common/src/test/java/azkaban/executor/SelectorTest.java b/azkaban-common/src/test/java/azkaban/executor/SelectorTest.java
index c341445..ac8c65c 100644
--- a/azkaban-common/src/test/java/azkaban/executor/SelectorTest.java
+++ b/azkaban-common/src/test/java/azkaban/executor/SelectorTest.java
@@ -18,7 +18,6 @@ package azkaban.executor;
import java.util.ArrayList;
import java.util.Collections;
-import java.util.Comparator;
import java.util.Date;
import java.util.HashMap;
import java.util.List;
@@ -121,62 +120,54 @@ public class SelectorTest {
// for test purpose the registration is put in a separated method, in production the work should be done
// in the constructor.
public void registerFilterforTotalMemory(){
- this.registerFactorFilter(FactorFilter.create("requiredTotalMemory",
- new FactorFilter.Filter<MockExecutorObject,MockFlowObject>() {
- public boolean filterTarget(MockExecutorObject itemToCheck, MockFlowObject sourceObject) {
- // REAL LOGIC COMES HERE -
- if (null == itemToCheck || null == sourceObject){
- return false;
- }
-
- // Box has infinite memory.:)
- if (itemToCheck.percentOfRemainingMemory == 0) {
- return true;
- }
-
- // calculate the memory and return.
- return itemToCheck.amountOfRemainingMemory / itemToCheck.percentOfRemainingMemory * 100 >
- sourceObject.requiredTotalMemory;
- }}));
+ this.registerFactorFilter(FactorFilter.create("requiredTotalMemory", (itemToCheck, sourceObject) -> {
+ // REAL LOGIC COMES HERE -
+ if (null == itemToCheck || null == sourceObject){
+ return false;
+ }
+
+ // Box has infinite memory.:)
+ if (itemToCheck.percentOfRemainingMemory == 0) {
+ return true;
+ }
+
+ // calculate the memory and return.
+ return itemToCheck.amountOfRemainingMemory / itemToCheck.percentOfRemainingMemory * 100 >
+ sourceObject.requiredTotalMemory;
+ }));
}
public void registerFilterforRemainingMemory(){
- this.registerFactorFilter(FactorFilter.create("requiredRemainingMemory",
- new FactorFilter.Filter<MockExecutorObject,MockFlowObject>() {
- public boolean filterTarget(MockExecutorObject itemToCheck, MockFlowObject sourceObject) {
- // REAL LOGIC COMES HERE -
- if (null == itemToCheck || null == sourceObject){
- return false;
- }
- return itemToCheck.amountOfRemainingMemory > sourceObject.requiredRemainingMemory;
- }}));
+ this.registerFactorFilter(FactorFilter.create("requiredRemainingMemory", (itemToCheck, sourceObject) -> {
+ // REAL LOGIC COMES HERE -
+ if (null == itemToCheck || null == sourceObject){
+ return false;
+ }
+ return itemToCheck.amountOfRemainingMemory > sourceObject.requiredRemainingMemory;
+ }));
}
public void registerFilterforPriority(){
- this.registerFactorFilter(FactorFilter.create("requiredProprity",
- new FactorFilter.Filter<MockExecutorObject,MockFlowObject>() {
- public boolean filterTarget(MockExecutorObject itemToCheck, MockFlowObject sourceObject) {
- // REAL LOGIC COMES HERE -
- if (null == itemToCheck || null == sourceObject){
- return false;
- }
-
- // priority value, the bigger the lower.
- return itemToCheck.priority >= sourceObject.priority;
- }}));
+ this.registerFactorFilter(FactorFilter.create("requiredProprity", (itemToCheck, sourceObject) -> {
+ // REAL LOGIC COMES HERE -
+ if (null == itemToCheck || null == sourceObject){
+ return false;
+ }
+
+ // priority value, the bigger the lower.
+ return itemToCheck.priority >= sourceObject.priority;
+ }));
}
public void registerFilterforRemainingTmpSpace(){
- this.registerFactorFilter(FactorFilter.create("requiredRemainingTmpSpace",
- new FactorFilter.Filter<MockExecutorObject,MockFlowObject>() {
- public boolean filterTarget(MockExecutorObject itemToCheck, MockFlowObject sourceObject) {
- // REAL LOGIC COMES HERE -
- if (null == itemToCheck || null == sourceObject){
- return false;
- }
-
- return itemToCheck.remainingTmp > sourceObject.requiredRemainingTmpSpace;
- }}));
+ this.registerFactorFilter(FactorFilter.create("requiredRemainingTmpSpace", (itemToCheck, sourceObject) -> {
+ // REAL LOGIC COMES HERE -
+ if (null == itemToCheck || null == sourceObject){
+ return false;
+ }
+
+ return itemToCheck.remainingTmp > sourceObject.requiredRemainingTmpSpace;
+ }));
}
}
@@ -201,45 +192,42 @@ public class SelectorTest {
}
public void registerComparerForMemory(int weight){
- this.registerFactorComparator(FactorComparator.create("Memory", weight, new Comparator<MockExecutorObject>(){
- public int compare(MockExecutorObject o1, MockExecutorObject o2) {
- int result = 0 ;
+ this.registerFactorComparator(FactorComparator.create("Memory", weight, (o1, o2) -> {
+ int result = 0 ;
- // check remaining amount of memory.
- result = o1.amountOfRemainingMemory - o2.amountOfRemainingMemory;
- if (result != 0){
- return result > 0 ? 1 : -1;
- }
+ // check remaining amount of memory.
+ result = o1.amountOfRemainingMemory - o2.amountOfRemainingMemory;
+ if (result != 0){
+ return result > 0 ? 1 : -1;
+ }
- // check remaining % .
- result = (int)(o1.percentOfRemainingMemory - o2.percentOfRemainingMemory);
- return result == 0 ? 0 : result > 0 ? 1 : -1;
+ // check remaining % .
+ result = (int)(o1.percentOfRemainingMemory - o2.percentOfRemainingMemory);
+ return result == 0 ? 0 : result > 0 ? 1 : -1;
- } }));
+ }));
}
public void registerComparerForRemainingSpace(int weight){
- this.registerFactorComparator(FactorComparator.create("RemainingTmp", weight, new Comparator<MockExecutorObject>(){
- public int compare(MockExecutorObject o1, MockExecutorObject o2) {
- int result = 0 ;
+ this.registerFactorComparator(FactorComparator.create("RemainingTmp", weight, (o1, o2) -> {
+ int result = 0 ;
- // check remaining % .
- result = (int)(o1.remainingTmp - o2.remainingTmp);
- return result == 0 ? 0 : result > 0 ? 1 : -1;
+ // check remaining % .
+ result = (int)(o1.remainingTmp - o2.remainingTmp);
+ return result == 0 ? 0 : result > 0 ? 1 : -1;
- } }));
+ }));
}
public void registerComparerForPriority(int weight){
- this.registerFactorComparator(FactorComparator.create("Priority", weight, new Comparator<MockExecutorObject>(){
- public int compare(MockExecutorObject o1, MockExecutorObject o2) {
- int result = 0 ;
+ this.registerFactorComparator(FactorComparator.create("Priority", weight, (o1, o2) -> {
+ int result = 0 ;
- // check priority, bigger the better.
- result = (int)(o1.priority - o2.priority);
- return result == 0 ? 0 : result > 0 ? 1 : -1;
+ // check priority, bigger the better.
+ result = (int)(o1.priority - o2.priority);
+ return result == 0 ? 0 : result > 0 ? 1 : -1;
- } }));
+ }));
}
}
@@ -416,8 +404,7 @@ public class SelectorTest {
comparator.registerComparerForPriority(5);
comparator.registerComparerForRemainingSpace(3);
- CandidateSelector<MockExecutorObject,MockFlowObject> morkSelector =
- new CandidateSelector<MockExecutorObject,MockFlowObject>(filter,comparator);
+ CandidateSelector<MockExecutorObject,MockFlowObject> morkSelector = new CandidateSelector<>(filter, comparator);
// mock object, remaining memory 11500, total memory 3095, remainingTmpSpace 4200, priority 2.
MockFlowObject dispatchingObj = new MockFlowObject("flow1",3096, 1500,4200,2);
@@ -447,10 +434,9 @@ public class SelectorTest {
comparator.registerComparerForPriority(4);
comparator.registerComparerForRemainingSpace(1);
- CandidateSelector<MockExecutorObject,MockFlowObject> morkSelector =
- new CandidateSelector<MockExecutorObject,MockFlowObject>(filter,comparator);
+ CandidateSelector<MockExecutorObject,MockFlowObject> morkSelector = new CandidateSelector<>(filter, comparator);
- ArrayList<MockExecutorObject> signleExecutorList = new ArrayList<MockExecutorObject>();
+ ArrayList<MockExecutorObject> signleExecutorList = new ArrayList<>();
MockExecutorObject signleExecutor = new MockExecutorObject("ExecutorX",8080,50.0,2048,3,new Date(), 20, 6400);
signleExecutorList.add(signleExecutor);
@@ -479,10 +465,9 @@ public class SelectorTest {
comparator.registerComparerForPriority(4);
comparator.registerComparerForRemainingSpace(1);
- CandidateSelector<MockExecutorObject,MockFlowObject> morkSelector =
- new CandidateSelector<MockExecutorObject,MockFlowObject>(filter,comparator);
+ CandidateSelector<MockExecutorObject,MockFlowObject> morkSelector = new CandidateSelector<>(filter, comparator);
- ArrayList<MockExecutorObject> list = new ArrayList<MockExecutorObject>();
+ ArrayList<MockExecutorObject> list = new ArrayList<>();
MockExecutorObject signleExecutor = new MockExecutorObject("ExecutorX",8080,50.0,2048,3,new Date(), 20, 6400);
list.add(signleExecutor);
list.add(signleExecutor);
@@ -505,15 +490,14 @@ public class SelectorTest {
comparator.registerComparerForPriority(4);
comparator.registerComparerForRemainingSpace(1);
- CandidateSelector<MockExecutorObject,MockFlowObject> morkSelector =
- new CandidateSelector<MockExecutorObject,MockFlowObject>(filter,comparator);
+ CandidateSelector<MockExecutorObject,MockFlowObject> morkSelector = new CandidateSelector<>(filter, comparator);
// note - as the tieBreaker set in the MockComparator uses the name value of the executor to do the
// final diff therefore we need to set the name differently to make a meaningful test, in real
// scenario we may want to use something else (say hash code) to be the bottom line for the tieBreaker
// to make a final decision, the purpose of the test here is to prove that for two candidates with
// exact value (in the case of test, all values except for the name) the decision result is stable.
- ArrayList<MockExecutorObject> list = new ArrayList<MockExecutorObject>();
+ ArrayList<MockExecutorObject> list = new ArrayList<>();
MockExecutorObject executor1 = new MockExecutorObject("ExecutorX", 8080,50.0,2048,3,new Date(), 20, 6400);
MockExecutorObject executor2 = new MockExecutorObject("ExecutorX2",8080,50.0,2048,3,new Date(), 20, 6400);
list.add(executor1);
@@ -543,10 +527,9 @@ public class SelectorTest {
comparator.registerComparerForPriority(4);
comparator.registerComparerForRemainingSpace(1);
- CandidateSelector<MockExecutorObject,MockFlowObject> morkSelector =
- new CandidateSelector<MockExecutorObject,MockFlowObject>(filter,comparator);
+ CandidateSelector<MockExecutorObject,MockFlowObject> morkSelector = new CandidateSelector<>(filter, comparator);
- ArrayList<MockExecutorObject> list = new ArrayList<MockExecutorObject>();
+ ArrayList<MockExecutorObject> list = new ArrayList<>();
MockFlowObject dispatchingObj = new MockFlowObject("flow1",100, 100,100,5);
@@ -580,10 +563,9 @@ public class SelectorTest {
comparator.registerComparerForPriority(4);
comparator.registerComparerForRemainingSpace(1);
- CandidateSelector<MockExecutorObject,MockFlowObject> morkSelector =
- new CandidateSelector<MockExecutorObject,MockFlowObject>(null,comparator);
+ CandidateSelector<MockExecutorObject,MockFlowObject> morkSelector = new CandidateSelector<>(null, comparator);
- ArrayList<MockExecutorObject> list = new ArrayList<MockExecutorObject>();
+ ArrayList<MockExecutorObject> list = new ArrayList<>();
MockExecutorObject executor1 = new MockExecutorObject("ExecutorX", 8080,50.0,2048,3,new Date(), 20, 6400);
MockExecutorObject executor2 = new MockExecutorObject("ExecutorX2",8080,50.0,2048,3,new Date(), 20, 6400);
list.add(executor1);
@@ -614,7 +596,7 @@ public class SelectorTest {
@Test
public void testCreatingExectorfilterObject() throws Exception{
- List<String> validList = new ArrayList<String>(ExecutorFilter.getAvailableFilterNames());
+ List<String> validList = new ArrayList<>(ExecutorFilter.getAvailableFilterNames());
try {
new ExecutorFilter(validList);
}catch (Exception ex){
@@ -624,7 +606,7 @@ public class SelectorTest {
@Test
public void testCreatingExectorfilterObjectWInvalidList() throws Exception{
- List<String> invalidList = new ArrayList<String>();
+ List<String> invalidList = new ArrayList<>();
invalidList.add("notExistingFilter");
Exception result = null;
try {
@@ -638,7 +620,7 @@ public class SelectorTest {
@Test
public void testCreatingExectorComparatorObject() throws Exception{
- Map<String,Integer> comparatorMap = new HashMap<String,Integer>();
+ Map<String,Integer> comparatorMap = new HashMap<>();
for (String name : ExecutorComparator.getAvailableComparatorNames()){
comparatorMap.put(name, 1);
}
@@ -651,7 +633,7 @@ public class SelectorTest {
@Test
public void testCreatingExectorComparatorObjectWInvalidName() throws Exception{
- Map<String,Integer> comparatorMap = new HashMap<String,Integer>();
+ Map<String,Integer> comparatorMap = new HashMap<>();
comparatorMap.put("invalidName", 0);
Exception result = null;
try {
@@ -665,7 +647,7 @@ public class SelectorTest {
@Test
public void testCreatingExectorComparatorObjectWInvalidWeight() throws Exception{
- Map<String,Integer> comparatorMap = new HashMap<String,Integer>();
+ Map<String,Integer> comparatorMap = new HashMap<>();
for (String name : ExecutorComparator.getAvailableComparatorNames()){
comparatorMap.put(name, -1);
}
@@ -681,7 +663,7 @@ public class SelectorTest {
@Test
public void testCreatingExecutorSelectorWithEmptyFilterComparatorList() throws Exception{
- List<Executor> executorList = new ArrayList<Executor>();
+ List<Executor> executorList = new ArrayList<>();
executorList.add(new Executor(1, "host1", 80, true));
executorList.add(new Executor(2, "host2", 80, true));
executorList.add(new Executor(3, "host3", 80, true));
@@ -700,9 +682,10 @@ public class SelectorTest {
@Test
public void testExecutorSelectorE2E() throws Exception{
- List<String> filterList = new ArrayList<String>(ExecutorFilter.getAvailableFilterNames());
- Map<String,Integer> comparatorMap = new HashMap<String,Integer>();
- List<Executor> executorList = new ArrayList<Executor>();
+ List<String> filterList = new ArrayList<>(ExecutorFilter.getAvailableFilterNames());
+ Map<String,Integer> comparatorMap;
+ comparatorMap = new HashMap<>();
+ List<Executor> executorList = new ArrayList<>();
executorList.add(new Executor(1, "host1", 80, true));
executorList.add(new Executor(2, "host2", 80, true));
executorList.add(new Executor(3, "host3", 80, true));