azkaban-developers

DSP- 7976/7977 implement dispatcher / interface of statistics

8/24/2015 9:26:32 PM

Details

diff --git a/azkaban-common/src/main/java/azkaban/executor/dispatcher/CandidateComparator.java b/azkaban-common/src/main/java/azkaban/executor/dispatcher/CandidateComparator.java
new file mode 100644
index 0000000..1b6db3d
--- /dev/null
+++ b/azkaban-common/src/main/java/azkaban/executor/dispatcher/CandidateComparator.java
@@ -0,0 +1,166 @@
+/*
+ * Copyright 2015 LinkedIn Corp.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package azkaban.executor.dispatcher;
+
+import java.util.Collection;
+import java.util.Comparator;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+
+import org.apache.log4j.Logger;
+
+import azkaban.utils.Pair;
+
+/** Abstract class for a candidate comparator.
+ *  this class contains implementation of most of the core logics. Implementing classes is expected only to
+ *  register factor comparators using the provided register function.
+ *
+ */
+public abstract class CandidateComparator<T> implements Comparator<T> {
+  private static Logger logger = Logger.getLogger(CandidateComparator.class);
+
+  // internal repository of the registered comparators .
+  private Map<String,FactorComparator<T>> factorComparatorList =
+      new ConcurrentHashMap<String,FactorComparator<T>>();
+
+  /** gets the name of the current implementation of the candidate comparator.
+   * @returns : name of the comparator.
+   * */
+  public abstract String getName();
+
+  /** function to register a factorComparator to the internal Map for future reference.
+   * @param factorComparator : the comparator object to be registered.
+   * */
+  protected void registerFactorComparator(FactorComparator<T> comparator){
+      if (null == comparator ||
+          Integer.MAX_VALUE - this.getTotalWeight() < comparator.getWeight() ) {
+        logger.info("skipping registerFactorComparator as the comaractor is null or has an invalid weight value.");
+        return;
+      }
+
+      // add or replace the Comparator.
+      this.factorComparatorList.put(comparator.getFactorName(),comparator);
+      logger.info(String.format("Factor comparator added for '%s'. Weight = '%s'",
+          comparator.getFactorName(), comparator.getWeight()));
+  }
+
+
+  /** function update the weight of a specific registered factorCompartor.
+   * @param factorName : the name of the registered factorComparator to adjust.
+   * @param weight:      the new weight value to be adjusted to.
+   * @return -1 if the factor doesn't exist or the weight value specified is invalid,
+   *          the original value before update otherwise.
+   * */
+  public int adjustFactorWeight(String factorName, int weight){
+    // shortcut if the input is invalid.
+    if (factorName == null ||
+        factorName == "" ||
+        weight < 0 ||
+        Integer.MAX_VALUE - this.getTotalWeight() < weight){
+      logger.info("skipping adjustFactorWeight as one or more of the input parameters are invalid");
+      return -1;
+    }
+
+    FactorComparator<T> value = this.factorComparatorList.get(factorName);
+
+    // shortcut if the key doesn't exist.
+    if (null == value){
+      logger.info(String.format("unable to udpate weight as the specified factorName %s doesn't exist",factorName));
+      return -1;
+    }
+
+    int returnVal = value.getWeight();
+    value.updateWeight(weight);
+    return returnVal;
+  }
+
+  /** function returns the total weight of the registered comparators.
+   * @return the value of total weight.
+   * */
+  public int getTotalWeight(){
+    int totalWeight = 0 ;
+
+    // save out a copy of the values as HashMap.values() takes o(n) to return the value.
+    Collection<FactorComparator<T>> allValues = this.factorComparatorList.values();
+    for (FactorComparator<T> item : allValues){
+      if (item != null){
+        totalWeight += item.getWeight();
+      }
+    }
+
+    return totalWeight;
+  }
+
+  /** function to actually calculate the scores for the two objects that are being compared.
+   *  the comparison follows the following logic -
+   *  1. if both objects are equal return 0 score for both.
+   *  2. if one side is null, the other side gets all the score.
+   *  3. if both sides are non-null value, both values will be passed to all the registered FactorComparators
+   *     each factor comparator will generate a result based off it sole logic the weight of the comparator will be
+   *     added to the wining side, if equal, no value will be added to either side.
+   *  4. final result will be returned in a Pair container.
+   *
+   * */
+  public Pair<Integer,Integer> getReult(T object1, T object2){
+    logger.info(String.format("start comparing '%s' with '%s',  total weight = %s ",
+        object1 == null ? "(null)" : object1.toString(),
+        object2 == null ? "(null)" : object2.toString(),
+        this.getTotalWeight()));
+
+    // short cut if object equals.
+    if (object1 ==  object2){
+      logger.info("Result : 0 vs 0 (equal)");
+      return new Pair<Integer,Integer>(0,0);
+    }
+
+    // left side is null.
+    if (object1 == null){
+      logger.info(String.format("Result : 0 vs %s (left is null)",this.getTotalWeight()));
+      return new Pair<Integer, Integer>(0,this.getTotalWeight());
+    }
+
+    // right side is null.
+    if (object2 == null){
+      logger.info(String.format("Result : %s vs 0 (right is null)",this.getTotalWeight()));
+      return new Pair<Integer, Integer>(this.getTotalWeight(),0);
+    }
+
+    // both side is not null,put them thru the full loop
+    int result1 = 0 ;
+    int result2 = 0 ;
+    Collection<FactorComparator<T>> comparatorList = this.factorComparatorList.values();
+    Iterator<FactorComparator<T>> mapItr = comparatorList.iterator();
+    while (mapItr.hasNext()){
+      FactorComparator<T> comparator = (FactorComparator<T>) mapItr.next();
+      int result = comparator.compare(object1, object2);
+      result1  = result1 + (result > 0 ? comparator.getWeight() : 0);
+      result2  = result2 + (result < 0 ? comparator.getWeight() : 0);
+      logger.info(String.format("[Factor: %s] compare result : %s (current score %s vs %s)",
+          comparator.getFactorName(), result, result1, result2));
+    }
+    logger.info(String.format("Result : %s vs %s ",result1,result2));
+    return new Pair<Integer,Integer>(result1,result2);
+  }
+
+  @Override
+  public int compare(T o1, T o2) {
+    Pair<Integer,Integer> result = this.getReult(o1,o2);
+    return result.getFirst() == result.getSecond() ? 0 :
+                                result.getFirst() > result.getSecond() ? 1 : -1;
+  }
+}
diff --git a/azkaban-common/src/main/java/azkaban/executor/dispatcher/CandidateFilter.java b/azkaban-common/src/main/java/azkaban/executor/dispatcher/CandidateFilter.java
new file mode 100644
index 0000000..dac8fcd
--- /dev/null
+++ b/azkaban-common/src/main/java/azkaban/executor/dispatcher/CandidateFilter.java
@@ -0,0 +1,81 @@
+/*
+ * Copyright 2015 LinkedIn Corp.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package azkaban.executor.dispatcher;
+
+import java.util.Collection;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+
+import org.apache.log4j.Logger;
+
+
+/** Abstract class for a candidate filter.
+ *  this class contains implementation of most of the core logics. Implementing classes is expected only to
+ *  register filters using the provided register function.
+ *
+ */
+public abstract class CandidateFilter<T,V>  {
+  private static Logger logger = Logger.getLogger(CandidateFilter.class);
+
+  // internal repository of the registered filters .
+  private Map<String,FactorFilter<T,V>> factorFilterList =
+      new ConcurrentHashMap<String,FactorFilter<T,V>>();
+
+  /** gets the name of the current implementation of the candidate filter.
+   * @returns : name of the filter.
+   * */
+  public abstract String getName();
+
+  /** function to register a factorFilter to the internal Map for future reference.
+   * @param factorfilter : the Filter object to be registered.
+   * */
+  protected void registerFactorFilter(FactorFilter<T,V> filter){
+      if (null == filter ) {
+        logger.info("skipping registerFactorFilter as the comaractor is null or has an invalid weight value.");
+        return;
+      }
+
+      // add or replace the filter.
+      this.factorFilterList.put(filter.getFactorName(),filter);
+      logger.info(String.format("Factor filter added for '%s'.",
+          filter.getFactorName()));
+  }
+
+  /** function to get the filtering result.
+   * */
+  public boolean check(T item, V object){
+    logger.info(String.format("start checking '%s' with factor filter for '%s'",
+        item == null ? "(null)" : item.toString(),
+        this.getName()));
+
+    Collection<FactorFilter<T,V>> filterList = this.factorFilterList.values();
+    Iterator<FactorFilter<T,V>> mapItr = filterList.iterator();
+    boolean result = true;
+    while (mapItr.hasNext()){
+      FactorFilter<T,V> filter = (FactorFilter<T,V>) mapItr.next();
+      result &= filter.check(item,object);
+      logger.info(String.format("[Factor: %s] filter result : %s ",
+          filter.getFactorName(), result));
+      if (!result){
+        break;
+      }
+    }
+    logger.info(String.format("Final checking result : %s ",result));
+    return result;
+  }
+}
diff --git a/azkaban-common/src/main/java/azkaban/executor/dispatcher/Dispatcher.java b/azkaban-common/src/main/java/azkaban/executor/dispatcher/Dispatcher.java
new file mode 100644
index 0000000..3d97604
--- /dev/null
+++ b/azkaban-common/src/main/java/azkaban/executor/dispatcher/Dispatcher.java
@@ -0,0 +1,41 @@
+/*
+ * Copyright 2015 LinkedIn Corp.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package azkaban.executor.dispatcher;
+
+import java.util.List;
+
+
+/** Definition of the dispatcher interface.
+ *  an implementation of the dispatcher interface provides the functionality
+ *  to return a candidate from the candidateList that suits best for the dispatchingObject.
+ *  @param K : type of the candidate.
+ *  @param V : type of the dispatching object.
+ */
+public interface Dispatcher <K,V> {
+
+  /** Function returns the next best suit candidate from the candidateList for the dispatching object.
+   *  @param  candidateList : List of the candidates to select from .
+   *  @param  dispatchingObject : the object to be dispatched .
+   *  @return candidate from the candidate list that suits best for the dispatching object.
+   * */
+  public K getNext(List<K> candidateList, V dispatchingObject);
+
+  /** Function returns the name of the current Dispatcher
+   *  @return name of the dispatcher.
+   * */
+  public String getName();
+}
diff --git a/azkaban-common/src/main/java/azkaban/executor/dispatcher/ExecutorDispatcher.java b/azkaban-common/src/main/java/azkaban/executor/dispatcher/ExecutorDispatcher.java
new file mode 100644
index 0000000..9ed3be0
--- /dev/null
+++ b/azkaban-common/src/main/java/azkaban/executor/dispatcher/ExecutorDispatcher.java
@@ -0,0 +1,113 @@
+/*
+ * Copyright 2015 LinkedIn Corp.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package azkaban.executor.dispatcher;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+
+import org.apache.log4j.Logger;
+
+/** Implementation of the ExecutorDispatcher.
+ *  @param K executor object type.
+ *  @param V dispatching object type.
+ * */
+public class ExecutorDispatcher<K,V> implements Dispatcher<K, V> {
+  private static Logger logger = Logger.getLogger(CandidateComparator.class);
+
+  private CandidateFilter<K,V> filter;
+  private CandidateComparator<K> comparator;
+
+  // getter of the filter.
+  public CandidateFilter<K,V> getFilter(){
+    return this.filter;
+  }
+
+  // setter of the filer.
+  public CandidateFilter<K,V> setFilter(CandidateFilter<K,V> value){
+    CandidateFilter<K,V> returnVal = this.filter;
+    this.filter = value;
+    return returnVal;
+  }
+
+  // getter of the comparator.
+  public CandidateComparator<K> getComparator(){
+    return this.comparator;
+  }
+
+  // setter of the comparator.
+  public CandidateComparator<K> setComparator(CandidateComparator<K>  value){
+    CandidateComparator<K> returnVal = this.comparator;
+    this.comparator = value;
+    return returnVal;
+  }
+
+  /**constructor of the class.
+   * @param filter CandidateFilter object to be used to perform the candidate filtering.
+   * @param comparator CandidateComparator object to be used to find the best suit candidate from the filtered list.
+   * */
+  public ExecutorDispatcher(CandidateFilter<K,V> filter,
+      CandidateComparator<K> comparator){
+    this.filter = filter;
+    this.comparator = comparator;
+  }
+
+  @Override
+  public K getNext(List<K> candidateList, V dispatchingObject) {
+
+     // shortcut if the candidateList is empty.
+     if ( null == candidateList || candidateList.size() == 0){
+       logger.error("failed to getNext candidate as the passed candidateList is null or empty.");
+       return null;
+     }
+
+     logger.info("start candidate selection logic.");
+     logger.info(String.format("candidate count before filtering: %s", candidateList.size()));
+
+     // to keep the input untouched, we will form up a new list based off the filtering result.
+     List<K> filteredList = new ArrayList<K>();
+
+     if (null != this.filter){
+       for (K candidateInfo : candidateList){
+         if (filter.check(candidateInfo,dispatchingObject)){
+           filteredList.add(candidateInfo);
+         }
+       }
+     } else{
+       filteredList = candidateList;
+       logger.info("skipping the candidate filtering as the filter object is not specifed.");
+     }
+
+     logger.info(String.format("candidate count after filtering: %s", filteredList.size()));
+     if (filteredList.size() == 0){
+       logger.info("failed to select candidate as the filted candidate list is empty.");
+       return null;
+     }
+
+     // final work - find the best candidate from the filtered list.
+     K executor = Collections.max(filteredList,comparator);
+     logger.info(String.format("candidate selected %s", executor.toString()));
+
+     return executor;
+  }
+
+  @Override
+  public String getName() {
+    return "ExecutorDispatcher";
+  }
+
+}
diff --git a/azkaban-common/src/main/java/azkaban/executor/dispatcher/FactorComparator.java b/azkaban-common/src/main/java/azkaban/executor/dispatcher/FactorComparator.java
new file mode 100644
index 0000000..cb99f9d
--- /dev/null
+++ b/azkaban-common/src/main/java/azkaban/executor/dispatcher/FactorComparator.java
@@ -0,0 +1,78 @@
+/*
+ * Copyright 2015 LinkedIn Corp.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package azkaban.executor.dispatcher;
+
+import java.util.Comparator;
+import org.apache.log4j.Logger;
+
+/** wrapper class for a factor comparator .
+ *@param T: the type of the objects to be compared.
+ */
+public final class FactorComparator<T>{
+  private static Logger logger = Logger.getLogger(CandidateComparator.class);
+
+  private String factorName;
+  private int weight;
+  private Comparator<T> comparator;
+
+  /** private constructor of the class. User will create the instance of the class by calling the static
+   *  method provided below.
+   * @param factorName : the factor name .
+   * @param weight : the weight of the comparator.
+   * @ comparator : function to be provided by user on how the comparison should be made.
+   * */
+  private FactorComparator(String factorName, int weight, Comparator<T> comparator){
+    this.factorName = factorName;
+    this.weight = weight;
+    this.comparator = comparator;
+  }
+
+  /** static function to generate an instance of the class.
+   *  refer to the constructor for the param definitions.
+   * */
+  public static <T> FactorComparator<T> create(String factorName, int weight, Comparator<T> comparator){
+
+    if (null == factorName || factorName == "" || weight < 0 || null == comparator){
+      logger.error("failed to create instance of FactorComparator, at least one of the input paramters are invalid");
+      return null;
+    }
+
+    return new FactorComparator<T>(factorName,weight,comparator);
+  }
+
+  // function to return the factor name.
+  public String getFactorName(){
+    return this.factorName;
+  }
+
+  // function to return the weight value.
+  public int getWeight(){
+    return this.weight;
+  }
+
+  // function to return the weight value.
+  public void updateWeight(int value){
+    this.weight = value;
+  }
+
+  // the actual compare function, which will leverage the user defined function.
+  public int compare(T object1, T object2){
+    return this.comparator.compare(object1, object2);
+  }
+
+
+}
diff --git a/azkaban-common/src/main/java/azkaban/executor/dispatcher/FactorFilter.java b/azkaban-common/src/main/java/azkaban/executor/dispatcher/FactorFilter.java
new file mode 100644
index 0000000..6d57413
--- /dev/null
+++ b/azkaban-common/src/main/java/azkaban/executor/dispatcher/FactorFilter.java
@@ -0,0 +1,77 @@
+/*
+ * Copyright 2015 LinkedIn Corp.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package azkaban.executor.dispatcher;
+
+import org.apache.log4j.Logger;
+
+/** wrapper class for a factor Filter .
+ *@param T: the type of the objects to be compared.
+ *@param V: the type of the object to be used for filtering.
+ */
+public final class FactorFilter<T,V>{
+  private static Logger logger = Logger.getLogger(FactorFilter.class);
+
+  private String factorName;
+  private Filter<T,V> filter;
+
+  /** private constructor of the class. User will create the instance of the class by calling the static
+   *  method provided below.
+   * @param factorName : the factor name .
+   * @param filter : user defined function specifying how the filtering should be implemented.
+   * */
+  private FactorFilter(String factorName, Filter<T,V> filter){
+    this.factorName = factorName;
+    this.filter = filter;
+  }
+
+  /** static function to generate an instance of the class.
+   *  refer to the constructor for the param definitions.
+   * */
+  public static <T,V> FactorFilter<T,V> create(String factorName, Filter<T,V> filter){
+
+    if (null == factorName || factorName == "" || null == filter){
+      logger.error("failed to create instance of FactorFilter, at least one of the input paramters are invalid");
+      return null;
+    }
+
+    return new FactorFilter<T,V>(factorName,filter);
+  }
+
+  // function to return the factor name.
+  public String getFactorName(){
+    return this.factorName;
+  }
+
+  // the actual check function, which will leverage the logic defined by user.
+  public boolean check(T itemToCheck, V sourceObject){
+    return this.filter.check(itemToCheck, sourceObject);
+  }
+
+  // interface of the filter.
+  public interface Filter<K,V>{
+
+    /**check function for the filter.
+     * @param itemToCheck:  object to be checked.
+     * @param sourceObject: object which contains statistics based on which a decision is made whether
+     *                      the object being checked need to be filtered or not.
+     * @return true if the check passed, false if check failed, which means the item need to be filtered.
+     * */
+    public boolean check(K itemToCheck, V sourceObject);
+  }
+
+
+}
diff --git a/azkaban-common/src/test/java/azkaban/executor/DispatcherTest.java b/azkaban-common/src/test/java/azkaban/executor/DispatcherTest.java
new file mode 100644
index 0000000..d5d5f47
--- /dev/null
+++ b/azkaban-common/src/test/java/azkaban/executor/DispatcherTest.java
@@ -0,0 +1,585 @@
+/*
+ * Copyright 2015 LinkedIn Corp.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package azkaban.executor;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.Date;
+import org.apache.log4j.BasicConfigurator;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import azkaban.executor.dispatcher.*;
+
+public class DispatcherTest {
+  // mock executor object.
+  protected class MockExecutorObject{
+    public String name;
+    public int    port;
+    public double percentOfRemainingMemory;
+    public int    amountOfRemainingMemory;
+    public int    priority;
+    public Date   lastAssigned;
+    public double percentOfRemainingFlowcapacity;
+    public int    remainingTmp;
+
+    public MockExecutorObject(String name,
+        int port,
+        double percentOfRemainingMemory,
+        int amountOfRemainingMemory,
+        int priority,
+        Date lastAssigned,
+        double percentOfRemainingFlowcapacity,
+        int remainingTmp)
+    {
+      this.name = name;
+      this.port = port;
+      this.percentOfRemainingMemory = percentOfRemainingMemory;
+      this.amountOfRemainingMemory =amountOfRemainingMemory;
+      this.priority = priority;
+      this.lastAssigned = lastAssigned;
+      this.percentOfRemainingFlowcapacity = percentOfRemainingFlowcapacity;
+      this.remainingTmp = remainingTmp;
+    }
+
+    @Override
+    public String toString()
+    {
+      return this.name;
+    }
+  }
+
+  // Mock flow object.
+  protected class MockFlowObject{
+    public String name;
+    public int    requiredRemainingMemory;
+    public int    requiredTotalMemory;
+    public int    requiredRemainingTmpSpace;
+    public int    priority;
+
+    public MockFlowObject(String name,
+        int requiredTotalMemory,
+        int requiredRemainingMemory,
+        int requiredRemainingTmpSpace,
+        int priority)
+    {
+      this.name = name;
+      this.requiredTotalMemory = requiredTotalMemory;
+      this.requiredRemainingMemory = requiredRemainingMemory;
+      this.requiredRemainingTmpSpace = requiredRemainingTmpSpace;
+      this.priority = priority;
+    }
+
+    @Override
+    public String toString()
+    {
+      return this.name;
+    }
+  }
+
+  // mock Filter class.
+  protected class MockFilter
+  extends CandidateFilter<MockExecutorObject,MockFlowObject>{
+
+    @Override
+    public String getName() {
+      return "Mockfilter";
+    }
+
+    public MockFilter(){
+    }
+
+    // function to register the remainingMemory filter.
+    // for test purpose the registration is put in a separated method, in production the work should be done
+    // in the constructor.
+    public void registerFilterforTotalMemory(){
+      this.registerFactorFilter(FactorFilter.create("requiredTotalMemory",
+          new FactorFilter.Filter<MockExecutorObject,MockFlowObject>() {
+          public boolean check(MockExecutorObject itemToCheck, MockFlowObject sourceObject) {
+            // REAL LOGIC COMES HERE -
+            if (null == itemToCheck || null == sourceObject){
+              return false;
+            }
+
+            // Box has infinite memory.:)
+            if (itemToCheck.percentOfRemainingMemory == 0) {
+              return true;
+            }
+
+            // calculate the memory and return.
+            return itemToCheck.amountOfRemainingMemory / itemToCheck.percentOfRemainingMemory * 100 > sourceObject.requiredTotalMemory;
+          }}));
+    }
+
+    public void registerFilterforRemainingMemory(){
+      this.registerFactorFilter(FactorFilter.create("requiredRemainingMemory",
+          new FactorFilter.Filter<MockExecutorObject,MockFlowObject>() {
+        public boolean check(MockExecutorObject itemToCheck, MockFlowObject sourceObject) {
+          // REAL LOGIC COMES HERE -
+          if (null == itemToCheck || null == sourceObject){
+            return false;
+         }
+         return itemToCheck.amountOfRemainingMemory > sourceObject.requiredRemainingMemory;
+        }}));
+    }
+
+    public void registerFilterforPriority(){
+      this.registerFactorFilter(FactorFilter.create("requiredProprity",
+          new FactorFilter.Filter<MockExecutorObject,MockFlowObject>() {
+        public boolean check(MockExecutorObject itemToCheck, MockFlowObject sourceObject) {
+          // REAL LOGIC COMES HERE -
+          if (null == itemToCheck || null == sourceObject){
+            return false;
+          }
+
+          // priority value, the bigger the lower.
+          return itemToCheck.priority >= sourceObject.priority;
+        }}));
+    }
+
+    public void registerFilterforRemainingTmpSpace(){
+      this.registerFactorFilter(FactorFilter.create("requiredRemainingTmpSpace",
+          new FactorFilter.Filter<MockExecutorObject,MockFlowObject>() {
+        public boolean check(MockExecutorObject itemToCheck, MockFlowObject sourceObject) {
+          // REAL LOGIC COMES HERE -
+          if (null == itemToCheck || null == sourceObject){
+            return false;
+          }
+
+         return itemToCheck.remainingTmp > sourceObject.requiredRemainingTmpSpace;
+        }}));
+    }
+
+  }
+
+  // mock comparator class.
+  protected class MockComparator
+  extends CandidateComparator<MockExecutorObject>{
+
+    @Override
+    public String getName() {
+      return "MockComparator";
+    }
+
+    public MockComparator(){
+    }
+
+    public void registerComparerForMemory(int weight){
+      this.registerFactorComparator(FactorComparator.create("Memory", weight, new Comparator<MockExecutorObject>(){
+        public int compare(MockExecutorObject o1, MockExecutorObject o2) {
+          int result = 0 ;
+
+          // check remaining amount of memory.
+          result = o1.amountOfRemainingMemory - o2.amountOfRemainingMemory;
+          if (result != 0){
+            return result > 0 ? 1 : -1;
+          }
+
+          // check remaining % .
+          result = (int)(o1.percentOfRemainingMemory - o2.percentOfRemainingMemory);
+          return result == 0 ? 0 : result > 0 ? 1 : -1;
+
+        } }));
+    }
+
+    public void registerComparerForRemainingSpace(int weight){
+      this.registerFactorComparator(FactorComparator.create("RemainingTmp", weight, new Comparator<MockExecutorObject>(){
+        public int compare(MockExecutorObject o1, MockExecutorObject o2) {
+          int result = 0 ;
+
+          // check remaining % .
+          result = (int)(o1.remainingTmp - o2.remainingTmp);
+          return result == 0 ? 0 : result > 0 ? 1 : -1;
+
+        } }));
+    }
+
+    public void registerComparerForPriority(int weight){
+      this.registerFactorComparator(FactorComparator.create("Priority", weight, new Comparator<MockExecutorObject>(){
+        public int compare(MockExecutorObject o1, MockExecutorObject o2) {
+          int result = 0 ;
+
+          // check priority, bigger the better.
+          result = (int)(o1.priority - o2.priority);
+          return result == 0 ? 0 : result > 0 ? 1 : -1;
+
+        } }));
+    }
+  }
+
+  // test samples.
+  protected ArrayList<MockExecutorObject> executorList = new ArrayList<MockExecutorObject>();
+
+  @Before
+  public void setUp() throws Exception {
+    BasicConfigurator.configure();
+
+    executorList.clear();
+    executorList.add(new MockExecutorObject("Executor1",8080,50.0,2048,5,new Date(), 20, 6400));
+    executorList.add(new MockExecutorObject("Executor2",8080,50.0,2048,4,new Date(), 20, 6400));
+    executorList.add(new MockExecutorObject("Executor3",8080,40.0,2048,1,new Date(), 20, 6400));
+    executorList.add(new MockExecutorObject("Executor4",8080,50.0,2048,4,new Date(), 20, 6400));
+    executorList.add(new MockExecutorObject("Executor5",8080,50.0,1024,5,new Date(), 90, 6400));
+    executorList.add(new MockExecutorObject("Executor6",8080,50.0,1024,5,new Date(), 90, 3200));
+    executorList.add(new MockExecutorObject("Executor7",8080,50.0,1024,5,new Date(), 90, 3200));
+    executorList.add(new MockExecutorObject("Executor8",8080,50.0,2048,1,new Date(), 90, 3200));
+    executorList.add(new MockExecutorObject("Executor9",8080,50.0,2050,5,new Date(), 90, 4200));
+    executorList.add(new MockExecutorObject("Executor10",8080,00.0,1024,1,new Date(), 90, 3200));
+    executorList.add(new MockExecutorObject("Executor11",8080,40.0,3096,3,new Date(), 90, 2400));
+    executorList.add(new MockExecutorObject("Executor12",8080,50.0,2050,5,new Date(), 60, 7200));
+  }
+
+  @After
+  public void tearDown() throws Exception {
+  }
+
+  @Test
+  public void testExecutorFilter() throws Exception {
+
+      // mock object, remaining memory 11500, total memory 3095, remainingTmpSpace 4200, priority 2.
+      MockFlowObject  dispatchingObj = new MockFlowObject("flow1",3096, 1500,4200,2);
+
+      MockFilter mFilter = new MockFilter();
+      mFilter.registerFilterforRemainingMemory();
+
+      // expect true.
+      boolean result = mFilter.check(this.executorList.get(0), dispatchingObj);
+      /*
+       1 [main] INFO azkaban.executor.dispatcher.CandidateFilter  - start checking 'Executor1' with factor filter for 'Mockfilter'
+       1 [main] INFO azkaban.executor.dispatcher.CandidateFilter  - [Factor: requiredRemainingMemory] filter result : true
+       1 [main] INFO azkaban.executor.dispatcher.CandidateFilter  - Final checking result : true
+       * */
+      Assert.assertTrue(result);
+
+      //expect true.
+      result = mFilter.check(this.executorList.get(2), dispatchingObj);
+      /*
+      1 [main] INFO azkaban.executor.dispatcher.CandidateFilter  - start checking 'Executor3' with factor filter for 'Mockfilter'
+      2 [main] INFO azkaban.executor.dispatcher.CandidateFilter  - [Factor: requiredRemainingMemory] filter result : true
+      2 [main] INFO azkaban.executor.dispatcher.CandidateFilter  - Final checking result : true
+      */
+      Assert.assertTrue(result);
+
+      // add the priority filter.
+      mFilter.registerFilterforPriority();
+      result = mFilter.check(this.executorList.get(2), dispatchingObj);
+      // expect false, for priority.
+      /*
+      2 [main] INFO azkaban.executor.dispatcher.CandidateFilter  - start checking 'Executor3' with factor filter for 'Mockfilter'
+      2 [main] INFO azkaban.executor.dispatcher.CandidateFilter  - [Factor: requiredRemainingMemory] filter result : true
+      2 [main] INFO azkaban.executor.dispatcher.CandidateFilter  - [Factor: requiredProprity] filter result : false
+      2 [main] INFO azkaban.executor.dispatcher.CandidateFilter  - Final checking result : false
+      */
+      Assert.assertFalse(result);
+
+      // add the remaining space filter.
+      mFilter.registerFilterforRemainingTmpSpace();
+
+      // expect pass.
+      result = mFilter.check(this.executorList.get(1), dispatchingObj);
+      /*
+      3 [main] INFO azkaban.executor.dispatcher.CandidateFilter  - start checking 'Executor2' with factor filter for 'Mockfilter'
+      3 [main] INFO azkaban.executor.dispatcher.CandidateFilter  - [Factor: requiredRemainingMemory] filter result : true
+      3 [main] INFO azkaban.executor.dispatcher.CandidateFilter  - [Factor: requiredRemainingTmpSpace] filter result : true
+      3 [main] INFO azkaban.executor.dispatcher.CandidateFilter  - [Factor: requiredProprity] filter result : true
+      3 [main] INFO azkaban.executor.dispatcher.CandidateFilter  - Final checking result : true
+      */
+      Assert.assertTrue(result);
+
+      // expect false, remaining tmp, priority will also fail but the logic shortcuts when the Tmp size check Fails.
+      result = mFilter.check(this.executorList.get(7), dispatchingObj);
+      /*
+      4 [main] INFO azkaban.executor.dispatcher.CandidateFilter  - start checking 'Executor8' with factor filter for 'Mockfilter'
+      4 [main] INFO azkaban.executor.dispatcher.CandidateFilter  - [Factor: requiredRemainingMemory] filter result : true
+      4 [main] INFO azkaban.executor.dispatcher.CandidateFilter  - [Factor: requiredRemainingTmpSpace] filter result : false
+      4 [main] INFO azkaban.executor.dispatcher.CandidateFilter  - Final checking result : false
+      */
+      Assert.assertFalse(result);
+
+  }
+
+  @Test
+  public void testExecutorComparer() throws Exception {
+    MockComparator comparator = new MockComparator();
+    comparator.registerComparerForMemory(5);
+
+    MockExecutorObject nextExecutor = Collections.max(this.executorList, comparator);
+
+    // expect the first item to be selected, memory wise it is the max.
+    Assert.assertEquals(this.executorList.get(10),nextExecutor);
+
+    // add the priority factor.
+    // expect again the #9 item to be selected.
+    comparator.registerComparerForPriority(6);
+    nextExecutor = Collections.max(this.executorList, comparator);
+    /*
+      10 [main] INFO azkaban.executor.dispatcher.CandidateComparator  - start comparing 'Executor2' with 'Executor1',  total weight = 11
+      10 [main] INFO azkaban.executor.dispatcher.CandidateComparator  - [Factor: Memory] compare result : 0 (current score 0 vs 0)
+      11 [main] INFO azkaban.executor.dispatcher.CandidateComparator  - [Factor: Priority] compare result : -1 (current score 0 vs 6)
+      11 [main] INFO azkaban.executor.dispatcher.CandidateComparator  - Result : 0 vs 6
+      11 [main] INFO azkaban.executor.dispatcher.CandidateComparator  - start comparing 'Executor3' with 'Executor1',  total weight = 11
+      11 [main] INFO azkaban.executor.dispatcher.CandidateComparator  - [Factor: Memory] compare result : -1 (current score 0 vs 5)
+      11 [main] INFO azkaban.executor.dispatcher.CandidateComparator  - [Factor: Priority] compare result : -1 (current score 0 vs 11)
+      11 [main] INFO azkaban.executor.dispatcher.CandidateComparator  - Result : 0 vs 11
+      12 [main] INFO azkaban.executor.dispatcher.CandidateComparator  - start comparing 'Executor4' with 'Executor1',  total weight = 11
+      12 [main] INFO azkaban.executor.dispatcher.CandidateComparator  - [Factor: Memory] compare result : 0 (current score 0 vs 0)
+      12 [main] INFO azkaban.executor.dispatcher.CandidateComparator  - [Factor: Priority] compare result : -1 (current score 0 vs 6)
+      12 [main] INFO azkaban.executor.dispatcher.CandidateComparator  - Result : 0 vs 6
+      13 [main] INFO azkaban.executor.dispatcher.CandidateComparator  - start comparing 'Executor5' with 'Executor1',  total weight = 11
+      13 [main] INFO azkaban.executor.dispatcher.CandidateComparator  - [Factor: Memory] compare result : -1 (current score 0 vs 5)
+      13 [main] INFO azkaban.executor.dispatcher.CandidateComparator  - [Factor: Priority] compare result : 0 (current score 0 vs 5)
+      14 [main] INFO azkaban.executor.dispatcher.CandidateComparator  - Result : 0 vs 5
+      14 [main] INFO azkaban.executor.dispatcher.CandidateComparator  - start comparing 'Executor6' with 'Executor1',  total weight = 11
+      14 [main] INFO azkaban.executor.dispatcher.CandidateComparator  - [Factor: Memory] compare result : -1 (current score 0 vs 5)
+      14 [main] INFO azkaban.executor.dispatcher.CandidateComparator  - [Factor: Priority] compare result : 0 (current score 0 vs 5)
+      14 [main] INFO azkaban.executor.dispatcher.CandidateComparator  - Result : 0 vs 5
+      14 [main] INFO azkaban.executor.dispatcher.CandidateComparator  - start comparing 'Executor7' with 'Executor1',  total weight = 11
+      15 [main] INFO azkaban.executor.dispatcher.CandidateComparator  - [Factor: Memory] compare result : -1 (current score 0 vs 5)
+      15 [main] INFO azkaban.executor.dispatcher.CandidateComparator  - [Factor: Priority] compare result : 0 (current score 0 vs 5)
+      15 [main] INFO azkaban.executor.dispatcher.CandidateComparator  - Result : 0 vs 5
+      15 [main] INFO azkaban.executor.dispatcher.CandidateComparator  - start comparing 'Executor8' with 'Executor1',  total weight = 11
+      15 [main] INFO azkaban.executor.dispatcher.CandidateComparator  - [Factor: Memory] compare result : 0 (current score 0 vs 0)
+      16 [main] INFO azkaban.executor.dispatcher.CandidateComparator  - [Factor: Priority] compare result : -1 (current score 0 vs 6)
+      16 [main] INFO azkaban.executor.dispatcher.CandidateComparator  - Result : 0 vs 6
+      16 [main] INFO azkaban.executor.dispatcher.CandidateComparator  - start comparing 'Executor9' with 'Executor1',  total weight = 11
+      16 [main] INFO azkaban.executor.dispatcher.CandidateComparator  - [Factor: Memory] compare result : 1 (current score 5 vs 0)
+      16 [main] INFO azkaban.executor.dispatcher.CandidateComparator  - [Factor: Priority] compare result : 0 (current score 5 vs 0)
+      16 [main] INFO azkaban.executor.dispatcher.CandidateComparator  - Result : 5 vs 0
+      17 [main] INFO azkaban.executor.dispatcher.CandidateComparator  - start comparing 'Executor10' with 'Executor9',  total weight = 11
+      17 [main] INFO azkaban.executor.dispatcher.CandidateComparator  - [Factor: Memory] compare result : -1 (current score 0 vs 5)
+      17 [main] INFO azkaban.executor.dispatcher.CandidateComparator  - [Factor: Priority] compare result : -1 (current score 0 vs 11)
+      17 [main] INFO azkaban.executor.dispatcher.CandidateComparator  - Result : 0 vs 11
+      18 [main] INFO azkaban.executor.dispatcher.CandidateComparator  - start comparing 'Executor11' with 'Executor9',  total weight = 11
+      18 [main] INFO azkaban.executor.dispatcher.CandidateComparator  - [Factor: Memory] compare result : 1 (current score 5 vs 0)
+      18 [main] INFO azkaban.executor.dispatcher.CandidateComparator  - [Factor: Priority] compare result : -1 (current score 5 vs 6)
+      19 [main] INFO azkaban.executor.dispatcher.CandidateComparator  - Result : 5 vs 6
+      19 [main] INFO azkaban.executor.dispatcher.CandidateComparator  - start comparing 'Executor12' with 'Executor9',  total weight = 11
+      19 [main] INFO azkaban.executor.dispatcher.CandidateComparator  - [Factor: Memory] compare result : 0 (current score 0 vs 0)
+      19 [main] INFO azkaban.executor.dispatcher.CandidateComparator  - [Factor: Priority] compare result : 0 (current score 0 vs 0)
+      20 [main] INFO azkaban.executor.dispatcher.CandidateComparator  - Result : 0 vs 0
+     * */
+    Assert.assertEquals(this.executorList.get(8),nextExecutor);
+
+    // add the remaining space factor.
+    // expect the #12 item to be returned.
+    comparator.registerComparerForRemainingSpace(3);
+    nextExecutor = Collections.max(this.executorList, comparator);
+    /*
+      21 [main] INFO azkaban.executor.dispatcher.CandidateComparator  - start comparing 'Executor2' with 'Executor1',  total weight = 14
+      22 [main] INFO azkaban.executor.dispatcher.CandidateComparator  - [Factor: Memory] compare result : 0 (current score 0 vs 0)
+      22 [main] INFO azkaban.executor.dispatcher.CandidateComparator  - [Factor: Priority] compare result : -1 (current score 0 vs 6)
+      22 [main] INFO azkaban.executor.dispatcher.CandidateComparator  - [Factor: RemainingTmp] compare result : 0 (current score 0 vs 6)
+      22 [main] INFO azkaban.executor.dispatcher.CandidateComparator  - Result : 0 vs 6
+      22 [main] INFO azkaban.executor.dispatcher.CandidateComparator  - start comparing 'Executor3' with 'Executor1',  total weight = 14
+      23 [main] INFO azkaban.executor.dispatcher.CandidateComparator  - [Factor: Memory] compare result : -1 (current score 0 vs 5)
+      23 [main] INFO azkaban.executor.dispatcher.CandidateComparator  - [Factor: Priority] compare result : -1 (current score 0 vs 11)
+      23 [main] INFO azkaban.executor.dispatcher.CandidateComparator  - [Factor: RemainingTmp] compare result : 0 (current score 0 vs 11)
+      23 [main] INFO azkaban.executor.dispatcher.CandidateComparator  - Result : 0 vs 11
+      24 [main] INFO azkaban.executor.dispatcher.CandidateComparator  - start comparing 'Executor4' with 'Executor1',  total weight = 14
+      24 [main] INFO azkaban.executor.dispatcher.CandidateComparator  - [Factor: Memory] compare result : 0 (current score 0 vs 0)
+      24 [main] INFO azkaban.executor.dispatcher.CandidateComparator  - [Factor: Priority] compare result : -1 (current score 0 vs 6)
+      25 [main] INFO azkaban.executor.dispatcher.CandidateComparator  - [Factor: RemainingTmp] compare result : 0 (current score 0 vs 6)
+      25 [main] INFO azkaban.executor.dispatcher.CandidateComparator  - Result : 0 vs 6
+      25 [main] INFO azkaban.executor.dispatcher.CandidateComparator  - start comparing 'Executor5' with 'Executor1',  total weight = 14
+      25 [main] INFO azkaban.executor.dispatcher.CandidateComparator  - [Factor: Memory] compare result : -1 (current score 0 vs 5)
+      25 [main] INFO azkaban.executor.dispatcher.CandidateComparator  - [Factor: Priority] compare result : 0 (current score 0 vs 5)
+      26 [main] INFO azkaban.executor.dispatcher.CandidateComparator  - [Factor: RemainingTmp] compare result : 0 (current score 0 vs 5)
+      26 [main] INFO azkaban.executor.dispatcher.CandidateComparator  - Result : 0 vs 5
+      26 [main] INFO azkaban.executor.dispatcher.CandidateComparator  - start comparing 'Executor6' with 'Executor1',  total weight = 14
+      26 [main] INFO azkaban.executor.dispatcher.CandidateComparator  - [Factor: Memory] compare result : -1 (current score 0 vs 5)
+      27 [main] INFO azkaban.executor.dispatcher.CandidateComparator  - [Factor: Priority] compare result : 0 (current score 0 vs 5)
+      27 [main] INFO azkaban.executor.dispatcher.CandidateComparator  - [Factor: RemainingTmp] compare result : -1 (current score 0 vs 8)
+      27 [main] INFO azkaban.executor.dispatcher.CandidateComparator  - Result : 0 vs 8
+      27 [main] INFO azkaban.executor.dispatcher.CandidateComparator  - start comparing 'Executor7' with 'Executor1',  total weight = 14
+      27 [main] INFO azkaban.executor.dispatcher.CandidateComparator  - [Factor: Memory] compare result : -1 (current score 0 vs 5)
+      28 [main] INFO azkaban.executor.dispatcher.CandidateComparator  - [Factor: Priority] compare result : 0 (current score 0 vs 5)
+      28 [main] INFO azkaban.executor.dispatcher.CandidateComparator  - [Factor: RemainingTmp] compare result : -1 (current score 0 vs 8)
+      28 [main] INFO azkaban.executor.dispatcher.CandidateComparator  - Result : 0 vs 8
+      28 [main] INFO azkaban.executor.dispatcher.CandidateComparator  - start comparing 'Executor8' with 'Executor1',  total weight = 14
+      29 [main] INFO azkaban.executor.dispatcher.CandidateComparator  - [Factor: Memory] compare result : 0 (current score 0 vs 0)
+      29 [main] INFO azkaban.executor.dispatcher.CandidateComparator  - [Factor: Priority] compare result : -1 (current score 0 vs 6)
+      29 [main] INFO azkaban.executor.dispatcher.CandidateComparator  - [Factor: RemainingTmp] compare result : -1 (current score 0 vs 9)
+      29 [main] INFO azkaban.executor.dispatcher.CandidateComparator  - Result : 0 vs 9
+      30 [main] INFO azkaban.executor.dispatcher.CandidateComparator  - start comparing 'Executor9' with 'Executor1',  total weight = 14
+      30 [main] INFO azkaban.executor.dispatcher.CandidateComparator  - [Factor: Memory] compare result : 1 (current score 5 vs 0)
+      30 [main] INFO azkaban.executor.dispatcher.CandidateComparator  - [Factor: Priority] compare result : 0 (current score 5 vs 0)
+      30 [main] INFO azkaban.executor.dispatcher.CandidateComparator  - [Factor: RemainingTmp] compare result : -1 (current score 5 vs 3)
+      30 [main] INFO azkaban.executor.dispatcher.CandidateComparator  - Result : 5 vs 3
+      31 [main] INFO azkaban.executor.dispatcher.CandidateComparator  - start comparing 'Executor10' with 'Executor9',  total weight = 14
+      31 [main] INFO azkaban.executor.dispatcher.CandidateComparator  - [Factor: Memory] compare result : -1 (current score 0 vs 5)
+      31 [main] INFO azkaban.executor.dispatcher.CandidateComparator  - [Factor: Priority] compare result : -1 (current score 0 vs 11)
+      32 [main] INFO azkaban.executor.dispatcher.CandidateComparator  - [Factor: RemainingTmp] compare result : -1 (current score 0 vs 14)
+      32 [main] INFO azkaban.executor.dispatcher.CandidateComparator  - Result : 0 vs 14
+      32 [main] INFO azkaban.executor.dispatcher.CandidateComparator  - start comparing 'Executor11' with 'Executor9',  total weight = 14
+      32 [main] INFO azkaban.executor.dispatcher.CandidateComparator  - [Factor: Memory] compare result : 1 (current score 5 vs 0)
+      32 [main] INFO azkaban.executor.dispatcher.CandidateComparator  - [Factor: Priority] compare result : -1 (current score 5 vs 6)
+      33 [main] INFO azkaban.executor.dispatcher.CandidateComparator  - [Factor: RemainingTmp] compare result : -1 (current score 5 vs 9)
+      33 [main] INFO azkaban.executor.dispatcher.CandidateComparator  - Result : 5 vs 9
+      33 [main] INFO azkaban.executor.dispatcher.CandidateComparator  - start comparing 'Executor12' with 'Executor9',  total weight = 14
+      33 [main] INFO azkaban.executor.dispatcher.CandidateComparator  - [Factor: Memory] compare result : 0 (current score 0 vs 0)
+      34 [main] INFO azkaban.executor.dispatcher.CandidateComparator  - [Factor: Priority] compare result : 0 (current score 0 vs 0)
+      34 [main] INFO azkaban.executor.dispatcher.CandidateComparator  - [Factor: RemainingTmp] compare result : 1 (current score 3 vs 0)
+      34 [main] INFO azkaban.executor.dispatcher.CandidateComparator  - Result : 3 vs 0
+     * */
+    Assert.assertEquals(this.executorList.get(11),nextExecutor);
+  }
+
+  @Test
+  public void testDispatcher() throws Exception {
+    MockFilter filter = new MockFilter();
+    MockComparator comparator = new MockComparator();
+
+    filter.registerFilterforPriority();
+    filter.registerFilterforRemainingMemory();
+    filter.registerFilterforRemainingTmpSpace();
+    filter.registerFilterforTotalMemory();
+
+    comparator.registerComparerForMemory(3);
+    comparator.registerComparerForPriority(5);
+    comparator.registerComparerForRemainingSpace(3);
+
+    ExecutorDispatcher<MockExecutorObject,MockFlowObject> morkDispatcher =
+        new ExecutorDispatcher<MockExecutorObject,MockFlowObject>(filter,comparator);
+
+    // mock object, remaining memory 11500, total memory 3095, remainingTmpSpace 4200, priority 2.
+    MockFlowObject  dispatchingObj = new MockFlowObject("flow1",3096, 1500,4200,2);
+
+    // expected selection = #12
+    MockExecutorObject nextExecutor = morkDispatcher.getNext(this.executorList, dispatchingObj);
+    /*
+     *  9 [main] INFO azkaban.executor.dispatcher.CandidateComparator  - start candidate selection logic.
+        9 [main] INFO azkaban.executor.dispatcher.CandidateComparator  - candidate count before filtering: 12
+        10 [main] INFO azkaban.executor.dispatcher.CandidateFilter  - start checking 'Executor1' with factor filter for 'Mockfilter'
+        10 [main] INFO azkaban.executor.dispatcher.CandidateFilter  - [Factor: requiredRemainingMemory] filter result : true
+        10 [main] INFO azkaban.executor.dispatcher.CandidateFilter  - [Factor: requiredTotalMemory] filter result : true
+        10 [main] INFO azkaban.executor.dispatcher.CandidateFilter  - [Factor: requiredRemainingTmpSpace] filter result : true
+        11 [main] INFO azkaban.executor.dispatcher.CandidateFilter  - [Factor: requiredProprity] filter result : true
+        11 [main] INFO azkaban.executor.dispatcher.CandidateFilter  - Final checking result : true
+        11 [main] INFO azkaban.executor.dispatcher.CandidateFilter  - start checking 'Executor2' with factor filter for 'Mockfilter'
+        11 [main] INFO azkaban.executor.dispatcher.CandidateFilter  - [Factor: requiredRemainingMemory] filter result : true
+        11 [main] INFO azkaban.executor.dispatcher.CandidateFilter  - [Factor: requiredTotalMemory] filter result : true
+        12 [main] INFO azkaban.executor.dispatcher.CandidateFilter  - [Factor: requiredRemainingTmpSpace] filter result : true
+        12 [main] INFO azkaban.executor.dispatcher.CandidateFilter  - [Factor: requiredProprity] filter result : true
+        12 [main] INFO azkaban.executor.dispatcher.CandidateFilter  - Final checking result : true
+        12 [main] INFO azkaban.executor.dispatcher.CandidateFilter  - start checking 'Executor3' with factor filter for 'Mockfilter'
+        12 [main] INFO azkaban.executor.dispatcher.CandidateFilter  - [Factor: requiredRemainingMemory] filter result : true
+        13 [main] INFO azkaban.executor.dispatcher.CandidateFilter  - [Factor: requiredTotalMemory] filter result : true
+        13 [main] INFO azkaban.executor.dispatcher.CandidateFilter  - [Factor: requiredRemainingTmpSpace] filter result : true
+        13 [main] INFO azkaban.executor.dispatcher.CandidateFilter  - [Factor: requiredProprity] filter result : false
+        13 [main] INFO azkaban.executor.dispatcher.CandidateFilter  - Final checking result : false
+        13 [main] INFO azkaban.executor.dispatcher.CandidateFilter  - start checking 'Executor4' with factor filter for 'Mockfilter'
+        13 [main] INFO azkaban.executor.dispatcher.CandidateFilter  - [Factor: requiredRemainingMemory] filter result : true
+        14 [main] INFO azkaban.executor.dispatcher.CandidateFilter  - [Factor: requiredTotalMemory] filter result : true
+        14 [main] INFO azkaban.executor.dispatcher.CandidateFilter  - [Factor: requiredRemainingTmpSpace] filter result : true
+        14 [main] INFO azkaban.executor.dispatcher.CandidateFilter  - [Factor: requiredProprity] filter result : true
+        14 [main] INFO azkaban.executor.dispatcher.CandidateFilter  - Final checking result : true
+        14 [main] INFO azkaban.executor.dispatcher.CandidateFilter  - start checking 'Executor5' with factor filter for 'Mockfilter'
+        15 [main] INFO azkaban.executor.dispatcher.CandidateFilter  - [Factor: requiredRemainingMemory] filter result : false
+        15 [main] INFO azkaban.executor.dispatcher.CandidateFilter  - Final checking result : false
+        15 [main] INFO azkaban.executor.dispatcher.CandidateFilter  - start checking 'Executor6' with factor filter for 'Mockfilter'
+        15 [main] INFO azkaban.executor.dispatcher.CandidateFilter  - [Factor: requiredRemainingMemory] filter result : false
+        15 [main] INFO azkaban.executor.dispatcher.CandidateFilter  - Final checking result : false
+        15 [main] INFO azkaban.executor.dispatcher.CandidateFilter  - start checking 'Executor7' with factor filter for 'Mockfilter'
+        16 [main] INFO azkaban.executor.dispatcher.CandidateFilter  - [Factor: requiredRemainingMemory] filter result : false
+        16 [main] INFO azkaban.executor.dispatcher.CandidateFilter  - Final checking result : false
+        16 [main] INFO azkaban.executor.dispatcher.CandidateFilter  - start checking 'Executor8' with factor filter for 'Mockfilter'
+        16 [main] INFO azkaban.executor.dispatcher.CandidateFilter  - [Factor: requiredRemainingMemory] filter result : true
+        16 [main] INFO azkaban.executor.dispatcher.CandidateFilter  - [Factor: requiredTotalMemory] filter result : true
+        17 [main] INFO azkaban.executor.dispatcher.CandidateFilter  - [Factor: requiredRemainingTmpSpace] filter result : false
+        17 [main] INFO azkaban.executor.dispatcher.CandidateFilter  - Final checking result : false
+        17 [main] INFO azkaban.executor.dispatcher.CandidateFilter  - start checking 'Executor9' with factor filter for 'Mockfilter'
+        17 [main] INFO azkaban.executor.dispatcher.CandidateFilter  - [Factor: requiredRemainingMemory] filter result : true
+        17 [main] INFO azkaban.executor.dispatcher.CandidateFilter  - [Factor: requiredTotalMemory] filter result : true
+        17 [main] INFO azkaban.executor.dispatcher.CandidateFilter  - [Factor: requiredRemainingTmpSpace] filter result : false
+        18 [main] INFO azkaban.executor.dispatcher.CandidateFilter  - Final checking result : false
+        18 [main] INFO azkaban.executor.dispatcher.CandidateFilter  - start checking 'Executor10' with factor filter for 'Mockfilter'
+        18 [main] INFO azkaban.executor.dispatcher.CandidateFilter  - [Factor: requiredRemainingMemory] filter result : false
+        18 [main] INFO azkaban.executor.dispatcher.CandidateFilter  - Final checking result : false
+        18 [main] INFO azkaban.executor.dispatcher.CandidateFilter  - start checking 'Executor11' with factor filter for 'Mockfilter'
+        18 [main] INFO azkaban.executor.dispatcher.CandidateFilter  - [Factor: requiredRemainingMemory] filter result : true
+        18 [main] INFO azkaban.executor.dispatcher.CandidateFilter  - [Factor: requiredTotalMemory] filter result : true
+        18 [main] INFO azkaban.executor.dispatcher.CandidateFilter  - [Factor: requiredRemainingTmpSpace] filter result : false
+        19 [main] INFO azkaban.executor.dispatcher.CandidateFilter  - Final checking result : false
+        19 [main] INFO azkaban.executor.dispatcher.CandidateFilter  - start checking 'Executor12' with factor filter for 'Mockfilter'
+        19 [main] INFO azkaban.executor.dispatcher.CandidateFilter  - [Factor: requiredRemainingMemory] filter result : true
+        19 [main] INFO azkaban.executor.dispatcher.CandidateFilter  - [Factor: requiredTotalMemory] filter result : true
+        19 [main] INFO azkaban.executor.dispatcher.CandidateFilter  - [Factor: requiredRemainingTmpSpace] filter result : true
+        19 [main] INFO azkaban.executor.dispatcher.CandidateFilter  - [Factor: requiredProprity] filter result : true
+        19 [main] INFO azkaban.executor.dispatcher.CandidateFilter  - Final checking result : true
+        20 [main] INFO azkaban.executor.dispatcher.CandidateComparator  - candidate count after filtering: 4
+        20 [main] INFO azkaban.executor.dispatcher.CandidateComparator  - start comparing 'Executor2' with 'Executor1',  total weight = 11
+        20 [main] INFO azkaban.executor.dispatcher.CandidateComparator  - [Factor: Memory] compare result : 0 (current score 0 vs 0)
+        20 [main] INFO azkaban.executor.dispatcher.CandidateComparator  - [Factor: Priority] compare result : -1 (current score 0 vs 5)
+        21 [main] INFO azkaban.executor.dispatcher.CandidateComparator  - [Factor: RemainingTmp] compare result : 0 (current score 0 vs 5)
+        21 [main] INFO azkaban.executor.dispatcher.CandidateComparator  - Result : 0 vs 5
+        21 [main] INFO azkaban.executor.dispatcher.CandidateComparator  - start comparing 'Executor4' with 'Executor1',  total weight = 11
+        22 [main] INFO azkaban.executor.dispatcher.CandidateComparator  - [Factor: Memory] compare result : 0 (current score 0 vs 0)
+        22 [main] INFO azkaban.executor.dispatcher.CandidateComparator  - [Factor: Priority] compare result : -1 (current score 0 vs 5)
+        22 [main] INFO azkaban.executor.dispatcher.CandidateComparator  - [Factor: RemainingTmp] compare result : 0 (current score 0 vs 5)
+        22 [main] INFO azkaban.executor.dispatcher.CandidateComparator  - Result : 0 vs 5
+        22 [main] INFO azkaban.executor.dispatcher.CandidateComparator  - start comparing 'Executor12' with 'Executor1',  total weight = 11
+        23 [main] INFO azkaban.executor.dispatcher.CandidateComparator  - [Factor: Memory] compare result : 1 (current score 3 vs 0)
+        23 [main] INFO azkaban.executor.dispatcher.CandidateComparator  - [Factor: Priority] compare result : 0 (current score 3 vs 0)
+        23 [main] INFO azkaban.executor.dispatcher.CandidateComparator  - [Factor: RemainingTmp] compare result : 1 (current score 6 vs 0)
+        23 [main] INFO azkaban.executor.dispatcher.CandidateComparator  - Result : 6 vs 0
+        23 [main] INFO azkaban.executor.dispatcher.CandidateComparator  - candidate selected Executor12
+     */
+    Assert.assertEquals( this.executorList.get(11),nextExecutor);
+
+   // remaining memory 11500, total memory 3095, remainingTmpSpace 14200, priority 2.
+   dispatchingObj = new MockFlowObject("flow1",3096, 1500,14200,2);
+   // all candidates should be filtered by the remaining memory.
+   nextExecutor = morkDispatcher.getNext(this.executorList, dispatchingObj);
+   Assert.assertEquals(null,nextExecutor);
+  }
+
+  @Test
+  public void testDispatcherChangingFactorWeight() throws Exception {
+    MockFilter filter = new MockFilter();
+    MockComparator comparator = new MockComparator();
+
+    filter.registerFilterforPriority();
+    filter.registerFilterforRemainingMemory();
+    filter.registerFilterforRemainingTmpSpace();
+    filter.registerFilterforTotalMemory();
+
+    comparator.registerComparerForMemory(1);
+    comparator.registerComparerForPriority(1);
+    comparator.registerComparerForRemainingSpace(1);
+
+    ExecutorDispatcher<MockExecutorObject,MockFlowObject> morkDispatcher =
+        new ExecutorDispatcher<MockExecutorObject,MockFlowObject>(filter,comparator);
+
+    MockFlowObject  dispatchingObj = new MockFlowObject("flow1",100, 100,100,3);
+    MockExecutorObject executor = morkDispatcher.getNext(this.executorList, dispatchingObj);
+    Assert.assertEquals(this.executorList.get(11), executor);
+
+    // adjusted the weight for memory to 10, therefore item #11 should be returned.
+    morkDispatcher.getComparator().adjustFactorWeight("Memory", 10);
+    executor = morkDispatcher.getNext(this.executorList, dispatchingObj);
+    Assert.assertEquals(this.executorList.get(10), executor);
+    
+    // adjusted the weight for memory back to 1, therefore item #12 should be returned.
+    morkDispatcher.getComparator().adjustFactorWeight("Memory", 1);
+    executor = morkDispatcher.getNext(this.executorList, dispatchingObj);
+    Assert.assertEquals(this.executorList.get(11), executor);
+
+  }
+}