Details
diff --git a/azkaban-common/src/main/java/azkaban/executor/Executor.java b/azkaban-common/src/main/java/azkaban/executor/Executor.java
index b2c3766..efe00f9 100644
--- a/azkaban-common/src/main/java/azkaban/executor/Executor.java
+++ b/azkaban-common/src/main/java/azkaban/executor/Executor.java
@@ -24,14 +24,14 @@ import azkaban.utils.Utils;
*
* @author gaggarwa
*/
-public class Executor {
+public class Executor implements Comparable<Executor> {
private final int id;
private final String host;
private final int port;
private boolean isActive;
// cached copy of the latest statistics from the executor.
- private Statistics cachedExecutorStats;
- private Date lastUpdated;
+ private ServerStatistics cachedExecutorStats;
+ private Date lastStatsUpdatedTime;
/**
* <pre>
@@ -113,13 +113,13 @@ public class Executor {
return id;
}
- public Statistics getExecutorStats() {
+ public ServerStatistics getExecutorStats() {
return this.cachedExecutorStats;
}
- public void setExecutorStats(Statistics stats) {
+ public void setExecutorStats(ServerStatistics stats) {
this.cachedExecutorStats = stats;
- this.lastUpdated = new Date();
+ this.lastStatsUpdatedTime = new Date();
}
/**
@@ -128,10 +128,15 @@ public class Executor {
* specific object is never refreshed.
* */
public Date getLastStatsUpdatedTime(){
- return this.lastUpdated;
+ return this.lastStatsUpdatedTime;
}
public void setActive(boolean isActive) {
this.isActive = isActive;
}
+
+ @Override
+ public int compareTo(Executor o) {
+ return null == o ? 1 : this.hashCode() - o.hashCode();
+ }
}
diff --git a/azkaban-common/src/main/java/azkaban/executor/selector/CandidateSelector.java b/azkaban-common/src/main/java/azkaban/executor/selector/CandidateSelector.java
index 7a25891..53cb1bb 100644
--- a/azkaban-common/src/main/java/azkaban/executor/selector/CandidateSelector.java
+++ b/azkaban-common/src/main/java/azkaban/executor/selector/CandidateSelector.java
@@ -26,7 +26,7 @@ import org.apache.log4j.Logger;
* @param K executor object type.
* @param V dispatching object type.
* */
-public class CandidateSelector<K,V> implements Selector<K, V> {
+public class CandidateSelector<K extends Comparable<K>, V> implements Selector<K, V> {
private static Logger logger = Logger.getLogger(CandidateComparator.class);
private CandidateFilter<K,V> filter;
@@ -70,16 +70,16 @@ public class CandidateSelector<K,V> implements Selector<K, V> {
logger.info(String.format("candidate count after filtering: %s", filteredList.size()));
if (filteredList.size() == 0){
- logger.info("failed to select candidate as the filted candidate list is empty.");
+ logger.info("failed to select candidate as the filtered candidate list is empty.");
return null;
}
if (null == comparator){
- logger.info("candidate comparator is not specified, default comparator from 'Collections' class will be used.");
+ logger.info("candidate comparator is not specified, default hash code comparator class will be used.");
}
// final work - find the best candidate from the filtered list.
- K executor = Collections.max(filteredList,comparator);
+ K executor = Collections.max(filteredList, comparator);
logger.info(String.format("candidate selected %s",
null == executor ? "(null)" : executor.toString()));
return executor;
diff --git a/azkaban-common/src/main/java/azkaban/executor/selector/ExecutorComparator.java b/azkaban-common/src/main/java/azkaban/executor/selector/ExecutorComparator.java
index ce4e41a..1d48685 100644
--- a/azkaban-common/src/main/java/azkaban/executor/selector/ExecutorComparator.java
+++ b/azkaban-common/src/main/java/azkaban/executor/selector/ExecutorComparator.java
@@ -24,8 +24,12 @@ import java.util.Map.Entry;
import java.util.Set;
import azkaban.executor.Executor;
-import azkaban.executor.Statistics;
+import azkaban.executor.ServerStatistics;
+
+/**
+ * De-normalized version of the CandidateComparator, which also contains the implementation of the factor comparators.
+ * */
public class ExecutorComparator extends CandidateComparator<Executor> {
private static Map<String, ComparatorCreator> comparatorCreatorRepository = null;
@@ -40,7 +44,6 @@ public class ExecutorComparator extends CandidateComparator<Executor> {
// factor comparator names
private static final String NUMOFASSIGNEDFLOW_COMPARATOR_NAME = "NumberOfAssignedFlowComparator";
private static final String MEMORY_COMPARATOR_NAME = "Memory";
- private static final String PRIORITY_COMPARATOR_NAME = "Priority";
private static final String LSTDISPATCHED_COMPARATOR_NAME = "LastDispatched";
private static final String CPUUSAGE_COMPARATOR_NAME = "CpuUsage";
@@ -60,10 +63,6 @@ public class ExecutorComparator extends CandidateComparator<Executor> {
comparatorCreatorRepository.put(MEMORY_COMPARATOR_NAME, new ComparatorCreator(){
public FactorComparator<Executor> create(int weight) { return getMemoryComparator(weight); }});
- // register the creator for priority comparator.
- comparatorCreatorRepository.put(PRIORITY_COMPARATOR_NAME, new ComparatorCreator(){
- public FactorComparator<Executor> create(int weight) { return getPriorityComparator(weight); }});
-
// register the creator for last dispatched time comparator.
comparatorCreatorRepository.put(LSTDISPATCHED_COMPARATOR_NAME, new ComparatorCreator(){
public FactorComparator<Executor> create(int weight) { return getLstDispatchedTimeComparator(weight); }});
@@ -81,8 +80,8 @@ public class ExecutorComparator extends CandidateComparator<Executor> {
* */
public ExecutorComparator(Map<String,Integer> comparatorList) {
if (null == comparatorList|| comparatorList.size() == 0){
- logger.error("failed to initialize executor comparator as the passed comparator list is invalid or empty.");
- throw new IllegalArgumentException("filterList");
+ throw new IllegalArgumentException("failed to initialize executor comparator" +
+ "as the passed comparator list is invalid or empty.");
}
// register the comparators, we will now throw here if the weight is invalid, it is handled in the super.
@@ -92,10 +91,9 @@ public class ExecutorComparator extends CandidateComparator<Executor> {
get(entry.getKey()).
create(entry.getValue()));
} else {
- logger.error(String.format("failed to initialize executor comparator "+
- "as the comparator implementation for requested factor '%s' doesn't exist.",
- entry.getKey()));
- throw new IllegalArgumentException("comparatorList");
+ throw new IllegalArgumentException(String.format("failed to initialize executor comparator " +
+ "as the comparator implementation for requested factor '%s' doesn't exist.",
+ entry.getKey()));
}
}
}
@@ -109,9 +107,10 @@ public class ExecutorComparator extends CandidateComparator<Executor> {
FactorComparator<Executor> create(int weight);
}
- /**
+ /**<pre>
* helper function that does the object on two statistics, comparator can leverage this function to provide
* shortcuts if the statistics object is missing from one or both sides of the executors.
+ * </pre>
* @param stat1 the first statistics object to be checked .
* @param stat2 the second statistics object to be checked.
* @param caller the name of the calling function, for logging purpose.
@@ -119,8 +118,8 @@ public class ExecutorComparator extends CandidateComparator<Executor> {
* @return true if the passed statistics are NOT both valid, a shortcut can be made (caller can consume the result),
* false otherwise.
* */
- private static boolean statisticsObjectCheck(Statistics statisticsObj1,
- Statistics statisticsObj2, String caller, Integer result){
+ private static boolean statisticsObjectCheck(ServerStatistics statisticsObj1,
+ ServerStatistics statisticsObj2, String caller, Integer result){
result = 0 ;
// both doesn't expose the info
if (null == statisticsObj1 && null == statisticsObj2){
@@ -158,8 +157,8 @@ public class ExecutorComparator extends CandidateComparator<Executor> {
@Override
public int compare(Executor o1, Executor o2) {
- Statistics stat1 = o1.getExecutorStats();
- Statistics stat2 = o2.getExecutorStats();
+ ServerStatistics stat1 = o1.getExecutorStats();
+ ServerStatistics stat2 = o2.getExecutorStats();
Integer result = 0;
if (statisticsObjectCheck(stat1,stat2,NUMOFASSIGNEDFLOW_COMPARATOR_NAME,result)){
@@ -170,27 +169,6 @@ public class ExecutorComparator extends CandidateComparator<Executor> {
}
/**
- * function defines the priority comparator.
- * @param weight weight of the comparator.
- * @return
- * */
- private static FactorComparator<Executor> getPriorityComparator(int weight){
- return FactorComparator.create(PRIORITY_COMPARATOR_NAME, weight, new Comparator<Executor>(){
-
- @Override
- public int compare(Executor o1, Executor o2) {
- Statistics stat1 = o1.getExecutorStats();
- Statistics stat2 = o2.getExecutorStats();
-
- Integer result = 0;
- if (statisticsObjectCheck(stat1,stat2,PRIORITY_COMPARATOR_NAME,result)){
- return result;
- }
- return ((Integer)stat1.getPriority()).compareTo(stat2.getPriority());
- }});
- }
-
- /**
* function defines the cpuUsage comparator.
* @param weight weight of the comparator.
* @return
@@ -200,10 +178,10 @@ public class ExecutorComparator extends CandidateComparator<Executor> {
@Override
public int compare(Executor o1, Executor o2) {
- Statistics stat1 = o1.getExecutorStats();
- Statistics stat2 = o2.getExecutorStats();
+ ServerStatistics stat1 = o1.getExecutorStats();
+ ServerStatistics stat2 = o2.getExecutorStats();
- Integer result = 0;
+ int result = 0;
if (statisticsObjectCheck(stat1,stat2,CPUUSAGE_COMPARATOR_NAME,result)){
return result;
}
@@ -224,10 +202,10 @@ public class ExecutorComparator extends CandidateComparator<Executor> {
@Override
public int compare(Executor o1, Executor o2) {
- Statistics stat1 = o1.getExecutorStats();
- Statistics stat2 = o2.getExecutorStats();
+ ServerStatistics stat1 = o1.getExecutorStats();
+ ServerStatistics stat2 = o2.getExecutorStats();
- Integer result = 0;
+ int result = 0;
if (statisticsObjectCheck(stat1,stat2,LSTDISPATCHED_COMPARATOR_NAME,result)){
return result;
}
@@ -256,11 +234,13 @@ public class ExecutorComparator extends CandidateComparator<Executor> {
}
- /**
+ /**<pre>
* function defines the Memory comparator.
- * @param weight weight of the comparator.
* Note: comparator firstly take the absolute value of the remaining memory, if both sides have the same value,
* it go further to check the percent of the remaining memory.
+ * </pre>
+ * @param weight weight of the comparator.
+
* @return
* */
private static FactorComparator<Executor> getMemoryComparator(int weight){
@@ -268,10 +248,10 @@ public class ExecutorComparator extends CandidateComparator<Executor> {
@Override
public int compare(Executor o1, Executor o2) {
- Statistics stat1 = o1.getExecutorStats();
- Statistics stat2 = o2.getExecutorStats();
+ ServerStatistics stat1 = o1.getExecutorStats();
+ ServerStatistics stat2 = o2.getExecutorStats();
- Integer result = 0;
+ int result = 0;
if (statisticsObjectCheck(stat1,stat2,MEMORY_COMPARATOR_NAME,result)){
return result;
}
diff --git a/azkaban-common/src/main/java/azkaban/executor/selector/ExecutorFilter.java b/azkaban-common/src/main/java/azkaban/executor/selector/ExecutorFilter.java
index c27b673..9adc0dc 100644
--- a/azkaban-common/src/main/java/azkaban/executor/selector/ExecutorFilter.java
+++ b/azkaban-common/src/main/java/azkaban/executor/selector/ExecutorFilter.java
@@ -23,9 +23,11 @@ import java.util.Set;
import azkaban.executor.ExecutableFlow;
import azkaban.executor.Executor;
-import azkaban.executor.Statistics;
-
+import azkaban.executor.ServerStatistics;
+/**
+ * De-normalized version of the candidateFilter, which also contains the implementation of the factor filters.
+ * */
public final class ExecutorFilter extends CandidateFilter<Executor, ExecutableFlow> {
private static Map<String, FactorFilter<Executor, ExecutableFlow>> filterRepository = null;
@@ -84,11 +86,12 @@ public final class ExecutorFilter extends CandidateFilter<Executor, ExecutableFl
return "ExecutorFilter";
}
- /**
+ /**<pre>
* function to register the static remaining flow size filter.
* NOTE : this is a static filter which means the filter will be filtering based on the system standard which is not
* Coming for the passed flow.
* Ideally this filter will make sure only the executor hasn't reached the Max allowed # of executing flows.
+ *</pre>
* */
private static FactorFilter<Executor, ExecutableFlow> getStaticRemainingFlowSizeFilter(){
return FactorFilter.create(STATICREMAININGFLOWSIZE_FILTER_NAME, new FactorFilter.Filter<Executor, ExecutableFlow>() {
@@ -98,7 +101,7 @@ public final class ExecutorFilter extends CandidateFilter<Executor, ExecutableFl
return false;
}
- Statistics stats = filteringTarget.getExecutorStats();
+ ServerStatistics stats = filteringTarget.getExecutorStats();
if (null == stats) {
logger.info(String.format("%s : filtering out %s as it's stats is unavailable.",
STATICREMAININGFLOWSIZE_FILTER_NAME,
@@ -125,7 +128,7 @@ public final class ExecutorFilter extends CandidateFilter<Executor, ExecutableFl
return false;
}
- Statistics stats = filteringTarget.getExecutorStats();
+ ServerStatistics stats = filteringTarget.getExecutorStats();
if (null == stats) {
logger.info(String.format("%s : filtering out %s as it's stats is unavailable.",
MINIMUMFREEMEMORY_FILTER_NAME,
@@ -140,9 +143,10 @@ public final class ExecutorFilter extends CandidateFilter<Executor, ExecutableFl
/**
* function to register the static Minimum Reserved Memory filter.
- * NOTE : this is a static filter which means the filter will be filtering based on the system standard which is not
- * Coming for the passed flow.
+ * NOTE : <pre> this is a static filter which means the filter will be filtering based on the system standard which
+ * is not Coming for the passed flow.
* This filter will filter out any executors that the current CPU usage exceed 95%
+ * </pre>
* */
private static FactorFilter<Executor, ExecutableFlow> getCpuStatusFilter(){
return FactorFilter.create(CPUSTATUS_FILTER_NAME, new FactorFilter.Filter<Executor, ExecutableFlow>() {
@@ -153,7 +157,7 @@ public final class ExecutorFilter extends CandidateFilter<Executor, ExecutableFl
return false;
}
- Statistics stats = filteringTarget.getExecutorStats();
+ ServerStatistics stats = filteringTarget.getExecutorStats();
if (null == stats) {
logger.info(String.format("%s : filtering out %s as it's stats is unavailable.",
MINIMUMFREEMEMORY_FILTER_NAME,
diff --git a/azkaban-common/src/main/java/azkaban/executor/selector/ExecutorSelector.java b/azkaban-common/src/main/java/azkaban/executor/selector/ExecutorSelector.java
index fd15ad1..d7236e8 100644
--- a/azkaban-common/src/main/java/azkaban/executor/selector/ExecutorSelector.java
+++ b/azkaban-common/src/main/java/azkaban/executor/selector/ExecutorSelector.java
@@ -22,11 +22,12 @@ import java.util.Map;
import azkaban.executor.ExecutableFlow;
import azkaban.executor.Executor;
-/**
+/**<pre>
* Executor selector class implementation.
* NOTE: This class is a de-generalized version of the CandidateSelector, which provides a
* clean and convenient constructor to take in filter and comparator name list and build
* the instance from that.
+ *</pre>
* */
public class ExecutorSelector extends CandidateSelector<Executor, ExecutableFlow> {
@@ -38,7 +39,7 @@ public class ExecutorSelector extends CandidateSelector<Executor, ExecutableFlow
* again comparator feature is disabled if a null value is passed.
* */
public ExecutorSelector(List<String> filterList, Map<String,Integer> comparatorList) {
- super(null == filterList || filterList.size() == 0 ? null : new ExecutorFilter(filterList),
- null == comparatorList || comparatorList.size() == 0 ? null : new ExecutorComparator(comparatorList));
+ super(null == filterList || filterList.isEmpty() ? null : new ExecutorFilter(filterList),
+ null == comparatorList || comparatorList.isEmpty() ? null : new ExecutorComparator(comparatorList));
}
}
diff --git a/azkaban-common/src/main/java/azkaban/executor/selector/FactorComparator.java b/azkaban-common/src/main/java/azkaban/executor/selector/FactorComparator.java
index 2dd76c7..6d4d2e0 100644
--- a/azkaban-common/src/main/java/azkaban/executor/selector/FactorComparator.java
+++ b/azkaban-common/src/main/java/azkaban/executor/selector/FactorComparator.java
@@ -33,13 +33,12 @@ public final class FactorComparator<T>{
* method provided below.
* @param factorName : the factor name .
* @param weight : the weight of the comparator.
- * @ comparator : function to be provided by user on how the comparison should be made.
+ * @param comparator : function to be provided by user on how the comparison should be made.
* */
private FactorComparator(String factorName, int weight, Comparator<T> comparator){
this.factorName = factorName;
this.weight = weight;
this.comparator = comparator;
- logger.info("comparator created for " + this.factorName);
}
/** static function to generate an instance of the class.
diff --git a/azkaban-common/src/main/java/azkaban/executor/selector/FactorFilter.java b/azkaban-common/src/main/java/azkaban/executor/selector/FactorFilter.java
index b277d45..04b04b7 100644
--- a/azkaban-common/src/main/java/azkaban/executor/selector/FactorFilter.java
+++ b/azkaban-common/src/main/java/azkaban/executor/selector/FactorFilter.java
@@ -36,7 +36,6 @@ public final class FactorFilter<T,V>{
private FactorFilter(String factorName, Filter<T,V> filter){
this.factorName = factorName;
this.filter = filter;
- logger.info("filter created for " + this.factorName);
}
/** static function to generate an instance of the class.
@@ -66,8 +65,8 @@ public final class FactorFilter<T,V>{
public interface Filter<T,V>{
/**function to analyze the target item according to the reference object to decide whether the item should be filtered.
- * @param filteringTarget: object to be checked.
- * @param referencingObject: object which contains statistics based on which a decision is made whether
+ * @param filteringTarget object to be checked.
+ * @param referencingObject object which contains statistics based on which a decision is made whether
* the object being checked need to be filtered or not.
* @return true if the check passed, false if check failed, which means the item need to be filtered.
* */
diff --git a/azkaban-common/src/main/java/azkaban/executor/selector/Selector.java b/azkaban-common/src/main/java/azkaban/executor/selector/Selector.java
index 610f75a..605fd28 100644
--- a/azkaban-common/src/main/java/azkaban/executor/selector/Selector.java
+++ b/azkaban-common/src/main/java/azkaban/executor/selector/Selector.java
@@ -19,13 +19,15 @@ package azkaban.executor.selector;
import java.util.List;
-/** Definition of the selector interface.
+/**<pre>
+ * Definition of the selector interface.
* an implementation of the selector interface provides the functionality
* to return a candidate from the candidateList that suits best for the dispatchingObject.
+ * </pre>
* @param K : type of the candidate.
* @param V : type of the dispatching object.
*/
-public interface Selector <K,V> {
+public interface Selector <K extends Comparable<K>,V> {
/** Function returns the next best suit candidate from the candidateList for the dispatching object.
* @param candidateList : List of the candidates to select from .
diff --git a/azkaban-common/src/test/java/azkaban/executor/SelectorTest.java b/azkaban-common/src/test/java/azkaban/executor/SelectorTest.java
index 97b460a..ca43678 100644
--- a/azkaban-common/src/test/java/azkaban/executor/SelectorTest.java
+++ b/azkaban-common/src/test/java/azkaban/executor/SelectorTest.java
@@ -35,7 +35,7 @@ import azkaban.executor.selector.*;
public class SelectorTest {
// mock executor object.
- protected class MockExecutorObject{
+ protected class MockExecutorObject implements Comparable <MockExecutorObject>{
public String name;
public int port;
public double percentOfRemainingMemory;
@@ -69,6 +69,11 @@ public class SelectorTest {
{
return this.name;
}
+
+ @Override
+ public int compareTo(MockExecutorObject o) {
+ return null == o ? 1 : this.hashCode() - o.hashCode();
+ }
}
// Mock flow object.
@@ -674,6 +679,25 @@ public class SelectorTest {
}
@Test
+ public void testCreatingExecutorSelectorWithEmptyFilterComparatorList() throws Exception{
+ List<Executor> executorList = new ArrayList<Executor>();
+ executorList.add(new Executor(1, "host1", 80, true));
+ executorList.add(new Executor(2, "host2", 80, true));
+ executorList.add(new Executor(3, "host3", 80, true));
+
+ executorList.get(0).setExecutorStats(new ServerStatistics(99.9, 14095, 50, new Date(), 89, 0));
+ executorList.get(1).setExecutorStats(new ServerStatistics(50, 14095, 50, new Date(), 90, 0));
+ executorList.get(2).setExecutorStats(new ServerStatistics(99.9, 14095, 50, new Date(), 90, 0));
+
+ ExecutableFlow flow = new ExecutableFlow();
+
+ ExecutorSelector selector = new ExecutorSelector(null , null);
+ Executor executor = selector.getBest(executorList, flow);
+ Assert.assertEquals(executorList.get(2), executor);
+ }
+
+
+ @Test
public void testExecutorSelectorE2E() throws Exception{
List<String> filterList = new ArrayList<String>(ExecutorFilter.getAvailableFilterNames());
Map<String,Integer> comparatorMap = new HashMap<String,Integer>();
@@ -682,9 +706,9 @@ public class SelectorTest {
executorList.add(new Executor(2, "host2", 80, true));
executorList.add(new Executor(3, "host3", 80, true));
- executorList.get(0).setExecutorStats(new Statistics(99.9, 14095, 50, new Date(), 4095,89, 0, 0));
- executorList.get(1).setExecutorStats(new Statistics(50, 14095, 50, new Date(), 4095,90, 0, 0));
- executorList.get(2).setExecutorStats(new Statistics(99.9, 14095, 50, new Date(), 2048,90, 0, 0));
+ executorList.get(0).setExecutorStats(new ServerStatistics(99.9, 14095, 50, new Date(), 89, 0));
+ executorList.get(1).setExecutorStats(new ServerStatistics(50, 14095, 50, new Date(), 90, 0));
+ executorList.get(2).setExecutorStats(new ServerStatistics(99.9, 14095, 50, new Date(), 90, 0));
ExecutableFlow flow = new ExecutableFlow();
@@ -697,7 +721,7 @@ public class SelectorTest {
// simulate that once the flow is assigned, executor1's remaining TMP storage dropped to 2048
// now we do the getBest again executor3 is expected to be selected as it has a earlier last dispatched time.
- executorList.get(0).setExecutorStats(new Statistics(99.9, 4095, 50, new Date(), 2048,90, 0, 1));
+ executorList.get(0).setExecutorStats(new ServerStatistics(99.9, 4095, 50, new Date(), 90, 1));
executor = selector.getBest(executorList, flow);
Assert.assertEquals(executorList.get(2), executor);
}
diff --git a/azkaban-common/src/test/java/azkaban/utils/cache/CacheTest.java b/azkaban-common/src/test/java/azkaban/utils/cache/CacheTest.java
index 905cf5e..125f1b3 100644
--- a/azkaban-common/src/test/java/azkaban/utils/cache/CacheTest.java
+++ b/azkaban-common/src/test/java/azkaban/utils/cache/CacheTest.java
@@ -84,9 +84,9 @@ public class CacheTest {
@Test
public void testTimeToLiveExpiry() {
- CacheManager.setUpdateFrequency(200);
CacheManager manager = CacheManager.getInstance();
Cache cache = manager.createCache();
+ CacheManager.setUpdateFrequency(200);
cache.setUpdateFrequencyMs(200);
cache.setEjectionPolicy(EjectionPolicy.FIFO);
@@ -122,9 +122,9 @@ public class CacheTest {
@Test
public void testIdleExpireExpiry() {
- CacheManager.setUpdateFrequency(250);
CacheManager manager = CacheManager.getInstance();
Cache cache = manager.createCache();
+ CacheManager.setUpdateFrequency(250);
cache.setUpdateFrequencyMs(250);
cache.setEjectionPolicy(EjectionPolicy.FIFO);
diff --git a/azkaban-execserver/src/main/java/azkaban/execapp/AzkabanExecutorServer.java b/azkaban-execserver/src/main/java/azkaban/execapp/AzkabanExecutorServer.java
index 1060e83..235989d 100644
--- a/azkaban-execserver/src/main/java/azkaban/execapp/AzkabanExecutorServer.java
+++ b/azkaban-execserver/src/main/java/azkaban/execapp/AzkabanExecutorServer.java
@@ -131,7 +131,7 @@ public class AzkabanExecutorServer {
root.addServlet(new ServletHolder(new ExecutorServlet()), "/executor");
root.addServlet(new ServletHolder(new JMXHttpServlet()), "/jmx");
root.addServlet(new ServletHolder(new StatsServlet()), "/stats");
- root.addServlet(new ServletHolder(new StatisticsServlet()), "/stastics");
+ root.addServlet(new ServletHolder(new ServerStatisticsServlet()), "/serverstastics");
root.setAttribute(ServerConstants.AZKABAN_SERVLET_CONTEXT_KEY, this);
@@ -176,7 +176,7 @@ public class AzkabanExecutorServer {
/**
* Configure Metric Reporting as per azkaban.properties settings
- *
+ *
* @throws MetricException
*/
private void configureMetricReports() throws MetricException {
@@ -225,13 +225,13 @@ public class AzkabanExecutorServer {
/**
* Load a custom class, which is provided by a configuration
* CUSTOM_JMX_ATTRIBUTE_PROCESSOR_PROPERTY.
- *
+ *
* This method will try to instantiate an instance of this custom class and
* with given properties as the argument in the constructor.
- *
+ *
* Basically the custom class must have a constructor that takes an argument
* with type Properties.
- *
+ *
* @param props
*/
private void loadCustomJMXAttributeProcessor(Props props) {
diff --git a/azkaban-execserver/src/main/java/azkaban/execapp/FlowRunnerManager.java b/azkaban-execserver/src/main/java/azkaban/execapp/FlowRunnerManager.java
index 10371aa..71caaff 100644
--- a/azkaban-execserver/src/main/java/azkaban/execapp/FlowRunnerManager.java
+++ b/azkaban-execserver/src/main/java/azkaban/execapp/FlowRunnerManager.java
@@ -144,7 +144,7 @@ public class FlowRunnerManager implements EventListener,
private Object executionDirDeletionSync = new Object();
// date time of the the last flow submitted.
- private Date lastFlowSubmitted = null;
+ private Date lastFlowSubmittedDate = null;
public FlowRunnerManager(Props props, ExecutorLoader executorLoader,
ProjectLoader projectLoader, ClassLoader parentClassLoader)
@@ -262,7 +262,7 @@ public class FlowRunnerManager implements EventListener,
// Note: this is not thread safe and may result in providing dirty data.
// we will provide this data as is for now and will revisit if there
// is a string justification for change.
- return lastFlowSubmitted;
+ return lastFlowSubmittedDate;
}
public Props getGlobalProps() {
@@ -519,7 +519,7 @@ public class FlowRunnerManager implements EventListener,
// keep track of this future
submittedFlows.put(future, runner.getExecutionId());
// update the last submitted time.
- this.lastFlowSubmitted = new Date();
+ this.lastFlowSubmittedDate = new Date();
} catch (RejectedExecutionException re) {
throw new ExecutorManagerException(
"Azkaban server can't execute any more flows. "
diff --git a/azkaban-execserver/src/test/java/azkaban/execapp/StatisticsServletTest.java b/azkaban-execserver/src/test/java/azkaban/execapp/StatisticsServletTest.java
index faf6abb..781973b 100644
--- a/azkaban-execserver/src/test/java/azkaban/execapp/StatisticsServletTest.java
+++ b/azkaban-execserver/src/test/java/azkaban/execapp/StatisticsServletTest.java
@@ -7,19 +7,19 @@ import java.util.Map;
import org.junit.Assert;
import org.junit.Test;
-import azkaban.executor.Statistics;
+import azkaban.executor.ServerStatistics;
import azkaban.utils.JSONUtils;
public class StatisticsServletTest {
- private class MockStatisticsServlet extends StatisticsServlet{
+ private class MockStatisticsServlet extends ServerStatisticsServlet{
/** */
private static final long serialVersionUID = 1L;
- public Statistics getStastics(){
+ public ServerStatistics getStastics(){
return cachedstats;
}
- public Date getUpdatedTime(){
+ public long getUpdatedTime(){
return lastRefreshedTime;
}
@@ -27,17 +27,17 @@ public class StatisticsServletTest {
this.populateStatistics(false);
}
- public void callFillCpuUsage(Statistics stats){
+ public void callFillCpuUsage(ServerStatistics stats){
this.fillCpuUsage(stats);}
- public void callFillRemainingMemoryPercent(Statistics stats){
+ public void callFillRemainingMemoryPercent(ServerStatistics stats){
this.fillRemainingMemoryPercent(stats);}
}
private MockStatisticsServlet statServlet = new MockStatisticsServlet();
@Test
public void testFillMemory() {
- Statistics stats = new Statistics();
+ ServerStatistics stats = new ServerStatistics();
this.statServlet.callFillRemainingMemoryPercent(stats);
// assume any machine that runs this test should
// have bash and top available and at least got some remaining memory.
@@ -47,7 +47,7 @@ public class StatisticsServletTest {
@Test
public void testFillCpu() {
- Statistics stats = new Statistics();
+ ServerStatistics stats = new ServerStatistics();
this.statServlet.callFillCpuUsage(stats);
Assert.assertTrue(stats.getCpuUsage() > 0);
}
@@ -64,8 +64,8 @@ public class StatisticsServletTest {
@Test
public void testPopulateStatisticsCache() {
this.statServlet.callPopulateStatistics();
- final Date updatedTime = this.statServlet.getUpdatedTime();
- while (new Date().getTime() - updatedTime.getTime() < 1000){
+ final long updatedTime = this.statServlet.getUpdatedTime();
+ while (System.currentTimeMillis() - updatedTime < 1000){
this.statServlet.callPopulateStatistics();
Assert.assertEquals(updatedTime, this.statServlet.getUpdatedTime());
}
@@ -81,21 +81,21 @@ public class StatisticsServletTest {
@Test
public void testStatisticsJsonParser() throws IOException {
- Statistics stat = new Statistics(0.1,1,2,new Date(),3,4,5,5);
+ ServerStatistics stat = new ServerStatistics(0.1,1,2,new Date(),3,5);
String jSonStr = JSONUtils.toJSON(stat);
@SuppressWarnings("unchecked")
Map<String,Object> jSonObj = (Map<String,Object>)JSONUtils.parseJSONFromString(jSonStr);
- Statistics stat2 = Statistics.fromJsonObject(jSonObj);
+ ServerStatistics stat2 = ServerStatistics.fromJsonObject(jSonObj);
Assert.assertTrue(stat.equals(stat2));
}
@Test
public void testStatisticsJsonParserWNullDateValue() throws IOException {
- Statistics stat = new Statistics(0.1,1,2,null,3,4,5,5);
+ ServerStatistics stat = new ServerStatistics(0.1,1,2,null,3,5);
String jSonStr = JSONUtils.toJSON(stat);
@SuppressWarnings("unchecked")
Map<String,Object> jSonObj = (Map<String,Object>)JSONUtils.parseJSONFromString(jSonStr);
- Statistics stat2 = Statistics.fromJsonObject(jSonObj);
+ ServerStatistics stat2 = ServerStatistics.fromJsonObject(jSonObj);
Assert.assertTrue(stat.equals(stat2));
}
}