azkaban-developers

Details

diff --git a/azkaban-common/src/main/java/azkaban/executor/Executor.java b/azkaban-common/src/main/java/azkaban/executor/Executor.java
index 31070bf..4153270 100644
--- a/azkaban-common/src/main/java/azkaban/executor/Executor.java
+++ b/azkaban-common/src/main/java/azkaban/executor/Executor.java
@@ -16,6 +16,10 @@
 
 package azkaban.executor;
 
+import java.util.Date;
+import java.util.HashMap;
+import java.util.Map;
+
 import azkaban.utils.Utils;
 
 /**
@@ -28,8 +32,9 @@ public class Executor {
   private final String host;
   private final int port;
   private boolean isActive;
-
-  // TODO: ExecutorStats to be added
+  // cached copy of the latest statistics from  the executor.
+  private Statistics cachedExecutorStats;
+  private Date lastUpdated;
 
   /**
    * <pre>
@@ -88,6 +93,13 @@ public class Executor {
     return true;
   }
 
+  @Override
+  public String toString(){
+    return String.format("%s (id: %s)",
+        null == this.host || this.host.length() == 0 ? "(empty)" : this.host,
+        this.id);
+  }
+
   public String getHost() {
     return host;
   }
@@ -104,6 +116,24 @@ public class Executor {
     return id;
   }
 
+  public Statistics getExecutorStats() {
+    return this.cachedExecutorStats;
+  }
+
+  public void setExecutorStats(Statistics stats) {
+    this.cachedExecutorStats = stats;
+    this.lastUpdated = new Date();
+  }
+
+  /**
+   * Gets the timestamp when the stats are last updated.
+   * @return date object represents the timestamp, null if the stats of this
+   *         specific object is never refreshed.
+   * */
+  public Date getLastStatsUpdatedTime(){
+    return this.lastUpdated;
+  }
+
   public void setActive(boolean isActive) {
     this.isActive = isActive;
   }
