azkaban-aplcache

Details

diff --git a/azkaban-common/src/main/java/azkaban/executor/dispatcher/CandidateComparator.java b/azkaban-common/src/main/java/azkaban/executor/dispatcher/CandidateComparator.java
index 9239bf5..6bf1620 100644
--- a/azkaban-common/src/main/java/azkaban/executor/dispatcher/CandidateComparator.java
+++ b/azkaban-common/src/main/java/azkaban/executor/dispatcher/CandidateComparator.java
@@ -46,6 +46,8 @@ public abstract class CandidateComparator<T> implements Comparator<T> {
    *  both sides. the tieBreak method will try best to make sure a stable result is returned.
    * */
   protected boolean tieBreak(T object1, T object2){
+    if (null == object2) return true;
+    if (null == object1) return false;
     return object1.hashCode() >= object2.hashCode();
   }
 
@@ -128,43 +130,40 @@ public abstract class CandidateComparator<T> implements Comparator<T> {
         object2 == null ? "(null)" : object2.toString(),
         this.getTotalWeight()));
 
+    int result1 = 0 ;
+    int result2 = 0 ;
+
     // short cut if object equals.
-    // Note if:  objects are reference equals, there is no need to call
-    //           TieBreaker as returning either side actually doesn't matter,
-    //           because they are the same.
     if (object1 ==  object2){
-      logger.info("Result : 0 vs 0 (equal)");
-      return new Pair<Integer,Integer>(0,0);
-    }
-
+      logger.info("[Comparator] same object.");
+    } else
     // left side is null.
     if (object1 == null){
-      logger.info(String.format("Result : 0 vs %s (left is null)",this.getTotalWeight()));
-      return new Pair<Integer, Integer>(0,this.getTotalWeight());
-    }
-
+      logger.info("[Comparator] left side is null, right side gets total weight.");
+      result2 = this.getTotalWeight();
+    } else
     // right side is null.
     if (object2 == null){
-      logger.info(String.format("Result : %s vs 0 (right is null)",this.getTotalWeight()));
-      return new Pair<Integer, Integer>(this.getTotalWeight(),0);
-    }
-
+      logger.info("[Comparator] right side is null, left side gets total weight");
+      result1 = this.getTotalWeight();
+    } else
     // both side is not null,put them thru the full loop
