ExecutorComparator.java

282 lines | 10.695 kB Blame History Raw Download
/*
 * Copyright 2015 LinkedIn Corp.
 *
 * Licensed under the Apache License, Version 2.0 (the "License"); you may not
 * use this file except in compliance with the License. You may obtain a copy of
 * the License at
 *
 * http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
 * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
 * License for the specific language governing permissions and limitations under
 * the License.
 */

package azkaban.executor.selector;

import java.util.Comparator;
import java.util.Date;
import java.util.HashMap;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Set;

import azkaban.executor.Executor;
import azkaban.executor.Statistics;

public class ExecutorComparator extends CandidateComparator<Executor> {
  private static Map<String, ComparatorCreator> comparatorCreatorRepository = null;

  /**
   * Gets the name list of all available comparators.
   * @return the list of the names.
   * */
  public static Set<String> getAvailableComparatorNames(){
    return comparatorCreatorRepository.keySet();
  }

  // factor comparator names
  private static final String REMAININGFLOWSIZE_COMPARATOR_NAME = "RemainingFlowSize";
  private static final String MEMORY_COMPARATOR_NAME = "Memory";
  private static final String REMAININGTMPSIZE_COMPARATOR_NAME = "RemainingTmpSize";
  private static final String PRIORITY_COMPARATOR_NAME = "Priority";
  private static final String LSTDISPATCHED_COMPARATOR_NAME = "LastDispatched";

  /**
   * static initializer of the class.
   * We will build the filter repository here.
   * when a new comparator is added, please do remember to register it here.
   * */
  static {
    comparatorCreatorRepository = new HashMap<String, ComparatorCreator>();

    // register the creator for remaining flow size comparator.
    comparatorCreatorRepository.put(REMAININGFLOWSIZE_COMPARATOR_NAME, new ComparatorCreator(){
      @Override public FactorComparator<Executor> create(int weight) { return getRemainingFlowSizeComparator(weight); }});

    // register the creator for memory comparator.
    comparatorCreatorRepository.put(MEMORY_COMPARATOR_NAME, new ComparatorCreator(){
      @Override public FactorComparator<Executor> create(int weight) { return getMemoryComparator(weight); }});

    // register the creator for priority comparator.
    comparatorCreatorRepository.put(PRIORITY_COMPARATOR_NAME, new ComparatorCreator(){
      @Override public FactorComparator<Executor> create(int weight) { return getPriorityComparator(weight); }});

    // register the creator for remaining TMP size comparator.
    comparatorCreatorRepository.put(PRIORITY_COMPARATOR_NAME, new ComparatorCreator(){
      @Override public FactorComparator<Executor> create(int weight) { return getRemainingTmpSizeComparator(weight); }});

    // register the creator for last dispatched time comparator.
    comparatorCreatorRepository.put(LSTDISPATCHED_COMPARATOR_NAME, new ComparatorCreator(){
      @Override public FactorComparator<Executor> create(int weight) { return getLstDispatchedTimeComparator(weight); }});
  }


  /**
   * constructor of the ExecutorComparator.
   * @param comparatorList   the list of comparator, plus its weight information to be registered,
   *  the parameter must be a not-empty and valid list object.
   * */
  public ExecutorComparator(Map<String,Integer> comparatorList) {
    if (null == comparatorList|| comparatorList.size() == 0){
      logger.error("failed to initialize executor comparator as the passed comparator list is invalid or empty.");
      throw new IllegalArgumentException("filterList");
    }

    // register the comparators, we will now throw here if the weight is invalid, it is handled in the super.
    for (Entry<String,Integer> entry : comparatorList.entrySet()){
      if (comparatorCreatorRepository.containsKey(entry.getKey())){
        this.registerFactorComparator(comparatorCreatorRepository.
            get(entry.getKey()).
            create(entry.getValue()));
      } else {
        logger.error(String.format("failed to initialize executor comparator as the comparator implementation for requested factor '%s' doesn't exist.",
            entry.getKey()));
        throw new IllegalArgumentException("comparatorList");
      }
    }
  }

  @Override
  public String getName() {
    return "ExecutorComparator";
  }

  private interface ComparatorCreator{
    FactorComparator<Executor> create(int weight);
  }

  /**
   * helper function that does the object  on two stats, comparator can leverage this function to provide
   * shortcuts if   the stat object is missing from one or both sides of the executors.
   * @param stat1   the first stat object to be checked .
   * @param stat2   the second stat object to be checked.
   * @param caller  the name of the calling function, for logging purpose.
   * @param result  result Integer to pass out the result in case the stats are not both valid.
   * @return true if the passed stats are NOT both valid, a shortcut can be made (caller can consume the result),
   *         false otherwise.
   * */
  private static boolean statsObjectCheck(Statistics stat1, Statistics stat2, String caller, Integer result){
    result = 0 ;
    // both doesn't expose the info
    if (null == stat1 && null == stat2){
      logger.info(String.format("%s : neither of the executors exposed stats info.",
          caller));
      return true;
    }

    //right side doesn't expose the info.
    if (null == stat2 ){
        logger.info(String.format("%s : choosing left side and the right side executor doesn't expose stat info",
            caller));
        result = 1;
        return true;
    }

    //left side doesn't expose the info.
    if (null == stat1 ){
      logger.info(String.format("%s : choosing right side and the left side executor doesn't expose stat info",
          caller));
      result = -1;
      return true;
      }

    // both not null
    return false;
  }

