Details
diff --git a/azkaban-common/src/main/java/azkaban/executor/ExecutorApiClient.java b/azkaban-common/src/main/java/azkaban/executor/ExecutorApiClient.java
new file mode 100644
index 0000000..4d77897
--- /dev/null
+++ b/azkaban-common/src/main/java/azkaban/executor/ExecutorApiClient.java
@@ -0,0 +1,74 @@
+/*
+ * Copyright 2015 LinkedIn Corp.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package azkaban.executor;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.http.HttpEntity;
+import org.apache.http.HttpResponse;
+import org.apache.http.StatusLine;
+import org.apache.http.client.HttpResponseException;
+import org.apache.http.util.EntityUtils;
+
+import azkaban.utils.JSONUtils;
+import azkaban.utils.RestfulApiClient;
+
+/** Client class that will be used to handle all Restful API calls between Executor and the host application.
+ * */
+public class ExecutorApiClient extends RestfulApiClient<Map<String, Object>> {
+ private ExecutorApiClient(){}
+ private static ExecutorApiClient instance = new ExecutorApiClient();
+
+ /** Function to return the instance of the ExecutorApiClient class.
+ * */
+ public static ExecutorApiClient getInstance() {
+ return instance;
+ }
+
+ /**Implementing the parseResponse function to return de-serialized Json object.
+ * @param response the returned response from the HttpClient.
+ * @return de-serialized object from Json or empty object if the response doesn't have a body.
+ * */
+ @SuppressWarnings("unchecked")
+ @Override
+ protected Map<String, Object> parseResponse(HttpResponse response)
+ throws HttpResponseException, IOException {
+ final StatusLine statusLine = response.getStatusLine();
+ String responseBody = response.getEntity() != null ?
+ EntityUtils.toString(response.getEntity()) : "";
+
+ if (statusLine.getStatusCode() >= 300) {
+
+ logger.error(String.format("unable to parse response as the response status is %s",
+ statusLine.getStatusCode()));
+
+ throw new HttpResponseException(statusLine.getStatusCode(),responseBody);
+ }
+
+ final HttpEntity entity = response.getEntity();
+ if (null != entity){
+ Object returnVal = JSONUtils.parseJSONFromString(EntityUtils.toString(entity));
+ if (null!= returnVal){
+ return (Map<String, Object>) returnVal;
+ }
+ }
+
+ return new HashMap<String, Object>() ;
+ }
+}
diff --git a/azkaban-common/src/main/java/azkaban/executor/selector/CandidateComparator.java b/azkaban-common/src/main/java/azkaban/executor/selector/CandidateComparator.java
new file mode 100644
index 0000000..8234f7d
--- /dev/null
+++ b/azkaban-common/src/main/java/azkaban/executor/selector/CandidateComparator.java
@@ -0,0 +1,151 @@
+/*
+ * Copyright 2015 LinkedIn Corp.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package azkaban.executor.selector;
+
+import java.util.Collection;
+import java.util.Comparator;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+
+import org.apache.log4j.Logger;
+
+import azkaban.utils.Pair;
+
+/** Abstract class for a candidate comparator.
+ * this class contains implementation of most of the core logics. Implementing classes is expected only to
+ * register factor comparators using the provided register function.
+ *
+ */
+public abstract class CandidateComparator<T> implements Comparator<T> {
+ private static Logger logger = Logger.getLogger(CandidateComparator.class);
+
+ // internal repository of the registered comparators .
+ private Map<String,FactorComparator<T>> factorComparatorList =
+ new ConcurrentHashMap<String,FactorComparator<T>>();
+
+ /** gets the name of the current implementation of the candidate comparator.
+ * @returns : name of the comparator.
+ * */
+ public abstract String getName();
+
+ /** tieBreak method which will kick in when the comparator list generated an equality result for
+ * both sides. the tieBreak method will try best to make sure a stable result is returned.
+ * */
+ protected boolean tieBreak(T object1, T object2){
+ if (null == object2) return true;
+ if (null == object1) return false;
+ return object1.hashCode() >= object2.hashCode();
+ }
+
+ /** function to register a factorComparator to the internal Map for future reference.
+ * @param factorComparator : the comparator object to be registered.
+ * @throws IllegalArgumentException
+ * */
+ protected void registerFactorComparator(FactorComparator<T> comparator){
+ if (null == comparator ||
+ Integer.MAX_VALUE - this.getTotalWeight() < comparator.getWeight() ) {
+ throw new IllegalArgumentException("unable to register comparator."+
+ " The passed comparator is null or has an invalid weight value.");
+ }
+
+ // add or replace the Comparator.
+ this.factorComparatorList.put(comparator.getFactorName(),comparator);
+ logger.info(String.format("Factor comparator added for '%s'. Weight = '%s'",
+ comparator.getFactorName(), comparator.getWeight()));
+ }
+
+ /** function returns the total weight of the registered comparators.
+ * @return the value of total weight.
+ * */
+ public int getTotalWeight(){
+ int totalWeight = 0 ;
+
+ // save out a copy of the values as HashMap.values() takes o(n) to return the value.
+ Collection<FactorComparator<T>> allValues = this.factorComparatorList.values();
+ for (FactorComparator<T> item : allValues){
+ if (item != null){
+ totalWeight += item.getWeight();
+ }
+ }
+
+ return totalWeight;
+ }
+
+ /** function to actually calculate the scores for the two objects that are being compared.
+ * the comparison follows the following logic -
+ * 1. if both objects are equal return 0 score for both.
+ * 2. if one side is null, the other side gets all the score.
+ * 3. if both sides are non-null value, both values will be passed to all the registered FactorComparators
+ * each factor comparator will generate a result based off it sole logic the weight of the comparator will be
+ * added to the wining side, if equal, no value will be added to either side.
+ * 4. final result will be returned in a Pair container.
+ *
+ * */
+ public Pair<Integer,Integer> getComparisonScore(T object1, T object2){
+ logger.info(String.format("start comparing '%s' with '%s', total weight = %s ",
+ object1 == null ? "(null)" : object1.toString(),
+ object2 == null ? "(null)" : object2.toString(),
+ this.getTotalWeight()));
+
+ int result1 = 0 ;
+ int result2 = 0 ;
+
+ // short cut if object equals.
+ if (object1 == object2){
+ logger.info("[Comparator] same object.");
+ } else
+ // left side is null.
+ if (object1 == null){
+ logger.info("[Comparator] left side is null, right side gets total weight.");
+ result2 = this.getTotalWeight();
+ } else
+ // right side is null.
+ if (object2 == null){
+ logger.info("[Comparator] right side is null, left side gets total weight.");
+ result1 = this.getTotalWeight();
+ } else
+ // both side is not null,put them thru the full loop
+ {
+ Collection<FactorComparator<T>> comparatorList = this.factorComparatorList.values();
+ for (FactorComparator<T> comparator :comparatorList){
+ int result = comparator.compare(object1, object2);
+ result1 = result1 + (result > 0 ? comparator.getWeight() : 0);
+ result2 = result2 + (result < 0 ? comparator.getWeight() : 0);
+ logger.info(String.format("[Factor: %s] compare result : %s (current score %s vs %s)",
+ comparator.getFactorName(), result, result1, result2));
+ }
+ }
+ // in case of same score, use tie-breaker to stabilize the result.
+ if (result1 == result2){
+ boolean result = this.tieBreak(object1, object2);
+ logger.info("[TieBreaker] TieBreaker chose " +
+ (result? String.format("left side (%s)", null== object1 ? "null": object1.toString()) :
+ String.format("right side (%s)", null== object2 ? "null": object2.toString()) ));
+ if (result) result1++; else result2++;
+ }
+
+ logger.info(String.format("Result : %s vs %s ",result1,result2));
+ return new Pair<Integer,Integer>(result1,result2);
+ }
+
+ @Override
+ public int compare(T o1, T o2) {
+ Pair<Integer,Integer> result = this.getComparisonScore(o1,o2);
+ return result.getFirst() == result.getSecond() ? 0 :
+ result.getFirst() > result.getSecond() ? 1 : -1;
+ }
+}
diff --git a/azkaban-common/src/main/java/azkaban/executor/selector/CandidateFilter.java b/azkaban-common/src/main/java/azkaban/executor/selector/CandidateFilter.java
new file mode 100644
index 0000000..154d528
--- /dev/null
+++ b/azkaban-common/src/main/java/azkaban/executor/selector/CandidateFilter.java
@@ -0,0 +1,82 @@
+/*
+ * Copyright 2015 LinkedIn Corp.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package azkaban.executor.selector;
+
+import java.util.Collection;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+
+import org.apache.log4j.Logger;
+
+
+/** Abstract class for a candidate filter.
+ * this class contains implementation of most of the core logics. Implementing classes is expected only to
+ * register filters using the provided register function.
+ */
+public abstract class CandidateFilter<T,V> {
+ private static Logger logger = Logger.getLogger(CandidateFilter.class);
+
+ // internal repository of the registered filters .
+ private Map<String,FactorFilter<T,V>> factorFilterList =
+ new ConcurrentHashMap<String,FactorFilter<T,V>>();
+
+ /** gets the name of the current implementation of the candidate filter.
+ * @returns : name of the filter.
+ * */
+ public abstract String getName();
+
+ /** function to register a factorFilter to the internal Map for future reference.
+ * @param factorfilter : the Filter object to be registered.
+ * @throws IllegalArgumentException
+ * */
+ protected void registerFactorFilter(FactorFilter<T,V> filter){
+ if (null == filter ) {
+ throw new IllegalArgumentException("unable to register factor filter. " +
+ "The passed comaractor is null or has an invalid weight value.");
+ }
+
+ // add or replace the filter.
+ this.factorFilterList.put(filter.getFactorName(),filter);
+ logger.info(String.format("Factor filter added for '%s'.",
+ filter.getFactorName()));
+ }
+
+ /** function to analyze the target item according to the reference object to decide whether the item should be filtered.
+ * @param filteringTarget: object to be checked.
+ * @param referencingObject: object which contains statistics based on which a decision is made whether
+ * the object being checked need to be filtered or not.
+ * @return true if the check passed, false if check failed, which means the item need to be filtered.
+ * */
+ public boolean filterTarget(T filteringTarget, V referencingObject){
+ logger.info(String.format("start filtering '%s' with factor filter for '%s'",
+ filteringTarget == null ? "(null)" : filteringTarget.toString(),
+ this.getName()));
+
+ Collection<FactorFilter<T,V>> filterList = this.factorFilterList.values();
+ boolean result = true;
+ for (FactorFilter<T,V> filter : filterList){
+ result &= filter.filterTarget(filteringTarget,referencingObject);
+ logger.info(String.format("[Factor: %s] filter result : %s ",
+ filter.getFactorName(), result));
+ if (!result){
+ break;
+ }
+ }
+ logger.info(String.format("Final filtering result : %s ",result));
+ return result;
+ }
+}
diff --git a/azkaban-common/src/main/java/azkaban/executor/selector/CandidateSelector.java b/azkaban-common/src/main/java/azkaban/executor/selector/CandidateSelector.java
new file mode 100644
index 0000000..0598d95
--- /dev/null
+++ b/azkaban-common/src/main/java/azkaban/executor/selector/CandidateSelector.java
@@ -0,0 +1,88 @@
+/*
+ * Copyright 2015 LinkedIn Corp.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package azkaban.executor.selector;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+
+import org.apache.log4j.Logger;
+
+/** Implementation of the CandidateSelector.
+ * @param K executor object type.
+ * @param V dispatching object type.
+ * */
+public class CandidateSelector<K,V> implements Selector<K, V> {
+ private static Logger logger = Logger.getLogger(CandidateComparator.class);
+
+ private CandidateFilter<K,V> filter;
+ private CandidateComparator<K> comparator;
+
+ /**constructor of the class.
+ * @param filter CandidateFilter object to be used to perform the candidate filtering.
+ * @param comparator CandidateComparator object to be used to find the best suit candidate from the filtered list.
+ * */
+ public CandidateSelector(CandidateFilter<K,V> filter,
+ CandidateComparator<K> comparator){
+ this.filter = filter;
+ this.comparator = comparator;
+ }
+
+ @Override
+ public K getBest(List<K> candidateList, V dispatchingObject) {
+
+ // shortcut if the candidateList is empty.
+ if ( null == candidateList || candidateList.size() == 0){
+ logger.error("failed to getNext candidate as the passed candidateList is null or empty.");
+ return null;
+ }
+
+ logger.info("start candidate selection logic.");
+ logger.info(String.format("candidate count before filtering: %s", candidateList.size()));
+
+ // to keep the input untouched, we will form up a new list based off the filtering result.
+ List<K> filteredList = new ArrayList<K>();
+
+ if (null != this.filter){
+ for (K candidateInfo : candidateList){
+ if (filter.filterTarget(candidateInfo,dispatchingObject)){
+ filteredList.add(candidateInfo);
+ }
+ }
+ } else{
+ filteredList = candidateList;
+ logger.info("skipping the candidate filtering as the filter object is not specifed.");
+ }
+
+ logger.info(String.format("candidate count after filtering: %s", filteredList.size()));
+ if (filteredList.size() == 0){
+ logger.info("failed to select candidate as the filted candidate list is empty.");
+ return null;
+ }
+
+ // final work - find the best candidate from the filtered list.
+ K executor = Collections.max(filteredList,comparator);
+ logger.info(String.format("candidate selected %s",
+ null == executor ? "(null)" : executor.toString()));
+ return executor;
+ }
+
+ @Override
+ public String getName() {
+ return "CandidateSelector";
+ }
+}
diff --git a/azkaban-common/src/main/java/azkaban/executor/selector/FactorComparator.java b/azkaban-common/src/main/java/azkaban/executor/selector/FactorComparator.java
new file mode 100644
index 0000000..1d9d162
--- /dev/null
+++ b/azkaban-common/src/main/java/azkaban/executor/selector/FactorComparator.java
@@ -0,0 +1,76 @@
+/*
+ * Copyright 2015 LinkedIn Corp.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package azkaban.executor.selector;
+
+import java.util.Comparator;
+import org.apache.log4j.Logger;
+
+/** wrapper class for a factor comparator .
+ *@param T: the type of the objects to be compared.
+ */
+public final class FactorComparator<T>{
+ private static Logger logger = Logger.getLogger(CandidateComparator.class);
+
+ private String factorName;
+ private int weight;
+ private Comparator<T> comparator;
+
+ /** private constructor of the class. User will create the instance of the class by calling the static
+ * method provided below.
+ * @param factorName : the factor name .
+ * @param weight : the weight of the comparator.
+ * @ comparator : function to be provided by user on how the comparison should be made.
+ * */
+ private FactorComparator(String factorName, int weight, Comparator<T> comparator){
+ this.factorName = factorName;
+ this.weight = weight;
+ this.comparator = comparator;
+ }
+
+ /** static function to generate an instance of the class.
+ * refer to the constructor for the param definitions.
+ * */
+ public static <T> FactorComparator<T> create(String factorName, int weight, Comparator<T> comparator){
+
+ if (null == factorName || factorName.length() == 0 || weight < 0 || null == comparator){
+ logger.error("failed to create instance of FactorComparator, at least one of the input paramters are invalid");
+ return null;
+ }
+
+ return new FactorComparator<T>(factorName,weight,comparator);
+ }
+
+ // function to return the factor name.
+ public String getFactorName(){
+ return this.factorName;
+ }
+
+ // function to return the weight value.
+ public int getWeight(){
+ return this.weight;
+ }
+
+ // function to return the weight value.
+ public void updateWeight(int value){
+ this.weight = value;
+ }
+
+ // the actual compare function, which will leverage the user defined function.
+ public int compare(T object1, T object2){
+ return this.comparator.compare(object1, object2);
+ }
+}
diff --git a/azkaban-common/src/main/java/azkaban/executor/selector/FactorFilter.java b/azkaban-common/src/main/java/azkaban/executor/selector/FactorFilter.java
new file mode 100644
index 0000000..74b9d5d
--- /dev/null
+++ b/azkaban-common/src/main/java/azkaban/executor/selector/FactorFilter.java
@@ -0,0 +1,77 @@
+/*
+ * Copyright 2015 LinkedIn Corp.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package azkaban.executor.selector;
+
+import org.apache.log4j.Logger;
+
+/** wrapper class for a factor Filter .
+ *@param T: the type of the objects to be compared.
+ *@param V: the type of the object to be used for filtering.
+ */
+public final class FactorFilter<T,V>{
+ private static Logger logger = Logger.getLogger(FactorFilter.class);
+
+ private String factorName;
+ private Filter<T,V> filter;
+
+ /** private constructor of the class. User will create the instance of the class by calling the static
+ * method provided below.
+ * @param factorName : the factor name .
+ * @param filter : user defined function specifying how the filtering should be implemented.
+ * */
+ private FactorFilter(String factorName, Filter<T,V> filter){
+ this.factorName = factorName;
+ this.filter = filter;
+ }
+
+ /** static function to generate an instance of the class.
+ * refer to the constructor for the param definitions.
+ * */
+ public static <T,V> FactorFilter<T,V> create(String factorName, Filter<T,V> filter){
+
+ if (null == factorName || factorName.length() == 0 || null == filter){
+ logger.error("failed to create instance of FactorFilter, at least one of the input paramters are invalid");
+ return null;
+ }
+
+ return new FactorFilter<T,V>(factorName,filter);
+ }
+
+ // function to return the factor name.
+ public String getFactorName(){
+ return this.factorName;
+ }
+
+ // the actual check function, which will leverage the logic defined by user.
+ public boolean filterTarget(T filteringTarget, V referencingObject){
+ return this.filter.filterTarget(filteringTarget, referencingObject);
+ }
+
+ // interface of the filter.
+ public interface Filter<T,V>{
+
+ /**function to analyze the target item according to the reference object to decide whether the item should be filtered.
+ * @param filteringTarget: object to be checked.
+ * @param referencingObject: object which contains statistics based on which a decision is made whether
+ * the object being checked need to be filtered or not.
+ * @return true if the check passed, false if check failed, which means the item need to be filtered.
+ * */
+ public boolean filterTarget(T filteringTarget, V referencingObject);
+ }
+
+
+}
diff --git a/azkaban-common/src/main/java/azkaban/executor/selector/Selector.java b/azkaban-common/src/main/java/azkaban/executor/selector/Selector.java
new file mode 100644
index 0000000..610f75a
--- /dev/null
+++ b/azkaban-common/src/main/java/azkaban/executor/selector/Selector.java
@@ -0,0 +1,41 @@
+/*
+ * Copyright 2015 LinkedIn Corp.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package azkaban.executor.selector;
+
+import java.util.List;
+
+
+/** Definition of the selector interface.
+ * an implementation of the selector interface provides the functionality
+ * to return a candidate from the candidateList that suits best for the dispatchingObject.
+ * @param K : type of the candidate.
+ * @param V : type of the dispatching object.
+ */
+public interface Selector <K,V> {
+
+ /** Function returns the next best suit candidate from the candidateList for the dispatching object.
+ * @param candidateList : List of the candidates to select from .
+ * @param dispatchingObject : the object to be dispatched .
+ * @return candidate from the candidate list that suits best for the dispatching object.
+ * */
+ public K getBest(List<K> candidateList, V dispatchingObject);
+
+ /** Function returns the name of the current Dispatcher
+ * @return name of the dispatcher.
+ * */
+ public String getName();
+}
diff --git a/azkaban-common/src/main/java/azkaban/utils/RestfulApiClient.java b/azkaban-common/src/main/java/azkaban/utils/RestfulApiClient.java
new file mode 100644
index 0000000..ef0ef46
--- /dev/null
+++ b/azkaban-common/src/main/java/azkaban/utils/RestfulApiClient.java
@@ -0,0 +1,239 @@
+/*
+ * Copyright 2015 LinkedIn Corp.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package azkaban.utils;
+
+import java.io.IOException;
+import java.io.UnsupportedEncodingException;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.util.List;
+
+import org.apache.http.HttpEntity;
+import org.apache.http.HttpMessage;
+import org.apache.http.HttpResponse;
+import org.apache.http.NameValuePair;
+import org.apache.http.ParseException;
+import org.apache.http.client.HttpResponseException;
+import org.apache.http.client.methods.HttpDelete;
+import org.apache.http.client.methods.HttpEntityEnclosingRequestBase;
+import org.apache.http.client.methods.HttpGet;
+import org.apache.http.client.methods.HttpPost;
+import org.apache.http.client.methods.HttpPut;
+import org.apache.http.client.methods.HttpUriRequest;
+import org.apache.http.client.utils.URIBuilder;
+import org.apache.http.entity.ByteArrayEntity;
+import org.apache.http.impl.client.CloseableHttpClient;
+import org.apache.http.impl.client.HttpClients;
+import org.apache.log4j.Logger;
+
+/** class handles the communication between the application and
+ * a Restful API based web server.
+ * @param T : type of the returning response object.
+ * Note: the idea of this abstract class is to provide a wrapper for the logic around HTTP layer communication so
+ * development work can take this as a black box and focus on processing the result.
+ * With that said the abstract class will be provided as a template, which ideally can support different types
+ * of returning object (Dictionary, xmlDoc , text etc.)
+ * */
+public abstract class RestfulApiClient<T> {
+ protected static Logger logger = Logger.getLogger(RestfulApiClient.class);
+
+ /** Method to transform the response returned by the httpClient into the
+ * type specified.
+ * Note: Method need to handle case such as failed request.
+ * Also method is not supposed to pass the response object out
+ * via the returning value as the response will be closed after the
+ * execution steps out of the method context.
+ * @throws HttpResponseException
+ * @throws IOException
+ * @throws ParseException
+ * **/
+ protected abstract T parseResponse(HttpResponse response)
+ throws HttpResponseException, IOException;
+
+ /** function to perform a Get http request.
+ * @param uri the URI of the request.
+ * @param headerEntries extra entries to be added to request header.
+ * @return the response object type of which is specified by user.
+ * @throws IOException */
+ public T httpGet(URI uri, List<NameValuePair> headerEntries) throws IOException{
+ // shortcut if the passed url is invalid.
+ if (null == uri){
+ logger.error(" unable to perform httpGet as the passed uri is null");
+ return null;
+ }
+
+ HttpGet get = new HttpGet(uri);
+ return this.sendAndReturn((HttpGet)completeRequest(get, headerEntries));
+ }
+
+ /** function to perform a Post http request.
+ * @param uri the URI of the request.
+ * @param headerEntries extra entries to be added to request header.
+ * @param postingBody the content to be posted , optional.
+ * @return the response object type of which is specified by user.
+ * @throws UnsupportedEncodingException, IOException */
+ public T httpPost(URI uri,
+ List<NameValuePair> headerEntries,
+ String postingBody) throws UnsupportedEncodingException, IOException{
+ // shortcut if the passed url is invalid.
+ if (null == uri){
+ logger.error(" unable to perform httpPost as the passed uri is null.");
+ return null;
+ }
+
+ HttpPost post = new HttpPost(uri);
+ return this.sendAndReturn(completeRequest(post,headerEntries,postingBody));
+ }
+
+ /** function to perform a Delete http request.
+ * @param uri the URI of the request.
+ * @param headerEntries extra entries to be added to request header.
+ * @return the response object type of which is specified by user.
+ * @throws IOException */
+ public T httpDelete(URI uri, List<NameValuePair> headerEntries) throws IOException{
+ // shortcut if the passed url is invalid.
+ if (null == uri){
+ logger.error(" unable to perform httpDelete as the passed uri is null.");
+ return null;
+ }
+
+ HttpDelete delete = new HttpDelete(uri);
+ return this.sendAndReturn((HttpDelete)completeRequest(delete, headerEntries));
+ }
+
+ /** function to perform a Put http request.
+ * @param uri the URI of the request.
+ * @param headerEntries extra entries to be added to request header.
+ * @param postingBody the content to be posted , optional.
+ * @return the response object type of which is specified by user.
+ * @throws UnsupportedEncodingException, IOException */
+ public T httpPut(URI uri, List<NameValuePair> headerEntries,
+ String postingBody) throws UnsupportedEncodingException, IOException{
+ // shortcut if the passed url is invalid.
+ if (null == uri){
+ logger.error(" unable to perform httpPut as the passed url is null or empty.");
+ return null;
+ }
+
+ HttpPut put = new HttpPut(uri);
+ return this.sendAndReturn(completeRequest(put, headerEntries, postingBody));
+ }
+
+ /** function to dispatch the request and pass back the response.
+ * */
+ protected T sendAndReturn(HttpUriRequest request) throws IOException{
+ CloseableHttpClient client = HttpClients.createDefault();
+ try {
+ return this.parseResponse(client.execute(request));
+ }finally{
+ client.close();
+ }
+ }
+
+ /** helper function to build a valid URI.
+ * @param host host name.
+ * @param port host port.
+ * @param path extra path after host.
+ * @param isHttp indicates if whether Http or HTTPS should be used.
+ * @param params extra query parameters.
+ * @return the URI built from the inputs.
+ * @throws IOException
+ * */
+ public static URI buildUri(String host, int port, String path,
+ boolean isHttp, Pair<String, String>... params) throws IOException{
+ URIBuilder builder = new URIBuilder();
+ builder.setScheme(isHttp? "http" : "https").setHost(host).setPort(port);
+
+ if (null != path && path.length() > 0){
+ builder.setPath(path);
+ }
+
+ if (params != null) {
+ for (Pair<String, String> pair : params) {
+ builder.setParameter(pair.getFirst(), pair.getSecond());
+ }
+ }
+
+ URI uri = null;
+ try {
+ uri = builder.build();
+ } catch (URISyntaxException e) {
+ throw new IOException(e);
+ }
+
+ return uri;
+ }
+
+ /** helper function to build a valid URI.
+ * @param uri the URI to start with.
+ * @param params extra query parameters to append.
+ * @return the URI built from the inputs.
+ * @throws IOException
+ * */
+ public static URI BuildUri(URI uri, Pair<String, String>... params) throws IOException{
+ URIBuilder builder = new URIBuilder(uri);
+
+ if (params != null) {
+ for (Pair<String, String> pair : params) {
+ builder.setParameter(pair.getFirst(), pair.getSecond());
+ }
+ }
+
+ URI returningUri = null;
+ try {
+ returningUri = builder.build();
+ } catch (URISyntaxException e) {
+ throw new IOException(e);
+ }
+
+ return returningUri;
+ }
+
+ /** helper function to fill the request with header entries .
+ * */
+ private static HttpMessage completeRequest(HttpMessage request,
+ List<NameValuePair> headerEntries){
+ if (null == request){
+ logger.error("unable to complete request as the passed request object is null");
+ return request;
+ }
+
+ // dump all the header entries to the request.
+ if (null != headerEntries && headerEntries.size() > 0){
+ for (NameValuePair pair : headerEntries){
+ request.addHeader(pair.getName(), pair.getValue());
+ }
+ }
+ return request;
+ }
+
+ /** helper function to fill the request with header entries and posting body .
+ * */
+ private static HttpEntityEnclosingRequestBase completeRequest(HttpEntityEnclosingRequestBase request,
+ List<NameValuePair> headerEntries,
+ String postingBody) throws UnsupportedEncodingException{
+ if (null != completeRequest(request, headerEntries)){
+ // dump the post body UTF-8 will be used as the default encoding type.
+ if (null != postingBody && postingBody.length() > 0){
+ HttpEntity entity = new ByteArrayEntity(postingBody.getBytes("UTF-8"));
+ request.setHeader("Content-Length", Long.toString(entity.getContentLength()));
+ request.setEntity(entity);
+ }
+ }
+ return request;
+ }
+}
diff --git a/azkaban-common/src/test/java/azkaban/executor/SelectorTest.java b/azkaban-common/src/test/java/azkaban/executor/SelectorTest.java
new file mode 100644
index 0000000..b5b4509
--- /dev/null
+++ b/azkaban-common/src/test/java/azkaban/executor/SelectorTest.java
@@ -0,0 +1,605 @@
+/*
+ * Copyright 2015 LinkedIn Corp.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package azkaban.executor;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.Date;
+
+import org.apache.log4j.BasicConfigurator;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import azkaban.executor.selector.*;
+
+public class SelectorTest {
+ // mock executor object.
+ protected class MockExecutorObject{
+ public String name;
+ public int port;
+ public double percentOfRemainingMemory;
+ public int amountOfRemainingMemory;
+ public int priority;
+ public Date lastAssigned;
+ public double percentOfRemainingFlowcapacity;
+ public int remainingTmp;
+
+ public MockExecutorObject(String name,
+ int port,
+ double percentOfRemainingMemory,
+ int amountOfRemainingMemory,
+ int priority,
+ Date lastAssigned,
+ double percentOfRemainingFlowcapacity,
+ int remainingTmp)
+ {
+ this.name = name;
+ this.port = port;
+ this.percentOfRemainingMemory = percentOfRemainingMemory;
+ this.amountOfRemainingMemory =amountOfRemainingMemory;
+ this.priority = priority;
+ this.lastAssigned = lastAssigned;
+ this.percentOfRemainingFlowcapacity = percentOfRemainingFlowcapacity;
+ this.remainingTmp = remainingTmp;
+ }
+
+ @Override
+ public String toString()
+ {
+ return this.name;
+ }
+ }
+
+ // Mock flow object.
+ protected class MockFlowObject{
+ public String name;
+ public int requiredRemainingMemory;
+ public int requiredTotalMemory;
+ public int requiredRemainingTmpSpace;
+ public int priority;
+
+ public MockFlowObject(String name,
+ int requiredTotalMemory,
+ int requiredRemainingMemory,
+ int requiredRemainingTmpSpace,
+ int priority)
+ {
+ this.name = name;
+ this.requiredTotalMemory = requiredTotalMemory;
+ this.requiredRemainingMemory = requiredRemainingMemory;
+ this.requiredRemainingTmpSpace = requiredRemainingTmpSpace;
+ this.priority = priority;
+ }
+
+ @Override
+ public String toString()
+ {
+ return this.name;
+ }
+ }
+
+ // mock Filter class.
+ protected class MockFilter
+ extends CandidateFilter<MockExecutorObject,MockFlowObject>{
+
+ @Override
+ public String getName() {
+ return "Mockfilter";
+ }
+
+ public MockFilter(){
+ }
+
+ // function to register the remainingMemory filter.
+ // for test purpose the registration is put in a separated method, in production the work should be done
+ // in the constructor.
+ public void registerFilterforTotalMemory(){
+ this.registerFactorFilter(FactorFilter.create("requiredTotalMemory",
+ new FactorFilter.Filter<MockExecutorObject,MockFlowObject>() {
+ public boolean filterTarget(MockExecutorObject itemToCheck, MockFlowObject sourceObject) {
+ // REAL LOGIC COMES HERE -
+ if (null == itemToCheck || null == sourceObject){
+ return false;
+ }
+
+ // Box has infinite memory.:)
+ if (itemToCheck.percentOfRemainingMemory == 0) {
+ return true;
+ }
+
+ // calculate the memory and return.
+ return itemToCheck.amountOfRemainingMemory / itemToCheck.percentOfRemainingMemory * 100 >
+ sourceObject.requiredTotalMemory;
+ }}));
+ }
+
+ public void registerFilterforRemainingMemory(){
+ this.registerFactorFilter(FactorFilter.create("requiredRemainingMemory",
+ new FactorFilter.Filter<MockExecutorObject,MockFlowObject>() {
+ public boolean filterTarget(MockExecutorObject itemToCheck, MockFlowObject sourceObject) {
+ // REAL LOGIC COMES HERE -
+ if (null == itemToCheck || null == sourceObject){
+ return false;
+ }
+ return itemToCheck.amountOfRemainingMemory > sourceObject.requiredRemainingMemory;
+ }}));
+ }
+
+ public void registerFilterforPriority(){
+ this.registerFactorFilter(FactorFilter.create("requiredProprity",
+ new FactorFilter.Filter<MockExecutorObject,MockFlowObject>() {
+ public boolean filterTarget(MockExecutorObject itemToCheck, MockFlowObject sourceObject) {
+ // REAL LOGIC COMES HERE -
+ if (null == itemToCheck || null == sourceObject){
+ return false;
+ }
+
+ // priority value, the bigger the lower.
+ return itemToCheck.priority >= sourceObject.priority;
+ }}));
+ }
+
+ public void registerFilterforRemainingTmpSpace(){
+ this.registerFactorFilter(FactorFilter.create("requiredRemainingTmpSpace",
+ new FactorFilter.Filter<MockExecutorObject,MockFlowObject>() {
+ public boolean filterTarget(MockExecutorObject itemToCheck, MockFlowObject sourceObject) {
+ // REAL LOGIC COMES HERE -
+ if (null == itemToCheck || null == sourceObject){
+ return false;
+ }
+
+ return itemToCheck.remainingTmp > sourceObject.requiredRemainingTmpSpace;
+ }}));
+ }
+
+ }
+
+ // mock comparator class.
+ protected class MockComparator
+ extends CandidateComparator<MockExecutorObject>{
+
+ @Override
+ public String getName() {
+ return "MockComparator";
+ }
+
+ @Override
+ protected boolean tieBreak(MockExecutorObject object1, MockExecutorObject object2){
+ if (null == object2) return true;
+ if (null == object1) return false;
+ return object1.name.compareTo(object2.name) >= 0;
+ }
+
+ public MockComparator(){
+ }
+
+ public void registerComparerForMemory(int weight){
+ this.registerFactorComparator(FactorComparator.create("Memory", weight, new Comparator<MockExecutorObject>(){
+ public int compare(MockExecutorObject o1, MockExecutorObject o2) {
+ int result = 0 ;
+
+ // check remaining amount of memory.
+ result = o1.amountOfRemainingMemory - o2.amountOfRemainingMemory;
+ if (result != 0){
+ return result > 0 ? 1 : -1;
+ }
+
+ // check remaining % .
+ result = (int)(o1.percentOfRemainingMemory - o2.percentOfRemainingMemory);
+ return result == 0 ? 0 : result > 0 ? 1 : -1;
+
+ } }));
+ }
+
+ public void registerComparerForRemainingSpace(int weight){
+ this.registerFactorComparator(FactorComparator.create("RemainingTmp", weight, new Comparator<MockExecutorObject>(){
+ public int compare(MockExecutorObject o1, MockExecutorObject o2) {
+ int result = 0 ;
+
+ // check remaining % .
+ result = (int)(o1.remainingTmp - o2.remainingTmp);
+ return result == 0 ? 0 : result > 0 ? 1 : -1;
+
+ } }));
+ }
+
+ public void registerComparerForPriority(int weight){
+ this.registerFactorComparator(FactorComparator.create("Priority", weight, new Comparator<MockExecutorObject>(){
+ public int compare(MockExecutorObject o1, MockExecutorObject o2) {
+ int result = 0 ;
+
+ // check priority, bigger the better.
+ result = (int)(o1.priority - o2.priority);
+ return result == 0 ? 0 : result > 0 ? 1 : -1;
+
+ } }));
+ }
+ }
+
+ // test samples.
+ protected ArrayList<MockExecutorObject> executorList = new ArrayList<MockExecutorObject>();
+
+ @BeforeClass public static void onlyOnce() {
+ BasicConfigurator.configure();
+ }
+
+ @Before
+ public void setUp() throws Exception {
+ executorList.clear();
+ executorList.add(new MockExecutorObject("Executor1",8080,50.0,2048,5,new Date(), 20, 6400));
+ executorList.add(new MockExecutorObject("Executor2",8080,50.0,2048,4,new Date(), 20, 6400));
+ executorList.add(new MockExecutorObject("Executor3",8080,40.0,2048,1,new Date(), 20, 6400));
+ executorList.add(new MockExecutorObject("Executor4",8080,50.0,2048,4,new Date(), 20, 6400));
+ executorList.add(new MockExecutorObject("Executor5",8080,50.0,1024,5,new Date(), 90, 6400));
+ executorList.add(new MockExecutorObject("Executor6",8080,50.0,1024,5,new Date(), 90, 3200));
+ executorList.add(new MockExecutorObject("Executor7",8080,50.0,1024,5,new Date(), 90, 3200));
+ executorList.add(new MockExecutorObject("Executor8",8080,50.0,2048,1,new Date(), 90, 3200));
+ executorList.add(new MockExecutorObject("Executor9",8080,50.0,2050,5,new Date(), 90, 4200));
+ executorList.add(new MockExecutorObject("Executor10",8080,00.0,1024,1,new Date(), 90, 3200));
+ executorList.add(new MockExecutorObject("Executor11",8080,20.0,2096,3,new Date(), 90, 2400));
+ executorList.add(new MockExecutorObject("Executor12",8080,90.0,2050,5,new Date(), 60, 2500));
+
+
+ // make sure each time the order is different.
+ Collections.shuffle(this.executorList);
+ }
+
+ private MockExecutorObject getExecutorByName(String name){
+ MockExecutorObject returnVal = null;
+ for (MockExecutorObject item : this.executorList){
+ if (item.name.equals(name)){
+ returnVal = item;
+ break;
+ }
+ }
+ return returnVal;
+ }
+
+ @After
+ public void tearDown() throws Exception {
+ }
+
+ @Test
+ public void testExecutorFilter() throws Exception {
+
+ // mock object, remaining memory 11500, total memory 3095, remainingTmpSpace 4200, priority 2.
+ MockFlowObject dispatchingObj = new MockFlowObject("flow1",3096, 1500,4200,2);
+
+ MockFilter mFilter = new MockFilter();
+ mFilter.registerFilterforRemainingMemory();
+
+ // expect true.
+ boolean result = mFilter.filterTarget(this.getExecutorByName("Executor1"), dispatchingObj);
+ Assert.assertTrue(result);
+
+ //expect true.
+ result = mFilter.filterTarget(this.getExecutorByName("Executor3"), dispatchingObj);
+ /*
+ 1 [main] INFO azkaban.executor.Selector.CandidateFilter - start checking 'Executor3' with factor filter for 'Mockfilter'
+ 2 [main] INFO azkaban.executor.Selector.CandidateFilter - [Factor: requiredRemainingMemory] filter result : true
+ 2 [main] INFO azkaban.executor.Selector.CandidateFilter - Final checking result : true
+ */
+ Assert.assertTrue(result);
+
+ // add the priority filter.
+ mFilter.registerFilterforPriority();
+ result = mFilter.filterTarget(this.getExecutorByName("Executor3"), dispatchingObj);
+ // expect false, for priority.
+ /*
+ 2 [main] INFO azkaban.executor.Selector.CandidateFilter - start checking 'Executor3' with factor filter for 'Mockfilter'
+ 2 [main] INFO azkaban.executor.Selector.CandidateFilter - [Factor: requiredRemainingMemory] filter result : true
+ 2 [main] INFO azkaban.executor.Selector.CandidateFilter - [Factor: requiredProprity] filter result : false
+ 2 [main] INFO azkaban.executor.Selector.CandidateFilter - Final checking result : false
+ */
+ Assert.assertFalse(result);
+
+ // add the remaining space filter.
+ mFilter.registerFilterforRemainingTmpSpace();
+
+ // expect pass.
+ result = mFilter.filterTarget(this.getExecutorByName("Executor2"), dispatchingObj);
+ /*
+ 3 [main] INFO azkaban.executor.Selector.CandidateFilter - start checking 'Executor2' with factor filter for 'Mockfilter'
+ 3 [main] INFO azkaban.executor.Selector.CandidateFilter - [Factor: requiredRemainingMemory] filter result : true
+ 3 [main] INFO azkaban.executor.Selector.CandidateFilter - [Factor: requiredRemainingTmpSpace] filter result : true
+ 3 [main] INFO azkaban.executor.Selector.CandidateFilter - [Factor: requiredProprity] filter result : true
+ 3 [main] INFO azkaban.executor.Selector.CandidateFilter - Final checking result : true
+ */
+ Assert.assertTrue(result);
+
+ // expect false, remaining tmp, priority will also fail but the logic shortcuts when the Tmp size check Fails.
+ result = mFilter.filterTarget(this.getExecutorByName("Executor8"), dispatchingObj);
+ /*
+ 4 [main] INFO azkaban.executor.Selector.CandidateFilter - start checking 'Executor8' with factor filter for 'Mockfilter'
+ 4 [main] INFO azkaban.executor.Selector.CandidateFilter - [Factor: requiredRemainingMemory] filter result : true
+ 4 [main] INFO azkaban.executor.Selector.CandidateFilter - [Factor: requiredRemainingTmpSpace] filter result : false
+ 4 [main] INFO azkaban.executor.Selector.CandidateFilter - Final checking result : false
+ */
+ Assert.assertFalse(result);
+
+ }
+
+ @Test
+ public void testExecutorFilterWithNullInputs() throws Exception {
+ MockFilter filter = new MockFilter();
+ filter.registerFilterforPriority();
+ filter.registerFilterforRemainingMemory();
+ filter.registerFilterforRemainingTmpSpace();
+ filter.registerFilterforTotalMemory();
+ boolean result = false;
+ try {
+ result = filter.filterTarget(this.getExecutorByName("Executor1"), null);
+ } catch (Exception ex){
+ Assert.fail("no exception should be thrown when null value is passed to the filter.");
+ }
+ // note : the FactorFilter logic will decide whether true or false should be returned when null value
+ // is passed, for the Mock class it returns false.
+ Assert.assertFalse(result);
+
+ try {
+ result = filter.filterTarget(null, null);
+ } catch (Exception ex){
+ Assert.fail("no exception should be thrown when null value is passed to the filter.");
+ }
+ // note : the FactorFilter logic will decide whether true or false should be returned when null value
+ // is passed, for the Mock class it returns false.
+ Assert.assertFalse(result);
+ }
+
+ @Test
+ public void testExecutorComparer() throws Exception {
+ MockComparator comparator = new MockComparator();
+ comparator.registerComparerForMemory(5);
+
+ MockExecutorObject nextExecutor = Collections.max(this.executorList, comparator);
+
+ // expect the first item to be selected, memory wise it is the max.
+ Assert.assertEquals(this.getExecutorByName("Executor11"),nextExecutor);
+
+ // add the priority factor.
+ // expect again the #9 item to be selected.
+ comparator.registerComparerForPriority(6);
+ nextExecutor = Collections.max(this.executorList, comparator);
+ Assert.assertEquals(this.getExecutorByName("Executor12"),nextExecutor);
+
+ // add the remaining space factor.
+ // expect the #12 item to be returned.
+ comparator.registerComparerForRemainingSpace(3);
+ nextExecutor = Collections.max(this.executorList, comparator);
+ Assert.assertEquals(this.getExecutorByName("Executor12"),nextExecutor);
+ }
+
+ @Test
+ public void testExecutorComparerResisterComparerWInvalidWeight() throws Exception {
+ MockComparator comparator = new MockComparator();
+ comparator.registerComparerForMemory(0);
+ }
+
+ @Test
+ public void testSelector() throws Exception {
+ MockFilter filter = new MockFilter();
+ MockComparator comparator = new MockComparator();
+
+ filter.registerFilterforPriority();
+ filter.registerFilterforRemainingMemory();
+ filter.registerFilterforRemainingTmpSpace();
+ filter.registerFilterforTotalMemory();
+
+ comparator.registerComparerForMemory(3);
+ comparator.registerComparerForPriority(5);
+ comparator.registerComparerForRemainingSpace(3);
+
+ CandidateSelector<MockExecutorObject,MockFlowObject> morkSelector =
+ new CandidateSelector<MockExecutorObject,MockFlowObject>(filter,comparator);
+
+ // mock object, remaining memory 11500, total memory 3095, remainingTmpSpace 4200, priority 2.
+ MockFlowObject dispatchingObj = new MockFlowObject("flow1",3096, 1500,4200,2);
+
+ // expected selection = #12
+ MockExecutorObject nextExecutor = morkSelector.getBest(this.executorList, dispatchingObj);
+ Assert.assertEquals(this.getExecutorByName("Executor1"),nextExecutor);
+
+ // remaining memory 11500, total memory 3095, remainingTmpSpace 14200, priority 2.
+ dispatchingObj = new MockFlowObject("flow1",3096, 1500,14200,2);
+ // all candidates should be filtered by the remaining memory.
+ nextExecutor = morkSelector.getBest(this.executorList, dispatchingObj);
+ Assert.assertEquals(null,nextExecutor);
+ }
+
+ @Test
+ public void testSelectorsignleCandidate() throws Exception {
+ MockFilter filter = new MockFilter();
+ MockComparator comparator = new MockComparator();
+
+ filter.registerFilterforPriority();
+ filter.registerFilterforRemainingMemory();
+ filter.registerFilterforRemainingTmpSpace();
+ filter.registerFilterforTotalMemory();
+
+ comparator.registerComparerForMemory(3);
+ comparator.registerComparerForPriority(4);
+ comparator.registerComparerForRemainingSpace(1);
+
+ CandidateSelector<MockExecutorObject,MockFlowObject> morkSelector =
+ new CandidateSelector<MockExecutorObject,MockFlowObject>(filter,comparator);
+
+ ArrayList<MockExecutorObject> signleExecutorList = new ArrayList<MockExecutorObject>();
+ MockExecutorObject signleExecutor = new MockExecutorObject("ExecutorX",8080,50.0,2048,3,new Date(), 20, 6400);
+ signleExecutorList.add(signleExecutor);
+
+ MockFlowObject dispatchingObj = new MockFlowObject("flow1",100, 100,100,5);
+ MockExecutorObject executor = morkSelector.getBest(signleExecutorList, dispatchingObj);
+ // expected to see null result, as the only executor is filtered out .
+ Assert.assertTrue(null == executor);
+
+ // adjust the priority to let the executor pass the filter.
+ dispatchingObj.priority = 3;
+ executor = morkSelector.getBest(signleExecutorList, dispatchingObj);
+ Assert.assertEquals(signleExecutor, executor);
+ }
+
+ @Test
+ public void testSelectorListWithItemsThatAreReferenceEqual() throws Exception {
+ MockFilter filter = new MockFilter();
+ MockComparator comparator = new MockComparator();
+
+ filter.registerFilterforPriority();
+ filter.registerFilterforRemainingMemory();
+ filter.registerFilterforRemainingTmpSpace();
+ filter.registerFilterforTotalMemory();
+
+ comparator.registerComparerForMemory(3);
+ comparator.registerComparerForPriority(4);
+ comparator.registerComparerForRemainingSpace(1);
+
+ CandidateSelector<MockExecutorObject,MockFlowObject> morkSelector =
+ new CandidateSelector<MockExecutorObject,MockFlowObject>(filter,comparator);
+
+ ArrayList<MockExecutorObject> list = new ArrayList<MockExecutorObject>();
+ MockExecutorObject signleExecutor = new MockExecutorObject("ExecutorX",8080,50.0,2048,3,new Date(), 20, 6400);
+ list.add(signleExecutor);
+ list.add(signleExecutor);
+ MockFlowObject dispatchingObj = new MockFlowObject("flow1",100, 100,100,3);
+ MockExecutorObject executor = morkSelector.getBest(list, dispatchingObj);
+ Assert.assertTrue(signleExecutor == executor);
+ }
+
+ @Test
+ public void testSelectorListWithItemsThatAreEqualInValue() throws Exception {
+ MockFilter filter = new MockFilter();
+ MockComparator comparator = new MockComparator();
+
+ filter.registerFilterforPriority();
+ filter.registerFilterforRemainingMemory();
+ filter.registerFilterforRemainingTmpSpace();
+ filter.registerFilterforTotalMemory();
+
+ comparator.registerComparerForMemory(3);
+ comparator.registerComparerForPriority(4);
+ comparator.registerComparerForRemainingSpace(1);
+
+ CandidateSelector<MockExecutorObject,MockFlowObject> morkSelector =
+ new CandidateSelector<MockExecutorObject,MockFlowObject>(filter,comparator);
+
+ // note - as the tieBreaker set in the MockComparator uses the name value of the executor to do the
+ // final diff therefore we need to set the name differently to make a meaningful test, in real
+ // scenario we may want to use something else (say hash code) to be the bottom line for the tieBreaker
+ // to make a final decision, the purpose of the test here is to prove that for two candidates with
+ // exact value (in the case of test, all values except for the name) the decision result is stable.
+ ArrayList<MockExecutorObject> list = new ArrayList<MockExecutorObject>();
+ MockExecutorObject executor1 = new MockExecutorObject("ExecutorX", 8080,50.0,2048,3,new Date(), 20, 6400);
+ MockExecutorObject executor2 = new MockExecutorObject("ExecutorX2",8080,50.0,2048,3,new Date(), 20, 6400);
+ list.add(executor1);
+ list.add(executor2);
+ MockFlowObject dispatchingObj = new MockFlowObject("flow1",100, 100,100,3);
+ MockExecutorObject executor = morkSelector.getBest(list, dispatchingObj);
+ Assert.assertTrue(executor2 == executor);
+
+ // shuffle and test again.
+ list.remove(0);
+ list.add(executor1);
+ executor = morkSelector.getBest(list, dispatchingObj);
+ Assert.assertTrue(executor2 == executor);
+ }
+
+ @Test
+ public void testSelectorEmptyList() throws Exception {
+ MockFilter filter = new MockFilter();
+ MockComparator comparator = new MockComparator();
+
+ filter.registerFilterforPriority();
+ filter.registerFilterforRemainingMemory();
+ filter.registerFilterforRemainingTmpSpace();
+ filter.registerFilterforTotalMemory();
+
+ comparator.registerComparerForMemory(3);
+ comparator.registerComparerForPriority(4);
+ comparator.registerComparerForRemainingSpace(1);
+
+ CandidateSelector<MockExecutorObject,MockFlowObject> morkSelector =
+ new CandidateSelector<MockExecutorObject,MockFlowObject>(filter,comparator);
+
+ ArrayList<MockExecutorObject> list = new ArrayList<MockExecutorObject>();
+
+ MockFlowObject dispatchingObj = new MockFlowObject("flow1",100, 100,100,5);
+
+ MockExecutorObject executor = null;
+
+ try {
+ executor = morkSelector.getBest(list, dispatchingObj);
+ } catch (Exception ex){
+ Assert.fail("no exception should be thrown when an empty list is passed to the Selector.");
+ }
+
+ // expected to see null result.
+ Assert.assertTrue(null == executor);
+
+ try {
+ executor = morkSelector.getBest(list, dispatchingObj);
+ } catch (Exception ex){
+ Assert.fail("no exception should be thrown when null is passed to the Selector as the candidate list.");
+ }
+
+ // expected to see null result, as the only executor is filtered out .
+ Assert.assertTrue(null == executor);
+
+ }
+
+ @Test
+ public void testSelectorListWithNullValue() throws Exception {
+ MockComparator comparator = new MockComparator();
+
+ comparator.registerComparerForMemory(3);
+ comparator.registerComparerForPriority(4);
+ comparator.registerComparerForRemainingSpace(1);
+
+ CandidateSelector<MockExecutorObject,MockFlowObject> morkSelector =
+ new CandidateSelector<MockExecutorObject,MockFlowObject>(null,comparator);
+
+ ArrayList<MockExecutorObject> list = new ArrayList<MockExecutorObject>();
+ MockExecutorObject executor1 = new MockExecutorObject("ExecutorX", 8080,50.0,2048,3,new Date(), 20, 6400);
+ MockExecutorObject executor2 = new MockExecutorObject("ExecutorX2",8080,50.0,2048,3,new Date(), 20, 6400);
+ list.add(executor1);
+ list.add(executor2);
+ list.add(null);
+
+ MockFlowObject dispatchingObj = new MockFlowObject("flow1",100, 100,100,3);
+ MockExecutorObject executor = null;
+ try {
+ executor = morkSelector.getBest(list, dispatchingObj);
+ } catch (Exception ex){
+ Assert.fail("no exception should be thrown when an List contains null value.");
+ }
+ Assert.assertTrue(executor2 == executor);
+
+ // try to compare null vs null, no exception is expected.
+ list.clear();
+ list.add(null);
+ list.add(null);
+ try {
+ executor = morkSelector.getBest(list, dispatchingObj);
+ } catch (Exception ex){
+ Assert.fail("no exception should be thrown when an List contains multiple null values.");
+ }
+ Assert.assertTrue(null == executor);
+
+ }
+}
diff --git a/azkaban-common/src/test/java/azkaban/utils/RestfulApiClientTest.java b/azkaban-common/src/test/java/azkaban/utils/RestfulApiClientTest.java
new file mode 100644
index 0000000..85b2666
--- /dev/null
+++ b/azkaban-common/src/test/java/azkaban/utils/RestfulApiClientTest.java
@@ -0,0 +1,237 @@
+/*
+ * Copyright 2015 LinkedIn Corp.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package azkaban.utils;
+
+import java.io.IOException;
+import java.net.URI;
+import java.util.ArrayList;
+
+import org.apache.http.Header;
+import org.apache.http.HttpEntity;
+import org.apache.http.HttpResponse;
+import org.apache.http.HttpResponseFactory;
+import org.apache.http.HttpStatus;
+import org.apache.http.HttpVersion;
+import org.apache.http.NameValuePair;
+import org.apache.http.StatusLine;
+import org.apache.http.client.HttpResponseException;
+import org.apache.http.client.methods.HttpEntityEnclosingRequestBase;
+import org.apache.http.client.methods.HttpUriRequest;
+import org.apache.http.entity.StringEntity;
+import org.apache.http.impl.DefaultHttpResponseFactory;
+import org.apache.http.message.BasicNameValuePair;
+import org.apache.http.message.BasicStatusLine;
+import org.apache.http.util.EntityUtils;
+import org.junit.Assert;
+import org.junit.Test;
+
+/**
+ *
+ */
+public class RestfulApiClientTest {
+
+ protected class MockRestfulApiClient extends RestfulApiClient<String> {
+ private int status = HttpStatus.SC_OK;
+
+ @Override
+ protected String parseResponse(HttpResponse response) throws IOException {
+ final StatusLine statusLine = response.getStatusLine();
+ if (statusLine.getStatusCode() >= 300) {
+ throw new HttpResponseException(statusLine.getStatusCode(),
+ statusLine.getReasonPhrase());
+ }
+ final HttpEntity entity = response.getEntity();
+ return entity == null ? null : EntityUtils.toString(entity);
+ }
+
+ public void setReturnStatus(int newStatus){
+ this.status = newStatus;
+ }
+
+ public void resetReturnStatus(){
+ this.status = HttpStatus.SC_OK;
+ }
+
+ @Override
+ protected String sendAndReturn(HttpUriRequest request) throws IOException{
+ HttpResponseFactory factory = new DefaultHttpResponseFactory();
+
+ HttpResponse response = factory.newHttpResponse(
+ new BasicStatusLine(HttpVersion.HTTP_1_1, this.status, null),null);
+
+ StringBuilder sb = new StringBuilder();
+ sb.append(String.format("%s = %s;", "METHOD", request.getMethod()));
+ sb.append(String.format("%s = %s;", "URI", request.getURI()));
+
+ if (request.getAllHeaders().length > 0){
+ sb.append("HEADER_EXISTS");
+ }
+
+ for (Header h : request.getAllHeaders()){
+ sb.append(String.format("%s = %s;", h.getName(), h.getValue()));
+ }
+
+ if (request instanceof HttpEntityEnclosingRequestBase){
+ HttpEntity entity = ((HttpEntityEnclosingRequestBase)request).getEntity();
+ if (entity != null){
+ sb.append("BODY_EXISTS");
+ sb.append(String.format("%s = %s;", "BODY", EntityUtils.toString(entity)));
+ }
+ }
+
+ response.setEntity(new StringEntity(sb.toString()));
+ return parseResponse(response);
+ }
+
+ }
+
+ @Test
+ public void testHttpGet() throws Exception {
+ MockRestfulApiClient mockClient = new MockRestfulApiClient();
+ @SuppressWarnings("unchecked")
+ URI uri = MockRestfulApiClient.buildUri("test.com", 80, "test", true,
+ new Pair <String,String>("Entry1","Value1"));
+
+ String result = mockClient.httpGet(uri, null);
+ Assert.assertTrue(result!= null && result.contains(uri.toString()));
+ Assert.assertTrue(result.contains("METHOD = GET"));
+ }
+
+ @Test
+ public void testHttpGetWithHeaderItems() throws Exception {
+ MockRestfulApiClient mockClient = new MockRestfulApiClient();
+ @SuppressWarnings("unchecked")
+ URI uri = MockRestfulApiClient.buildUri("test.com", 80, "test", true,
+ new Pair <String,String>("Entry1","Value1"));
+
+ ArrayList<NameValuePair> headerItems = new ArrayList<NameValuePair>();
+ headerItems.add(new BasicNameValuePair("h1","v1"));
+ headerItems.add(new BasicNameValuePair("h2","v2"));
+
+ String result = mockClient.httpGet(uri, headerItems);
+ Assert.assertTrue(result!= null && result.contains(uri.toString()));
+ Assert.assertTrue(result.contains("METHOD = GET"));
+ Assert.assertTrue(result.contains("h1 = v1"));
+ Assert.assertTrue(result.contains("h2 = v2"));
+ }
+
+ @Test
+ public void testHttpPost() throws Exception {
+ MockRestfulApiClient mockClient = new MockRestfulApiClient();
+ @SuppressWarnings("unchecked")
+ URI uri = MockRestfulApiClient.buildUri("test.com", 80, "test", true,
+ new Pair <String,String>("Entry1","Value1"));
+
+ ArrayList<NameValuePair> headerItems = new ArrayList<NameValuePair>();
+ headerItems.add(new BasicNameValuePair("h1","v1"));
+ headerItems.add(new BasicNameValuePair("h2","v2"));
+
+ String content = "123456789";
+
+ String result = mockClient.httpPost(uri, headerItems,content);
+ Assert.assertTrue(result!= null && result.contains(uri.toString()));
+ Assert.assertTrue(result.contains("METHOD = POST"));
+ Assert.assertTrue(result.contains("h1 = v1"));
+ Assert.assertTrue(result.contains("h2 = v2"));
+ Assert.assertTrue(result.contains(String.format("%s = %s;", "BODY", content)));
+ }
+
+ @Test
+ public void testHttpPostWOBody() throws Exception {
+ MockRestfulApiClient mockClient = new MockRestfulApiClient();
+ @SuppressWarnings("unchecked")
+ URI uri = MockRestfulApiClient.buildUri("test.com", 80, "test", true,
+ new Pair <String,String>("Entry1","Value1"));
+
+ String result = mockClient.httpPost(uri, null,null);
+ Assert.assertTrue(result!= null && result.contains(uri.toString()));
+ Assert.assertTrue(result.contains("METHOD = POST"));
+ Assert.assertFalse(result.contains("BODY_EXISTS"));
+ Assert.assertFalse(result.contains("HEADER_EXISTS"));
+ }
+
+ @Test
+ public void testHttpPut() throws Exception {
+ MockRestfulApiClient mockClient = new MockRestfulApiClient();
+ @SuppressWarnings("unchecked")
+ URI uri = MockRestfulApiClient.buildUri("test.com", 80, "test", true,
+ new Pair <String,String>("Entry1","Value1"));
+
+ ArrayList<NameValuePair> headerItems = new ArrayList<NameValuePair>();
+ headerItems.add(new BasicNameValuePair("h1","v1"));
+ headerItems.add(new BasicNameValuePair("h2","v2"));
+
+ String content = "123456789";
+
+ String result = mockClient.httpPut(uri, headerItems,content);
+ Assert.assertTrue(result!= null && result.contains(uri.toString()));
+ Assert.assertTrue(result.contains("METHOD = PUT"));
+ Assert.assertTrue(result.contains("h1 = v1"));
+ Assert.assertTrue(result.contains("h2 = v2"));
+ Assert.assertTrue(result.contains(String.format("%s = %s;", "BODY", content)));
+ }
+
+ @Test
+ public void testContentLength() throws Exception {
+ MockRestfulApiClient mockClient = new MockRestfulApiClient();
+ @SuppressWarnings("unchecked")
+ URI uri = MockRestfulApiClient.buildUri("test.com", 80, "test", true,
+ new Pair <String,String>("Entry1","Value1"));
+
+ String content = "123456789";
+
+ String result = mockClient.httpPut(uri, null,content);
+ Assert.assertTrue(result!= null && result.contains(uri.toString()));
+ Assert.assertTrue(result.contains("Content-Length = " + Integer.toString(content.length())));
+ }
+
+ @Test
+ public void testContentLengthOverride() throws Exception {
+ MockRestfulApiClient mockClient = new MockRestfulApiClient();
+ @SuppressWarnings("unchecked")
+ URI uri = MockRestfulApiClient.buildUri("test.com", 80, "test", true,
+ new Pair <String,String>("Entry1","Value1"));
+
+ ArrayList<NameValuePair> headerItems = new ArrayList<NameValuePair>();
+ headerItems.add(new BasicNameValuePair("Content-Length","0"));
+
+ String content = "123456789";
+
+ String result = mockClient.httpPut(uri, headerItems,content);
+ Assert.assertTrue(result!= null && result.contains(uri.toString()));
+ Assert.assertEquals(result.lastIndexOf("Content-Length"),result.indexOf("Content-Length"));
+ Assert.assertTrue(result.contains("Content-Length = " + Integer.toString(content.length())));
+ }
+
+ @Test
+ public void testHttpDelete() throws Exception {
+ MockRestfulApiClient mockClient = new MockRestfulApiClient();
+ @SuppressWarnings("unchecked")
+ URI uri = MockRestfulApiClient.buildUri("test.com", 80, "test", true,
+ new Pair <String,String>("Entry1","Value1"));
+
+ ArrayList<NameValuePair> headerItems = new ArrayList<NameValuePair>();
+ headerItems.add(new BasicNameValuePair("h1","v1"));
+ headerItems.add(new BasicNameValuePair("h2","v2"));
+
+ String result = mockClient.httpDelete(uri, headerItems);
+ Assert.assertTrue(result!= null && result.contains(uri.toString()));
+ Assert.assertTrue(result.contains("METHOD = DELETE"));
+ Assert.assertTrue(result.contains("h1 = v1"));
+ Assert.assertTrue(result.contains("h2 = v2"));
+ }
+}