Details
diff --git a/azkaban-common/src/main/java/azkaban/executor/Executor.java b/azkaban-common/src/main/java/azkaban/executor/Executor.java
index 31070bf..4153270 100644
--- a/azkaban-common/src/main/java/azkaban/executor/Executor.java
+++ b/azkaban-common/src/main/java/azkaban/executor/Executor.java
@@ -16,6 +16,10 @@
package azkaban.executor;
+import java.util.Date;
+import java.util.HashMap;
+import java.util.Map;
+
import azkaban.utils.Utils;
/**
@@ -28,8 +32,9 @@ public class Executor {
private final String host;
private final int port;
private boolean isActive;
-
- // TODO: ExecutorStats to be added
+ // cached copy of the latest statistics from the executor.
+ private Statistics cachedExecutorStats;
+ private Date lastUpdated;
/**
* <pre>
@@ -88,6 +93,13 @@ public class Executor {
return true;
}
+ @Override
+ public String toString(){
+ return String.format("%s (id: %s)",
+ null == this.host || this.host.length() == 0 ? "(empty)" : this.host,
+ this.id);
+ }
+
public String getHost() {
return host;
}
@@ -104,6 +116,24 @@ public class Executor {
return id;
}
+ public Statistics getExecutorStats() {
+ return this.cachedExecutorStats;
+ }
+
+ public void setExecutorStats(Statistics stats) {
+ this.cachedExecutorStats = stats;
+ this.lastUpdated = new Date();
+ }
+
+ /**
+ * Gets the timestamp when the stats are last updated.
+ * @return date object represents the timestamp, null if the stats of this
+ * specific object is never refreshed.
+ * */
+ public Date getLastStatsUpdatedTime(){
+ return this.lastUpdated;
+ }
+
public void setActive(boolean isActive) {
this.isActive = isActive;
}
diff --git a/azkaban-common/src/main/java/azkaban/executor/selector/CandidateComparator.java b/azkaban-common/src/main/java/azkaban/executor/selector/CandidateComparator.java
index 8234f7d..8ef2c76 100644
--- a/azkaban-common/src/main/java/azkaban/executor/selector/CandidateComparator.java
+++ b/azkaban-common/src/main/java/azkaban/executor/selector/CandidateComparator.java
@@ -31,7 +31,7 @@ import azkaban.utils.Pair;
*
*/
public abstract class CandidateComparator<T> implements Comparator<T> {
- private static Logger logger = Logger.getLogger(CandidateComparator.class);
+ protected static Logger logger = Logger.getLogger(CandidateComparator.class);
// internal repository of the registered comparators .
private Map<String,FactorComparator<T>> factorComparatorList =
diff --git a/azkaban-common/src/main/java/azkaban/executor/selector/CandidateFilter.java b/azkaban-common/src/main/java/azkaban/executor/selector/CandidateFilter.java
index 154d528..94b8dfa 100644
--- a/azkaban-common/src/main/java/azkaban/executor/selector/CandidateFilter.java
+++ b/azkaban-common/src/main/java/azkaban/executor/selector/CandidateFilter.java
@@ -28,14 +28,14 @@ import org.apache.log4j.Logger;
* register filters using the provided register function.
*/
public abstract class CandidateFilter<T,V> {
- private static Logger logger = Logger.getLogger(CandidateFilter.class);
+ protected static Logger logger = Logger.getLogger(CandidateFilter.class);
// internal repository of the registered filters .
private Map<String,FactorFilter<T,V>> factorFilterList =
new ConcurrentHashMap<String,FactorFilter<T,V>>();
/** gets the name of the current implementation of the candidate filter.
- * @returns : name of the filter.
+ * @return : name of the filter.
* */
public abstract String getName();
diff --git a/azkaban-common/src/main/java/azkaban/executor/selector/CandidateSelector.java b/azkaban-common/src/main/java/azkaban/executor/selector/CandidateSelector.java
index 0598d95..55c0864 100644
--- a/azkaban-common/src/main/java/azkaban/executor/selector/CandidateSelector.java
+++ b/azkaban-common/src/main/java/azkaban/executor/selector/CandidateSelector.java
@@ -74,6 +74,10 @@ public class CandidateSelector<K,V> implements Selector<K, V> {
return null;
}
+ if (null == comparator){
+ logger.info("candidate comparator is not specified, default comparator from 'Collections' class will be used");
+ }
+
// final work - find the best candidate from the filtered list.
K executor = Collections.max(filteredList,comparator);
logger.info(String.format("candidate selected %s",
diff --git a/azkaban-common/src/main/java/azkaban/executor/selector/ExecutorComparator.java b/azkaban-common/src/main/java/azkaban/executor/selector/ExecutorComparator.java
new file mode 100644
index 0000000..f0c46cd
--- /dev/null
+++ b/azkaban-common/src/main/java/azkaban/executor/selector/ExecutorComparator.java
@@ -0,0 +1,281 @@
+/*
+ * Copyright 2015 LinkedIn Corp.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package azkaban.executor.selector;
+
+import java.util.Comparator;
+import java.util.Date;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Set;
+
+import azkaban.executor.Executor;
+import azkaban.executor.Statistics;
+
+public class ExecutorComparator extends CandidateComparator<Executor> {
+ private static Map<String, ComparatorCreator> comparatorCreatorRepository = null;
+
+ /**
+ * Gets the name list of all available comparators.
+ * @return the list of the names.
+ * */
+ public static Set<String> getAvailableComparatorNames(){
+ return comparatorCreatorRepository.keySet();
+ }
+
+ // factor comparator names
+ private static final String REMAININGFLOWSIZE_COMPARATOR_NAME = "RemainingFlowSize";
+ private static final String MEMORY_COMPARATOR_NAME = "Memory";
+ private static final String REMAININGTMPSIZE_COMPARATOR_NAME = "RemainingTmpSize";
+ private static final String PRIORITY_COMPARATOR_NAME = "Priority";
+ private static final String LSTDISPATCHED_COMPARATOR_NAME = "LastDispatched";
+
+ /**
+ * static initializer of the class.
+ * We will build the filter repository here.
+ * when a new comparator is added, please do remember to register it here.
+ * */
+ static {
+ comparatorCreatorRepository = new HashMap<String, ComparatorCreator>();
+
+ // register the creator for remaining flow size comparator.
+ comparatorCreatorRepository.put(REMAININGFLOWSIZE_COMPARATOR_NAME, new ComparatorCreator(){
+ @Override public FactorComparator<Executor> create(int weight) { return getRemainingFlowSizeComparator(weight); }});
+
+ // register the creator for memory comparator.
+ comparatorCreatorRepository.put(MEMORY_COMPARATOR_NAME, new ComparatorCreator(){
+ @Override public FactorComparator<Executor> create(int weight) { return getMemoryComparator(weight); }});
+
+ // register the creator for priority comparator.
+ comparatorCreatorRepository.put(PRIORITY_COMPARATOR_NAME, new ComparatorCreator(){
+ @Override public FactorComparator<Executor> create(int weight) { return getPriorityComparator(weight); }});
+
+ // register the creator for remaining TMP size comparator.
+ comparatorCreatorRepository.put(PRIORITY_COMPARATOR_NAME, new ComparatorCreator(){
+ @Override public FactorComparator<Executor> create(int weight) { return getRemainingTmpSizeComparator(weight); }});
+
+ // register the creator for last dispatched time comparator.
+ comparatorCreatorRepository.put(LSTDISPATCHED_COMPARATOR_NAME, new ComparatorCreator(){
+ @Override public FactorComparator<Executor> create(int weight) { return getLstDispatchedTimeComparator(weight); }});
+ }
+
+
+ /**
+ * constructor of the ExecutorComparator.
+ * @param comparatorList the list of comparator, plus its weight information to be registered,
+ * the parameter must be a not-empty and valid list object.
+ * */
+ public ExecutorComparator(Map<String,Integer> comparatorList) {
+ if (null == comparatorList|| comparatorList.size() == 0){
+ logger.error("failed to initialize executor comparator as the passed comparator list is invalid or empty.");
+ throw new IllegalArgumentException("filterList");
+ }
+
+ // register the comparators, we will now throw here if the weight is invalid, it is handled in the super.
+ for (Entry<String,Integer> entry : comparatorList.entrySet()){
+ if (comparatorCreatorRepository.containsKey(entry.getKey())){
+ this.registerFactorComparator(comparatorCreatorRepository.
+ get(entry.getKey()).
+ create(entry.getValue()));
+ } else {
+ logger.error(String.format("failed to initialize executor comparator as the comparator implementation for requested factor '%s' doesn't exist.",
+ entry.getKey()));
+ throw new IllegalArgumentException("comparatorList");
+ }
+ }
+ }
+
+ @Override
+ public String getName() {
+ return "ExecutorComparator";
+ }
+
+ private interface ComparatorCreator{
+ FactorComparator<Executor> create(int weight);
+ }
+
+ /**
+ * helper function that does the object on two stats, comparator can leverage this function to provide
+ * shortcuts if the stat object is missing from one or both sides of the executors.
+ * @param stat1 the first stat object to be checked .
+ * @param stat2 the second stat object to be checked.
+ * @param caller the name of the calling function, for logging purpose.
+ * @param result result Integer to pass out the result in case the stats are not both valid.
+ * @return true if the passed stats are NOT both valid, a shortcut can be made (caller can consume the result),
+ * false otherwise.
+ * */
+ private static boolean statsObjectCheck(Statistics stat1, Statistics stat2, String caller, Integer result){
+ result = 0 ;
+ // both doesn't expose the info
+ if (null == stat1 && null == stat2){
+ logger.info(String.format("%s : neither of the executors exposed stats info.",
+ caller));
+ return true;
+ }
+
+ //right side doesn't expose the info.
+ if (null == stat2 ){
+ logger.info(String.format("%s : choosing left side and the right side executor doesn't expose stat info",
+ caller));
+ result = 1;
+ return true;
+ }
+
+ //left side doesn't expose the info.
+ if (null == stat1 ){
+ logger.info(String.format("%s : choosing right side and the left side executor doesn't expose stat info",
+ caller));
+ result = -1;
+ return true;
+ }
+
+ // both not null
+ return false;
+ }
+
+ /**
+ * function defines the remaining flow size comparator.
+ * @param weight weight of the comparator.
+ * */
+ private static FactorComparator<Executor> getRemainingFlowSizeComparator(int weight){
+ return FactorComparator.create(REMAININGFLOWSIZE_COMPARATOR_NAME, weight, new Comparator<Executor>(){
+
+ @Override
+ public int compare(Executor o1, Executor o2) {
+ Statistics stat1 = o1.getExecutorStats();
+ Statistics stat2 = o2.getExecutorStats();
+
+ Integer result = 0;
+ if (statsObjectCheck(stat1,stat2,REMAININGFLOWSIZE_COMPARATOR_NAME,result)){
+ return result;
+ }
+ return ((Integer)stat1.getRemainingFlowCapacity()).compareTo(stat2.getRemainingFlowCapacity());
+ }});
+ }
+
+ /**
+ * function defines the remaining folder size comparator.
+ * @param weight weight of the comparator.
+ * */
+ private static FactorComparator<Executor> getRemainingTmpSizeComparator(int weight){
+ return FactorComparator.create(REMAININGTMPSIZE_COMPARATOR_NAME, weight, new Comparator<Executor>(){
+
+ @Override
+ public int compare(Executor o1, Executor o2) {
+ Statistics stat1 = o1.getExecutorStats();
+ Statistics stat2 = o2.getExecutorStats();
+
+ Integer result = 0;
+ if (statsObjectCheck(stat1,stat2,REMAININGTMPSIZE_COMPARATOR_NAME,result)){
+ return result;
+ }
+ return ((Long)stat1.getRemainingStorage()).compareTo(stat2.getRemainingStorage());
+ }});
+ }
+
+ /**
+ * function defines the priority comparator.
+ * @param weight weight of the comparator.
+ * @return
+ * */
+ private static FactorComparator<Executor> getPriorityComparator(int weight){
+ return FactorComparator.create(PRIORITY_COMPARATOR_NAME, weight, new Comparator<Executor>(){
+
+ @Override
+ public int compare(Executor o1, Executor o2) {
+ Statistics stat1 = o1.getExecutorStats();
+ Statistics stat2 = o2.getExecutorStats();
+
+ Integer result = 0;
+ if (statsObjectCheck(stat1,stat2,PRIORITY_COMPARATOR_NAME,result)){
+ return result;
+ }
+ return ((Integer)stat1.getPriority()).compareTo(stat2.getPriority());
+ }});
+ }
+
+
+ /**
+ * function defines the last dispatched time comparator.
+ * @param weight weight of the comparator.
+ * @return
+ * */
+ private static FactorComparator<Executor> getLstDispatchedTimeComparator(int weight){
+ return FactorComparator.create(LSTDISPATCHED_COMPARATOR_NAME, weight, new Comparator<Executor>(){
+
+ @Override
+ public int compare(Executor o1, Executor o2) {
+ Statistics stat1 = o1.getExecutorStats();
+ Statistics stat2 = o2.getExecutorStats();
+
+ Integer result = 0;
+ if (statsObjectCheck(stat1,stat2,LSTDISPATCHED_COMPARATOR_NAME,result)){
+ return result;
+ }
+
+ if (null == stat1.getLastDispatchedTime() && null == stat1.getLastDispatchedTime()){
+ logger.info(String.format("%s : stats from both side doesn't contain last dispatched time info.",
+ LSTDISPATCHED_COMPARATOR_NAME));
+ return 0;
+ }
+
+ if (null == stat2.getLastDispatchedTime()){
+ logger.info(String.format("%s : choosing left side as right doesn't contain last dispatched time info.",
+ LSTDISPATCHED_COMPARATOR_NAME));
+ return 1;
+ }
+
+ if (null == stat1.getLastDispatchedTime()){
+ logger.info(String.format("%s : choosing rigth side as left doesn't contain last dispatched time info.",
+ LSTDISPATCHED_COMPARATOR_NAME));
+ return -1;
+ }
+
+ // Note: an earlier date time indicates higher weight.
+ return ((Date)stat2.getLastDispatchedTime()).compareTo(stat1.getLastDispatchedTime());
+ }});
+ }
+
+
+ /**
+ * function defines the Memory comparator.
+ * @param weight weight of the comparator.
+ * Note: comparator firstly take the absolute value of the remaining memory, if both sides have the same value,
+ * it go further to check the percent of the remaining memory.
+ * @return
+ * */
+ private static FactorComparator<Executor> getMemoryComparator(int weight){
+ return FactorComparator.create(MEMORY_COMPARATOR_NAME, weight, new Comparator<Executor>(){
+
+ @Override
+ public int compare(Executor o1, Executor o2) {
+ Statistics stat1 = o1.getExecutorStats();
+ Statistics stat2 = o2.getExecutorStats();
+
+ Integer result = 0;
+ if (statsObjectCheck(stat1,stat2,MEMORY_COMPARATOR_NAME,result)){
+ return result;
+ }
+
+ if (stat1.getRemainingMemory() != stat2.getRemainingMemory()){
+ return stat1.getRemainingMemory() > stat2.getRemainingMemory() ? 1:-1;
+ }
+
+ return Double.compare(stat1.getRemainingMemoryPercent(), stat2.getRemainingMemoryPercent());
+ }});
+ }
+}
diff --git a/azkaban-common/src/main/java/azkaban/executor/selector/ExecutorFilter.java b/azkaban-common/src/main/java/azkaban/executor/selector/ExecutorFilter.java
new file mode 100644
index 0000000..0d61aba
--- /dev/null
+++ b/azkaban-common/src/main/java/azkaban/executor/selector/ExecutorFilter.java
@@ -0,0 +1,111 @@
+/*
+ * Copyright 2015 LinkedIn Corp.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package azkaban.executor.selector;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import azkaban.executor.ExecutableFlow;
+import azkaban.executor.Executor;
+import azkaban.executor.Statistics;
+
+public final class ExecutorFilter extends CandidateFilter<Executor, ExecutableFlow> {
+ private static Map<String, FactorFilter<Executor, ExecutableFlow>> filterRepository = null;
+
+ /**
+ * Gets the name list of all available filters.
+ * @return the list of the names.
+ * */
+ public static Set<String> getAvailableFilterNames(){
+ return filterRepository.keySet();
+ }
+
+
+ // factor filter names.
+ private static final String STATICREMAININGFLOWSIZE_FILTER_NAME = "StaticRemainingFlowSize";
+
+ /**
+ * static initializer of the class.
+ * We will build the filter repository here.
+ * when a new filter is added, please do remember to register it here.
+ * */
+ static {
+ filterRepository = new HashMap<String, FactorFilter<Executor, ExecutableFlow>>();
+ filterRepository.put(STATICREMAININGFLOWSIZE_FILTER_NAME, getStaticRemainingFlowSizeFilter());
+ }
+
+ /**
+ * constructor of the ExecutorFilter.
+ * @param filterList the list of filter to be registered, the parameter must be a not-empty and valid list object.
+ * */
+ public ExecutorFilter(List<String> filterList) {
+ // shortcut if the filter list is invalid. A little bit ugly to have to throw in constructor.
+ if (null == filterList || filterList.size() == 0){
+ logger.error("failed to initialize executor filter as the passed filter list is invalid or empty.");
+ throw new IllegalArgumentException("filterList");
+ }
+
+ // register the filters according to the list.
+ for (String filterName : filterList){
+ if (filterRepository.containsKey(filterName)){
+ this.registerFactorFilter(filterRepository.get(filterName));
+ } else {
+ logger.error(String.format("failed to initialize executor filter as the filter implementation for requested factor '%s' doesn't exist.",
+ filterName));
+ throw new IllegalArgumentException("filterList");
+ }
+ }
+ }
+
+ @Override
+ public String getName() {
+ return "ExecutorFilter";
+ }
+
+ /**
+ * function to register the static remaining flow size filter.
+ * NOTE : this is a static filter which means the filter will be filtering based on the system standard which is not
+ * Coming for the passed flow.
+ * Ideally this filter will make sure only the executor has remaining
+ * */
+ private static FactorFilter<Executor, ExecutableFlow> getStaticRemainingFlowSizeFilter(){
+ return FactorFilter.create(STATICREMAININGFLOWSIZE_FILTER_NAME, new FactorFilter.Filter<Executor, ExecutableFlow>() {
+
+ @Override
+ public boolean filterTarget(Executor filteringTarget, ExecutableFlow referencingObject) {
+ if (null == filteringTarget){
+ logger.info(String.format("%s : filtering out the target as it is null.", STATICREMAININGFLOWSIZE_FILTER_NAME));
+ return false;
+ }
+
+ Statistics stats = filteringTarget.getExecutorStats();
+ if (null == stats) {
+ logger.info(String.format("%s : filtering out %s as it's stats is unavailable.",
+ STATICREMAININGFLOWSIZE_FILTER_NAME,
+ filteringTarget.toString()));
+ return false;
+ }
+ return stats.getRemainingFlowCapacity() > 0 ;
+ }
+ });
+ }
+
+ // TO-DO
+ // Add more Filter definitions .
+}
diff --git a/azkaban-common/src/main/java/azkaban/executor/selector/ExecutorSelector.java b/azkaban-common/src/main/java/azkaban/executor/selector/ExecutorSelector.java
new file mode 100644
index 0000000..fd15ad1
--- /dev/null
+++ b/azkaban-common/src/main/java/azkaban/executor/selector/ExecutorSelector.java
@@ -0,0 +1,44 @@
+/*
+ * Copyright 2015 LinkedIn Corp.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package azkaban.executor.selector;
+
+import java.util.List;
+import java.util.Map;
+
+import azkaban.executor.ExecutableFlow;
+import azkaban.executor.Executor;
+
+/**
+ * Executor selector class implementation.
+ * NOTE: This class is a de-generalized version of the CandidateSelector, which provides a
+ * clean and convenient constructor to take in filter and comparator name list and build
+ * the instance from that.
+ * */
+public class ExecutorSelector extends CandidateSelector<Executor, ExecutableFlow> {
+
+ /**
+ * Contractor of the class.
+ * @param filterList name list of the filters to be registered,
+ * filter feature will be disabled if a null value is passed.
+ * @param comparatorList name/weight pair list of the comparators to be registered ,
+ * again comparator feature is disabled if a null value is passed.
+ * */
+ public ExecutorSelector(List<String> filterList, Map<String,Integer> comparatorList) {
+ super(null == filterList || filterList.size() == 0 ? null : new ExecutorFilter(filterList),
+ null == comparatorList || comparatorList.size() == 0 ? null : new ExecutorComparator(comparatorList));
+ }
+}
diff --git a/azkaban-common/src/main/java/azkaban/executor/Statistics.java b/azkaban-common/src/main/java/azkaban/executor/Statistics.java
new file mode 100644
index 0000000..85d31f7
--- /dev/null
+++ b/azkaban-common/src/main/java/azkaban/executor/Statistics.java
@@ -0,0 +1,66 @@
+/*
+ * Copyright 2015 LinkedIn Corp.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package azkaban.executor;
+
+import java.util.Date;
+
+ public class Statistics {
+ private double remainingMemoryPercent;
+ private long remainingMemory;
+ private int remainingFlowCapacity;
+ private Date lastDispatched;
+ private long remainingStorage;
+ private int priority;
+
+ public double getRemainingMemoryPercent() {
+ return this.remainingMemoryPercent;
+ }
+
+ public long getRemainingMemory(){
+ return this.remainingMemory;
+ }
+
+ public int getRemainingFlowCapacity(){
+ return this.remainingFlowCapacity;
+ }
+
+ public Date getLastDispatchedTime(){
+ return this.lastDispatched;
+ }
+
+ public long getRemainingStorage() {
+ return this.remainingStorage;
+ }
+
+ public int getPriority () {
+ return this.priority;
+ }
+
+ public Statistics (double remainingMemoryPercent,
+ long remainingMemory,
+ int remainingFlowCapacity,
+ Date lastDispatched,
+ long remainingStorage,
+ int priority ){
+ this.remainingMemory = remainingMemory;
+ this.remainingFlowCapacity = remainingFlowCapacity;
+ this.priority = priority;
+ this.remainingMemoryPercent = remainingMemoryPercent;
+ this.remainingStorage = remainingStorage;
+ this.lastDispatched = lastDispatched;
+ }
+}
diff --git a/azkaban-common/src/test/java/azkaban/executor/SelectorTest.java b/azkaban-common/src/test/java/azkaban/executor/SelectorTest.java
index b5b4509..c193397 100644
--- a/azkaban-common/src/test/java/azkaban/executor/SelectorTest.java
+++ b/azkaban-common/src/test/java/azkaban/executor/SelectorTest.java
@@ -20,12 +20,16 @@ import java.util.ArrayList;
import java.util.Collections;
import java.util.Comparator;
import java.util.Date;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
import org.apache.log4j.BasicConfigurator;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.BeforeClass;
+import org.junit.Ignore;
import org.junit.Test;
import azkaban.executor.selector.*;
@@ -602,4 +606,101 @@ public class SelectorTest {
Assert.assertTrue(null == executor);
}
+
+ @Test
+ public void testCreatingExectorfilterObject() throws Exception{
+ List<String> validList = new ArrayList<String>(ExecutorFilter.getAvailableFilterNames());
+ try {
+ new ExecutorFilter(validList);
+ }catch (Exception ex){
+ Assert.fail("creating ExecutorFilter with valid list throws exception . ex -" + ex.getMessage());
+ }
+ }
+
+ @Test
+ public void testCreatingExectorfilterObjectWInvalidList() throws Exception{
+ List<String> invalidList = new ArrayList<String>();
+ invalidList.add("notExistingFilter");
+ Exception result = null;
+ try {
+ new ExecutorFilter(invalidList);
+ }catch (Exception ex){
+ if (ex instanceof IllegalArgumentException)
+ result = ex;
+ }
+ Assert.assertNotNull(result);
+ }
+
+ @Test
+ public void testCreatingExectorComparatorObject() throws Exception{
+ Map<String,Integer> comparatorMap = new HashMap<String,Integer>();
+ for (String name : ExecutorComparator.getAvailableComparatorNames()){
+ comparatorMap.put(name, 1);
+ }
+ try {
+ new ExecutorComparator(comparatorMap);
+ }catch (Exception ex){
+ Assert.fail("creating ExecutorComparator with valid list throws exception . ex -" + ex.getMessage());
+ }
+ }
+
+ @Test
+ public void testCreatingExectorComparatorObjectWInvalidName() throws Exception{
+ Map<String,Integer> comparatorMap = new HashMap<String,Integer>();
+ comparatorMap.put("invalidName", 0);
+ Exception result = null;
+ try {
+ new ExecutorComparator(comparatorMap);
+ }catch (Exception ex){
+ if (ex instanceof IllegalArgumentException)
+ result = ex;
+ }
+ Assert.assertNotNull(result);
+ }
+
+ @Test
+ public void testCreatingExectorComparatorObjectWInvalidWeight() throws Exception{
+ Map<String,Integer> comparatorMap = new HashMap<String,Integer>();
+ for (String name : ExecutorComparator.getAvailableComparatorNames()){
+ comparatorMap.put(name, -1);
+ }
+ Exception result = null;
+ try {
+ new ExecutorComparator(comparatorMap);
+ }catch (Exception ex){
+ if (ex instanceof IllegalArgumentException)
+ result = ex;
+ }
+ Assert.assertNotNull(result);
+ }
+
+ @Test
+ public void testExecutorSelectorE2E() throws Exception{
+ List<String> filterList = new ArrayList<String>(ExecutorFilter.getAvailableFilterNames());
+ Map<String,Integer> comparatorMap = new HashMap<String,Integer>();
+ List<Executor> executorList = new ArrayList<Executor>();
+ executorList.add(new Executor(1, "host1", 80, true));
+ executorList.add(new Executor(2, "host2", 80, true));
+ executorList.add(new Executor(3, "host3", 80, true));
+
+ executorList.get(0).setExecutorStats(new Statistics(99.9, 4095, 50, new Date(), 4095, 0));
+ executorList.get(1).setExecutorStats(new Statistics(50, 4095, 50, new Date(), 4095, 0));
+ executorList.get(2).setExecutorStats(new Statistics(99.9, 4095, 50, new Date(), 2048, 0));
+
+ ExecutableFlow flow = new ExecutableFlow();
+
+ for (String name : ExecutorComparator.getAvailableComparatorNames()){
+ comparatorMap.put(name, 1);
+ }
+ ExecutorSelector selector = new ExecutorSelector(filterList,comparatorMap);
+ Executor executor = selector.getBest(executorList, flow);
+ Assert.assertEquals(executorList.get(0), executor);
+
+ // simulate that once the flow is assigned, executor1's remaining TMP storage dropped to 2048
+ // now we do the getBest again executor3 is expected to be selected as it has a earlier last dispatched time.
+ executorList.get(0).setExecutorStats(new Statistics(99.9, 4095, 50, new Date(), 2048, 0));
+ executor = selector.getBest(executorList, flow);
+ Assert.assertEquals(executorList.get(2), executor);
+ }
+
}