  /**
   * function defines the remaining flow size comparator.
   * @param weight weight of the comparator.
   * */
  private static FactorComparator<Executor> getRemainingFlowSizeComparator(int weight){
    return FactorComparator.create(REMAININGFLOWSIZE_COMPARATOR_NAME, weight, new Comparator<Executor>(){

      @Override
      public int compare(Executor o1, Executor o2) {
        Statistics stat1 = o1.getExecutorStats();
        Statistics stat2 = o2.getExecutorStats();

        Integer result = 0;
        if (statsObjectCheck(stat1,stat2,REMAININGFLOWSIZE_COMPARATOR_NAME,result)){
          return result;
        }
        return ((Integer)stat1.getRemainingFlowCapacity()).compareTo(stat2.getRemainingFlowCapacity());
      }});
  }

  /**
   * function defines the remaining folder size comparator.
   * @param weight weight of the comparator.
   * */
  private static FactorComparator<Executor> getRemainingTmpSizeComparator(int weight){
    return FactorComparator.create(REMAININGTMPSIZE_COMPARATOR_NAME, weight, new Comparator<Executor>(){

      @Override
      public int compare(Executor o1, Executor o2) {
        Statistics stat1 = o1.getExecutorStats();
        Statistics stat2 = o2.getExecutorStats();

        Integer result = 0;
        if (statsObjectCheck(stat1,stat2,REMAININGTMPSIZE_COMPARATOR_NAME,result)){
          return result;
        }
        return ((Long)stat1.getRemainingStorage()).compareTo(stat2.getRemainingStorage());
      }});
  }

  /**
   * function defines the priority comparator.
   * @param weight weight of the comparator.
   * @return
   * */
  private static FactorComparator<Executor> getPriorityComparator(int weight){
    return FactorComparator.create(PRIORITY_COMPARATOR_NAME, weight, new Comparator<Executor>(){

      @Override
      public int compare(Executor o1, Executor o2) {
        Statistics stat1 = o1.getExecutorStats();
        Statistics stat2 = o2.getExecutorStats();

        Integer result = 0;
        if (statsObjectCheck(stat1,stat2,PRIORITY_COMPARATOR_NAME,result)){
          return result;
        }
        return ((Integer)stat1.getPriority()).compareTo(stat2.getPriority());
      }});
  }


  /**
   * function defines the last dispatched time comparator.
   * @param weight weight of the comparator.
   * @return
   * */
  private static FactorComparator<Executor> getLstDispatchedTimeComparator(int weight){
    return FactorComparator.create(LSTDISPATCHED_COMPARATOR_NAME, weight, new Comparator<Executor>(){

      @Override
      public int compare(Executor o1, Executor o2) {
        Statistics stat1 = o1.getExecutorStats();
        Statistics stat2 = o2.getExecutorStats();

        Integer result = 0;
        if (statsObjectCheck(stat1,stat2,LSTDISPATCHED_COMPARATOR_NAME,result)){
          return result;
        }

        if (null == stat1.getLastDispatchedTime() && null == stat1.getLastDispatchedTime()){
          logger.info(String.format("%s : stats from both side doesn't contain last dispatched time info.",
              LSTDISPATCHED_COMPARATOR_NAME));
          return 0;
        }

        if (null == stat2.getLastDispatchedTime()){
          logger.info(String.format("%s : choosing left side as right doesn't contain last dispatched time info.",
              LSTDISPATCHED_COMPARATOR_NAME));
          return 1;
        }

        if (null == stat1.getLastDispatchedTime()){
          logger.info(String.format("%s : choosing rigth side as left doesn't contain last dispatched time info.",
              LSTDISPATCHED_COMPARATOR_NAME));
          return -1;
        }

        // Note: an earlier date time indicates higher weight.
        return ((Date)stat2.getLastDispatchedTime()).compareTo(stat1.getLastDispatchedTime());
      }});
  }


  /**
   * function defines the Memory comparator.
   * @param weight weight of the comparator.
   * Note: comparator firstly take the absolute value of the remaining memory, if both sides have the same value,
   *       it go further to check the percent of the remaining memory.
   * @return
   * */
  private static FactorComparator<Executor> getMemoryComparator(int weight){
    return FactorComparator.create(MEMORY_COMPARATOR_NAME, weight, new Comparator<Executor>(){

      @Override
      public int compare(Executor o1, Executor o2) {
       Statistics stat1 = o1.getExecutorStats();
       Statistics stat2 = o2.getExecutorStats();

       Integer result = 0;
       if (statsObjectCheck(stat1,stat2,MEMORY_COMPARATOR_NAME,result)){
         return result;
       }

       if (stat1.getRemainingMemory() != stat2.getRemainingMemory()){
         return stat1.getRemainingMemory() > stat2.getRemainingMemory() ? 1:-1;
       }

       return Double.compare(stat1.getRemainingMemoryPercent(), stat2.getRemainingMemoryPercent());
      }});
  }
}