diff --git a/azkaban-common/src/test/java/azkaban/executor/DispatcherTest.java b/azkaban-common/src/test/java/azkaban/executor/DispatcherTest.java
index dc1af73..59ebb72 100644
--- a/azkaban-common/src/test/java/azkaban/executor/DispatcherTest.java
+++ b/azkaban-common/src/test/java/azkaban/executor/DispatcherTest.java
@@ -20,10 +20,12 @@ import java.util.ArrayList;
import java.util.Collections;
import java.util.Comparator;
import java.util.Date;
+
import org.apache.log4j.BasicConfigurator;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
+import org.junit.BeforeClass;
import org.junit.Test;
import azkaban.executor.dispatcher.*;
@@ -112,7 +114,7 @@ public class DispatcherTest {
public void registerFilterforTotalMemory(){
this.registerFactorFilter(FactorFilter.create("requiredTotalMemory",
new FactorFilter.Filter<MockExecutorObject,MockFlowObject>() {
- public boolean analyzeTarget(MockExecutorObject itemToCheck, MockFlowObject sourceObject) {
+ public boolean filterTarget(MockExecutorObject itemToCheck, MockFlowObject sourceObject) {
// REAL LOGIC COMES HERE -
if (null == itemToCheck || null == sourceObject){
return false;
@@ -124,14 +126,15 @@ public class DispatcherTest {
}
// calculate the memory and return.
- return itemToCheck.amountOfRemainingMemory / itemToCheck.percentOfRemainingMemory * 100 > sourceObject.requiredTotalMemory;
+ return itemToCheck.amountOfRemainingMemory / itemToCheck.percentOfRemainingMemory * 100 >
+ sourceObject.requiredTotalMemory;
}}));
}
public void registerFilterforRemainingMemory(){
this.registerFactorFilter(FactorFilter.create("requiredRemainingMemory",
new FactorFilter.Filter<MockExecutorObject,MockFlowObject>() {
- public boolean analyzeTarget(MockExecutorObject itemToCheck, MockFlowObject sourceObject) {
+ public boolean filterTarget(MockExecutorObject itemToCheck, MockFlowObject sourceObject) {
// REAL LOGIC COMES HERE -
if (null == itemToCheck || null == sourceObject){
return false;
@@ -143,7 +146,7 @@ public class DispatcherTest {
public void registerFilterforPriority(){
this.registerFactorFilter(FactorFilter.create("requiredProprity",
new FactorFilter.Filter<MockExecutorObject,MockFlowObject>() {
- public boolean analyzeTarget(MockExecutorObject itemToCheck, MockFlowObject sourceObject) {
+ public boolean filterTarget(MockExecutorObject itemToCheck, MockFlowObject sourceObject) {
// REAL LOGIC COMES HERE -
if (null == itemToCheck || null == sourceObject){
return false;
@@ -157,7 +160,7 @@ public class DispatcherTest {
public void registerFilterforRemainingTmpSpace(){
this.registerFactorFilter(FactorFilter.create("requiredRemainingTmpSpace",
new FactorFilter.Filter<MockExecutorObject,MockFlowObject>() {
- public boolean analyzeTarget(MockExecutorObject itemToCheck, MockFlowObject sourceObject) {
+ public boolean filterTarget(MockExecutorObject itemToCheck, MockFlowObject sourceObject) {
// REAL LOGIC COMES HERE -
if (null == itemToCheck || null == sourceObject){
return false;
@@ -180,6 +183,8 @@ public class DispatcherTest {
@Override
protected boolean tieBreak(MockExecutorObject object1, MockExecutorObject object2){
+ if (null == object2) return true;
+ if (null == object1) return false;
return object1.name.compareTo(object2.name) >= 0;
}
@@ -232,10 +237,12 @@ public class DispatcherTest {
// test samples.
protected ArrayList<MockExecutorObject> executorList = new ArrayList<MockExecutorObject>();
- @Before
- public void setUp() throws Exception {
+ @BeforeClass public static void onlyOnce() {
BasicConfigurator.configure();
+ }
+ @Before
+ public void setUp() throws Exception {
executorList.clear();
executorList.add(new MockExecutorObject("Executor1",8080,50.0,2048,5,new Date(), 20, 6400));
executorList.add(new MockExecutorObject("Executor2",8080,50.0,2048,4,new Date(), 20, 6400));
@@ -280,11 +287,11 @@ public class DispatcherTest {
mFilter.registerFilterforRemainingMemory();
// expect true.
- boolean result = mFilter.analyzeTarget(this.getExecutorByName("Executor1"), dispatchingObj);
+ boolean result = mFilter.filterTarget(this.getExecutorByName("Executor1"), dispatchingObj);
Assert.assertTrue(result);
//expect true.
- result = mFilter.analyzeTarget(this.getExecutorByName("Executor3"), dispatchingObj);
+ result = mFilter.filterTarget(this.getExecutorByName("Executor3"), dispatchingObj);
/*
1 [main] INFO azkaban.executor.dispatcher.CandidateFilter - start checking 'Executor3' with factor filter for 'Mockfilter'
2 [main] INFO azkaban.executor.dispatcher.CandidateFilter - [Factor: requiredRemainingMemory] filter result : true
@@ -294,7 +301,7 @@ public class DispatcherTest {
// add the priority filter.
mFilter.registerFilterforPriority();
- result = mFilter.analyzeTarget(this.getExecutorByName("Executor3"), dispatchingObj);
+ result = mFilter.filterTarget(this.getExecutorByName("Executor3"), dispatchingObj);
// expect false, for priority.
/*
2 [main] INFO azkaban.executor.dispatcher.CandidateFilter - start checking 'Executor3' with factor filter for 'Mockfilter'
@@ -308,7 +315,7 @@ public class DispatcherTest {
mFilter.registerFilterforRemainingTmpSpace();
// expect pass.
- result = mFilter.analyzeTarget(this.getExecutorByName("Executor2"), dispatchingObj);
+ result = mFilter.filterTarget(this.getExecutorByName("Executor2"), dispatchingObj);
/*
3 [main] INFO azkaban.executor.dispatcher.CandidateFilter - start checking 'Executor2' with factor filter for 'Mockfilter'
3 [main] INFO azkaban.executor.dispatcher.CandidateFilter - [Factor: requiredRemainingMemory] filter result : true
@@ -319,7 +326,7 @@ public class DispatcherTest {
Assert.assertTrue(result);
// expect false, remaining tmp, priority will also fail but the logic shortcuts when the Tmp size check Fails.
- result = mFilter.analyzeTarget(this.getExecutorByName("Executor8"), dispatchingObj);
+ result = mFilter.filterTarget(this.getExecutorByName("Executor8"), dispatchingObj);
/*
4 [main] INFO azkaban.executor.dispatcher.CandidateFilter - start checking 'Executor8' with factor filter for 'Mockfilter'
4 [main] INFO azkaban.executor.dispatcher.CandidateFilter - [Factor: requiredRemainingMemory] filter result : true
@@ -331,6 +338,33 @@ public class DispatcherTest {
}
@Test
+ public void testExecutorFilterWithNullInputs() throws Exception {
+ MockFilter filter = new MockFilter();
+ filter.registerFilterforPriority();
+ filter.registerFilterforRemainingMemory();
+ filter.registerFilterforRemainingTmpSpace();
+ filter.registerFilterforTotalMemory();
+ boolean result = false;
+ try {
+ result = filter.filterTarget(this.getExecutorByName("Executor1"), null);
+ } catch (Exception ex){
+ Assert.fail("no exception should be thrown when null value is passed to the filter.");
+ }
+ // note : the FactorFilter logic will decide whether true or false should be returned when null value
+ // is passed, for the Mock class it returns false.
+ Assert.assertFalse(result);
+
+ try {
+ result = filter.filterTarget(null, null);
+ } catch (Exception ex){
+ Assert.fail("no exception should be thrown when null value is passed to the filter.");
+ }
+ // note : the FactorFilter logic will decide whether true or false should be returned when null value
+ // is passed, for the Mock class it returns false.
+ Assert.assertFalse(result);
+ }
+
+ @Test
public void testExecutorComparer() throws Exception {
MockComparator comparator = new MockComparator();
comparator.registerComparerForMemory(5);
@@ -354,6 +388,12 @@ public class DispatcherTest {
}
@Test
+ public void testExecutorComparerResisterComparerWInvalidWeight() throws Exception {
+ MockComparator comparator = new MockComparator();
+ comparator.registerComparerForMemory(0);
+ }
+
+ @Test
public void testDispatcher() throws Exception {
MockFilter filter = new MockFilter();
MockComparator comparator = new MockComparator();
@@ -374,13 +414,13 @@ public class DispatcherTest {
MockFlowObject dispatchingObj = new MockFlowObject("flow1",3096, 1500,4200,2);
// expected selection = #12
- MockExecutorObject nextExecutor = morkDispatcher.getNext(this.executorList, dispatchingObj);
+ MockExecutorObject nextExecutor = morkDispatcher.getBest(this.executorList, dispatchingObj);
Assert.assertEquals(this.getExecutorByName("Executor1"),nextExecutor);
// remaining memory 11500, total memory 3095, remainingTmpSpace 14200, priority 2.
dispatchingObj = new MockFlowObject("flow1",3096, 1500,14200,2);
// all candidates should be filtered by the remaining memory.
- nextExecutor = morkDispatcher.getNext(this.executorList, dispatchingObj);
+ nextExecutor = morkDispatcher.getBest(this.executorList, dispatchingObj);
Assert.assertEquals(null,nextExecutor);
}
@@ -402,18 +442,197 @@ public class DispatcherTest {
new ExecutorDispatcher<MockExecutorObject,MockFlowObject>(filter,comparator);
MockFlowObject dispatchingObj = new MockFlowObject("flow1",100, 100,100,3);
- MockExecutorObject executor = morkDispatcher.getNext(this.executorList, dispatchingObj);
+ MockExecutorObject executor = morkDispatcher.getBest(this.executorList, dispatchingObj);
Assert.assertEquals(this.getExecutorByName("Executor9"), executor);
// adjusted the weight for memory to 10, therefore item #11 should be returned.
morkDispatcher.getComparator().adjustFactorWeight("Memory", 10);
- executor = morkDispatcher.getNext(this.executorList, dispatchingObj);
+ executor = morkDispatcher.getBest(this.executorList, dispatchingObj);
Assert.assertEquals(this.getExecutorByName("Executor11"), executor);
// adjusted the weight for memory back to 1, therefore item #12 should be returned.
morkDispatcher.getComparator().adjustFactorWeight("Memory", 1);
- executor = morkDispatcher.getNext(this.executorList, dispatchingObj);
+ executor = morkDispatcher.getBest(this.executorList, dispatchingObj);
Assert.assertEquals(this.getExecutorByName("Executor9"), executor);
}
+
+ @Test
+ public void testDispatchersignleCandidate() throws Exception {
+ MockFilter filter = new MockFilter();
+ MockComparator comparator = new MockComparator();
+
+ filter.registerFilterforPriority();
+ filter.registerFilterforRemainingMemory();
+ filter.registerFilterforRemainingTmpSpace();
+ filter.registerFilterforTotalMemory();
+
+ comparator.registerComparerForMemory(3);
+ comparator.registerComparerForPriority(4);
+ comparator.registerComparerForRemainingSpace(1);
+
+ ExecutorDispatcher<MockExecutorObject,MockFlowObject> morkDispatcher =
+ new ExecutorDispatcher<MockExecutorObject,MockFlowObject>(filter,comparator);
+
+ ArrayList<MockExecutorObject> signleExecutorList = new ArrayList<MockExecutorObject>();
+ MockExecutorObject signleExecutor = new MockExecutorObject("ExecutorX",8080,50.0,2048,3,new Date(), 20, 6400);
+ signleExecutorList.add(signleExecutor);
+
+ MockFlowObject dispatchingObj = new MockFlowObject("flow1",100, 100,100,5);
+ MockExecutorObject executor = morkDispatcher.getBest(signleExecutorList, dispatchingObj);
+ // expected to see null result, as the only executor is filtered out .
+ Assert.assertTrue(null == executor);
+
+ // adjust the priority to let the executor pass the filter.
+ dispatchingObj.priority = 3;
+ executor = morkDispatcher.getBest(signleExecutorList, dispatchingObj);
+ Assert.assertEquals(signleExecutor, executor);
+ }
+
+ @Test
+ public void testDispatcherListWithItemsThatAreReferenceEqual() throws Exception {
+ MockFilter filter = new MockFilter();
+ MockComparator comparator = new MockComparator();
+
+ filter.registerFilterforPriority();
+ filter.registerFilterforRemainingMemory();
+ filter.registerFilterforRemainingTmpSpace();
+ filter.registerFilterforTotalMemory();
+
+ comparator.registerComparerForMemory(3);
+ comparator.registerComparerForPriority(4);
+ comparator.registerComparerForRemainingSpace(1);
+
+ ExecutorDispatcher<MockExecutorObject,MockFlowObject> morkDispatcher =
+ new ExecutorDispatcher<MockExecutorObject,MockFlowObject>(filter,comparator);
+
+ ArrayList<MockExecutorObject> list = new ArrayList<MockExecutorObject>();
+ MockExecutorObject signleExecutor = new MockExecutorObject("ExecutorX",8080,50.0,2048,3,new Date(), 20, 6400);
+ list.add(signleExecutor);
+ list.add(signleExecutor);
+ MockFlowObject dispatchingObj = new MockFlowObject("flow1",100, 100,100,3);
+ MockExecutorObject executor = morkDispatcher.getBest(list, dispatchingObj);
+ Assert.assertTrue(signleExecutor == executor);
+ }
+
+ @Test
+ public void testDispatcherListWithItemsThatAreEqualInValue() throws Exception {
+ MockFilter filter = new MockFilter();
+ MockComparator comparator = new MockComparator();
+
+ filter.registerFilterforPriority();
+ filter.registerFilterforRemainingMemory();
+ filter.registerFilterforRemainingTmpSpace();
+ filter.registerFilterforTotalMemory();
+
+ comparator.registerComparerForMemory(3);
+ comparator.registerComparerForPriority(4);
+ comparator.registerComparerForRemainingSpace(1);
+
+ ExecutorDispatcher<MockExecutorObject,MockFlowObject> morkDispatcher =
+ new ExecutorDispatcher<MockExecutorObject,MockFlowObject>(filter,comparator);
+
+ // note - as the tieBreaker set in the MockComparator uses the name value of the executor to do the
+ // final diff therefore we need to set the name differently to make a meaningful test, in real
+ // scenario we may want to use something else (say hash code) to be the bottom line for the tieBreaker
+ // to make a final decision, the purpose of the test here is to prove that for two candidates with
+ // exact value (in the case of test, all values except for the name) the decision result is stable.
+ ArrayList<MockExecutorObject> list = new ArrayList<MockExecutorObject>();
+ MockExecutorObject executor1 = new MockExecutorObject("ExecutorX", 8080,50.0,2048,3,new Date(), 20, 6400);
+ MockExecutorObject executor2 = new MockExecutorObject("ExecutorX2",8080,50.0,2048,3,new Date(), 20, 6400);
+ list.add(executor1);
+ list.add(executor2);
+ MockFlowObject dispatchingObj = new MockFlowObject("flow1",100, 100,100,3);
+ MockExecutorObject executor = morkDispatcher.getBest(list, dispatchingObj);
+ Assert.assertTrue(executor2 == executor);
+
+ // shuffle and test again.
+ list.remove(0);
+ list.add(executor1);
+ executor = morkDispatcher.getBest(list, dispatchingObj);
+ Assert.assertTrue(executor2 == executor);
+ }
+
+ @Test
+ public void testDispatcherEmptyList() throws Exception {
+ MockFilter filter = new MockFilter();
+ MockComparator comparator = new MockComparator();
+
+ filter.registerFilterforPriority();
+ filter.registerFilterforRemainingMemory();
+ filter.registerFilterforRemainingTmpSpace();
+ filter.registerFilterforTotalMemory();
+
+ comparator.registerComparerForMemory(3);
+ comparator.registerComparerForPriority(4);
+ comparator.registerComparerForRemainingSpace(1);
+
+ ExecutorDispatcher<MockExecutorObject,MockFlowObject> morkDispatcher =
+ new ExecutorDispatcher<MockExecutorObject,MockFlowObject>(filter,comparator);
+
+ ArrayList<MockExecutorObject> list = new ArrayList<MockExecutorObject>();
+
+ MockFlowObject dispatchingObj = new MockFlowObject("flow1",100, 100,100,5);
+
+ MockExecutorObject executor = null;
+
+ try {
+ executor = morkDispatcher.getBest(list, dispatchingObj);
+ } catch (Exception ex){
+ Assert.fail("no exception should be thrown when an empty list is passed to the dispatcher.");
+ }
+
+ // expected to see null result.
+ Assert.assertTrue(null == executor);
+
+ try {
+ executor = morkDispatcher.getBest(list, dispatchingObj);
+ } catch (Exception ex){
+ Assert.fail("no exception should be thrown when null is passed to the dispatcher as the candidate list.");
+ }
+
+ // expected to see null result, as the only executor is filtered out .
+ Assert.assertTrue(null == executor);
+
+ }
+
+ @Test
+ public void testDispatcherListWithNullValue() throws Exception {
+ MockComparator comparator = new MockComparator();
+
+ comparator.registerComparerForMemory(3);
+ comparator.registerComparerForPriority(4);
+ comparator.registerComparerForRemainingSpace(1);
+
+ ExecutorDispatcher<MockExecutorObject,MockFlowObject> morkDispatcher =
+ new ExecutorDispatcher<MockExecutorObject,MockFlowObject>(null,comparator);
+
+ ArrayList<MockExecutorObject> list = new ArrayList<MockExecutorObject>();
+ MockExecutorObject executor1 = new MockExecutorObject("ExecutorX", 8080,50.0,2048,3,new Date(), 20, 6400);
+ MockExecutorObject executor2 = new MockExecutorObject("ExecutorX2",8080,50.0,2048,3,new Date(), 20, 6400);
+ list.add(executor1);
+ list.add(executor2);
+ list.add(null);
+
+ MockFlowObject dispatchingObj = new MockFlowObject("flow1",100, 100,100,3);
+ MockExecutorObject executor = null;
+ try {
+ executor = morkDispatcher.getBest(list, dispatchingObj);
+ } catch (Exception ex){
+ Assert.fail("no exception should be thrown when an List contains null value.");
+ }
+ Assert.assertTrue(executor2 == executor);
+
+ // try to compare null vs null, no exception is expected.
+ list.clear();
+ list.add(null);
+ list.add(null);
+ try {
+ executor = morkDispatcher.getBest(list, dispatchingObj);
+ } catch (Exception ex){
+ Assert.fail("no exception should be thrown when an List contains multiple null values.");
+ }
+ Assert.assertTrue(null == executor);
+
+ }
}