diff --git a/azkaban-common/src/main/java/azkaban/executor/dispatcher/CandidateComparator.java b/azkaban-common/src/main/java/azkaban/executor/dispatcher/CandidateComparator.java
new file mode 100644
index 0000000..1b6db3d
--- /dev/null
+++ b/azkaban-common/src/main/java/azkaban/executor/dispatcher/CandidateComparator.java
@@ -0,0 +1,166 @@
+/*
+ * Copyright 2015 LinkedIn Corp.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package azkaban.executor.dispatcher;
+
+import java.util.Collection;
+import java.util.Comparator;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+
+import org.apache.log4j.Logger;
+
+import azkaban.utils.Pair;
+
+/** Abstract class for a candidate comparator.
+ * this class contains implementation of most of the core logics. Implementing classes is expected only to
+ * register factor comparators using the provided register function.
+ *
+ */
+public abstract class CandidateComparator<T> implements Comparator<T> {
+ private static Logger logger = Logger.getLogger(CandidateComparator.class);
+
+ // internal repository of the registered comparators .
+ private Map<String,FactorComparator<T>> factorComparatorList =
+ new ConcurrentHashMap<String,FactorComparator<T>>();
+
+ /** gets the name of the current implementation of the candidate comparator.
+ * @returns : name of the comparator.
+ * */
+ public abstract String getName();
+
+ /** function to register a factorComparator to the internal Map for future reference.
+ * @param factorComparator : the comparator object to be registered.
+ * */
+ protected void registerFactorComparator(FactorComparator<T> comparator){
+ if (null == comparator ||
+ Integer.MAX_VALUE - this.getTotalWeight() < comparator.getWeight() ) {
+ logger.info("skipping registerFactorComparator as the comaractor is null or has an invalid weight value.");
+ return;
+ }
+
+ // add or replace the Comparator.
+ this.factorComparatorList.put(comparator.getFactorName(),comparator);
+ logger.info(String.format("Factor comparator added for '%s'. Weight = '%s'",
+ comparator.getFactorName(), comparator.getWeight()));
+ }
+
+
+ /** function update the weight of a specific registered factorCompartor.
+ * @param factorName : the name of the registered factorComparator to adjust.
+ * @param weight: the new weight value to be adjusted to.
+ * @return -1 if the factor doesn't exist or the weight value specified is invalid,
+ * the original value before update otherwise.
+ * */
+ public int adjustFactorWeight(String factorName, int weight){
+ // shortcut if the input is invalid.
+ if (factorName == null ||
+ factorName == "" ||
+ weight < 0 ||
+ Integer.MAX_VALUE - this.getTotalWeight() < weight){
+ logger.info("skipping adjustFactorWeight as one or more of the input parameters are invalid");
+ return -1;
+ }
+
+ FactorComparator<T> value = this.factorComparatorList.get(factorName);
+
+ // shortcut if the key doesn't exist.
+ if (null == value){
+ logger.info(String.format("unable to udpate weight as the specified factorName %s doesn't exist",factorName));
+ return -1;
+ }
+
+ int returnVal = value.getWeight();
+ value.updateWeight(weight);
+ return returnVal;
+ }
+
+ /** function returns the total weight of the registered comparators.
+ * @return the value of total weight.
+ * */
+ public int getTotalWeight(){
+ int totalWeight = 0 ;
+
+ // save out a copy of the values as HashMap.values() takes o(n) to return the value.
+ Collection<FactorComparator<T>> allValues = this.factorComparatorList.values();
+ for (FactorComparator<T> item : allValues){
+ if (item != null){
+ totalWeight += item.getWeight();
+ }
+ }
+
+ return totalWeight;
+ }
+
+ /** function to actually calculate the scores for the two objects that are being compared.
+ * the comparison follows the following logic -
+ * 1. if both objects are equal return 0 score for both.
+ * 2. if one side is null, the other side gets all the score.
+ * 3. if both sides are non-null value, both values will be passed to all the registered FactorComparators
+ * each factor comparator will generate a result based off it sole logic the weight of the comparator will be
+ * added to the wining side, if equal, no value will be added to either side.
+ * 4. final result will be returned in a Pair container.
+ *
+ * */
+ public Pair<Integer,Integer> getReult(T object1, T object2){
+ logger.info(String.format("start comparing '%s' with '%s', total weight = %s ",
+ object1 == null ? "(null)" : object1.toString(),
+ object2 == null ? "(null)" : object2.toString(),
+ this.getTotalWeight()));
+
+ // short cut if object equals.
+ if (object1 == object2){
+ logger.info("Result : 0 vs 0 (equal)");
+ return new Pair<Integer,Integer>(0,0);
+ }
+
+ // left side is null.
+ if (object1 == null){
+ logger.info(String.format("Result : 0 vs %s (left is null)",this.getTotalWeight()));
+ return new Pair<Integer, Integer>(0,this.getTotalWeight());
+ }
+
+ // right side is null.
+ if (object2 == null){
+ logger.info(String.format("Result : %s vs 0 (right is null)",this.getTotalWeight()));
+ return new Pair<Integer, Integer>(this.getTotalWeight(),0);
+ }
+
+ // both side is not null,put them thru the full loop
+ int result1 = 0 ;
+ int result2 = 0 ;
+ Collection<FactorComparator<T>> comparatorList = this.factorComparatorList.values();
+ Iterator<FactorComparator<T>> mapItr = comparatorList.iterator();
+ while (mapItr.hasNext()){
+ FactorComparator<T> comparator = (FactorComparator<T>) mapItr.next();
+ int result = comparator.compare(object1, object2);
+ result1 = result1 + (result > 0 ? comparator.getWeight() : 0);
+ result2 = result2 + (result < 0 ? comparator.getWeight() : 0);
+ logger.info(String.format("[Factor: %s] compare result : %s (current score %s vs %s)",
+ comparator.getFactorName(), result, result1, result2));
+ }
+ logger.info(String.format("Result : %s vs %s ",result1,result2));
+ return new Pair<Integer,Integer>(result1,result2);
+ }
+
+ @Override
+ public int compare(T o1, T o2) {
+ Pair<Integer,Integer> result = this.getReult(o1,o2);
+ return result.getFirst() == result.getSecond() ? 0 :
+ result.getFirst() > result.getSecond() ? 1 : -1;
+ }
+}
diff --git a/azkaban-common/src/test/java/azkaban/executor/DispatcherTest.java b/azkaban-common/src/test/java/azkaban/executor/DispatcherTest.java
new file mode 100644
index 0000000..d5d5f47
--- /dev/null
+++ b/azkaban-common/src/test/java/azkaban/executor/DispatcherTest.java
@@ -0,0 +1,585 @@
+/*
+ * Copyright 2015 LinkedIn Corp.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package azkaban.executor;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.Date;
+import org.apache.log4j.BasicConfigurator;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import azkaban.executor.dispatcher.*;
+
+public class DispatcherTest {
+ // mock executor object.
+ protected class MockExecutorObject{
+ public String name;
+ public int port;
+ public double percentOfRemainingMemory;
+ public int amountOfRemainingMemory;
+ public int priority;
+ public Date lastAssigned;
+ public double percentOfRemainingFlowcapacity;
+ public int remainingTmp;
+
+ public MockExecutorObject(String name,
+ int port,
+ double percentOfRemainingMemory,
+ int amountOfRemainingMemory,
+ int priority,
+ Date lastAssigned,
+ double percentOfRemainingFlowcapacity,
+ int remainingTmp)
+ {
+ this.name = name;
+ this.port = port;
+ this.percentOfRemainingMemory = percentOfRemainingMemory;
+ this.amountOfRemainingMemory =amountOfRemainingMemory;
+ this.priority = priority;
+ this.lastAssigned = lastAssigned;
+ this.percentOfRemainingFlowcapacity = percentOfRemainingFlowcapacity;
+ this.remainingTmp = remainingTmp;
+ }
+
+ @Override
+ public String toString()
+ {
+ return this.name;
+ }
+ }
+
+ // Mock flow object.
+ protected class MockFlowObject{
+ public String name;
+ public int requiredRemainingMemory;
+ public int requiredTotalMemory;
+ public int requiredRemainingTmpSpace;
+ public int priority;
+
+ public MockFlowObject(String name,
+ int requiredTotalMemory,
+ int requiredRemainingMemory,
+ int requiredRemainingTmpSpace,
+ int priority)
+ {
+ this.name = name;
+ this.requiredTotalMemory = requiredTotalMemory;
+ this.requiredRemainingMemory = requiredRemainingMemory;
+ this.requiredRemainingTmpSpace = requiredRemainingTmpSpace;
+ this.priority = priority;
+ }
+
+ @Override
+ public String toString()
+ {
+ return this.name;
+ }
+ }
+
+ // mock Filter class.
+ protected class MockFilter
+ extends CandidateFilter<MockExecutorObject,MockFlowObject>{
+
+ @Override
+ public String getName() {
+ return "Mockfilter";
+ }
+
+ public MockFilter(){
+ }
+
+ // function to register the remainingMemory filter.
+ // for test purpose the registration is put in a separated method, in production the work should be done
+ // in the constructor.
+ public void registerFilterforTotalMemory(){
+ this.registerFactorFilter(FactorFilter.create("requiredTotalMemory",
+ new FactorFilter.Filter<MockExecutorObject,MockFlowObject>() {
+ public boolean check(MockExecutorObject itemToCheck, MockFlowObject sourceObject) {
+ // REAL LOGIC COMES HERE -
+ if (null == itemToCheck || null == sourceObject){
+ return false;
+ }
+
+ // Box has infinite memory.:)
+ if (itemToCheck.percentOfRemainingMemory == 0) {
+ return true;
+ }
+
+ // calculate the memory and return.
+ return itemToCheck.amountOfRemainingMemory / itemToCheck.percentOfRemainingMemory * 100 > sourceObject.requiredTotalMemory;
+ }}));
+ }
+
+ public void registerFilterforRemainingMemory(){
+ this.registerFactorFilter(FactorFilter.create("requiredRemainingMemory",
+ new FactorFilter.Filter<MockExecutorObject,MockFlowObject>() {
+ public boolean check(MockExecutorObject itemToCheck, MockFlowObject sourceObject) {
+ // REAL LOGIC COMES HERE -
+ if (null == itemToCheck || null == sourceObject){
+ return false;
+ }
+ return itemToCheck.amountOfRemainingMemory > sourceObject.requiredRemainingMemory;
+ }}));
+ }
+
+ public void registerFilterforPriority(){
+ this.registerFactorFilter(FactorFilter.create("requiredProprity",
+ new FactorFilter.Filter<MockExecutorObject,MockFlowObject>() {
+ public boolean check(MockExecutorObject itemToCheck, MockFlowObject sourceObject) {
+ // REAL LOGIC COMES HERE -
+ if (null == itemToCheck || null == sourceObject){
+ return false;
+ }
+
+ // priority value, the bigger the lower.
+ return itemToCheck.priority >= sourceObject.priority;
+ }}));
+ }
+
+ public void registerFilterforRemainingTmpSpace(){
+ this.registerFactorFilter(FactorFilter.create("requiredRemainingTmpSpace",
+ new FactorFilter.Filter<MockExecutorObject,MockFlowObject>() {
+ public boolean check(MockExecutorObject itemToCheck, MockFlowObject sourceObject) {
+ // REAL LOGIC COMES HERE -
+ if (null == itemToCheck || null == sourceObject){
+ return false;
+ }
+
+ return itemToCheck.remainingTmp > sourceObject.requiredRemainingTmpSpace;
+ }}));
+ }
+
+ }
+
+ // mock comparator class.
+ protected class MockComparator
+ extends CandidateComparator<MockExecutorObject>{
+
+ @Override
+ public String getName() {
+ return "MockComparator";
+ }
+
+ public MockComparator(){
+ }
+
+ public void registerComparerForMemory(int weight){
+ this.registerFactorComparator(FactorComparator.create("Memory", weight, new Comparator<MockExecutorObject>(){
+ public int compare(MockExecutorObject o1, MockExecutorObject o2) {
+ int result = 0 ;
+
+ // check remaining amount of memory.
+ result = o1.amountOfRemainingMemory - o2.amountOfRemainingMemory;
+ if (result != 0){
+ return result > 0 ? 1 : -1;
+ }
+
+ // check remaining % .
+ result = (int)(o1.percentOfRemainingMemory - o2.percentOfRemainingMemory);
+ return result == 0 ? 0 : result > 0 ? 1 : -1;
+
+ } }));
+ }
+
+ public void registerComparerForRemainingSpace(int weight){
+ this.registerFactorComparator(FactorComparator.create("RemainingTmp", weight, new Comparator<MockExecutorObject>(){
+ public int compare(MockExecutorObject o1, MockExecutorObject o2) {
+ int result = 0 ;
+
+ // check remaining % .
+ result = (int)(o1.remainingTmp - o2.remainingTmp);
+ return result == 0 ? 0 : result > 0 ? 1 : -1;
+
+ } }));
+ }
+
+ public void registerComparerForPriority(int weight){
+ this.registerFactorComparator(FactorComparator.create("Priority", weight, new Comparator<MockExecutorObject>(){
+ public int compare(MockExecutorObject o1, MockExecutorObject o2) {
+ int result = 0 ;
+
+ // check priority, bigger the better.
+ result = (int)(o1.priority - o2.priority);
+ return result == 0 ? 0 : result > 0 ? 1 : -1;
+
+ } }));
+ }
+ }
+
+ // test samples.
+ protected ArrayList<MockExecutorObject> executorList = new ArrayList<MockExecutorObject>();
+
+ @Before
+ public void setUp() throws Exception {
+ BasicConfigurator.configure();
+
+ executorList.clear();
+ executorList.add(new MockExecutorObject("Executor1",8080,50.0,2048,5,new Date(), 20, 6400));
+ executorList.add(new MockExecutorObject("Executor2",8080,50.0,2048,4,new Date(), 20, 6400));
+ executorList.add(new MockExecutorObject("Executor3",8080,40.0,2048,1,new Date(), 20, 6400));
+ executorList.add(new MockExecutorObject("Executor4",8080,50.0,2048,4,new Date(), 20, 6400));
+ executorList.add(new MockExecutorObject("Executor5",8080,50.0,1024,5,new Date(), 90, 6400));
+ executorList.add(new MockExecutorObject("Executor6",8080,50.0,1024,5,new Date(), 90, 3200));
+ executorList.add(new MockExecutorObject("Executor7",8080,50.0,1024,5,new Date(), 90, 3200));
+ executorList.add(new MockExecutorObject("Executor8",8080,50.0,2048,1,new Date(), 90, 3200));
+ executorList.add(new MockExecutorObject("Executor9",8080,50.0,2050,5,new Date(), 90, 4200));
+ executorList.add(new MockExecutorObject("Executor10",8080,00.0,1024,1,new Date(), 90, 3200));
+ executorList.add(new MockExecutorObject("Executor11",8080,40.0,3096,3,new Date(), 90, 2400));
+ executorList.add(new MockExecutorObject("Executor12",8080,50.0,2050,5,new Date(), 60, 7200));
+ }
+
+ @After
+ public void tearDown() throws Exception {
+ }
+
+ @Test
+ public void testExecutorFilter() throws Exception {
+
+ // mock object, remaining memory 11500, total memory 3095, remainingTmpSpace 4200, priority 2.
+ MockFlowObject dispatchingObj = new MockFlowObject("flow1",3096, 1500,4200,2);
+
+ MockFilter mFilter = new MockFilter();
+ mFilter.registerFilterforRemainingMemory();
+
+ // expect true.
+ boolean result = mFilter.check(this.executorList.get(0), dispatchingObj);
+ /*
+ 1 [main] INFO azkaban.executor.dispatcher.CandidateFilter - start checking 'Executor1' with factor filter for 'Mockfilter'
+ 1 [main] INFO azkaban.executor.dispatcher.CandidateFilter - [Factor: requiredRemainingMemory] filter result : true
+ 1 [main] INFO azkaban.executor.dispatcher.CandidateFilter - Final checking result : true
+ * */
+ Assert.assertTrue(result);
+
+ //expect true.
+ result = mFilter.check(this.executorList.get(2), dispatchingObj);
+ /*
+ 1 [main] INFO azkaban.executor.dispatcher.CandidateFilter - start checking 'Executor3' with factor filter for 'Mockfilter'
+ 2 [main] INFO azkaban.executor.dispatcher.CandidateFilter - [Factor: requiredRemainingMemory] filter result : true
+ 2 [main] INFO azkaban.executor.dispatcher.CandidateFilter - Final checking result : true
+ */
+ Assert.assertTrue(result);
+
+ // add the priority filter.
+ mFilter.registerFilterforPriority();
+ result = mFilter.check(this.executorList.get(2), dispatchingObj);
+ // expect false, for priority.
+ /*
+ 2 [main] INFO azkaban.executor.dispatcher.CandidateFilter - start checking 'Executor3' with factor filter for 'Mockfilter'
+ 2 [main] INFO azkaban.executor.dispatcher.CandidateFilter - [Factor: requiredRemainingMemory] filter result : true
+ 2 [main] INFO azkaban.executor.dispatcher.CandidateFilter - [Factor: requiredProprity] filter result : false
+ 2 [main] INFO azkaban.executor.dispatcher.CandidateFilter - Final checking result : false
+ */
+ Assert.assertFalse(result);
+
+ // add the remaining space filter.
+ mFilter.registerFilterforRemainingTmpSpace();
+
+ // expect pass.
+ result = mFilter.check(this.executorList.get(1), dispatchingObj);
+ /*
+ 3 [main] INFO azkaban.executor.dispatcher.CandidateFilter - start checking 'Executor2' with factor filter for 'Mockfilter'
+ 3 [main] INFO azkaban.executor.dispatcher.CandidateFilter - [Factor: requiredRemainingMemory] filter result : true
+ 3 [main] INFO azkaban.executor.dispatcher.CandidateFilter - [Factor: requiredRemainingTmpSpace] filter result : true
+ 3 [main] INFO azkaban.executor.dispatcher.CandidateFilter - [Factor: requiredProprity] filter result : true
+ 3 [main] INFO azkaban.executor.dispatcher.CandidateFilter - Final checking result : true
+ */
+ Assert.assertTrue(result);
+
+ // expect false, remaining tmp, priority will also fail but the logic shortcuts when the Tmp size check Fails.
+ result = mFilter.check(this.executorList.get(7), dispatchingObj);
+ /*
+ 4 [main] INFO azkaban.executor.dispatcher.CandidateFilter - start checking 'Executor8' with factor filter for 'Mockfilter'
+ 4 [main] INFO azkaban.executor.dispatcher.CandidateFilter - [Factor: requiredRemainingMemory] filter result : true
+ 4 [main] INFO azkaban.executor.dispatcher.CandidateFilter - [Factor: requiredRemainingTmpSpace] filter result : false
+ 4 [main] INFO azkaban.executor.dispatcher.CandidateFilter - Final checking result : false
+ */
+ Assert.assertFalse(result);
+
+ }
+
+ @Test
+ public void testExecutorComparer() throws Exception {
+ MockComparator comparator = new MockComparator();
+ comparator.registerComparerForMemory(5);
+
+ MockExecutorObject nextExecutor = Collections.max(this.executorList, comparator);
+
+ // expect the first item to be selected, memory wise it is the max.
+ Assert.assertEquals(this.executorList.get(10),nextExecutor);
+
+ // add the priority factor.
+ // expect again the #9 item to be selected.
+ comparator.registerComparerForPriority(6);
+ nextExecutor = Collections.max(this.executorList, comparator);
+ /*
+ 10 [main] INFO azkaban.executor.dispatcher.CandidateComparator - start comparing 'Executor2' with 'Executor1', total weight = 11
+ 10 [main] INFO azkaban.executor.dispatcher.CandidateComparator - [Factor: Memory] compare result : 0 (current score 0 vs 0)
+ 11 [main] INFO azkaban.executor.dispatcher.CandidateComparator - [Factor: Priority] compare result : -1 (current score 0 vs 6)
+ 11 [main] INFO azkaban.executor.dispatcher.CandidateComparator - Result : 0 vs 6
+ 11 [main] INFO azkaban.executor.dispatcher.CandidateComparator - start comparing 'Executor3' with 'Executor1', total weight = 11
+ 11 [main] INFO azkaban.executor.dispatcher.CandidateComparator - [Factor: Memory] compare result : -1 (current score 0 vs 5)
+ 11 [main] INFO azkaban.executor.dispatcher.CandidateComparator - [Factor: Priority] compare result : -1 (current score 0 vs 11)
+ 11 [main] INFO azkaban.executor.dispatcher.CandidateComparator - Result : 0 vs 11
+ 12 [main] INFO azkaban.executor.dispatcher.CandidateComparator - start comparing 'Executor4' with 'Executor1', total weight = 11
+ 12 [main] INFO azkaban.executor.dispatcher.CandidateComparator - [Factor: Memory] compare result : 0 (current score 0 vs 0)
+ 12 [main] INFO azkaban.executor.dispatcher.CandidateComparator - [Factor: Priority] compare result : -1 (current score 0 vs 6)
+ 12 [main] INFO azkaban.executor.dispatcher.CandidateComparator - Result : 0 vs 6
+ 13 [main] INFO azkaban.executor.dispatcher.CandidateComparator - start comparing 'Executor5' with 'Executor1', total weight = 11
+ 13 [main] INFO azkaban.executor.dispatcher.CandidateComparator - [Factor: Memory] compare result : -1 (current score 0 vs 5)
+ 13 [main] INFO azkaban.executor.dispatcher.CandidateComparator - [Factor: Priority] compare result : 0 (current score 0 vs 5)
+ 14 [main] INFO azkaban.executor.dispatcher.CandidateComparator - Result : 0 vs 5
+ 14 [main] INFO azkaban.executor.dispatcher.CandidateComparator - start comparing 'Executor6' with 'Executor1', total weight = 11
+ 14 [main] INFO azkaban.executor.dispatcher.CandidateComparator - [Factor: Memory] compare result : -1 (current score 0 vs 5)
+ 14 [main] INFO azkaban.executor.dispatcher.CandidateComparator - [Factor: Priority] compare result : 0 (current score 0 vs 5)
+ 14 [main] INFO azkaban.executor.dispatcher.CandidateComparator - Result : 0 vs 5
+ 14 [main] INFO azkaban.executor.dispatcher.CandidateComparator - start comparing 'Executor7' with 'Executor1', total weight = 11
+ 15 [main] INFO azkaban.executor.dispatcher.CandidateComparator - [Factor: Memory] compare result : -1 (current score 0 vs 5)
+ 15 [main] INFO azkaban.executor.dispatcher.CandidateComparator - [Factor: Priority] compare result : 0 (current score 0 vs 5)
+ 15 [main] INFO azkaban.executor.dispatcher.CandidateComparator - Result : 0 vs 5
+ 15 [main] INFO azkaban.executor.dispatcher.CandidateComparator - start comparing 'Executor8' with 'Executor1', total weight = 11
+ 15 [main] INFO azkaban.executor.dispatcher.CandidateComparator - [Factor: Memory] compare result : 0 (current score 0 vs 0)
+ 16 [main] INFO azkaban.executor.dispatcher.CandidateComparator - [Factor: Priority] compare result : -1 (current score 0 vs 6)
+ 16 [main] INFO azkaban.executor.dispatcher.CandidateComparator - Result : 0 vs 6
+ 16 [main] INFO azkaban.executor.dispatcher.CandidateComparator - start comparing 'Executor9' with 'Executor1', total weight = 11
+ 16 [main] INFO azkaban.executor.dispatcher.CandidateComparator - [Factor: Memory] compare result : 1 (current score 5 vs 0)
+ 16 [main] INFO azkaban.executor.dispatcher.CandidateComparator - [Factor: Priority] compare result : 0 (current score 5 vs 0)
+ 16 [main] INFO azkaban.executor.dispatcher.CandidateComparator - Result : 5 vs 0
+ 17 [main] INFO azkaban.executor.dispatcher.CandidateComparator - start comparing 'Executor10' with 'Executor9', total weight = 11
+ 17 [main] INFO azkaban.executor.dispatcher.CandidateComparator - [Factor: Memory] compare result : -1 (current score 0 vs 5)
+ 17 [main] INFO azkaban.executor.dispatcher.CandidateComparator - [Factor: Priority] compare result : -1 (current score 0 vs 11)
+ 17 [main] INFO azkaban.executor.dispatcher.CandidateComparator - Result : 0 vs 11
+ 18 [main] INFO azkaban.executor.dispatcher.CandidateComparator - start comparing 'Executor11' with 'Executor9', total weight = 11
+ 18 [main] INFO azkaban.executor.dispatcher.CandidateComparator - [Factor: Memory] compare result : 1 (current score 5 vs 0)
+ 18 [main] INFO azkaban.executor.dispatcher.CandidateComparator - [Factor: Priority] compare result : -1 (current score 5 vs 6)
+ 19 [main] INFO azkaban.executor.dispatcher.CandidateComparator - Result : 5 vs 6
+ 19 [main] INFO azkaban.executor.dispatcher.CandidateComparator - start comparing 'Executor12' with 'Executor9', total weight = 11
+ 19 [main] INFO azkaban.executor.dispatcher.CandidateComparator - [Factor: Memory] compare result : 0 (current score 0 vs 0)
+ 19 [main] INFO azkaban.executor.dispatcher.CandidateComparator - [Factor: Priority] compare result : 0 (current score 0 vs 0)
+ 20 [main] INFO azkaban.executor.dispatcher.CandidateComparator - Result : 0 vs 0
+ * */
+ Assert.assertEquals(this.executorList.get(8),nextExecutor);
+
+ // add the remaining space factor.
+ // expect the #12 item to be returned.
+ comparator.registerComparerForRemainingSpace(3);
+ nextExecutor = Collections.max(this.executorList, comparator);
+ /*
+ 21 [main] INFO azkaban.executor.dispatcher.CandidateComparator - start comparing 'Executor2' with 'Executor1', total weight = 14
+ 22 [main] INFO azkaban.executor.dispatcher.CandidateComparator - [Factor: Memory] compare result : 0 (current score 0 vs 0)
+ 22 [main] INFO azkaban.executor.dispatcher.CandidateComparator - [Factor: Priority] compare result : -1 (current score 0 vs 6)
+ 22 [main] INFO azkaban.executor.dispatcher.CandidateComparator - [Factor: RemainingTmp] compare result : 0 (current score 0 vs 6)
+ 22 [main] INFO azkaban.executor.dispatcher.CandidateComparator - Result : 0 vs 6
+ 22 [main] INFO azkaban.executor.dispatcher.CandidateComparator - start comparing 'Executor3' with 'Executor1', total weight = 14
+ 23 [main] INFO azkaban.executor.dispatcher.CandidateComparator - [Factor: Memory] compare result : -1 (current score 0 vs 5)
+ 23 [main] INFO azkaban.executor.dispatcher.CandidateComparator - [Factor: Priority] compare result : -1 (current score 0 vs 11)
+ 23 [main] INFO azkaban.executor.dispatcher.CandidateComparator - [Factor: RemainingTmp] compare result : 0 (current score 0 vs 11)
+ 23 [main] INFO azkaban.executor.dispatcher.CandidateComparator - Result : 0 vs 11
+ 24 [main] INFO azkaban.executor.dispatcher.CandidateComparator - start comparing 'Executor4' with 'Executor1', total weight = 14
+ 24 [main] INFO azkaban.executor.dispatcher.CandidateComparator - [Factor: Memory] compare result : 0 (current score 0 vs 0)
+ 24 [main] INFO azkaban.executor.dispatcher.CandidateComparator - [Factor: Priority] compare result : -1 (current score 0 vs 6)
+ 25 [main] INFO azkaban.executor.dispatcher.CandidateComparator - [Factor: RemainingTmp] compare result : 0 (current score 0 vs 6)
+ 25 [main] INFO azkaban.executor.dispatcher.CandidateComparator - Result : 0 vs 6
+ 25 [main] INFO azkaban.executor.dispatcher.CandidateComparator - start comparing 'Executor5' with 'Executor1', total weight = 14
+ 25 [main] INFO azkaban.executor.dispatcher.CandidateComparator - [Factor: Memory] compare result : -1 (current score 0 vs 5)
+ 25 [main] INFO azkaban.executor.dispatcher.CandidateComparator - [Factor: Priority] compare result : 0 (current score 0 vs 5)
+ 26 [main] INFO azkaban.executor.dispatcher.CandidateComparator - [Factor: RemainingTmp] compare result : 0 (current score 0 vs 5)
+ 26 [main] INFO azkaban.executor.dispatcher.CandidateComparator - Result : 0 vs 5
+ 26 [main] INFO azkaban.executor.dispatcher.CandidateComparator - start comparing 'Executor6' with 'Executor1', total weight = 14
+ 26 [main] INFO azkaban.executor.dispatcher.CandidateComparator - [Factor: Memory] compare result : -1 (current score 0 vs 5)
+ 27 [main] INFO azkaban.executor.dispatcher.CandidateComparator - [Factor: Priority] compare result : 0 (current score 0 vs 5)
+ 27 [main] INFO azkaban.executor.dispatcher.CandidateComparator - [Factor: RemainingTmp] compare result : -1 (current score 0 vs 8)
+ 27 [main] INFO azkaban.executor.dispatcher.CandidateComparator - Result : 0 vs 8
+ 27 [main] INFO azkaban.executor.dispatcher.CandidateComparator - start comparing 'Executor7' with 'Executor1', total weight = 14
+ 27 [main] INFO azkaban.executor.dispatcher.CandidateComparator - [Factor: Memory] compare result : -1 (current score 0 vs 5)
+ 28 [main] INFO azkaban.executor.dispatcher.CandidateComparator - [Factor: Priority] compare result : 0 (current score 0 vs 5)
+ 28 [main] INFO azkaban.executor.dispatcher.CandidateComparator - [Factor: RemainingTmp] compare result : -1 (current score 0 vs 8)
+ 28 [main] INFO azkaban.executor.dispatcher.CandidateComparator - Result : 0 vs 8
+ 28 [main] INFO azkaban.executor.dispatcher.CandidateComparator - start comparing 'Executor8' with 'Executor1', total weight = 14
+ 29 [main] INFO azkaban.executor.dispatcher.CandidateComparator - [Factor: Memory] compare result : 0 (current score 0 vs 0)
+ 29 [main] INFO azkaban.executor.dispatcher.CandidateComparator - [Factor: Priority] compare result : -1 (current score 0 vs 6)
+ 29 [main] INFO azkaban.executor.dispatcher.CandidateComparator - [Factor: RemainingTmp] compare result : -1 (current score 0 vs 9)
+ 29 [main] INFO azkaban.executor.dispatcher.CandidateComparator - Result : 0 vs 9
+ 30 [main] INFO azkaban.executor.dispatcher.CandidateComparator - start comparing 'Executor9' with 'Executor1', total weight = 14
+ 30 [main] INFO azkaban.executor.dispatcher.CandidateComparator - [Factor: Memory] compare result : 1 (current score 5 vs 0)
+ 30 [main] INFO azkaban.executor.dispatcher.CandidateComparator - [Factor: Priority] compare result : 0 (current score 5 vs 0)
+ 30 [main] INFO azkaban.executor.dispatcher.CandidateComparator - [Factor: RemainingTmp] compare result : -1 (current score 5 vs 3)
+ 30 [main] INFO azkaban.executor.dispatcher.CandidateComparator - Result : 5 vs 3
+ 31 [main] INFO azkaban.executor.dispatcher.CandidateComparator - start comparing 'Executor10' with 'Executor9', total weight = 14
+ 31 [main] INFO azkaban.executor.dispatcher.CandidateComparator - [Factor: Memory] compare result : -1 (current score 0 vs 5)
+ 31 [main] INFO azkaban.executor.dispatcher.CandidateComparator - [Factor: Priority] compare result : -1 (current score 0 vs 11)
+ 32 [main] INFO azkaban.executor.dispatcher.CandidateComparator - [Factor: RemainingTmp] compare result : -1 (current score 0 vs 14)
+ 32 [main] INFO azkaban.executor.dispatcher.CandidateComparator - Result : 0 vs 14
+ 32 [main] INFO azkaban.executor.dispatcher.CandidateComparator - start comparing 'Executor11' with 'Executor9', total weight = 14
+ 32 [main] INFO azkaban.executor.dispatcher.CandidateComparator - [Factor: Memory] compare result : 1 (current score 5 vs 0)
+ 32 [main] INFO azkaban.executor.dispatcher.CandidateComparator - [Factor: Priority] compare result : -1 (current score 5 vs 6)
+ 33 [main] INFO azkaban.executor.dispatcher.CandidateComparator - [Factor: RemainingTmp] compare result : -1 (current score 5 vs 9)
+ 33 [main] INFO azkaban.executor.dispatcher.CandidateComparator - Result : 5 vs 9
+ 33 [main] INFO azkaban.executor.dispatcher.CandidateComparator - start comparing 'Executor12' with 'Executor9', total weight = 14
+ 33 [main] INFO azkaban.executor.dispatcher.CandidateComparator - [Factor: Memory] compare result : 0 (current score 0 vs 0)
+ 34 [main] INFO azkaban.executor.dispatcher.CandidateComparator - [Factor: Priority] compare result : 0 (current score 0 vs 0)
+ 34 [main] INFO azkaban.executor.dispatcher.CandidateComparator - [Factor: RemainingTmp] compare result : 1 (current score 3 vs 0)
+ 34 [main] INFO azkaban.executor.dispatcher.CandidateComparator - Result : 3 vs 0
+ * */
+ Assert.assertEquals(this.executorList.get(11),nextExecutor);
+ }
+
+ @Test
+ public void testDispatcher() throws Exception {
+ MockFilter filter = new MockFilter();
+ MockComparator comparator = new MockComparator();
+
+ filter.registerFilterforPriority();
+ filter.registerFilterforRemainingMemory();
+ filter.registerFilterforRemainingTmpSpace();
+ filter.registerFilterforTotalMemory();
+
+ comparator.registerComparerForMemory(3);
+ comparator.registerComparerForPriority(5);
+ comparator.registerComparerForRemainingSpace(3);
+
+ ExecutorDispatcher<MockExecutorObject,MockFlowObject> morkDispatcher =
+ new ExecutorDispatcher<MockExecutorObject,MockFlowObject>(filter,comparator);
+
+ // mock object, remaining memory 11500, total memory 3095, remainingTmpSpace 4200, priority 2.
+ MockFlowObject dispatchingObj = new MockFlowObject("flow1",3096, 1500,4200,2);
+
+ // expected selection = #12
+ MockExecutorObject nextExecutor = morkDispatcher.getNext(this.executorList, dispatchingObj);
+ /*
+ * 9 [main] INFO azkaban.executor.dispatcher.CandidateComparator - start candidate selection logic.
+ 9 [main] INFO azkaban.executor.dispatcher.CandidateComparator - candidate count before filtering: 12
+ 10 [main] INFO azkaban.executor.dispatcher.CandidateFilter - start checking 'Executor1' with factor filter for 'Mockfilter'
+ 10 [main] INFO azkaban.executor.dispatcher.CandidateFilter - [Factor: requiredRemainingMemory] filter result : true
+ 10 [main] INFO azkaban.executor.dispatcher.CandidateFilter - [Factor: requiredTotalMemory] filter result : true
+ 10 [main] INFO azkaban.executor.dispatcher.CandidateFilter - [Factor: requiredRemainingTmpSpace] filter result : true
+ 11 [main] INFO azkaban.executor.dispatcher.CandidateFilter - [Factor: requiredProprity] filter result : true
+ 11 [main] INFO azkaban.executor.dispatcher.CandidateFilter - Final checking result : true
+ 11 [main] INFO azkaban.executor.dispatcher.CandidateFilter - start checking 'Executor2' with factor filter for 'Mockfilter'
+ 11 [main] INFO azkaban.executor.dispatcher.CandidateFilter - [Factor: requiredRemainingMemory] filter result : true
+ 11 [main] INFO azkaban.executor.dispatcher.CandidateFilter - [Factor: requiredTotalMemory] filter result : true
+ 12 [main] INFO azkaban.executor.dispatcher.CandidateFilter - [Factor: requiredRemainingTmpSpace] filter result : true
+ 12 [main] INFO azkaban.executor.dispatcher.CandidateFilter - [Factor: requiredProprity] filter result : true
+ 12 [main] INFO azkaban.executor.dispatcher.CandidateFilter - Final checking result : true
+ 12 [main] INFO azkaban.executor.dispatcher.CandidateFilter - start checking 'Executor3' with factor filter for 'Mockfilter'
+ 12 [main] INFO azkaban.executor.dispatcher.CandidateFilter - [Factor: requiredRemainingMemory] filter result : true
+ 13 [main] INFO azkaban.executor.dispatcher.CandidateFilter - [Factor: requiredTotalMemory] filter result : true
+ 13 [main] INFO azkaban.executor.dispatcher.CandidateFilter - [Factor: requiredRemainingTmpSpace] filter result : true
+ 13 [main] INFO azkaban.executor.dispatcher.CandidateFilter - [Factor: requiredProprity] filter result : false
+ 13 [main] INFO azkaban.executor.dispatcher.CandidateFilter - Final checking result : false
+ 13 [main] INFO azkaban.executor.dispatcher.CandidateFilter - start checking 'Executor4' with factor filter for 'Mockfilter'
+ 13 [main] INFO azkaban.executor.dispatcher.CandidateFilter - [Factor: requiredRemainingMemory] filter result : true
+ 14 [main] INFO azkaban.executor.dispatcher.CandidateFilter - [Factor: requiredTotalMemory] filter result : true
+ 14 [main] INFO azkaban.executor.dispatcher.CandidateFilter - [Factor: requiredRemainingTmpSpace] filter result : true
+ 14 [main] INFO azkaban.executor.dispatcher.CandidateFilter - [Factor: requiredProprity] filter result : true
+ 14 [main] INFO azkaban.executor.dispatcher.CandidateFilter - Final checking result : true
+ 14 [main] INFO azkaban.executor.dispatcher.CandidateFilter - start checking 'Executor5' with factor filter for 'Mockfilter'
+ 15 [main] INFO azkaban.executor.dispatcher.CandidateFilter - [Factor: requiredRemainingMemory] filter result : false
+ 15 [main] INFO azkaban.executor.dispatcher.CandidateFilter - Final checking result : false
+ 15 [main] INFO azkaban.executor.dispatcher.CandidateFilter - start checking 'Executor6' with factor filter for 'Mockfilter'
+ 15 [main] INFO azkaban.executor.dispatcher.CandidateFilter - [Factor: requiredRemainingMemory] filter result : false
+ 15 [main] INFO azkaban.executor.dispatcher.CandidateFilter - Final checking result : false
+ 15 [main] INFO azkaban.executor.dispatcher.CandidateFilter - start checking 'Executor7' with factor filter for 'Mockfilter'
+ 16 [main] INFO azkaban.executor.dispatcher.CandidateFilter - [Factor: requiredRemainingMemory] filter result : false
+ 16 [main] INFO azkaban.executor.dispatcher.CandidateFilter - Final checking result : false
+ 16 [main] INFO azkaban.executor.dispatcher.CandidateFilter - start checking 'Executor8' with factor filter for 'Mockfilter'
+ 16 [main] INFO azkaban.executor.dispatcher.CandidateFilter - [Factor: requiredRemainingMemory] filter result : true
+ 16 [main] INFO azkaban.executor.dispatcher.CandidateFilter - [Factor: requiredTotalMemory] filter result : true
+ 17 [main] INFO azkaban.executor.dispatcher.CandidateFilter - [Factor: requiredRemainingTmpSpace] filter result : false
+ 17 [main] INFO azkaban.executor.dispatcher.CandidateFilter - Final checking result : false
+ 17 [main] INFO azkaban.executor.dispatcher.CandidateFilter - start checking 'Executor9' with factor filter for 'Mockfilter'
+ 17 [main] INFO azkaban.executor.dispatcher.CandidateFilter - [Factor: requiredRemainingMemory] filter result : true
+ 17 [main] INFO azkaban.executor.dispatcher.CandidateFilter - [Factor: requiredTotalMemory] filter result : true
+ 17 [main] INFO azkaban.executor.dispatcher.CandidateFilter - [Factor: requiredRemainingTmpSpace] filter result : false
+ 18 [main] INFO azkaban.executor.dispatcher.CandidateFilter - Final checking result : false
+ 18 [main] INFO azkaban.executor.dispatcher.CandidateFilter - start checking 'Executor10' with factor filter for 'Mockfilter'
+ 18 [main] INFO azkaban.executor.dispatcher.CandidateFilter - [Factor: requiredRemainingMemory] filter result : false
+ 18 [main] INFO azkaban.executor.dispatcher.CandidateFilter - Final checking result : false
+ 18 [main] INFO azkaban.executor.dispatcher.CandidateFilter - start checking 'Executor11' with factor filter for 'Mockfilter'
+ 18 [main] INFO azkaban.executor.dispatcher.CandidateFilter - [Factor: requiredRemainingMemory] filter result : true
+ 18 [main] INFO azkaban.executor.dispatcher.CandidateFilter - [Factor: requiredTotalMemory] filter result : true
+ 18 [main] INFO azkaban.executor.dispatcher.CandidateFilter - [Factor: requiredRemainingTmpSpace] filter result : false
+ 19 [main] INFO azkaban.executor.dispatcher.CandidateFilter - Final checking result : false
+ 19 [main] INFO azkaban.executor.dispatcher.CandidateFilter - start checking 'Executor12' with factor filter for 'Mockfilter'
+ 19 [main] INFO azkaban.executor.dispatcher.CandidateFilter - [Factor: requiredRemainingMemory] filter result : true
+ 19 [main] INFO azkaban.executor.dispatcher.CandidateFilter - [Factor: requiredTotalMemory] filter result : true
+ 19 [main] INFO azkaban.executor.dispatcher.CandidateFilter - [Factor: requiredRemainingTmpSpace] filter result : true
+ 19 [main] INFO azkaban.executor.dispatcher.CandidateFilter - [Factor: requiredProprity] filter result : true
+ 19 [main] INFO azkaban.executor.dispatcher.CandidateFilter - Final checking result : true
+ 20 [main] INFO azkaban.executor.dispatcher.CandidateComparator - candidate count after filtering: 4
+ 20 [main] INFO azkaban.executor.dispatcher.CandidateComparator - start comparing 'Executor2' with 'Executor1', total weight = 11
+ 20 [main] INFO azkaban.executor.dispatcher.CandidateComparator - [Factor: Memory] compare result : 0 (current score 0 vs 0)
+ 20 [main] INFO azkaban.executor.dispatcher.CandidateComparator - [Factor: Priority] compare result : -1 (current score 0 vs 5)
+ 21 [main] INFO azkaban.executor.dispatcher.CandidateComparator - [Factor: RemainingTmp] compare result : 0 (current score 0 vs 5)
+ 21 [main] INFO azkaban.executor.dispatcher.CandidateComparator - Result : 0 vs 5
+ 21 [main] INFO azkaban.executor.dispatcher.CandidateComparator - start comparing 'Executor4' with 'Executor1', total weight = 11
+ 22 [main] INFO azkaban.executor.dispatcher.CandidateComparator - [Factor: Memory] compare result : 0 (current score 0 vs 0)
+ 22 [main] INFO azkaban.executor.dispatcher.CandidateComparator - [Factor: Priority] compare result : -1 (current score 0 vs 5)
+ 22 [main] INFO azkaban.executor.dispatcher.CandidateComparator - [Factor: RemainingTmp] compare result : 0 (current score 0 vs 5)
+ 22 [main] INFO azkaban.executor.dispatcher.CandidateComparator - Result : 0 vs 5
+ 22 [main] INFO azkaban.executor.dispatcher.CandidateComparator - start comparing 'Executor12' with 'Executor1', total weight = 11
+ 23 [main] INFO azkaban.executor.dispatcher.CandidateComparator - [Factor: Memory] compare result : 1 (current score 3 vs 0)
+ 23 [main] INFO azkaban.executor.dispatcher.CandidateComparator - [Factor: Priority] compare result : 0 (current score 3 vs 0)
+ 23 [main] INFO azkaban.executor.dispatcher.CandidateComparator - [Factor: RemainingTmp] compare result : 1 (current score 6 vs 0)
+ 23 [main] INFO azkaban.executor.dispatcher.CandidateComparator - Result : 6 vs 0
+ 23 [main] INFO azkaban.executor.dispatcher.CandidateComparator - candidate selected Executor12
+ */
+ Assert.assertEquals( this.executorList.get(11),nextExecutor);
+
+ // remaining memory 11500, total memory 3095, remainingTmpSpace 14200, priority 2.
+ dispatchingObj = new MockFlowObject("flow1",3096, 1500,14200,2);
+ // all candidates should be filtered by the remaining memory.
+ nextExecutor = morkDispatcher.getNext(this.executorList, dispatchingObj);
+ Assert.assertEquals(null,nextExecutor);
+ }
+
+ @Test
+ public void testDispatcherChangingFactorWeight() throws Exception {
+ MockFilter filter = new MockFilter();
+ MockComparator comparator = new MockComparator();
+
+ filter.registerFilterforPriority();
+ filter.registerFilterforRemainingMemory();
+ filter.registerFilterforRemainingTmpSpace();
+ filter.registerFilterforTotalMemory();
+
+ comparator.registerComparerForMemory(1);
+ comparator.registerComparerForPriority(1);
+ comparator.registerComparerForRemainingSpace(1);
+
+ ExecutorDispatcher<MockExecutorObject,MockFlowObject> morkDispatcher =
+ new ExecutorDispatcher<MockExecutorObject,MockFlowObject>(filter,comparator);
+
+ MockFlowObject dispatchingObj = new MockFlowObject("flow1",100, 100,100,3);
+ MockExecutorObject executor = morkDispatcher.getNext(this.executorList, dispatchingObj);
+ Assert.assertEquals(this.executorList.get(11), executor);
+
+ // adjusted the weight for memory to 10, therefore item #11 should be returned.
+ morkDispatcher.getComparator().adjustFactorWeight("Memory", 10);
+ executor = morkDispatcher.getNext(this.executorList, dispatchingObj);
+ Assert.assertEquals(this.executorList.get(10), executor);
+
+ // adjusted the weight for memory back to 1, therefore item #12 should be returned.
+ morkDispatcher.getComparator().adjustFactorWeight("Memory", 1);
+ executor = morkDispatcher.getNext(this.executorList, dispatchingObj);
+ Assert.assertEquals(this.executorList.get(11), executor);
+
+ }
+}