diff --git a/azkaban-common/src/main/java/azkaban/executor/selector/CandidateComparator.java b/azkaban-common/src/main/java/azkaban/executor/selector/CandidateComparator.java
index 8234f7d..8ef2c76 100644
--- a/azkaban-common/src/main/java/azkaban/executor/selector/CandidateComparator.java
+++ b/azkaban-common/src/main/java/azkaban/executor/selector/CandidateComparator.java
@@ -31,7 +31,7 @@ import azkaban.utils.Pair;
  *
  */
 public abstract class CandidateComparator<T> implements Comparator<T> {
-  private static Logger logger = Logger.getLogger(CandidateComparator.class);
+  protected static Logger logger = Logger.getLogger(CandidateComparator.class);
 
   // internal repository of the registered comparators .
   private Map<String,FactorComparator<T>> factorComparatorList =
diff --git a/azkaban-common/src/main/java/azkaban/executor/selector/CandidateFilter.java b/azkaban-common/src/main/java/azkaban/executor/selector/CandidateFilter.java
index 154d528..94b8dfa 100644
--- a/azkaban-common/src/main/java/azkaban/executor/selector/CandidateFilter.java
+++ b/azkaban-common/src/main/java/azkaban/executor/selector/CandidateFilter.java
@@ -28,14 +28,14 @@ import org.apache.log4j.Logger;
  *  register filters using the provided register function.
  */
 public abstract class CandidateFilter<T,V>  {
-  private static Logger logger = Logger.getLogger(CandidateFilter.class);
+  protected static Logger logger = Logger.getLogger(CandidateFilter.class);
 
   // internal repository of the registered filters .
   private Map<String,FactorFilter<T,V>> factorFilterList =
       new ConcurrentHashMap<String,FactorFilter<T,V>>();
 
   /** gets the name of the current implementation of the candidate filter.
-   * @returns : name of the filter.
+   * @return : name of the filter.
    * */
   public abstract String getName();
 
diff --git a/azkaban-common/src/main/java/azkaban/executor/selector/CandidateSelector.java b/azkaban-common/src/main/java/azkaban/executor/selector/CandidateSelector.java
index 0598d95..55c0864 100644
--- a/azkaban-common/src/main/java/azkaban/executor/selector/CandidateSelector.java
+++ b/azkaban-common/src/main/java/azkaban/executor/selector/CandidateSelector.java
@@ -74,6 +74,10 @@ public class CandidateSelector<K,V> implements Selector<K, V> {
        return null;
      }
 
+     if (null == comparator){
+       logger.info("candidate comparator is not specified, default comparator from 'Collections' class will be used");
+     }
+
      // final work - find the best candidate from the filtered list.
      K executor = Collections.max(filteredList,comparator);
      logger.info(String.format("candidate selected %s",
diff --git a/azkaban-common/src/main/java/azkaban/executor/selector/ExecutorComparator.java b/azkaban-common/src/main/java/azkaban/executor/selector/ExecutorComparator.java
new file mode 100644
index 0000000..f0c46cd
--- /dev/null
+++ b/azkaban-common/src/main/java/azkaban/executor/selector/ExecutorComparator.java
@@ -0,0 +1,281 @@
+/*
+ * Copyright 2015 LinkedIn Corp.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package azkaban.executor.selector;
+
+import java.util.Comparator;
+import java.util.Date;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Set;
+
+import azkaban.executor.Executor;
+import azkaban.executor.Statistics;
+
+public class ExecutorComparator extends CandidateComparator<Executor> {
+  private static Map<String, ComparatorCreator> comparatorCreatorRepository = null;
+
+  /**
+   * Gets the name list of all available comparators.
+   * @return the list of the names.
+   * */
+  public static Set<String> getAvailableComparatorNames(){
+    return comparatorCreatorRepository.keySet();
+  }
+
+  // factor comparator names
+  private static final String REMAININGFLOWSIZE_COMPARATOR_NAME = "RemainingFlowSize";
+  private static final String MEMORY_COMPARATOR_NAME = "Memory";
+  private static final String REMAININGTMPSIZE_COMPARATOR_NAME = "RemainingTmpSize";
+  private static final String PRIORITY_COMPARATOR_NAME = "Priority";
+  private static final String LSTDISPATCHED_COMPARATOR_NAME = "LastDispatched";
+
+  /**
+   * static initializer of the class.
+   * We will build the filter repository here.
+   * when a new comparator is added, please do remember to register it here.
+   * */
+  static {
+    comparatorCreatorRepository = new HashMap<String, ComparatorCreator>();
+
+    // register the creator for remaining flow size comparator.
+    comparatorCreatorRepository.put(REMAININGFLOWSIZE_COMPARATOR_NAME, new ComparatorCreator(){
+      @Override public FactorComparator<Executor> create(int weight) { return getRemainingFlowSizeComparator(weight); }});
+
+    // register the creator for memory comparator.
+    comparatorCreatorRepository.put(MEMORY_COMPARATOR_NAME, new ComparatorCreator(){
+      @Override public FactorComparator<Executor> create(int weight) { return getMemoryComparator(weight); }});
+
+    // register the creator for priority comparator.
+    comparatorCreatorRepository.put(PRIORITY_COMPARATOR_NAME, new ComparatorCreator(){
+      @Override public FactorComparator<Executor> create(int weight) { return getPriorityComparator(weight); }});
+
+    // register the creator for remaining TMP size comparator.
+    comparatorCreatorRepository.put(PRIORITY_COMPARATOR_NAME, new ComparatorCreator(){
+      @Override public FactorComparator<Executor> create(int weight) { return getRemainingTmpSizeComparator(weight); }});
+
+    // register the creator for last dispatched time comparator.
+    comparatorCreatorRepository.put(LSTDISPATCHED_COMPARATOR_NAME, new ComparatorCreator(){
+      @Override public FactorComparator<Executor> create(int weight) { return getLstDispatchedTimeComparator(weight); }});
+  }
+
+
+  /**
+   * constructor of the ExecutorComparator.
+   * @param comparatorList   the list of comparator, plus its weight information to be registered,
+   *  the parameter must be a not-empty and valid list object.
+   * */
+  public ExecutorComparator(Map<String,Integer> comparatorList) {
+    if (null == comparatorList|| comparatorList.size() == 0){
+      logger.error("failed to initialize executor comparator as the passed comparator list is invalid or empty.");
+      throw new IllegalArgumentException("filterList");
+    }
+
+    // register the comparators, we will now throw here if the weight is invalid, it is handled in the super.
+    for (Entry<String,Integer> entry : comparatorList.entrySet()){
+      if (comparatorCreatorRepository.containsKey(entry.getKey())){
+        this.registerFactorComparator(comparatorCreatorRepository.
+            get(entry.getKey()).
+            create(entry.getValue()));
+      } else {
+        logger.error(String.format("failed to initialize executor comparator as the comparator implementation for requested factor '%s' doesn't exist.",
+            entry.getKey()));
+        throw new IllegalArgumentException("comparatorList");
+      }
+    }
+  }
+
+  @Override
+  public String getName() {
+    return "ExecutorComparator";
+  }
+
+  private interface ComparatorCreator{
+    FactorComparator<Executor> create(int weight);
+  }
+
+  /**
+   * helper function that does the object  on two stats, comparator can leverage this function to provide
+   * shortcuts if   the stat object is missing from one or both sides of the executors.
+   * @param stat1   the first stat object to be checked .
+   * @param stat2   the second stat object to be checked.
+   * @param caller  the name of the calling function, for logging purpose.
+   * @param result  result Integer to pass out the result in case the stats are not both valid.
+   * @return true if the passed stats are NOT both valid, a shortcut can be made (caller can consume the result),
+   *         false otherwise.
+   * */
+  private static boolean statsObjectCheck(Statistics stat1, Statistics stat2, String caller, Integer result){
+    result = 0 ;
+    // both doesn't expose the info
+    if (null == stat1 && null == stat2){
+      logger.info(String.format("%s : neither of the executors exposed stats info.",
+          caller));
+      return true;
+    }
+
+    //right side doesn't expose the info.
+    if (null == stat2 ){
+        logger.info(String.format("%s : choosing left side and the right side executor doesn't expose stat info",
+            caller));
+        result = 1;
+        return true;
+    }
+
+    //left side doesn't expose the info.
+    if (null == stat1 ){
+      logger.info(String.format("%s : choosing right side and the left side executor doesn't expose stat info",
+          caller));
+      result = -1;
+      return true;
+      }
+
+    // both not null
+    return false;
+  }
+
+  /**
+   * function defines the remaining flow size comparator.
+   * @param weight weight of the comparator.
+   * */
+  private static FactorComparator<Executor> getRemainingFlowSizeComparator(int weight){
+    return FactorComparator.create(REMAININGFLOWSIZE_COMPARATOR_NAME, weight, new Comparator<Executor>(){
+
+      @Override
+      public int compare(Executor o1, Executor o2) {
+        Statistics stat1 = o1.getExecutorStats();
+        Statistics stat2 = o2.getExecutorStats();
+
+        Integer result = 0;
+        if (statsObjectCheck(stat1,stat2,REMAININGFLOWSIZE_COMPARATOR_NAME,result)){
+          return result;
+        }
+        return ((Integer)stat1.getRemainingFlowCapacity()).compareTo(stat2.getRemainingFlowCapacity());
+      }});
+  }
+
+  /**
+   * function defines the remaining folder size comparator.
+   * @param weight weight of the comparator.
+   * */
+  private static FactorComparator<Executor> getRemainingTmpSizeComparator(int weight){
+    return FactorComparator.create(REMAININGTMPSIZE_COMPARATOR_NAME, weight, new Comparator<Executor>(){
+
+      @Override
+      public int compare(Executor o1, Executor o2) {
+        Statistics stat1 = o1.getExecutorStats();
+        Statistics stat2 = o2.getExecutorStats();
+
+        Integer result = 0;
+        if (statsObjectCheck(stat1,stat2,REMAININGTMPSIZE_COMPARATOR_NAME,result)){
+          return result;
+        }
+        return ((Long)stat1.getRemainingStorage()).compareTo(stat2.getRemainingStorage());
+      }});
+  }
+
+  /**
+   * function defines the priority comparator.
+   * @param weight weight of the comparator.
+   * @return
+   * */
+  private static FactorComparator<Executor> getPriorityComparator(int weight){
+    return FactorComparator.create(PRIORITY_COMPARATOR_NAME, weight, new Comparator<Executor>(){
+
+      @Override
+      public int compare(Executor o1, Executor o2) {
+        Statistics stat1 = o1.getExecutorStats();
+        Statistics stat2 = o2.getExecutorStats();
+
+        Integer result = 0;
+        if (statsObjectCheck(stat1,stat2,PRIORITY_COMPARATOR_NAME,result)){
+          return result;
+        }
+        return ((Integer)stat1.getPriority()).compareTo(stat2.getPriority());
+      }});
+  }
+
+
+  /**
+   * function defines the last dispatched time comparator.
+   * @param weight weight of the comparator.
+   * @return
+   * */
+  private static FactorComparator<Executor> getLstDispatchedTimeComparator(int weight){
+    return FactorComparator.create(LSTDISPATCHED_COMPARATOR_NAME, weight, new Comparator<Executor>(){
+
+      @Override
+      public int compare(Executor o1, Executor o2) {
+        Statistics stat1 = o1.getExecutorStats();
+        Statistics stat2 = o2.getExecutorStats();
+
+        Integer result = 0;
+        if (statsObjectCheck(stat1,stat2,LSTDISPATCHED_COMPARATOR_NAME,result)){
+          return result;
+        }
+
+        if (null == stat1.getLastDispatchedTime() && null == stat1.getLastDispatchedTime()){
+          logger.info(String.format("%s : stats from both side doesn't contain last dispatched time info.",
+              LSTDISPATCHED_COMPARATOR_NAME));
+          return 0;
+        }
+
+        if (null == stat2.getLastDispatchedTime()){
+          logger.info(String.format("%s : choosing left side as right doesn't contain last dispatched time info.",
+              LSTDISPATCHED_COMPARATOR_NAME));
+          return 1;
+        }
+
+        if (null == stat1.getLastDispatchedTime()){
+          logger.info(String.format("%s : choosing rigth side as left doesn't contain last dispatched time info.",
+              LSTDISPATCHED_COMPARATOR_NAME));
+          return -1;
+        }
+
+        // Note: an earlier date time indicates higher weight.
+        return ((Date)stat2.getLastDispatchedTime()).compareTo(stat1.getLastDispatchedTime());
+      }});
+  }
+
+
+  /**
+   * function defines the Memory comparator.
+   * @param weight weight of the comparator.
+   * Note: comparator firstly take the absolute value of the remaining memory, if both sides have the same value,
+   *       it go further to check the percent of the remaining memory.
+   * @return
+   * */
+  private static FactorComparator<Executor> getMemoryComparator(int weight){
+    return FactorComparator.create(MEMORY_COMPARATOR_NAME, weight, new Comparator<Executor>(){
+
+      @Override
+      public int compare(Executor o1, Executor o2) {
+       Statistics stat1 = o1.getExecutorStats();
+       Statistics stat2 = o2.getExecutorStats();
+
+       Integer result = 0;
+       if (statsObjectCheck(stat1,stat2,MEMORY_COMPARATOR_NAME,result)){
+         return result;
+       }
+
+       if (stat1.getRemainingMemory() != stat2.getRemainingMemory()){
+         return stat1.getRemainingMemory() > stat2.getRemainingMemory() ? 1:-1;
+       }
+
+       return Double.compare(stat1.getRemainingMemoryPercent(), stat2.getRemainingMemoryPercent());
+      }});
+  }
+}
diff --git a/azkaban-common/src/main/java/azkaban/executor/selector/ExecutorFilter.java b/azkaban-common/src/main/java/azkaban/executor/selector/ExecutorFilter.java
new file mode 100644
index 0000000..0d61aba
--- /dev/null
+++ b/azkaban-common/src/main/java/azkaban/executor/selector/ExecutorFilter.java
@@ -0,0 +1,111 @@
+/*
+ * Copyright 2015 LinkedIn Corp.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package azkaban.executor.selector;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import azkaban.executor.ExecutableFlow;
+import azkaban.executor.Executor;
+import azkaban.executor.Statistics;
+
+public final class ExecutorFilter extends CandidateFilter<Executor, ExecutableFlow> {
+  private static Map<String, FactorFilter<Executor, ExecutableFlow>> filterRepository = null;
+
+  /**
+   * Gets the name list of all available filters.
+   * @return the list of the names.
+   * */
+  public static Set<String> getAvailableFilterNames(){
+    return filterRepository.keySet();
+  }
+
+
+  // factor filter names.
+  private static final String STATICREMAININGFLOWSIZE_FILTER_NAME = "StaticRemainingFlowSize";
+
+  /**
+   * static initializer of the class.
+   * We will build the filter repository here.
+   * when a new filter is added, please do remember to register it here.
+   * */
+  static {
+    filterRepository = new HashMap<String, FactorFilter<Executor, ExecutableFlow>>();
+    filterRepository.put(STATICREMAININGFLOWSIZE_FILTER_NAME, getStaticRemainingFlowSizeFilter());
+  }
+
+  /**
+   * constructor of the ExecutorFilter.
+   * @param filterList   the list of filter to be registered, the parameter must be a not-empty and valid list object.
+   * */
+  public ExecutorFilter(List<String> filterList) {
+    // shortcut if the filter list is invalid. A little bit ugly to have to throw in constructor.
+    if (null == filterList || filterList.size() == 0){
+      logger.error("failed to initialize executor filter as the passed filter list is invalid or empty.");
+      throw new IllegalArgumentException("filterList");
+    }
+
+    // register the filters according to the list.
+    for (String filterName : filterList){
+      if (filterRepository.containsKey(filterName)){
+        this.registerFactorFilter(filterRepository.get(filterName));
+      } else {
+        logger.error(String.format("failed to initialize executor filter as the filter implementation for requested factor '%s' doesn't exist.",
+            filterName));
+        throw new IllegalArgumentException("filterList");
+      }
+    }
+  }
+
+  @Override
+  public String getName() {
+    return "ExecutorFilter";
+  }
+
+  /**
+   * function to register the static remaining flow size filter.
+   * NOTE : this is a static filter which means the filter will be filtering based on the system standard which is not
+   *        Coming for the passed flow.
+   *        Ideally this filter will make sure only the executor has remaining
+   * */
+  private static FactorFilter<Executor, ExecutableFlow> getStaticRemainingFlowSizeFilter(){
+    return FactorFilter.create(STATICREMAININGFLOWSIZE_FILTER_NAME, new FactorFilter.Filter<Executor, ExecutableFlow>() {
+
+      @Override
+      public boolean filterTarget(Executor filteringTarget, ExecutableFlow referencingObject) {
+        if (null == filteringTarget){
+          logger.info(String.format("%s : filtering out the target as it is null.", STATICREMAININGFLOWSIZE_FILTER_NAME));
+          return false;
+        }
+
+        Statistics stats = filteringTarget.getExecutorStats();
+        if (null == stats) {
+          logger.info(String.format("%s : filtering out %s as it's stats is unavailable.",
+              STATICREMAININGFLOWSIZE_FILTER_NAME,
+              filteringTarget.toString()));
+          return false;
+        }
+        return stats.getRemainingFlowCapacity() > 0 ;
+       }
+    });
+  }
+
+  // TO-DO
+  // Add more Filter definitions .
+}
diff --git a/azkaban-common/src/main/java/azkaban/executor/selector/ExecutorSelector.java b/azkaban-common/src/main/java/azkaban/executor/selector/ExecutorSelector.java
new file mode 100644
index 0000000..fd15ad1
--- /dev/null
+++ b/azkaban-common/src/main/java/azkaban/executor/selector/ExecutorSelector.java
@@ -0,0 +1,44 @@
+/*
+ * Copyright 2015 LinkedIn Corp.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package azkaban.executor.selector;
+
+import java.util.List;
+import java.util.Map;
+
+import azkaban.executor.ExecutableFlow;
+import azkaban.executor.Executor;
+
+/**
+ * Executor selector class implementation.
+ * NOTE: This class is a de-generalized version of the CandidateSelector, which provides a
+ *       clean and convenient constructor to take in filter and comparator name list and build
+ *       the instance from that.
+ * */
+public class ExecutorSelector extends CandidateSelector<Executor, ExecutableFlow> {
+
+  /**
+   * Contractor of the class.
+   * @param filterList      name list of the filters to be registered,
+   *                        filter feature will be disabled if a null value is passed.
+   * @param comparatorList  name/weight pair list of the comparators to be registered ,
+   *                        again comparator feature is disabled if a null value is passed.
+   * */
+  public ExecutorSelector(List<String> filterList, Map<String,Integer> comparatorList) {
+    super(null == filterList || filterList.size() == 0 ?         null : new ExecutorFilter(filterList),
+          null == comparatorList || comparatorList.size() == 0 ? null : new ExecutorComparator(comparatorList));
+  }
+}
diff --git a/azkaban-common/src/main/java/azkaban/executor/Statistics.java b/azkaban-common/src/main/java/azkaban/executor/Statistics.java
new file mode 100644
index 0000000..85d31f7
--- /dev/null
+++ b/azkaban-common/src/main/java/azkaban/executor/Statistics.java
@@ -0,0 +1,66 @@
+/*
+ * Copyright 2015 LinkedIn Corp.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package azkaban.executor;
+
+import java.util.Date;
+
+  public class Statistics {
+    private double remainingMemoryPercent;
+    private long   remainingMemory;
+    private int    remainingFlowCapacity;
+    private Date   lastDispatched;
+    private long   remainingStorage;
+    private int    priority;
+
+    public double getRemainingMemoryPercent() {
+      return this.remainingMemoryPercent;
+    }
+
+    public long getRemainingMemory(){
+      return this.remainingMemory;
+    }
+
+    public int getRemainingFlowCapacity(){
+      return this.remainingFlowCapacity;
+    }
+
+    public Date getLastDispatchedTime(){
+      return this.lastDispatched;
+    }
+
+    public long getRemainingStorage() {
+      return this.remainingStorage;
+    }
+
+    public int getPriority () {
+      return this.priority;
+    }
+
+    public Statistics (double remainingMemoryPercent,
+        long remainingMemory,
+        int remainingFlowCapacity,
+        Date lastDispatched,
+        long remainingStorage,
+        int  priority ){
+      this.remainingMemory = remainingMemory;
+      this.remainingFlowCapacity = remainingFlowCapacity;
+      this.priority = priority;
+      this.remainingMemoryPercent = remainingMemoryPercent;
+      this.remainingStorage = remainingStorage;
+      this.lastDispatched = lastDispatched;
+    }
+}
diff --git a/azkaban-common/src/test/java/azkaban/executor/SelectorTest.java b/azkaban-common/src/test/java/azkaban/executor/SelectorTest.java
index b5b4509..c193397 100644
--- a/azkaban-common/src/test/java/azkaban/executor/SelectorTest.java
+++ b/azkaban-common/src/test/java/azkaban/executor/SelectorTest.java
@@ -20,12 +20,16 @@ import java.util.ArrayList;
 import java.util.Collections;
 import java.util.Comparator;
 import java.util.Date;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
 
 import org.apache.log4j.BasicConfigurator;
 import org.junit.After;
 import org.junit.Assert;
 import org.junit.Before;
 import org.junit.BeforeClass;
+import org.junit.Ignore;
 import org.junit.Test;
 
 import azkaban.executor.selector.*;
@@ -602,4 +606,101 @@ public class SelectorTest {
     Assert.assertTrue(null == executor);
 
   }
+
+  @Test
+  public void testCreatingExectorfilterObject() throws Exception{
+    List<String> validList = new ArrayList<String>(ExecutorFilter.getAvailableFilterNames());
+    try {
+      new ExecutorFilter(validList);
+    }catch (Exception ex){
+      Assert.fail("creating ExecutorFilter with valid list throws exception . ex -" + ex.getMessage());
+    }
+  }
+
+  @Test
+  public void testCreatingExectorfilterObjectWInvalidList() throws Exception{
+    List<String> invalidList = new ArrayList<String>();
+    invalidList.add("notExistingFilter");
+    Exception result = null;
+    try {
+      new ExecutorFilter(invalidList);
+    }catch (Exception ex){
+      if (ex instanceof IllegalArgumentException)
+      result = ex;
+    }
+    Assert.assertNotNull(result);
+  }
+
+  @Test
+  public void testCreatingExectorComparatorObject() throws Exception{
+   Map<String,Integer> comparatorMap = new HashMap<String,Integer>();
+   for (String name : ExecutorComparator.getAvailableComparatorNames()){
+     comparatorMap.put(name, 1);
+   }
+   try {
+      new ExecutorComparator(comparatorMap);
+    }catch (Exception ex){
+      Assert.fail("creating ExecutorComparator with valid list throws exception . ex -" + ex.getMessage());
+    }
+  }
+
+  @Test
+  public void testCreatingExectorComparatorObjectWInvalidName() throws Exception{
+    Map<String,Integer> comparatorMap = new HashMap<String,Integer>();
+    comparatorMap.put("invalidName", 0);
+    Exception result = null;
+    try {
+      new ExecutorComparator(comparatorMap);
+    }catch (Exception ex){
+      if (ex instanceof IllegalArgumentException)
+      result = ex;
+    }
+    Assert.assertNotNull(result);
+  }
+
+  @Test
+  public void testCreatingExectorComparatorObjectWInvalidWeight() throws Exception{
+    Map<String,Integer> comparatorMap = new HashMap<String,Integer>();
+    for (String name : ExecutorComparator.getAvailableComparatorNames()){
+      comparatorMap.put(name, -1);
+    }
+    Exception result = null;
+    try {
+      new ExecutorComparator(comparatorMap);
+    }catch (Exception ex){
+      if (ex instanceof IllegalArgumentException)
+      result = ex;
+    }
+    Assert.assertNotNull(result);
+  }
+
+  @Test
+  public void testExecutorSelectorE2E() throws Exception{
+    List<String> filterList = new ArrayList<String>(ExecutorFilter.getAvailableFilterNames());
+    Map<String,Integer> comparatorMap = new HashMap<String,Integer>();
+    List<Executor> executorList = new ArrayList<Executor>();
+    executorList.add(new Executor(1, "host1", 80, true));
+    executorList.add(new Executor(2, "host2", 80, true));
+    executorList.add(new Executor(3, "host3", 80, true));
+
+    executorList.get(0).setExecutorStats(new Statistics(99.9, 4095, 50, new Date(), 4095, 0));
+    executorList.get(1).setExecutorStats(new Statistics(50, 4095, 50, new Date(), 4095, 0));
+    executorList.get(2).setExecutorStats(new Statistics(99.9, 4095, 50, new Date(), 2048, 0));
+
+    ExecutableFlow flow = new ExecutableFlow();
+
+    for (String name : ExecutorComparator.getAvailableComparatorNames()){
+      comparatorMap.put(name, 1);
+    }
+    ExecutorSelector selector = new ExecutorSelector(filterList,comparatorMap);
+    Executor executor = selector.getBest(executorList, flow);
+    Assert.assertEquals(executorList.get(0), executor);
+    
+    // simulate that once the flow is assigned, executor1's remaining TMP storage dropped to 2048
+    // now we do the getBest again executor3 is expected to be selected as it has a earlier last dispatched time.
+    executorList.get(0).setExecutorStats(new Statistics(99.9, 4095, 50, new Date(), 2048, 0));
+    executor = selector.getBest(executorList, flow);
+    Assert.assertEquals(executorList.get(2), executor);
+  }
+
 }