azkaban-aplcache

Merge pull request #471 from evlstyle/multipleexecutor change

8/27/2015 9:56:16 PM

Details

diff --git a/azkaban-common/src/main/java/azkaban/executor/ExecutorApiClient.java b/azkaban-common/src/main/java/azkaban/executor/ExecutorApiClient.java
new file mode 100644
index 0000000..4d77897
--- /dev/null
+++ b/azkaban-common/src/main/java/azkaban/executor/ExecutorApiClient.java
@@ -0,0 +1,74 @@
+/*
+ * Copyright 2015 LinkedIn Corp.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package azkaban.executor;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.http.HttpEntity;
+import org.apache.http.HttpResponse;
+import org.apache.http.StatusLine;
+import org.apache.http.client.HttpResponseException;
+import org.apache.http.util.EntityUtils;
+
+import azkaban.utils.JSONUtils;
+import azkaban.utils.RestfulApiClient;
+
+/** Client class that will be used to handle all Restful API calls between Executor and the host application.
+ * */
+public class ExecutorApiClient extends RestfulApiClient<Map<String, Object>> {
+  private ExecutorApiClient(){}
+  private static ExecutorApiClient instance = new ExecutorApiClient();
+
+  /** Function to return the instance of the ExecutorApiClient class.
+   * */
+  public static ExecutorApiClient getInstance() {
+    return instance;
+  }
+
+  /**Implementing the parseResponse function to return de-serialized Json object.
+   * @param response  the returned response from the HttpClient.
+   * @return de-serialized object from Json or empty object if the response doesn't have a body.
+   * */
+  @SuppressWarnings("unchecked")
+  @Override
+  protected Map<String, Object> parseResponse(HttpResponse response)
+      throws HttpResponseException, IOException {
+    final StatusLine statusLine = response.getStatusLine();
+    String responseBody = response.getEntity() != null ?
+        EntityUtils.toString(response.getEntity()) : "";
+
+    if (statusLine.getStatusCode() >= 300) {
+
+        logger.error(String.format("unable to parse response as the response status is %s",
+            statusLine.getStatusCode()));
+
+        throw new HttpResponseException(statusLine.getStatusCode(),responseBody);
+    }
+
+    final HttpEntity entity = response.getEntity();
+    if (null != entity){
+      Object returnVal = JSONUtils.parseJSONFromString(EntityUtils.toString(entity));
+      if (null!= returnVal){
+        return (Map<String, Object>) returnVal;
+      }
+    }
+
+    return new HashMap<String, Object>() ;
+  }
+}
diff --git a/azkaban-common/src/main/java/azkaban/executor/selector/CandidateComparator.java b/azkaban-common/src/main/java/azkaban/executor/selector/CandidateComparator.java
new file mode 100644
index 0000000..8234f7d
--- /dev/null
+++ b/azkaban-common/src/main/java/azkaban/executor/selector/CandidateComparator.java
@@ -0,0 +1,151 @@
+/*
+ * Copyright 2015 LinkedIn Corp.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package azkaban.executor.selector;
+
+import java.util.Collection;
+import java.util.Comparator;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+
+import org.apache.log4j.Logger;
+
+import azkaban.utils.Pair;
+
+/** Abstract class for a candidate comparator.
+ *  this class contains implementation of most of the core logics. Implementing classes is expected only to
+ *  register factor comparators using the provided register function.
+ *
+ */
+public abstract class CandidateComparator<T> implements Comparator<T> {
+  private static Logger logger = Logger.getLogger(CandidateComparator.class);
+
+  // internal repository of the registered comparators .
+  private Map<String,FactorComparator<T>> factorComparatorList =
+      new ConcurrentHashMap<String,FactorComparator<T>>();
+
+  /** gets the name of the current implementation of the candidate comparator.
+   * @returns : name of the comparator.
+   * */
+  public abstract String getName();
+
+  /** tieBreak method which will kick in when the comparator list generated an equality result for
+   *  both sides. the tieBreak method will try best to make sure a stable result is returned.
+   * */
+  protected boolean tieBreak(T object1, T object2){
+    if (null == object2) return true;
+    if (null == object1) return false;
+    return object1.hashCode() >= object2.hashCode();
+  }
+
+  /** function to register a factorComparator to the internal Map for future reference.
+   * @param factorComparator : the comparator object to be registered.
+   * @throws IllegalArgumentException
+   * */
+  protected void registerFactorComparator(FactorComparator<T> comparator){
+      if (null == comparator ||
+          Integer.MAX_VALUE - this.getTotalWeight() < comparator.getWeight() ) {
+        throw new IllegalArgumentException("unable to register comparator."+
+          " The passed comparator is null or has an invalid weight value.");
+      }
+
+      // add or replace the Comparator.
+      this.factorComparatorList.put(comparator.getFactorName(),comparator);
+      logger.info(String.format("Factor comparator added for '%s'. Weight = '%s'",
+          comparator.getFactorName(), comparator.getWeight()));
+  }
+
+  /** function returns the total weight of the registered comparators.
+   * @return the value of total weight.
+   * */
+  public int getTotalWeight(){
+    int totalWeight = 0 ;
+
+    // save out a copy of the values as HashMap.values() takes o(n) to return the value.
+    Collection<FactorComparator<T>> allValues = this.factorComparatorList.values();
+    for (FactorComparator<T> item : allValues){
+      if (item != null){
+        totalWeight += item.getWeight();
+      }
+    }
+
+    return totalWeight;
+  }
+
+  /** function to actually calculate the scores for the two objects that are being compared.
+   *  the comparison follows the following logic -
+   *  1. if both objects are equal return 0 score for both.
+   *  2. if one side is null, the other side gets all the score.
+   *  3. if both sides are non-null value, both values will be passed to all the registered FactorComparators
+   *     each factor comparator will generate a result based off it sole logic the weight of the comparator will be
+   *     added to the wining side, if equal, no value will be added to either side.
+   *  4. final result will be returned in a Pair container.
+   *
+   * */
+  public Pair<Integer,Integer> getComparisonScore(T object1, T object2){
+    logger.info(String.format("start comparing '%s' with '%s',  total weight = %s ",
+        object1 == null ? "(null)" : object1.toString(),
+        object2 == null ? "(null)" : object2.toString(),
+        this.getTotalWeight()));
+
+    int result1 = 0 ;
+    int result2 = 0 ;
+
+    // short cut if object equals.
+    if (object1 ==  object2){
+      logger.info("[Comparator] same object.");
+    } else
+    // left side is null.
+    if (object1 == null){
+      logger.info("[Comparator] left side is null, right side gets total weight.");
+      result2 = this.getTotalWeight();
+    } else
+    // right side is null.
+    if (object2 == null){
+      logger.info("[Comparator] right side is null, left side gets total weight.");
+      result1 = this.getTotalWeight();
+    } else
+    // both side is not null,put them thru the full loop
+    {
+      Collection<FactorComparator<T>> comparatorList = this.factorComparatorList.values();
+      for (FactorComparator<T> comparator :comparatorList){
+        int result = comparator.compare(object1, object2);
+        result1  = result1 + (result > 0 ? comparator.getWeight() : 0);
+        result2  = result2 + (result < 0 ? comparator.getWeight() : 0);
+        logger.info(String.format("[Factor: %s] compare result : %s (current score %s vs %s)",
+            comparator.getFactorName(), result, result1, result2));
+      }
+    }
+    // in case of same score, use tie-breaker to stabilize the result.
+    if (result1 == result2){
+      boolean result = this.tieBreak(object1, object2);
+      logger.info("[TieBreaker] TieBreaker chose " +
+      (result? String.format("left side (%s)",  null== object1 ? "null": object1.toString()) :
+               String.format("right side (%s)", null== object2 ? "null": object2.toString()) ));
+      if (result) result1++; else result2++;
+    }
+
+    logger.info(String.format("Result : %s vs %s ",result1,result2));
+    return new Pair<Integer,Integer>(result1,result2);
+  }
+
+  @Override
+  public int compare(T o1, T o2) {
+    Pair<Integer,Integer> result = this.getComparisonScore(o1,o2);
+    return result.getFirst() == result.getSecond() ? 0 :
+                                result.getFirst() > result.getSecond() ? 1 : -1;
+  }
+}
diff --git a/azkaban-common/src/main/java/azkaban/executor/selector/CandidateFilter.java b/azkaban-common/src/main/java/azkaban/executor/selector/CandidateFilter.java
new file mode 100644
index 0000000..154d528
--- /dev/null
+++ b/azkaban-common/src/main/java/azkaban/executor/selector/CandidateFilter.java
@@ -0,0 +1,82 @@
+/*
+ * Copyright 2015 LinkedIn Corp.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package azkaban.executor.selector;
+
+import java.util.Collection;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+
+import org.apache.log4j.Logger;
+
+
+/** Abstract class for a candidate filter.
+ *  this class contains implementation of most of the core logics. Implementing classes is expected only to
+ *  register filters using the provided register function.
+ */
+public abstract class CandidateFilter<T,V>  {
+  private static Logger logger = Logger.getLogger(CandidateFilter.class);
+
+  // internal repository of the registered filters .
+  private Map<String,FactorFilter<T,V>> factorFilterList =
+      new ConcurrentHashMap<String,FactorFilter<T,V>>();
+
+  /** gets the name of the current implementation of the candidate filter.
+   * @returns : name of the filter.
+   * */
+  public abstract String getName();
+
+  /** function to register a factorFilter to the internal Map for future reference.
+   * @param factorfilter : the Filter object to be registered.
+   * @throws IllegalArgumentException
+   * */
+  protected void registerFactorFilter(FactorFilter<T,V> filter){
+      if (null == filter ) {
+        throw new IllegalArgumentException("unable to register factor filter. " +
+                  "The passed comaractor is null or has an invalid weight value.");
+      }
+
+      // add or replace the filter.
+      this.factorFilterList.put(filter.getFactorName(),filter);
+      logger.info(String.format("Factor filter added for '%s'.",
+          filter.getFactorName()));
+  }
+
+  /** function to analyze the target item according to the reference object to decide whether the item should be filtered.
+   * @param filteringTarget:   object to be checked.
+   * @param referencingObject: object which contains statistics based on which a decision is made whether
+   *                      the object being checked need to be filtered or not.
+   * @return true if the check passed, false if check failed, which means the item need to be filtered.
+   * */
+  public boolean filterTarget(T filteringTarget, V referencingObject){
+    logger.info(String.format("start filtering '%s' with factor filter for '%s'",
+        filteringTarget == null ? "(null)" : filteringTarget.toString(),
+        this.getName()));
+
+    Collection<FactorFilter<T,V>> filterList = this.factorFilterList.values();
+    boolean result = true;
+    for (FactorFilter<T,V> filter : filterList){
+      result &= filter.filterTarget(filteringTarget,referencingObject);
+      logger.info(String.format("[Factor: %s] filter result : %s ",
+          filter.getFactorName(), result));
+      if (!result){
+        break;
+      }
+    }
+    logger.info(String.format("Final filtering result : %s ",result));
+    return result;
+  }
+}
diff --git a/azkaban-common/src/main/java/azkaban/executor/selector/CandidateSelector.java b/azkaban-common/src/main/java/azkaban/executor/selector/CandidateSelector.java
new file mode 100644
index 0000000..0598d95
--- /dev/null
+++ b/azkaban-common/src/main/java/azkaban/executor/selector/CandidateSelector.java
@@ -0,0 +1,88 @@
+/*
+ * Copyright 2015 LinkedIn Corp.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package azkaban.executor.selector;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+
+import org.apache.log4j.Logger;
+
+/** Implementation of the CandidateSelector.
+ *  @param K executor object type.
+ *  @param V dispatching object type.
+ * */
+public class CandidateSelector<K,V> implements Selector<K, V> {
+  private static Logger logger = Logger.getLogger(CandidateComparator.class);
+
+  private CandidateFilter<K,V> filter;
+  private CandidateComparator<K> comparator;
+
+  /**constructor of the class.
+   * @param filter CandidateFilter object to be used to perform the candidate filtering.
+   * @param comparator CandidateComparator object to be used to find the best suit candidate from the filtered list.
+   * */
+  public CandidateSelector(CandidateFilter<K,V> filter,
+      CandidateComparator<K> comparator){
+    this.filter = filter;
+    this.comparator = comparator;
+  }
+
+  @Override
+  public K getBest(List<K> candidateList, V dispatchingObject) {
+
+     // shortcut if the candidateList is empty.
+     if ( null == candidateList || candidateList.size() == 0){
+       logger.error("failed to getNext candidate as the passed candidateList is null or empty.");
+       return null;
+     }
+
+     logger.info("start candidate selection logic.");
+     logger.info(String.format("candidate count before filtering: %s", candidateList.size()));
+
+     // to keep the input untouched, we will form up a new list based off the filtering result.
+     List<K> filteredList = new ArrayList<K>();
+
+     if (null != this.filter){
+       for (K candidateInfo : candidateList){
+         if (filter.filterTarget(candidateInfo,dispatchingObject)){
+           filteredList.add(candidateInfo);
+         }
+       }
+     } else{
+       filteredList = candidateList;
+       logger.info("skipping the candidate filtering as the filter object is not specifed.");
+     }
+
+     logger.info(String.format("candidate count after filtering: %s", filteredList.size()));
+     if (filteredList.size() == 0){
+       logger.info("failed to select candidate as the filted candidate list is empty.");
+       return null;
+     }
+
+     // final work - find the best candidate from the filtered list.
+     K executor = Collections.max(filteredList,comparator);
+     logger.info(String.format("candidate selected %s",
+         null == executor ? "(null)" : executor.toString()));
+     return executor;
+  }
+
+  @Override
+  public String getName() {
+    return "CandidateSelector";
+  }
+}
diff --git a/azkaban-common/src/main/java/azkaban/executor/selector/FactorComparator.java b/azkaban-common/src/main/java/azkaban/executor/selector/FactorComparator.java
new file mode 100644
index 0000000..1d9d162
--- /dev/null
+++ b/azkaban-common/src/main/java/azkaban/executor/selector/FactorComparator.java
@@ -0,0 +1,76 @@
+/*
+ * Copyright 2015 LinkedIn Corp.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package azkaban.executor.selector;
+
+import java.util.Comparator;
+import org.apache.log4j.Logger;
+
+/** wrapper class for a factor comparator .
+ *@param T: the type of the objects to be compared.
+ */
+public final class FactorComparator<T>{
+  private static Logger logger = Logger.getLogger(CandidateComparator.class);
+
+  private String factorName;
+  private int weight;
+  private Comparator<T> comparator;
+
+  /** private constructor of the class. User will create the instance of the class by calling the static
+   *  method provided below.
+   * @param factorName : the factor name .
+   * @param weight : the weight of the comparator.
+   * @ comparator : function to be provided by user on how the comparison should be made.
+   * */
+  private FactorComparator(String factorName, int weight, Comparator<T> comparator){
+    this.factorName = factorName;
+    this.weight = weight;
+    this.comparator = comparator;
+  }
+
+  /** static function to generate an instance of the class.
+   *  refer to the constructor for the param definitions.
+   * */
+  public static <T> FactorComparator<T> create(String factorName, int weight, Comparator<T> comparator){
+
+    if (null == factorName || factorName.length() == 0 || weight < 0 || null == comparator){
+      logger.error("failed to create instance of FactorComparator, at least one of the input paramters are invalid");
+      return null;
+    }
+
+    return new FactorComparator<T>(factorName,weight,comparator);
+  }
+
+  // function to return the factor name.
+  public String getFactorName(){
+    return this.factorName;
+  }
+
+  // function to return the weight value.
+  public int getWeight(){
+    return this.weight;
+  }
+
+  // function to return the weight value.
+  public void updateWeight(int value){
+    this.weight = value;
+  }
+
+  // the actual compare function, which will leverage the user defined function.
+  public int compare(T object1, T object2){
+    return this.comparator.compare(object1, object2);
+  }
+}
diff --git a/azkaban-common/src/main/java/azkaban/executor/selector/FactorFilter.java b/azkaban-common/src/main/java/azkaban/executor/selector/FactorFilter.java
new file mode 100644
index 0000000..74b9d5d
--- /dev/null
+++ b/azkaban-common/src/main/java/azkaban/executor/selector/FactorFilter.java
@@ -0,0 +1,77 @@
+/*
+ * Copyright 2015 LinkedIn Corp.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package azkaban.executor.selector;
+
+import org.apache.log4j.Logger;
+
+/** wrapper class for a factor Filter .
+ *@param T: the type of the objects to be compared.
+ *@param V: the type of the object to be used for filtering.
+ */
+public final class FactorFilter<T,V>{
+  private static Logger logger = Logger.getLogger(FactorFilter.class);
+
+  private String factorName;
+  private Filter<T,V> filter;
+
+  /** private constructor of the class. User will create the instance of the class by calling the static
+   *  method provided below.
+   * @param factorName : the factor name .
+   * @param filter : user defined function specifying how the filtering should be implemented.
+   * */
+  private FactorFilter(String factorName, Filter<T,V> filter){
+    this.factorName = factorName;
+    this.filter = filter;
+  }
+
+  /** static function to generate an instance of the class.
+   *  refer to the constructor for the param definitions.
+   * */
+  public static <T,V> FactorFilter<T,V> create(String factorName, Filter<T,V> filter){
+
+    if (null == factorName || factorName.length() == 0 || null == filter){
+      logger.error("failed to create instance of FactorFilter, at least one of the input paramters are invalid");
+      return null;
+    }
+
+    return new FactorFilter<T,V>(factorName,filter);
+  }
+
+  // function to return the factor name.
+  public String getFactorName(){
+    return this.factorName;
+  }
+
+  // the actual check function, which will leverage the logic defined by user.
+  public boolean filterTarget(T filteringTarget, V referencingObject){
+    return this.filter.filterTarget(filteringTarget, referencingObject);
+  }
+
+  // interface of the filter.
+  public interface Filter<T,V>{
+
+    /**function to analyze the target item according to the reference object to decide whether the item should be filtered.
+     * @param filteringTarget:   object to be checked.
+     * @param referencingObject: object which contains statistics based on which a decision is made whether
+     *                      the object being checked need to be filtered or not.
+     * @return true if the check passed, false if check failed, which means the item need to be filtered.
+     * */
+    public boolean filterTarget(T filteringTarget, V referencingObject);
+  }
+
+
+}
diff --git a/azkaban-common/src/main/java/azkaban/executor/selector/Selector.java b/azkaban-common/src/main/java/azkaban/executor/selector/Selector.java
new file mode 100644
index 0000000..610f75a
--- /dev/null
+++ b/azkaban-common/src/main/java/azkaban/executor/selector/Selector.java
@@ -0,0 +1,41 @@
+/*
+ * Copyright 2015 LinkedIn Corp.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package azkaban.executor.selector;
+
+import java.util.List;
+
+
+/** Definition of the selector interface.
+ *  an implementation of the selector interface provides the functionality
+ *  to return a candidate from the candidateList that suits best for the dispatchingObject.
+ *  @param K : type of the candidate.
+ *  @param V : type of the dispatching object.
+ */
+public interface Selector <K,V> {
+
+  /** Function returns the next best suit candidate from the candidateList for the dispatching object.
+   *  @param  candidateList : List of the candidates to select from .
+   *  @param  dispatchingObject : the object to be dispatched .
+   *  @return candidate from the candidate list that suits best for the dispatching object.
+   * */
+  public K getBest(List<K> candidateList, V dispatchingObject);
+
+  /** Function returns the name of the current Dispatcher
+   *  @return name of the dispatcher.
+   * */
+  public String getName();
+}
diff --git a/azkaban-common/src/main/java/azkaban/utils/RestfulApiClient.java b/azkaban-common/src/main/java/azkaban/utils/RestfulApiClient.java
new file mode 100644
index 0000000..ef0ef46
--- /dev/null
+++ b/azkaban-common/src/main/java/azkaban/utils/RestfulApiClient.java
@@ -0,0 +1,239 @@
+/*
+ * Copyright 2015 LinkedIn Corp.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package azkaban.utils;
+
+import java.io.IOException;
+import java.io.UnsupportedEncodingException;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.util.List;
+
+import org.apache.http.HttpEntity;
+import org.apache.http.HttpMessage;
+import org.apache.http.HttpResponse;
+import org.apache.http.NameValuePair;
+import org.apache.http.ParseException;
+import org.apache.http.client.HttpResponseException;
+import org.apache.http.client.methods.HttpDelete;
+import org.apache.http.client.methods.HttpEntityEnclosingRequestBase;
+import org.apache.http.client.methods.HttpGet;
+import org.apache.http.client.methods.HttpPost;
+import org.apache.http.client.methods.HttpPut;
+import org.apache.http.client.methods.HttpUriRequest;
+import org.apache.http.client.utils.URIBuilder;
+import org.apache.http.entity.ByteArrayEntity;
+import org.apache.http.impl.client.CloseableHttpClient;
+import org.apache.http.impl.client.HttpClients;
+import org.apache.log4j.Logger;
+
+/** class handles the communication between the application and
+ *  a Restful API based web server.
+ *  @param T : type of the returning response object.
+ *  Note: the idea of this abstract class is to provide a wrapper for the logic around HTTP layer communication so
+ *        development work can take this as a black box and focus on processing the result.
+ *        With that said the abstract class will be provided as a template, which ideally can support different types
+ *        of returning object (Dictionary, xmlDoc , text etc.)
+ * */
+public abstract class RestfulApiClient<T> {
+  protected static Logger logger = Logger.getLogger(RestfulApiClient.class);
+
+  /** Method to transform the response returned by the httpClient into the
+   *  type specified.
+   *  Note: Method need to handle case such as failed request.
+   *        Also method is not supposed to pass the response object out
+   *        via the returning value as the response will be closed after the
+   *        execution steps out of the method context.
+   * @throws HttpResponseException
+   * @throws IOException
+   * @throws ParseException
+   * **/
+  protected abstract T parseResponse(HttpResponse  response)
+      throws HttpResponseException, IOException;
+
+  /** function to perform a Get http request.
+   * @param uri   the URI of the request.
+   * @param headerEntries   extra entries to be added to request header.
+   * @return the response object type of which is specified by user.
+   * @throws IOException */
+  public T httpGet(URI uri, List<NameValuePair> headerEntries) throws IOException{
+    // shortcut if the passed url is invalid.
+    if (null == uri){
+      logger.error(" unable to perform httpGet as the passed uri is null");
+      return null;
+    }
+
+    HttpGet get = new HttpGet(uri);
+    return this.sendAndReturn((HttpGet)completeRequest(get, headerEntries));
+  }
+
+  /** function to perform a Post http request.
+   * @param uri   the URI of the request.
+   * @param headerEntries   extra entries to be added to request header.
+   * @param postingBody  the content to be posted , optional.
+   * @return the response object type of which is specified by user.
+   * @throws UnsupportedEncodingException, IOException */
+  public T httpPost(URI uri,
+      List<NameValuePair> headerEntries,
+      String postingBody) throws UnsupportedEncodingException, IOException{
+    // shortcut if the passed url is invalid.
+    if (null == uri){
+      logger.error(" unable to perform httpPost as the passed uri is null.");
+      return null;
+    }
+
+    HttpPost post = new HttpPost(uri);
+    return this.sendAndReturn(completeRequest(post,headerEntries,postingBody));
+  }
+
+  /** function to perform a Delete http request.
+   * @param uri   the URI of the request.
+   * @param headerEntries   extra entries to be added to request header.
+   * @return the response object type of which is specified by user.
+   * @throws IOException */
+  public T httpDelete(URI uri, List<NameValuePair> headerEntries) throws IOException{
+    // shortcut if the passed url is invalid.
+    if (null == uri){
+      logger.error(" unable to perform httpDelete as the passed uri is null.");
+      return null;
+    }
+
+    HttpDelete delete = new HttpDelete(uri);
+    return this.sendAndReturn((HttpDelete)completeRequest(delete, headerEntries));
+  }
+
+  /** function to perform a Put http request.
+   * @param uri   the URI of the request.
+   * @param headerEntries   extra entries to be added to request header.
+   * @param postingBody  the content to be posted , optional.
+   * @return the response object type of which is specified by user.
+   * @throws UnsupportedEncodingException, IOException */
+  public T httpPut(URI uri, List<NameValuePair> headerEntries,
+      String postingBody) throws UnsupportedEncodingException, IOException{
+    // shortcut if the passed url is invalid.
+    if (null == uri){
+      logger.error(" unable to perform httpPut as the passed url is null or empty.");
+      return null;
+    }
+
+    HttpPut put = new HttpPut(uri);
+    return this.sendAndReturn(completeRequest(put, headerEntries, postingBody));
+  }
+
+  /** function to dispatch the request and pass back the response.
+   * */
+  protected T sendAndReturn(HttpUriRequest request) throws IOException{
+    CloseableHttpClient client = HttpClients.createDefault();
+    try {
+      return this.parseResponse(client.execute(request));
+    }finally{
+      client.close();
+    }
+  }
+
+  /** helper function to build a valid URI.
+   *  @param host   host name.
+   *  @param port   host port.
+   *  @param path   extra path after host.
+   *  @param isHttp indicates if whether Http or HTTPS should be used.
+   *  @param params extra query parameters.
+   *  @return the URI built from the inputs.
+   *  @throws IOException
+   * */
+  public static URI buildUri(String host, int port, String path,
+      boolean isHttp, Pair<String, String>... params) throws IOException{
+    URIBuilder builder = new URIBuilder();
+    builder.setScheme(isHttp? "http" : "https").setHost(host).setPort(port);
+
+    if (null != path && path.length() > 0){
+      builder.setPath(path);
+    }
+
+    if (params != null) {
+      for (Pair<String, String> pair : params) {
+        builder.setParameter(pair.getFirst(), pair.getSecond());
+      }
+    }
+
+    URI uri = null;
+    try {
+      uri = builder.build();
+    } catch (URISyntaxException e) {
+      throw new IOException(e);
+    }
+
+    return uri;
+  }
+
+  /** helper function to build a valid URI.
+   *  @param uri    the URI to start with.
+   *  @param params extra query parameters to append.
+   *  @return the URI built from the inputs.
+   *  @throws IOException
+   * */
+  public static URI BuildUri(URI uri, Pair<String, String>... params) throws IOException{
+    URIBuilder builder = new URIBuilder(uri);
+
+    if (params != null) {
+      for (Pair<String, String> pair : params) {
+        builder.setParameter(pair.getFirst(), pair.getSecond());
+      }
+    }
+
+    URI returningUri = null;
+    try {
+      returningUri = builder.build();
+    } catch (URISyntaxException e) {
+      throw new IOException(e);
+    }
+
+    return returningUri;
+  }
+
+  /** helper function to fill  the request with header entries .
+   * */
+  private static HttpMessage completeRequest(HttpMessage request,
+      List<NameValuePair> headerEntries){
+    if (null == request){
+      logger.error("unable to complete request as the passed request object is null");
+      return request;
+    }
+
+    // dump all the header entries to the request.
+    if (null != headerEntries && headerEntries.size() > 0){
+      for (NameValuePair pair : headerEntries){
+        request.addHeader(pair.getName(), pair.getValue());
+      }
+    }
+    return request;
+  }
+
+  /** helper function to fill  the request with header entries and posting body .
+   * */
+  private static HttpEntityEnclosingRequestBase completeRequest(HttpEntityEnclosingRequestBase request,
+      List<NameValuePair> headerEntries,
+      String postingBody) throws UnsupportedEncodingException{
+     if (null != completeRequest(request, headerEntries)){
+      // dump the post body UTF-8 will be used as the default encoding type.
+      if (null != postingBody && postingBody.length() > 0){
+        HttpEntity entity = new ByteArrayEntity(postingBody.getBytes("UTF-8"));
+        request.setHeader("Content-Length",  Long.toString(entity.getContentLength()));
+        request.setEntity(entity);
+      }
+    }
+    return request;
+  }
+}
diff --git a/azkaban-common/src/test/java/azkaban/executor/SelectorTest.java b/azkaban-common/src/test/java/azkaban/executor/SelectorTest.java
new file mode 100644
index 0000000..b5b4509
--- /dev/null
+++ b/azkaban-common/src/test/java/azkaban/executor/SelectorTest.java
@@ -0,0 +1,605 @@
+/*
+ * Copyright 2015 LinkedIn Corp.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package azkaban.executor;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.Date;
+
+import org.apache.log4j.BasicConfigurator;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import azkaban.executor.selector.*;
+
+public class SelectorTest {
+  // mock executor object.
+  protected class MockExecutorObject{
+    public String name;
+    public int    port;
+    public double percentOfRemainingMemory;
+    public int    amountOfRemainingMemory;
+    public int    priority;
+    public Date   lastAssigned;
+    public double percentOfRemainingFlowcapacity;
+    public int    remainingTmp;
+
+    public MockExecutorObject(String name,
+        int port,
+        double percentOfRemainingMemory,
+        int amountOfRemainingMemory,
+        int priority,
+        Date lastAssigned,
+        double percentOfRemainingFlowcapacity,
+        int remainingTmp)
+    {
+      this.name = name;
+      this.port = port;
+      this.percentOfRemainingMemory = percentOfRemainingMemory;
+      this.amountOfRemainingMemory =amountOfRemainingMemory;
+      this.priority = priority;
+      this.lastAssigned = lastAssigned;
+      this.percentOfRemainingFlowcapacity = percentOfRemainingFlowcapacity;
+      this.remainingTmp = remainingTmp;
+    }
+
+    @Override
+    public String toString()
+    {
+      return this.name;
+    }
+  }
+
+  // Mock flow object.
+  protected class MockFlowObject{
+    public String name;
+    public int    requiredRemainingMemory;
+    public int    requiredTotalMemory;
+    public int    requiredRemainingTmpSpace;
+    public int    priority;
+
+    public MockFlowObject(String name,
+        int requiredTotalMemory,
+        int requiredRemainingMemory,
+        int requiredRemainingTmpSpace,
+        int priority)
+    {
+      this.name = name;
+      this.requiredTotalMemory = requiredTotalMemory;
+      this.requiredRemainingMemory = requiredRemainingMemory;
+      this.requiredRemainingTmpSpace = requiredRemainingTmpSpace;
+      this.priority = priority;
+    }
+
+    @Override
+    public String toString()
+    {
+      return this.name;
+    }
+  }
+
+  // mock Filter class.
+  protected class MockFilter
+  extends CandidateFilter<MockExecutorObject,MockFlowObject>{
+
+    @Override
+    public String getName() {
+      return "Mockfilter";
+    }
+
+    public MockFilter(){
+    }
+
+    // function to register the remainingMemory filter.
+    // for test purpose the registration is put in a separated method, in production the work should be done
+    // in the constructor.
+    public void registerFilterforTotalMemory(){
+      this.registerFactorFilter(FactorFilter.create("requiredTotalMemory",
+          new FactorFilter.Filter<MockExecutorObject,MockFlowObject>() {
+          public boolean filterTarget(MockExecutorObject itemToCheck, MockFlowObject sourceObject) {
+            // REAL LOGIC COMES HERE -
+            if (null == itemToCheck || null == sourceObject){
+              return false;
+            }
+
+            // Box has infinite memory.:)
+            if (itemToCheck.percentOfRemainingMemory == 0) {
+              return true;
+            }
+
+            // calculate the memory and return.
+            return itemToCheck.amountOfRemainingMemory / itemToCheck.percentOfRemainingMemory * 100 >
+                   sourceObject.requiredTotalMemory;
+          }}));
+    }
+
+    public void registerFilterforRemainingMemory(){
+      this.registerFactorFilter(FactorFilter.create("requiredRemainingMemory",
+          new FactorFilter.Filter<MockExecutorObject,MockFlowObject>() {
+        public boolean filterTarget(MockExecutorObject itemToCheck, MockFlowObject sourceObject) {
+          // REAL LOGIC COMES HERE -
+          if (null == itemToCheck || null == sourceObject){
+            return false;
+         }
+         return itemToCheck.amountOfRemainingMemory > sourceObject.requiredRemainingMemory;
+        }}));
+    }
+
+    public void registerFilterforPriority(){
+      this.registerFactorFilter(FactorFilter.create("requiredProprity",
+          new FactorFilter.Filter<MockExecutorObject,MockFlowObject>() {
+        public boolean filterTarget(MockExecutorObject itemToCheck, MockFlowObject sourceObject) {
+          // REAL LOGIC COMES HERE -
+          if (null == itemToCheck || null == sourceObject){
+            return false;
+          }
+
+          // priority value, the bigger the lower.
+          return itemToCheck.priority >= sourceObject.priority;
+        }}));
+    }
+
+    public void registerFilterforRemainingTmpSpace(){
+      this.registerFactorFilter(FactorFilter.create("requiredRemainingTmpSpace",
+          new FactorFilter.Filter<MockExecutorObject,MockFlowObject>() {
+        public boolean filterTarget(MockExecutorObject itemToCheck, MockFlowObject sourceObject) {
+          // REAL LOGIC COMES HERE -
+          if (null == itemToCheck || null == sourceObject){
+            return false;
+          }
+
+         return itemToCheck.remainingTmp > sourceObject.requiredRemainingTmpSpace;
+        }}));
+    }
+
+  }
+
+  // mock comparator class.
+  protected class MockComparator
+  extends CandidateComparator<MockExecutorObject>{
+
+    @Override
+    public String getName() {
+      return "MockComparator";
+    }
+
+    @Override
+    protected boolean tieBreak(MockExecutorObject object1, MockExecutorObject object2){
+      if (null == object2) return true;
+      if (null == object1) return false;
+      return object1.name.compareTo(object2.name) >= 0;
+    }
+
+    public MockComparator(){
+    }
+
+    public void registerComparerForMemory(int weight){
+      this.registerFactorComparator(FactorComparator.create("Memory", weight, new Comparator<MockExecutorObject>(){
+        public int compare(MockExecutorObject o1, MockExecutorObject o2) {
+          int result = 0 ;
+
+          // check remaining amount of memory.
+          result = o1.amountOfRemainingMemory - o2.amountOfRemainingMemory;
+          if (result != 0){
+            return result > 0 ? 1 : -1;
+          }
+
+          // check remaining % .
+          result = (int)(o1.percentOfRemainingMemory - o2.percentOfRemainingMemory);
+          return result == 0 ? 0 : result > 0 ? 1 : -1;
+
+        } }));
+    }
+
+    public void registerComparerForRemainingSpace(int weight){
+      this.registerFactorComparator(FactorComparator.create("RemainingTmp", weight, new Comparator<MockExecutorObject>(){
+        public int compare(MockExecutorObject o1, MockExecutorObject o2) {
+          int result = 0 ;
+
+          // check remaining % .
+          result = (int)(o1.remainingTmp - o2.remainingTmp);
+          return result == 0 ? 0 : result > 0 ? 1 : -1;
+
+        } }));
+    }
+
+    public void registerComparerForPriority(int weight){
+      this.registerFactorComparator(FactorComparator.create("Priority", weight, new Comparator<MockExecutorObject>(){
+        public int compare(MockExecutorObject o1, MockExecutorObject o2) {
+          int result = 0 ;
+
+          // check priority, bigger the better.
+          result = (int)(o1.priority - o2.priority);
+          return result == 0 ? 0 : result > 0 ? 1 : -1;
+
+        } }));
+    }
+  }
+
+  // test samples.
+  protected ArrayList<MockExecutorObject> executorList = new ArrayList<MockExecutorObject>();
+
+  @BeforeClass public static void onlyOnce() {
+    BasicConfigurator.configure();
+   }
+
+  @Before
+  public void setUp() throws Exception {
+    executorList.clear();
+    executorList.add(new MockExecutorObject("Executor1",8080,50.0,2048,5,new Date(), 20, 6400));
+    executorList.add(new MockExecutorObject("Executor2",8080,50.0,2048,4,new Date(), 20, 6400));
+    executorList.add(new MockExecutorObject("Executor3",8080,40.0,2048,1,new Date(), 20, 6400));
+    executorList.add(new MockExecutorObject("Executor4",8080,50.0,2048,4,new Date(), 20, 6400));
+    executorList.add(new MockExecutorObject("Executor5",8080,50.0,1024,5,new Date(), 90, 6400));
+    executorList.add(new MockExecutorObject("Executor6",8080,50.0,1024,5,new Date(), 90, 3200));
+    executorList.add(new MockExecutorObject("Executor7",8080,50.0,1024,5,new Date(), 90, 3200));
+    executorList.add(new MockExecutorObject("Executor8",8080,50.0,2048,1,new Date(), 90, 3200));
+    executorList.add(new MockExecutorObject("Executor9",8080,50.0,2050,5,new Date(), 90, 4200));
+    executorList.add(new MockExecutorObject("Executor10",8080,00.0,1024,1,new Date(), 90, 3200));
+    executorList.add(new MockExecutorObject("Executor11",8080,20.0,2096,3,new Date(), 90, 2400));
+    executorList.add(new MockExecutorObject("Executor12",8080,90.0,2050,5,new Date(), 60, 2500));
+
+
+    // make sure each time the order is different.
+    Collections.shuffle(this.executorList);
+  }
+
+  private MockExecutorObject  getExecutorByName(String name){
+    MockExecutorObject returnVal = null;
+    for (MockExecutorObject item : this.executorList){
+      if (item.name.equals(name)){
+        returnVal = item;
+        break;
+      }
+    }
+    return returnVal;
+  }
+
+  @After
+  public void tearDown() throws Exception {
+  }
+
+  @Test
+  public void testExecutorFilter() throws Exception {
+
+      // mock object, remaining memory 11500, total memory 3095, remainingTmpSpace 4200, priority 2.
+      MockFlowObject  dispatchingObj = new MockFlowObject("flow1",3096, 1500,4200,2);
+
+      MockFilter mFilter = new MockFilter();
+      mFilter.registerFilterforRemainingMemory();
+
+      // expect true.
+      boolean result = mFilter.filterTarget(this.getExecutorByName("Executor1"), dispatchingObj);
+      Assert.assertTrue(result);
+
+      //expect true.
+      result = mFilter.filterTarget(this.getExecutorByName("Executor3"), dispatchingObj);
+      /*
+      1 [main] INFO azkaban.executor.Selector.CandidateFilter  - start checking 'Executor3' with factor filter for 'Mockfilter'
+      2 [main] INFO azkaban.executor.Selector.CandidateFilter  - [Factor: requiredRemainingMemory] filter result : true
+      2 [main] INFO azkaban.executor.Selector.CandidateFilter  - Final checking result : true
+      */
+      Assert.assertTrue(result);
+
+      // add the priority filter.
+      mFilter.registerFilterforPriority();
+      result = mFilter.filterTarget(this.getExecutorByName("Executor3"), dispatchingObj);
+      // expect false, for priority.
+      /*
+      2 [main] INFO azkaban.executor.Selector.CandidateFilter  - start checking 'Executor3' with factor filter for 'Mockfilter'
+      2 [main] INFO azkaban.executor.Selector.CandidateFilter  - [Factor: requiredRemainingMemory] filter result : true
+      2 [main] INFO azkaban.executor.Selector.CandidateFilter  - [Factor: requiredProprity] filter result : false
+      2 [main] INFO azkaban.executor.Selector.CandidateFilter  - Final checking result : false
+      */
+      Assert.assertFalse(result);
+
+      // add the remaining space filter.
+      mFilter.registerFilterforRemainingTmpSpace();
+
+      // expect pass.
+      result = mFilter.filterTarget(this.getExecutorByName("Executor2"), dispatchingObj);
+      /*
+      3 [main] INFO azkaban.executor.Selector.CandidateFilter  - start checking 'Executor2' with factor filter for 'Mockfilter'
+      3 [main] INFO azkaban.executor.Selector.CandidateFilter  - [Factor: requiredRemainingMemory] filter result : true
+      3 [main] INFO azkaban.executor.Selector.CandidateFilter  - [Factor: requiredRemainingTmpSpace] filter result : true
+      3 [main] INFO azkaban.executor.Selector.CandidateFilter  - [Factor: requiredProprity] filter result : true
+      3 [main] INFO azkaban.executor.Selector.CandidateFilter  - Final checking result : true
+      */
+      Assert.assertTrue(result);
+
+      // expect false, remaining tmp, priority will also fail but the logic shortcuts when the Tmp size check Fails.
+      result = mFilter.filterTarget(this.getExecutorByName("Executor8"), dispatchingObj);
+      /*
+      4 [main] INFO azkaban.executor.Selector.CandidateFilter  - start checking 'Executor8' with factor filter for 'Mockfilter'
+      4 [main] INFO azkaban.executor.Selector.CandidateFilter  - [Factor: requiredRemainingMemory] filter result : true
+      4 [main] INFO azkaban.executor.Selector.CandidateFilter  - [Factor: requiredRemainingTmpSpace] filter result : false
+      4 [main] INFO azkaban.executor.Selector.CandidateFilter  - Final checking result : false
+      */
+      Assert.assertFalse(result);
+
+  }
+
+  @Test
+  public void testExecutorFilterWithNullInputs() throws Exception {
+    MockFilter filter = new MockFilter();
+    filter.registerFilterforPriority();
+    filter.registerFilterforRemainingMemory();
+    filter.registerFilterforRemainingTmpSpace();
+    filter.registerFilterforTotalMemory();
+    boolean result = false;
+    try {
+        result = filter.filterTarget(this.getExecutorByName("Executor1"), null);
+      } catch (Exception ex){
+        Assert.fail("no exception should be thrown when null value is passed to the filter.");
+      }
+    // note : the FactorFilter logic will decide whether true or false should be returned when null value
+    //        is passed, for the Mock class it returns false.
+    Assert.assertFalse(result);
+
+    try {
+        result = filter.filterTarget(null, null);
+      } catch (Exception ex){
+        Assert.fail("no exception should be thrown when null value is passed to the filter.");
+      }
+    // note : the FactorFilter logic will decide whether true or false should be returned when null value
+    //        is passed, for the Mock class it returns false.
+    Assert.assertFalse(result);
+  }
+
+  @Test
+  public void testExecutorComparer() throws Exception {
+    MockComparator comparator = new MockComparator();
+    comparator.registerComparerForMemory(5);
+
+    MockExecutorObject nextExecutor = Collections.max(this.executorList, comparator);
+
+    // expect the first item to be selected, memory wise it is the max.
+    Assert.assertEquals(this.getExecutorByName("Executor11"),nextExecutor);
+
+    // add the priority factor.
+    // expect again the #9 item to be selected.
+    comparator.registerComparerForPriority(6);
+    nextExecutor = Collections.max(this.executorList, comparator);
+    Assert.assertEquals(this.getExecutorByName("Executor12"),nextExecutor);
+
+    // add the remaining space factor.
+    // expect the #12 item to be returned.
+    comparator.registerComparerForRemainingSpace(3);
+    nextExecutor = Collections.max(this.executorList, comparator);
+    Assert.assertEquals(this.getExecutorByName("Executor12"),nextExecutor);
+  }
+
+  @Test
+  public void testExecutorComparerResisterComparerWInvalidWeight() throws Exception {
+    MockComparator comparator = new MockComparator();
+    comparator.registerComparerForMemory(0);
+  }
+
+  @Test
+  public void testSelector() throws Exception {
+    MockFilter filter = new MockFilter();
+    MockComparator comparator = new MockComparator();
+
+    filter.registerFilterforPriority();
+    filter.registerFilterforRemainingMemory();
+    filter.registerFilterforRemainingTmpSpace();
+    filter.registerFilterforTotalMemory();
+
+    comparator.registerComparerForMemory(3);
+    comparator.registerComparerForPriority(5);
+    comparator.registerComparerForRemainingSpace(3);
+
+    CandidateSelector<MockExecutorObject,MockFlowObject> morkSelector =
+        new CandidateSelector<MockExecutorObject,MockFlowObject>(filter,comparator);
+
+    // mock object, remaining memory 11500, total memory 3095, remainingTmpSpace 4200, priority 2.
+    MockFlowObject  dispatchingObj = new MockFlowObject("flow1",3096, 1500,4200,2);
+
+    // expected selection = #12
+    MockExecutorObject nextExecutor = morkSelector.getBest(this.executorList, dispatchingObj);
+    Assert.assertEquals(this.getExecutorByName("Executor1"),nextExecutor);
+
+   // remaining memory 11500, total memory 3095, remainingTmpSpace 14200, priority 2.
+   dispatchingObj = new MockFlowObject("flow1",3096, 1500,14200,2);
+   // all candidates should be filtered by the remaining memory.
+   nextExecutor = morkSelector.getBest(this.executorList, dispatchingObj);
+   Assert.assertEquals(null,nextExecutor);
+  }
+
+  @Test
+  public void testSelectorsignleCandidate() throws Exception {
+    MockFilter filter = new MockFilter();
+    MockComparator comparator = new MockComparator();
+
+    filter.registerFilterforPriority();
+    filter.registerFilterforRemainingMemory();
+    filter.registerFilterforRemainingTmpSpace();
+    filter.registerFilterforTotalMemory();
+
+    comparator.registerComparerForMemory(3);
+    comparator.registerComparerForPriority(4);
+    comparator.registerComparerForRemainingSpace(1);
+
+    CandidateSelector<MockExecutorObject,MockFlowObject> morkSelector =
+        new CandidateSelector<MockExecutorObject,MockFlowObject>(filter,comparator);
+
+    ArrayList<MockExecutorObject> signleExecutorList = new ArrayList<MockExecutorObject>();
+    MockExecutorObject signleExecutor = new MockExecutorObject("ExecutorX",8080,50.0,2048,3,new Date(), 20, 6400);
+    signleExecutorList.add(signleExecutor);
+
+    MockFlowObject  dispatchingObj = new MockFlowObject("flow1",100, 100,100,5);
+    MockExecutorObject executor = morkSelector.getBest(signleExecutorList, dispatchingObj);
+    // expected to see null result, as the only executor is filtered out .
+    Assert.assertTrue(null == executor);
+
+    // adjust the priority to let the executor pass the filter.
+    dispatchingObj.priority = 3;
+    executor = morkSelector.getBest(signleExecutorList, dispatchingObj);
+    Assert.assertEquals(signleExecutor, executor);
+  }
+
+  @Test
+  public void testSelectorListWithItemsThatAreReferenceEqual() throws Exception {
+    MockFilter filter = new MockFilter();
+    MockComparator comparator = new MockComparator();
+
+    filter.registerFilterforPriority();
+    filter.registerFilterforRemainingMemory();
+    filter.registerFilterforRemainingTmpSpace();
+    filter.registerFilterforTotalMemory();
+
+    comparator.registerComparerForMemory(3);
+    comparator.registerComparerForPriority(4);
+    comparator.registerComparerForRemainingSpace(1);
+
+    CandidateSelector<MockExecutorObject,MockFlowObject> morkSelector =
+        new CandidateSelector<MockExecutorObject,MockFlowObject>(filter,comparator);
+
+    ArrayList<MockExecutorObject> list = new ArrayList<MockExecutorObject>();
+    MockExecutorObject signleExecutor = new MockExecutorObject("ExecutorX",8080,50.0,2048,3,new Date(), 20, 6400);
+    list.add(signleExecutor);
+    list.add(signleExecutor);
+    MockFlowObject  dispatchingObj = new MockFlowObject("flow1",100, 100,100,3);
+    MockExecutorObject executor = morkSelector.getBest(list, dispatchingObj);
+    Assert.assertTrue(signleExecutor == executor);
+  }
+
+  @Test
+  public void testSelectorListWithItemsThatAreEqualInValue() throws Exception {
+    MockFilter filter = new MockFilter();
+    MockComparator comparator = new MockComparator();
+
+    filter.registerFilterforPriority();
+    filter.registerFilterforRemainingMemory();
+    filter.registerFilterforRemainingTmpSpace();
+    filter.registerFilterforTotalMemory();
+
+    comparator.registerComparerForMemory(3);
+    comparator.registerComparerForPriority(4);
+    comparator.registerComparerForRemainingSpace(1);
+
+    CandidateSelector<MockExecutorObject,MockFlowObject> morkSelector =
+        new CandidateSelector<MockExecutorObject,MockFlowObject>(filter,comparator);
+
+    // note - as the tieBreaker set in the MockComparator uses the name value of the executor to do the
+    //        final diff therefore we need to set the name differently to make a meaningful test, in real
+    //        scenario we may want to use something else (say hash code) to be the bottom line for the tieBreaker
+    //        to make a final decision, the purpose of the test here is to prove that for two candidates with
+    //        exact value (in the case of test, all values except for the name) the decision result is stable.
+    ArrayList<MockExecutorObject> list = new ArrayList<MockExecutorObject>();
+    MockExecutorObject executor1 = new MockExecutorObject("ExecutorX", 8080,50.0,2048,3,new Date(), 20, 6400);
+    MockExecutorObject executor2 = new MockExecutorObject("ExecutorX2",8080,50.0,2048,3,new Date(), 20, 6400);
+    list.add(executor1);
+    list.add(executor2);
+    MockFlowObject  dispatchingObj = new MockFlowObject("flow1",100, 100,100,3);
+    MockExecutorObject executor = morkSelector.getBest(list, dispatchingObj);
+    Assert.assertTrue(executor2 == executor);
+
+    // shuffle and test again.
+    list.remove(0);
+    list.add(executor1);
+    executor = morkSelector.getBest(list, dispatchingObj);
+    Assert.assertTrue(executor2 == executor);
+  }
+
+  @Test
+  public void testSelectorEmptyList() throws Exception {
+    MockFilter filter = new MockFilter();
+    MockComparator comparator = new MockComparator();
+
+    filter.registerFilterforPriority();
+    filter.registerFilterforRemainingMemory();
+    filter.registerFilterforRemainingTmpSpace();
+    filter.registerFilterforTotalMemory();
+
+    comparator.registerComparerForMemory(3);
+    comparator.registerComparerForPriority(4);
+    comparator.registerComparerForRemainingSpace(1);
+
+    CandidateSelector<MockExecutorObject,MockFlowObject> morkSelector =
+        new CandidateSelector<MockExecutorObject,MockFlowObject>(filter,comparator);
+
+    ArrayList<MockExecutorObject> list = new ArrayList<MockExecutorObject>();
+
+    MockFlowObject  dispatchingObj = new MockFlowObject("flow1",100, 100,100,5);
+
+    MockExecutorObject executor  = null;
+
+    try {
+      executor = morkSelector.getBest(list, dispatchingObj);
+      } catch (Exception ex){
+        Assert.fail("no exception should be thrown when an empty list is passed to the Selector.");
+      }
+
+    // expected to see null result.
+    Assert.assertTrue(null == executor);
+
+    try {
+      executor = morkSelector.getBest(list, dispatchingObj);
+      } catch (Exception ex){
+        Assert.fail("no exception should be thrown when null is passed to the Selector as the candidate list.");
+      }
+
+      // expected to see null result, as the only executor is filtered out .
+      Assert.assertTrue(null == executor);
+
+  }
+
+  @Test
+  public void testSelectorListWithNullValue() throws Exception {
+    MockComparator comparator = new MockComparator();
+
+    comparator.registerComparerForMemory(3);
+    comparator.registerComparerForPriority(4);
+    comparator.registerComparerForRemainingSpace(1);
+
+    CandidateSelector<MockExecutorObject,MockFlowObject> morkSelector =
+        new CandidateSelector<MockExecutorObject,MockFlowObject>(null,comparator);
+
+    ArrayList<MockExecutorObject> list = new ArrayList<MockExecutorObject>();
+    MockExecutorObject executor1 = new MockExecutorObject("ExecutorX", 8080,50.0,2048,3,new Date(), 20, 6400);
+    MockExecutorObject executor2 = new MockExecutorObject("ExecutorX2",8080,50.0,2048,3,new Date(), 20, 6400);
+    list.add(executor1);
+    list.add(executor2);
+    list.add(null);
+
+    MockFlowObject  dispatchingObj = new MockFlowObject("flow1",100, 100,100,3);
+    MockExecutorObject executor  = null;
+    try {
+      executor = morkSelector.getBest(list, dispatchingObj);
+      } catch (Exception ex){
+        Assert.fail("no exception should be thrown when an List contains null value.");
+      }
+    Assert.assertTrue(executor2 == executor);
+
+    // try to compare null vs null, no exception is expected.
+    list.clear();
+    list.add(null);
+    list.add(null);
+    try {
+      executor = morkSelector.getBest(list, dispatchingObj);
+      } catch (Exception ex){
+        Assert.fail("no exception should be thrown when an List contains multiple null values.");
+      }
+    Assert.assertTrue(null == executor);
+
+  }
+}
diff --git a/azkaban-common/src/test/java/azkaban/utils/RestfulApiClientTest.java b/azkaban-common/src/test/java/azkaban/utils/RestfulApiClientTest.java
new file mode 100644
index 0000000..85b2666
--- /dev/null
+++ b/azkaban-common/src/test/java/azkaban/utils/RestfulApiClientTest.java
@@ -0,0 +1,237 @@
+/*
+ * Copyright 2015 LinkedIn Corp.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package azkaban.utils;
+
+import java.io.IOException;
+import java.net.URI;
+import java.util.ArrayList;
+
+import org.apache.http.Header;
+import org.apache.http.HttpEntity;
+import org.apache.http.HttpResponse;
+import org.apache.http.HttpResponseFactory;
+import org.apache.http.HttpStatus;
+import org.apache.http.HttpVersion;
+import org.apache.http.NameValuePair;
+import org.apache.http.StatusLine;
+import org.apache.http.client.HttpResponseException;
+import org.apache.http.client.methods.HttpEntityEnclosingRequestBase;
+import org.apache.http.client.methods.HttpUriRequest;
+import org.apache.http.entity.StringEntity;
+import org.apache.http.impl.DefaultHttpResponseFactory;
+import org.apache.http.message.BasicNameValuePair;
+import org.apache.http.message.BasicStatusLine;
+import org.apache.http.util.EntityUtils;
+import org.junit.Assert;
+import org.junit.Test;
+
+/**
+ *
+ */
+public class RestfulApiClientTest {
+
+  protected class MockRestfulApiClient extends RestfulApiClient<String> {
+    private int  status = HttpStatus.SC_OK;
+
+    @Override
+    protected String parseResponse(HttpResponse response) throws IOException {
+      final StatusLine statusLine = response.getStatusLine();
+      if (statusLine.getStatusCode() >= 300) {
+          throw new HttpResponseException(statusLine.getStatusCode(),
+                  statusLine.getReasonPhrase());
+      }
+      final HttpEntity entity = response.getEntity();
+      return entity == null ? null : EntityUtils.toString(entity);
+    }
+
+    public void setReturnStatus(int newStatus){
+      this.status = newStatus;
+    }
+
+    public void resetReturnStatus(){
+      this.status = HttpStatus.SC_OK;
+    }
+
+    @Override
+    protected String sendAndReturn(HttpUriRequest request) throws IOException{
+      HttpResponseFactory factory = new DefaultHttpResponseFactory();
+
+      HttpResponse response = factory.newHttpResponse(
+          new BasicStatusLine(HttpVersion.HTTP_1_1, this.status, null),null);
+
+      StringBuilder sb = new StringBuilder();
+      sb.append(String.format("%s = %s;", "METHOD", request.getMethod()));
+      sb.append(String.format("%s = %s;", "URI", request.getURI()));
+
+      if (request.getAllHeaders().length > 0){
+        sb.append("HEADER_EXISTS");
+      }
+
+      for (Header h : request.getAllHeaders()){
+        sb.append(String.format("%s = %s;", h.getName(), h.getValue()));
+      }
+
+      if (request instanceof HttpEntityEnclosingRequestBase){
+        HttpEntity entity = ((HttpEntityEnclosingRequestBase)request).getEntity();
+        if (entity != null){
+          sb.append("BODY_EXISTS");
+          sb.append(String.format("%s = %s;", "BODY", EntityUtils.toString(entity)));
+        }
+      }
+
+      response.setEntity(new StringEntity(sb.toString()));
+      return parseResponse(response);
+    }
+
+  }
+
+  @Test
+  public void testHttpGet() throws Exception {
+    MockRestfulApiClient mockClient = new MockRestfulApiClient();
+    @SuppressWarnings("unchecked")
+    URI uri = MockRestfulApiClient.buildUri("test.com", 80, "test", true,
+        new Pair <String,String>("Entry1","Value1"));
+
+    String result = mockClient.httpGet(uri, null);
+    Assert.assertTrue(result!= null && result.contains(uri.toString()));
+    Assert.assertTrue(result.contains("METHOD = GET"));
+  }
+
+  @Test
+  public void testHttpGetWithHeaderItems() throws Exception {
+    MockRestfulApiClient mockClient = new MockRestfulApiClient();
+    @SuppressWarnings("unchecked")
+    URI uri = MockRestfulApiClient.buildUri("test.com", 80, "test", true,
+        new Pair <String,String>("Entry1","Value1"));
+
+    ArrayList<NameValuePair> headerItems = new ArrayList<NameValuePair>();
+    headerItems.add(new BasicNameValuePair("h1","v1"));
+    headerItems.add(new BasicNameValuePair("h2","v2"));
+
+    String result = mockClient.httpGet(uri, headerItems);
+    Assert.assertTrue(result!= null && result.contains(uri.toString()));
+    Assert.assertTrue(result.contains("METHOD = GET"));
+    Assert.assertTrue(result.contains("h1 = v1"));
+    Assert.assertTrue(result.contains("h2 = v2"));
+  }
+
+  @Test
+  public void testHttpPost() throws Exception {
+    MockRestfulApiClient mockClient = new MockRestfulApiClient();
+    @SuppressWarnings("unchecked")
+    URI uri = MockRestfulApiClient.buildUri("test.com", 80, "test", true,
+        new Pair <String,String>("Entry1","Value1"));
+
+    ArrayList<NameValuePair> headerItems = new ArrayList<NameValuePair>();
+    headerItems.add(new BasicNameValuePair("h1","v1"));
+    headerItems.add(new BasicNameValuePair("h2","v2"));
+
+    String content = "123456789";
+
+    String result = mockClient.httpPost(uri, headerItems,content);
+    Assert.assertTrue(result!= null && result.contains(uri.toString()));
+    Assert.assertTrue(result.contains("METHOD = POST"));
+    Assert.assertTrue(result.contains("h1 = v1"));
+    Assert.assertTrue(result.contains("h2 = v2"));
+    Assert.assertTrue(result.contains(String.format("%s = %s;", "BODY", content)));
+  }
+
+  @Test
+  public void testHttpPostWOBody() throws Exception {
+    MockRestfulApiClient mockClient = new MockRestfulApiClient();
+    @SuppressWarnings("unchecked")
+    URI uri = MockRestfulApiClient.buildUri("test.com", 80, "test", true,
+        new Pair <String,String>("Entry1","Value1"));
+
+    String result = mockClient.httpPost(uri, null,null);
+    Assert.assertTrue(result!= null && result.contains(uri.toString()));
+    Assert.assertTrue(result.contains("METHOD = POST"));
+    Assert.assertFalse(result.contains("BODY_EXISTS"));
+    Assert.assertFalse(result.contains("HEADER_EXISTS"));
+  }
+
+  @Test
+  public void testHttpPut() throws Exception {
+    MockRestfulApiClient mockClient = new MockRestfulApiClient();
+    @SuppressWarnings("unchecked")
+    URI uri = MockRestfulApiClient.buildUri("test.com", 80, "test", true,
+        new Pair <String,String>("Entry1","Value1"));
+
+    ArrayList<NameValuePair> headerItems = new ArrayList<NameValuePair>();
+    headerItems.add(new BasicNameValuePair("h1","v1"));
+    headerItems.add(new BasicNameValuePair("h2","v2"));
+
+    String content = "123456789";
+
+    String result = mockClient.httpPut(uri, headerItems,content);
+    Assert.assertTrue(result!= null && result.contains(uri.toString()));
+    Assert.assertTrue(result.contains("METHOD = PUT"));
+    Assert.assertTrue(result.contains("h1 = v1"));
+    Assert.assertTrue(result.contains("h2 = v2"));
+    Assert.assertTrue(result.contains(String.format("%s = %s;", "BODY", content)));
+  }
+
+  @Test
+  public void testContentLength() throws Exception {
+    MockRestfulApiClient mockClient = new MockRestfulApiClient();
+    @SuppressWarnings("unchecked")
+    URI uri = MockRestfulApiClient.buildUri("test.com", 80, "test", true,
+        new Pair <String,String>("Entry1","Value1"));
+
+    String content = "123456789";
+
+    String result = mockClient.httpPut(uri, null,content);
+    Assert.assertTrue(result!= null && result.contains(uri.toString()));
+    Assert.assertTrue(result.contains("Content-Length = " + Integer.toString(content.length())));
+  }
+
+  @Test
+  public void testContentLengthOverride() throws Exception {
+    MockRestfulApiClient mockClient = new MockRestfulApiClient();
+    @SuppressWarnings("unchecked")
+    URI uri = MockRestfulApiClient.buildUri("test.com", 80, "test", true,
+        new Pair <String,String>("Entry1","Value1"));
+
+    ArrayList<NameValuePair> headerItems = new ArrayList<NameValuePair>();
+    headerItems.add(new BasicNameValuePair("Content-Length","0"));
+
+    String content = "123456789";
+
+    String result = mockClient.httpPut(uri, headerItems,content);
+    Assert.assertTrue(result!= null && result.contains(uri.toString()));
+    Assert.assertEquals(result.lastIndexOf("Content-Length"),result.indexOf("Content-Length"));
+    Assert.assertTrue(result.contains("Content-Length = " + Integer.toString(content.length())));
+  }
+
+  @Test
+  public void testHttpDelete() throws Exception {
+    MockRestfulApiClient mockClient = new MockRestfulApiClient();
+    @SuppressWarnings("unchecked")
+    URI uri = MockRestfulApiClient.buildUri("test.com", 80, "test", true,
+        new Pair <String,String>("Entry1","Value1"));
+
+    ArrayList<NameValuePair> headerItems = new ArrayList<NameValuePair>();
+    headerItems.add(new BasicNameValuePair("h1","v1"));
+    headerItems.add(new BasicNameValuePair("h2","v2"));
+
+    String result = mockClient.httpDelete(uri, headerItems);
+    Assert.assertTrue(result!= null && result.contains(uri.toString()));
+    Assert.assertTrue(result.contains("METHOD = DELETE"));
+    Assert.assertTrue(result.contains("h1 = v1"));
+    Assert.assertTrue(result.contains("h2 = v2"));
+  }
+}