-    int result1 = 0 ;
-    int result2 = 0 ;
-    Collection<FactorComparator<T>> comparatorList = this.factorComparatorList.values();
-    for (FactorComparator<T> comparator :comparatorList){
-      int result = comparator.compare(object1, object2);
-      result1  = result1 + (result > 0 ? comparator.getWeight() : 0);
-      result2  = result2 + (result < 0 ? comparator.getWeight() : 0);
-      logger.info(String.format("[Factor: %s] compare result : %s (current score %s vs %s)",
-          comparator.getFactorName(), result, result1, result2));
+    {
+      Collection<FactorComparator<T>> comparatorList = this.factorComparatorList.values();
+      for (FactorComparator<T> comparator :comparatorList){
+        int result = comparator.compare(object1, object2);
+        result1  = result1 + (result > 0 ? comparator.getWeight() : 0);
+        result2  = result2 + (result < 0 ? comparator.getWeight() : 0);
+        logger.info(String.format("[Factor: %s] compare result : %s (current score %s vs %s)",
+            comparator.getFactorName(), result, result1, result2));
+      }
     }
-
     // in case of same score, use tie-breaker to stabilize the result.
     if (result1 == result2){
       boolean result = this.tieBreak(object1, object2);
-      logger.info("[TieBreaker] TieBreaker chose " + (result?  object1.toString(): object2.toString()));
+      logger.info("[TieBreaker] TieBreaker chose " +
+      (result? String.format("left side (%s)",  null== object1 ? "null": object1.toString()) :
+               String.format("right side (%s)", null== object2 ? "null": object2.toString()) ));
       if (result) result1++; else result2++;
     }
 
diff --git a/azkaban-common/src/main/java/azkaban/executor/dispatcher/CandidateFilter.java b/azkaban-common/src/main/java/azkaban/executor/dispatcher/CandidateFilter.java
index 2823582..b8b647c 100644
--- a/azkaban-common/src/main/java/azkaban/executor/dispatcher/CandidateFilter.java
+++ b/azkaban-common/src/main/java/azkaban/executor/dispatcher/CandidateFilter.java
@@ -17,7 +17,6 @@
 package azkaban.executor.dispatcher;
 
 import java.util.Collection;
-import java.util.Iterator;
 import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
 
@@ -62,24 +61,22 @@ public abstract class CandidateFilter<T,V>  {
    *                      the object being checked need to be filtered or not.
    * @return true if the check passed, false if check failed, which means the item need to be filtered.
    * */
-  public boolean analyzeTarget(T filteringTarget, V referencingObject){
-    logger.info(String.format("start checking '%s' with factor filter for '%s'",
+  public boolean filterTarget(T filteringTarget, V referencingObject){
+    logger.info(String.format("start filtering '%s' with factor filter for '%s'",
         filteringTarget == null ? "(null)" : filteringTarget.toString(),
         this.getName()));
 
     Collection<FactorFilter<T,V>> filterList = this.factorFilterList.values();
-    Iterator<FactorFilter<T,V>> mapItr = filterList.iterator();
     boolean result = true;
-    while (mapItr.hasNext()){
-      FactorFilter<T,V> filter = (FactorFilter<T,V>) mapItr.next();
-      result &= filter.check(filteringTarget,referencingObject);
+    for (FactorFilter<T,V> filter : filterList){
+      result &= filter.filterTarget(filteringTarget,referencingObject);
       logger.info(String.format("[Factor: %s] filter result : %s ",
           filter.getFactorName(), result));
       if (!result){
         break;
       }
     }
-    logger.info(String.format("Final checking result : %s ",result));
+    logger.info(String.format("Final filtering result : %s ",result));
     return result;
   }
 }
diff --git a/azkaban-common/src/main/java/azkaban/executor/dispatcher/Dispatcher.java b/azkaban-common/src/main/java/azkaban/executor/dispatcher/Dispatcher.java
index 3d97604..69d3c75 100644
--- a/azkaban-common/src/main/java/azkaban/executor/dispatcher/Dispatcher.java
+++ b/azkaban-common/src/main/java/azkaban/executor/dispatcher/Dispatcher.java
@@ -32,7 +32,7 @@ public interface Dispatcher <K,V> {
    *  @param  dispatchingObject : the object to be dispatched .
    *  @return candidate from the candidate list that suits best for the dispatching object.
    * */
-  public K getNext(List<K> candidateList, V dispatchingObject);
+  public K getBest(List<K> candidateList, V dispatchingObject);
 
   /** Function returns the name of the current Dispatcher
    *  @return name of the dispatcher.
diff --git a/azkaban-common/src/main/java/azkaban/executor/dispatcher/ExecutorDispatcher.java b/azkaban-common/src/main/java/azkaban/executor/dispatcher/ExecutorDispatcher.java
index 1045fc8..4f9366b 100644
--- a/azkaban-common/src/main/java/azkaban/executor/dispatcher/ExecutorDispatcher.java
+++ b/azkaban-common/src/main/java/azkaban/executor/dispatcher/ExecutorDispatcher.java
@@ -67,7 +67,7 @@ public class ExecutorDispatcher<K,V> implements Dispatcher<K, V> {
   }
 
   @Override
-  public K getNext(List<K> candidateList, V dispatchingObject) {
+  public K getBest(List<K> candidateList, V dispatchingObject) {
 
      // shortcut if the candidateList is empty.
      if ( null == candidateList || candidateList.size() == 0){
@@ -83,7 +83,7 @@ public class ExecutorDispatcher<K,V> implements Dispatcher<K, V> {
 
      if (null != this.filter){
        for (K candidateInfo : candidateList){
-         if (filter.analyzeTarget(candidateInfo,dispatchingObject)){
+         if (filter.filterTarget(candidateInfo,dispatchingObject)){
            filteredList.add(candidateInfo);
          }
        }
@@ -100,8 +100,8 @@ public class ExecutorDispatcher<K,V> implements Dispatcher<K, V> {
 
      // final work - find the best candidate from the filtered list.
      K executor = Collections.max(filteredList,comparator);
-     logger.info(String.format("candidate selected %s", executor.toString()));
-
+     logger.info(String.format("candidate selected %s",
+         null == executor ? "(null)" : executor.toString()));
      return executor;
   }
 
diff --git a/azkaban-common/src/main/java/azkaban/executor/dispatcher/FactorFilter.java b/azkaban-common/src/main/java/azkaban/executor/dispatcher/FactorFilter.java
index f944c25..b389c50 100644
--- a/azkaban-common/src/main/java/azkaban/executor/dispatcher/FactorFilter.java
+++ b/azkaban-common/src/main/java/azkaban/executor/dispatcher/FactorFilter.java
@@ -57,12 +57,12 @@ public final class FactorFilter<T,V>{
   }
 
   // the actual check function, which will leverage the logic defined by user.
-  public boolean check(T itemToCheck, V sourceObject){
-    return this.filter.analyzeTarget(itemToCheck, sourceObject);
+  public boolean filterTarget(T filteringTarget, V referencingObject){
+    return this.filter.filterTarget(filteringTarget, referencingObject);
   }
 
   // interface of the filter.
-  public interface Filter<K,V>{
+  public interface Filter<T,V>{
 
     /**function to analyze the target item according to the reference object to decide whether the item should be filtered.
      * @param filteringTarget:   object to be checked.
@@ -70,7 +70,7 @@ public final class FactorFilter<T,V>{
      *                      the object being checked need to be filtered or not.
      * @return true if the check passed, false if check failed, which means the item need to be filtered.
      * */
-    public boolean analyzeTarget(K filteringTarget, V referencingObject);
+    public boolean filterTarget(T filteringTarget, V referencingObject);
   }
 
 
diff --git a/azkaban-common/src/test/java/azkaban/executor/DispatcherTest.java b/azkaban-common/src/test/java/azkaban/executor/DispatcherTest.java
index dc1af73..59ebb72 100644
--- a/azkaban-common/src/test/java/azkaban/executor/DispatcherTest.java
+++ b/azkaban-common/src/test/java/azkaban/executor/DispatcherTest.java
@@ -20,10 +20,12 @@ import java.util.ArrayList;
 import java.util.Collections;
 import java.util.Comparator;
 import java.util.Date;
+
 import org.apache.log4j.BasicConfigurator;
 import org.junit.After;
 import org.junit.Assert;
 import org.junit.Before;
+import org.junit.BeforeClass;
 import org.junit.Test;
 
 import azkaban.executor.dispatcher.*;
@@ -112,7 +114,7 @@ public class DispatcherTest {
     public void registerFilterforTotalMemory(){
       this.registerFactorFilter(FactorFilter.create("requiredTotalMemory",
           new FactorFilter.Filter<MockExecutorObject,MockFlowObject>() {
-          public boolean analyzeTarget(MockExecutorObject itemToCheck, MockFlowObject sourceObject) {
+          public boolean filterTarget(MockExecutorObject itemToCheck, MockFlowObject sourceObject) {
             // REAL LOGIC COMES HERE -
             if (null == itemToCheck || null == sourceObject){
               return false;
@@ -124,14 +126,15 @@ public class DispatcherTest {
             }
 
             // calculate the memory and return.
-            return itemToCheck.amountOfRemainingMemory / itemToCheck.percentOfRemainingMemory * 100 > sourceObject.requiredTotalMemory;
+            return itemToCheck.amountOfRemainingMemory / itemToCheck.percentOfRemainingMemory * 100 >
+                   sourceObject.requiredTotalMemory;
           }}));
     }
 
     public void registerFilterforRemainingMemory(){
       this.registerFactorFilter(FactorFilter.create("requiredRemainingMemory",
           new FactorFilter.Filter<MockExecutorObject,MockFlowObject>() {
-        public boolean analyzeTarget(MockExecutorObject itemToCheck, MockFlowObject sourceObject) {
+        public boolean filterTarget(MockExecutorObject itemToCheck, MockFlowObject sourceObject) {
           // REAL LOGIC COMES HERE -
           if (null == itemToCheck || null == sourceObject){
             return false;
@@ -143,7 +146,7 @@ public class DispatcherTest {
     public void registerFilterforPriority(){
       this.registerFactorFilter(FactorFilter.create("requiredProprity",
           new FactorFilter.Filter<MockExecutorObject,MockFlowObject>() {
-        public boolean analyzeTarget(MockExecutorObject itemToCheck, MockFlowObject sourceObject) {
+        public boolean filterTarget(MockExecutorObject itemToCheck, MockFlowObject sourceObject) {
           // REAL LOGIC COMES HERE -
           if (null == itemToCheck || null == sourceObject){
             return false;
@@ -157,7 +160,7 @@ public class DispatcherTest {
     public void registerFilterforRemainingTmpSpace(){
       this.registerFactorFilter(FactorFilter.create("requiredRemainingTmpSpace",
           new FactorFilter.Filter<MockExecutorObject,MockFlowObject>() {
-        public boolean analyzeTarget(MockExecutorObject itemToCheck, MockFlowObject sourceObject) {
+        public boolean filterTarget(MockExecutorObject itemToCheck, MockFlowObject sourceObject) {
           // REAL LOGIC COMES HERE -
           if (null == itemToCheck || null == sourceObject){
             return false;
@@ -180,6 +183,8 @@ public class DispatcherTest {
 
     @Override
     protected boolean tieBreak(MockExecutorObject object1, MockExecutorObject object2){
+      if (null == object2) return true;
+      if (null == object1) return false;
       return object1.name.compareTo(object2.name) >= 0;
     }
 
@@ -232,10 +237,12 @@ public class DispatcherTest {
   // test samples.
   protected ArrayList<MockExecutorObject> executorList = new ArrayList<MockExecutorObject>();
 
-  @Before
-  public void setUp() throws Exception {
+  @BeforeClass public static void onlyOnce() {
     BasicConfigurator.configure();
+   }
 
+  @Before
+  public void setUp() throws Exception {
     executorList.clear();
     executorList.add(new MockExecutorObject("Executor1",8080,50.0,2048,5,new Date(), 20, 6400));
     executorList.add(new MockExecutorObject("Executor2",8080,50.0,2048,4,new Date(), 20, 6400));
@@ -280,11 +287,11 @@ public class DispatcherTest {
       mFilter.registerFilterforRemainingMemory();
 
       // expect true.
-      boolean result = mFilter.analyzeTarget(this.getExecutorByName("Executor1"), dispatchingObj);
+      boolean result = mFilter.filterTarget(this.getExecutorByName("Executor1"), dispatchingObj);
       Assert.assertTrue(result);
 
       //expect true.
-      result = mFilter.analyzeTarget(this.getExecutorByName("Executor3"), dispatchingObj);
+      result = mFilter.filterTarget(this.getExecutorByName("Executor3"), dispatchingObj);
       /*
       1 [main] INFO azkaban.executor.dispatcher.CandidateFilter  - start checking 'Executor3' with factor filter for 'Mockfilter'
       2 [main] INFO azkaban.executor.dispatcher.CandidateFilter  - [Factor: requiredRemainingMemory] filter result : true
@@ -294,7 +301,7 @@ public class DispatcherTest {
 
       // add the priority filter.
       mFilter.registerFilterforPriority();
-      result = mFilter.analyzeTarget(this.getExecutorByName("Executor3"), dispatchingObj);
+      result = mFilter.filterTarget(this.getExecutorByName("Executor3"), dispatchingObj);
       // expect false, for priority.
       /*
       2 [main] INFO azkaban.executor.dispatcher.CandidateFilter  - start checking 'Executor3' with factor filter for 'Mockfilter'
@@ -308,7 +315,7 @@ public class DispatcherTest {
       mFilter.registerFilterforRemainingTmpSpace();
 
       // expect pass.
-      result = mFilter.analyzeTarget(this.getExecutorByName("Executor2"), dispatchingObj);
+      result = mFilter.filterTarget(this.getExecutorByName("Executor2"), dispatchingObj);
       /*
       3 [main] INFO azkaban.executor.dispatcher.CandidateFilter  - start checking 'Executor2' with factor filter for 'Mockfilter'
       3 [main] INFO azkaban.executor.dispatcher.CandidateFilter  - [Factor: requiredRemainingMemory] filter result : true
@@ -319,7 +326,7 @@ public class DispatcherTest {
       Assert.assertTrue(result);
 
       // expect false, remaining tmp, priority will also fail but the logic shortcuts when the Tmp size check Fails.
-      result = mFilter.analyzeTarget(this.getExecutorByName("Executor8"), dispatchingObj);
+      result = mFilter.filterTarget(this.getExecutorByName("Executor8"), dispatchingObj);
       /*
       4 [main] INFO azkaban.executor.dispatcher.CandidateFilter  - start checking 'Executor8' with factor filter for 'Mockfilter'
       4 [main] INFO azkaban.executor.dispatcher.CandidateFilter  - [Factor: requiredRemainingMemory] filter result : true
@@ -331,6 +338,33 @@ public class DispatcherTest {
   }
 
   @Test
+  public void testExecutorFilterWithNullInputs() throws Exception {
+    MockFilter filter = new MockFilter();
+    filter.registerFilterforPriority();
+    filter.registerFilterforRemainingMemory();
+    filter.registerFilterforRemainingTmpSpace();
+    filter.registerFilterforTotalMemory();
+    boolean result = false;
+    try {
+        result = filter.filterTarget(this.getExecutorByName("Executor1"), null);
+      } catch (Exception ex){
+        Assert.fail("no exception should be thrown when null value is passed to the filter.");
+      }
+    // note : the FactorFilter logic will decide whether true or false should be returned when null value
+    //        is passed, for the Mock class it returns false.
+    Assert.assertFalse(result);
+
+    try {
+        result = filter.filterTarget(null, null);
+      } catch (Exception ex){
+        Assert.fail("no exception should be thrown when null value is passed to the filter.");
+      }
+    // note : the FactorFilter logic will decide whether true or false should be returned when null value
+    //        is passed, for the Mock class it returns false.
+    Assert.assertFalse(result);
+  }
+
+  @Test
   public void testExecutorComparer() throws Exception {
     MockComparator comparator = new MockComparator();
     comparator.registerComparerForMemory(5);
@@ -354,6 +388,12 @@ public class DispatcherTest {
   }
 
   @Test
+  public void testExecutorComparerResisterComparerWInvalidWeight() throws Exception {
+    MockComparator comparator = new MockComparator();
+    comparator.registerComparerForMemory(0);
+  }
+
+  @Test
   public void testDispatcher() throws Exception {
     MockFilter filter = new MockFilter();
     MockComparator comparator = new MockComparator();
@@ -374,13 +414,13 @@ public class DispatcherTest {
     MockFlowObject  dispatchingObj = new MockFlowObject("flow1",3096, 1500,4200,2);
 
     // expected selection = #12
-    MockExecutorObject nextExecutor = morkDispatcher.getNext(this.executorList, dispatchingObj);
+    MockExecutorObject nextExecutor = morkDispatcher.getBest(this.executorList, dispatchingObj);
     Assert.assertEquals(this.getExecutorByName("Executor1"),nextExecutor);
 
    // remaining memory 11500, total memory 3095, remainingTmpSpace 14200, priority 2.
    dispatchingObj = new MockFlowObject("flow1",3096, 1500,14200,2);
    // all candidates should be filtered by the remaining memory.
-   nextExecutor = morkDispatcher.getNext(this.executorList, dispatchingObj);
+   nextExecutor = morkDispatcher.getBest(this.executorList, dispatchingObj);
    Assert.assertEquals(null,nextExecutor);
   }
 
@@ -402,18 +442,197 @@ public class DispatcherTest {
         new ExecutorDispatcher<MockExecutorObject,MockFlowObject>(filter,comparator);
 
     MockFlowObject  dispatchingObj = new MockFlowObject("flow1",100, 100,100,3);
-    MockExecutorObject executor = morkDispatcher.getNext(this.executorList, dispatchingObj);
+    MockExecutorObject executor = morkDispatcher.getBest(this.executorList, dispatchingObj);
     Assert.assertEquals(this.getExecutorByName("Executor9"), executor);
 
     // adjusted the weight for memory to 10, therefore item #11 should be returned.
     morkDispatcher.getComparator().adjustFactorWeight("Memory", 10);
-    executor = morkDispatcher.getNext(this.executorList, dispatchingObj);
+    executor = morkDispatcher.getBest(this.executorList, dispatchingObj);
     Assert.assertEquals(this.getExecutorByName("Executor11"), executor);
 
     // adjusted the weight for memory back to 1, therefore item #12 should be returned.
     morkDispatcher.getComparator().adjustFactorWeight("Memory", 1);
-    executor = morkDispatcher.getNext(this.executorList, dispatchingObj);
+    executor = morkDispatcher.getBest(this.executorList, dispatchingObj);
     Assert.assertEquals(this.getExecutorByName("Executor9"), executor);
 
   }
+
+  @Test
+  public void testDispatchersignleCandidate() throws Exception {
+    MockFilter filter = new MockFilter();
+    MockComparator comparator = new MockComparator();
+
+    filter.registerFilterforPriority();
+    filter.registerFilterforRemainingMemory();
+    filter.registerFilterforRemainingTmpSpace();
+    filter.registerFilterforTotalMemory();
+
+    comparator.registerComparerForMemory(3);
+    comparator.registerComparerForPriority(4);
+    comparator.registerComparerForRemainingSpace(1);
+
+    ExecutorDispatcher<MockExecutorObject,MockFlowObject> morkDispatcher =
+        new ExecutorDispatcher<MockExecutorObject,MockFlowObject>(filter,comparator);
+
+    ArrayList<MockExecutorObject> signleExecutorList = new ArrayList<MockExecutorObject>();
+    MockExecutorObject signleExecutor = new MockExecutorObject("ExecutorX",8080,50.0,2048,3,new Date(), 20, 6400);
+    signleExecutorList.add(signleExecutor);
+
+    MockFlowObject  dispatchingObj = new MockFlowObject("flow1",100, 100,100,5);
+    MockExecutorObject executor = morkDispatcher.getBest(signleExecutorList, dispatchingObj);
+    // expected to see null result, as the only executor is filtered out .
+    Assert.assertTrue(null == executor);
+
+    // adjust the priority to let the executor pass the filter.
+    dispatchingObj.priority = 3;
+    executor = morkDispatcher.getBest(signleExecutorList, dispatchingObj);
+    Assert.assertEquals(signleExecutor, executor);
+  }
+
+  @Test
+  public void testDispatcherListWithItemsThatAreReferenceEqual() throws Exception {
+    MockFilter filter = new MockFilter();
+    MockComparator comparator = new MockComparator();
+
+    filter.registerFilterforPriority();
+    filter.registerFilterforRemainingMemory();
+    filter.registerFilterforRemainingTmpSpace();
+    filter.registerFilterforTotalMemory();
+
+    comparator.registerComparerForMemory(3);
+    comparator.registerComparerForPriority(4);
+    comparator.registerComparerForRemainingSpace(1);
+
+    ExecutorDispatcher<MockExecutorObject,MockFlowObject> morkDispatcher =
+        new ExecutorDispatcher<MockExecutorObject,MockFlowObject>(filter,comparator);
+
+    ArrayList<MockExecutorObject> list = new ArrayList<MockExecutorObject>();
+    MockExecutorObject signleExecutor = new MockExecutorObject("ExecutorX",8080,50.0,2048,3,new Date(), 20, 6400);
+    list.add(signleExecutor);
+    list.add(signleExecutor);
+    MockFlowObject  dispatchingObj = new MockFlowObject("flow1",100, 100,100,3);
+    MockExecutorObject executor = morkDispatcher.getBest(list, dispatchingObj);
+    Assert.assertTrue(signleExecutor == executor);
+  }
+
+  @Test
+  public void testDispatcherListWithItemsThatAreEqualInValue() throws Exception {
+    MockFilter filter = new MockFilter();
+    MockComparator comparator = new MockComparator();
+
+    filter.registerFilterforPriority();
+    filter.registerFilterforRemainingMemory();
+    filter.registerFilterforRemainingTmpSpace();
+    filter.registerFilterforTotalMemory();
+
+    comparator.registerComparerForMemory(3);
+    comparator.registerComparerForPriority(4);
+    comparator.registerComparerForRemainingSpace(1);
+
+    ExecutorDispatcher<MockExecutorObject,MockFlowObject> morkDispatcher =
+        new ExecutorDispatcher<MockExecutorObject,MockFlowObject>(filter,comparator);
+
+    // note - as the tieBreaker set in the MockComparator uses the name value of the executor to do the
+    //        final diff therefore we need to set the name differently to make a meaningful test, in real
+    //        scenario we may want to use something else (say hash code) to be the bottom line for the tieBreaker
+    //        to make a final decision, the purpose of the test here is to prove that for two candidates with
+    //        exact value (in the case of test, all values except for the name) the decision result is stable.
+    ArrayList<MockExecutorObject> list = new ArrayList<MockExecutorObject>();
+    MockExecutorObject executor1 = new MockExecutorObject("ExecutorX", 8080,50.0,2048,3,new Date(), 20, 6400);
+    MockExecutorObject executor2 = new MockExecutorObject("ExecutorX2",8080,50.0,2048,3,new Date(), 20, 6400);
+    list.add(executor1);
+    list.add(executor2);
+    MockFlowObject  dispatchingObj = new MockFlowObject("flow1",100, 100,100,3);
+    MockExecutorObject executor = morkDispatcher.getBest(list, dispatchingObj);
+    Assert.assertTrue(executor2 == executor);
+
+    // shuffle and test again.
+    list.remove(0);
+    list.add(executor1);
+    executor = morkDispatcher.getBest(list, dispatchingObj);
+    Assert.assertTrue(executor2 == executor);
+  }
+
+  @Test
+  public void testDispatcherEmptyList() throws Exception {
+    MockFilter filter = new MockFilter();
+    MockComparator comparator = new MockComparator();
+
+    filter.registerFilterforPriority();
+    filter.registerFilterforRemainingMemory();
+    filter.registerFilterforRemainingTmpSpace();
+    filter.registerFilterforTotalMemory();
+
+    comparator.registerComparerForMemory(3);
+    comparator.registerComparerForPriority(4);
+    comparator.registerComparerForRemainingSpace(1);
+
+    ExecutorDispatcher<MockExecutorObject,MockFlowObject> morkDispatcher =
+        new ExecutorDispatcher<MockExecutorObject,MockFlowObject>(filter,comparator);
+
+    ArrayList<MockExecutorObject> list = new ArrayList<MockExecutorObject>();
+
+    MockFlowObject  dispatchingObj = new MockFlowObject("flow1",100, 100,100,5);
+
+    MockExecutorObject executor  = null;
+
+    try {
+      executor = morkDispatcher.getBest(list, dispatchingObj);
+      } catch (Exception ex){
+        Assert.fail("no exception should be thrown when an empty list is passed to the dispatcher.");
+      }
+
+    // expected to see null result.
+    Assert.assertTrue(null == executor);
+
+    try {
+      executor = morkDispatcher.getBest(list, dispatchingObj);
+      } catch (Exception ex){
+        Assert.fail("no exception should be thrown when null is passed to the dispatcher as the candidate list.");
+      }
+
+      // expected to see null result, as the only executor is filtered out .
+      Assert.assertTrue(null == executor);
+
+  }
+
+  @Test
+  public void testDispatcherListWithNullValue() throws Exception {
+    MockComparator comparator = new MockComparator();
+
+    comparator.registerComparerForMemory(3);
+    comparator.registerComparerForPriority(4);
+    comparator.registerComparerForRemainingSpace(1);
+
+    ExecutorDispatcher<MockExecutorObject,MockFlowObject> morkDispatcher =
+        new ExecutorDispatcher<MockExecutorObject,MockFlowObject>(null,comparator);
+
+    ArrayList<MockExecutorObject> list = new ArrayList<MockExecutorObject>();
+    MockExecutorObject executor1 = new MockExecutorObject("ExecutorX", 8080,50.0,2048,3,new Date(), 20, 6400);
+    MockExecutorObject executor2 = new MockExecutorObject("ExecutorX2",8080,50.0,2048,3,new Date(), 20, 6400);
+    list.add(executor1);
+    list.add(executor2);
+    list.add(null);
+
+    MockFlowObject  dispatchingObj = new MockFlowObject("flow1",100, 100,100,3);
+    MockExecutorObject executor  = null;
+    try {
+      executor = morkDispatcher.getBest(list, dispatchingObj);
+      } catch (Exception ex){
+        Assert.fail("no exception should be thrown when an List contains null value.");
+      }
+    Assert.assertTrue(executor2 == executor);
+
+    // try to compare null vs null, no exception is expected.
+    list.clear();
+    list.add(null);
+    list.add(null);
+    try {
+      executor = morkDispatcher.getBest(list, dispatchingObj);
+      } catch (Exception ex){
+        Assert.fail("no exception should be thrown when an List contains multiple null values.");
+      }
+    Assert.assertTrue(null == executor);
+
+  }
 }