Details
diff --git a/azkaban-common/src/main/java/azkaban/executor/Executor.java b/azkaban-common/src/main/java/azkaban/executor/Executor.java
index 31070bf..e21ae2a 100644
--- a/azkaban-common/src/main/java/azkaban/executor/Executor.java
+++ b/azkaban-common/src/main/java/azkaban/executor/Executor.java
@@ -16,6 +16,7 @@
package azkaban.executor;
+import java.util.Date;
import azkaban.utils.Utils;
/**
@@ -23,13 +24,14 @@ import azkaban.utils.Utils;
*
* @author gaggarwa
*/
-public class Executor {
+public class Executor implements Comparable<Executor> {
private final int id;
private final String host;
private final int port;
private boolean isActive;
-
- // TODO: ExecutorStats to be added
+ // cached copy of the latest statistics from the executor.
+ private ExecutorInfo cachedExecutorStats;
+ private Date lastStatsUpdatedTime;
/**
* <pre>
@@ -88,6 +90,13 @@ public class Executor {
return true;
}
+ @Override
+ public String toString(){
+ return String.format("%s (id: %s)",
+ null == this.host || this.host.length() == 0 ? "(empty)" : this.host,
+ this.id);
+ }
+
public String getHost() {
return host;
}
@@ -104,7 +113,30 @@ public class Executor {
return id;
}
+ public ExecutorInfo getExecutorInfo() {
+ return this.cachedExecutorStats;
+ }
+
+ public void setExecutorInfo(ExecutorInfo info) {
+ this.cachedExecutorStats = info;
+ this.lastStatsUpdatedTime = new Date();
+ }
+
+ /**
+ * Gets the timestamp when the executor info is last updated.
+ * @return date object represents the timestamp, null if the executor info of this
+ * specific executor is never refreshed.
+ * */
+ public Date getLastStatsUpdatedTime(){
+ return this.lastStatsUpdatedTime;
+ }
+
public void setActive(boolean isActive) {
this.isActive = isActive;
}
+
+ @Override
+ public int compareTo(Executor o) {
+ return null == o ? 1 : this.hashCode() - o.hashCode();
+ }
}
diff --git a/azkaban-common/src/main/java/azkaban/executor/ExecutorApiClient.java b/azkaban-common/src/main/java/azkaban/executor/ExecutorApiClient.java
index 4d77897..2ab814d 100644
--- a/azkaban-common/src/main/java/azkaban/executor/ExecutorApiClient.java
+++ b/azkaban-common/src/main/java/azkaban/executor/ExecutorApiClient.java
@@ -17,37 +17,35 @@
package azkaban.executor;
import java.io.IOException;
-import java.util.HashMap;
-import java.util.Map;
-
-import org.apache.http.HttpEntity;
import org.apache.http.HttpResponse;
import org.apache.http.StatusLine;
import org.apache.http.client.HttpResponseException;
import org.apache.http.util.EntityUtils;
-
-import azkaban.utils.JSONUtils;
import azkaban.utils.RestfulApiClient;
/** Client class that will be used to handle all Restful API calls between Executor and the host application.
* */
-public class ExecutorApiClient extends RestfulApiClient<Map<String, Object>> {
+public class ExecutorApiClient extends RestfulApiClient<String> {
+ private static ExecutorApiClient instance = null;
private ExecutorApiClient(){}
- private static ExecutorApiClient instance = new ExecutorApiClient();
- /** Function to return the instance of the ExecutorApiClient class.
+ /**
+ * Singleton method to return the instance of the current object.
* */
- public static ExecutorApiClient getInstance() {
+ public static ExecutorApiClient getInstance(){
+ if (null == instance){
+ instance = new ExecutorApiClient();
+ }
+
return instance;
}
/**Implementing the parseResponse function to return de-serialized Json object.
* @param response the returned response from the HttpClient.
- * @return de-serialized object from Json or empty object if the response doesn't have a body.
+ * @return de-serialized object from Json or null if the response doesn't have a body.
* */
- @SuppressWarnings("unchecked")
@Override
- protected Map<String, Object> parseResponse(HttpResponse response)
+ protected String parseResponse(HttpResponse response)
throws HttpResponseException, IOException {
final StatusLine statusLine = response.getStatusLine();
String responseBody = response.getEntity() != null ?
@@ -61,14 +59,6 @@ public class ExecutorApiClient extends RestfulApiClient<Map<String, Object>> {
throw new HttpResponseException(statusLine.getStatusCode(),responseBody);
}
- final HttpEntity entity = response.getEntity();
- if (null != entity){
- Object returnVal = JSONUtils.parseJSONFromString(EntityUtils.toString(entity));
- if (null!= returnVal){
- return (Map<String, Object>) returnVal;
- }
- }
-
- return new HashMap<String, Object>() ;
+ return responseBody;
}
}
diff --git a/azkaban-common/src/main/java/azkaban/executor/ExecutorInfo.java b/azkaban-common/src/main/java/azkaban/executor/ExecutorInfo.java
new file mode 100644
index 0000000..03de432
--- /dev/null
+++ b/azkaban-common/src/main/java/azkaban/executor/ExecutorInfo.java
@@ -0,0 +1,140 @@
+/*
+ * Copyright 2015 LinkedIn Corp.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package azkaban.executor;
+
+import java.io.IOException;
+
+import org.codehaus.jackson.JsonParseException;
+import org.codehaus.jackson.map.JsonMappingException;
+import org.codehaus.jackson.map.ObjectMapper;
+
+ /** Class that exposes the statistics from the executor server.
+ * List of the statistics -
+ * remainingMemoryPercent;
+ * remainingMemory;
+ * remainingFlowCapacity;
+ * numberOfAssignedFlows;
+ * lastDispatchedTime;
+ * cpuUsage;
+ *
+ * */
+ public class ExecutorInfo implements java.io.Serializable{
+ private static final long serialVersionUID = 3009746603773371263L;
+ private double remainingMemoryPercent;
+ private long remainingMemoryInMB;
+ private int remainingFlowCapacity;
+ private int numberOfAssignedFlows;
+ private long lastDispatchedTime;
+ private double cpuUsage;
+
+ public double getCpuUsage() {
+ return this.cpuUsage;
+ }
+
+ public void setCpuUpsage(double value){
+ this.cpuUsage = value;
+ }
+
+ public double getRemainingMemoryPercent() {
+ return this.remainingMemoryPercent;
+ }
+
+ public void setRemainingMemoryPercent(double value){
+ this.remainingMemoryPercent = value;
+ }
+
+ public long getRemainingMemoryInMB(){
+ return this.remainingMemoryInMB;
+ }
+
+ public void setRemainingMemoryInMB(long value){
+ this.remainingMemoryInMB = value;
+ }
+
+ public int getRemainingFlowCapacity(){
+ return this.remainingFlowCapacity;
+ }
+
+ public void setRemainingFlowCapacity(int value){
+ this.remainingFlowCapacity = value;
+ }
+
+ public long getLastDispatchedTime(){
+ return this.lastDispatchedTime;
+ }
+
+ public void setLastDispatchedTime(long value){
+ this.lastDispatchedTime = value;
+ }
+
+ public int getNumberOfAssignedFlows () {
+ return this.numberOfAssignedFlows;
+ }
+
+ public void setNumberOfAssignedFlows (int value) {
+ this.numberOfAssignedFlows = value;
+ }
+
+ public ExecutorInfo(){}
+
+ public ExecutorInfo (double remainingMemoryPercent,
+ long remainingMemory,
+ int remainingFlowCapacity,
+ long lastDispatched,
+ double cpuUsage,
+ int numberOfAssignedFlows){
+ this.remainingMemoryInMB = remainingMemory;
+ this.cpuUsage = cpuUsage;
+ this.remainingFlowCapacity = remainingFlowCapacity;
+ this.remainingMemoryPercent = remainingMemoryPercent;
+ this.lastDispatchedTime = lastDispatched;
+ this.numberOfAssignedFlows = numberOfAssignedFlows;
+ }
+
+ @Override
+ public boolean equals(Object obj)
+ {
+ if (obj instanceof ExecutorInfo)
+ {
+ boolean result = true;
+ ExecutorInfo stat = (ExecutorInfo) obj;
+
+ result &=this.remainingMemoryInMB == stat.remainingMemoryInMB;
+ result &=this.cpuUsage == stat.cpuUsage;
+ result &=this.remainingFlowCapacity == stat.remainingFlowCapacity;
+ result &=this.remainingMemoryPercent == stat.remainingMemoryPercent;
+ result &=this.numberOfAssignedFlows == stat.numberOfAssignedFlows;
+ result &= this.lastDispatchedTime == stat.lastDispatchedTime;
+ return result;
+ }
+ return false;
+ }
+
+ /**
+ * Helper function to get an ExecutorInfo instance from the JSon String serialized from another object.
+ * @param jsonString the string that will be de-serialized from.
+ * @return instance of the object if the parsing is successful, null other wise.
+ * @throws JsonParseException,JsonMappingException,IOException
+ * */
+ public static ExecutorInfo fromJSONString(String jsonString) throws
+ JsonParseException,
+ JsonMappingException,
+ IOException{
+ if (null == jsonString || jsonString.length() == 0) return null;
+ return new ObjectMapper().readValue(jsonString, ExecutorInfo.class);
+ }
+}
diff --git a/azkaban-common/src/main/java/azkaban/executor/selector/CandidateComparator.java b/azkaban-common/src/main/java/azkaban/executor/selector/CandidateComparator.java
index 8234f7d..8ef2c76 100644
--- a/azkaban-common/src/main/java/azkaban/executor/selector/CandidateComparator.java
+++ b/azkaban-common/src/main/java/azkaban/executor/selector/CandidateComparator.java
@@ -31,7 +31,7 @@ import azkaban.utils.Pair;
*
*/
public abstract class CandidateComparator<T> implements Comparator<T> {
- private static Logger logger = Logger.getLogger(CandidateComparator.class);
+ protected static Logger logger = Logger.getLogger(CandidateComparator.class);
// internal repository of the registered comparators .
private Map<String,FactorComparator<T>> factorComparatorList =
diff --git a/azkaban-common/src/main/java/azkaban/executor/selector/CandidateFilter.java b/azkaban-common/src/main/java/azkaban/executor/selector/CandidateFilter.java
index 154d528..94b8dfa 100644
--- a/azkaban-common/src/main/java/azkaban/executor/selector/CandidateFilter.java
+++ b/azkaban-common/src/main/java/azkaban/executor/selector/CandidateFilter.java
@@ -28,14 +28,14 @@ import org.apache.log4j.Logger;
* register filters using the provided register function.
*/
public abstract class CandidateFilter<T,V> {
- private static Logger logger = Logger.getLogger(CandidateFilter.class);
+ protected static Logger logger = Logger.getLogger(CandidateFilter.class);
// internal repository of the registered filters .
private Map<String,FactorFilter<T,V>> factorFilterList =
new ConcurrentHashMap<String,FactorFilter<T,V>>();
/** gets the name of the current implementation of the candidate filter.
- * @returns : name of the filter.
+ * @return : name of the filter.
* */
public abstract String getName();
diff --git a/azkaban-common/src/main/java/azkaban/executor/selector/CandidateSelector.java b/azkaban-common/src/main/java/azkaban/executor/selector/CandidateSelector.java
index 0598d95..53cb1bb 100644
--- a/azkaban-common/src/main/java/azkaban/executor/selector/CandidateSelector.java
+++ b/azkaban-common/src/main/java/azkaban/executor/selector/CandidateSelector.java
@@ -26,7 +26,7 @@ import org.apache.log4j.Logger;
* @param K executor object type.
* @param V dispatching object type.
* */
-public class CandidateSelector<K,V> implements Selector<K, V> {
+public class CandidateSelector<K extends Comparable<K>, V> implements Selector<K, V> {
private static Logger logger = Logger.getLogger(CandidateComparator.class);
private CandidateFilter<K,V> filter;
@@ -70,12 +70,16 @@ public class CandidateSelector<K,V> implements Selector<K, V> {
logger.info(String.format("candidate count after filtering: %s", filteredList.size()));
if (filteredList.size() == 0){
- logger.info("failed to select candidate as the filted candidate list is empty.");
+ logger.info("failed to select candidate as the filtered candidate list is empty.");
return null;
}
+ if (null == comparator){
+ logger.info("candidate comparator is not specified, default hash code comparator class will be used.");
+ }
+
// final work - find the best candidate from the filtered list.
- K executor = Collections.max(filteredList,comparator);
+ K executor = Collections.max(filteredList, comparator);
logger.info(String.format("candidate selected %s",
null == executor ? "(null)" : executor.toString()));
return executor;
diff --git a/azkaban-common/src/main/java/azkaban/executor/selector/ExecutorComparator.java b/azkaban-common/src/main/java/azkaban/executor/selector/ExecutorComparator.java
new file mode 100644
index 0000000..eafab67
--- /dev/null
+++ b/azkaban-common/src/main/java/azkaban/executor/selector/ExecutorComparator.java
@@ -0,0 +1,246 @@
+/*
+ * Copyright 2015 LinkedIn Corp.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package azkaban.executor.selector;
+
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Set;
+
+import azkaban.executor.Executor;
+import azkaban.executor.ExecutorInfo;
+
+
+/**
+ * De-normalized version of the CandidateComparator, which also contains the implementation of the factor comparators.
+ * */
+public class ExecutorComparator extends CandidateComparator<Executor> {
+ private static Map<String, ComparatorCreator> comparatorCreatorRepository = null;
+
+ /**
+ * Gets the name list of all available comparators.
+ * @return the list of the names.
+ * */
+ public static Set<String> getAvailableComparatorNames(){
+ return comparatorCreatorRepository.keySet();
+ }
+
+ // factor comparator names
+ private static final String NUMOFASSIGNEDFLOW_COMPARATOR_NAME = "NumberOfAssignedFlowComparator";
+ private static final String MEMORY_COMPARATOR_NAME = "Memory";
+ private static final String LSTDISPATCHED_COMPARATOR_NAME = "LastDispatched";
+ private static final String CPUUSAGE_COMPARATOR_NAME = "CpuUsage";
+
+ /**
+ * static initializer of the class.
+ * We will build the filter repository here.
+ * when a new comparator is added, please do remember to register it here.
+ * */
+ static {
+ comparatorCreatorRepository = new HashMap<String, ComparatorCreator>();
+
+ // register the creator for number of assigned flow comparator.
+ comparatorCreatorRepository.put(NUMOFASSIGNEDFLOW_COMPARATOR_NAME, new ComparatorCreator(){
+ public FactorComparator<Executor> create(int weight) { return getNumberOfAssignedFlowComparator(weight); }});
+
+ // register the creator for memory comparator.
+ comparatorCreatorRepository.put(MEMORY_COMPARATOR_NAME, new ComparatorCreator(){
+ public FactorComparator<Executor> create(int weight) { return getMemoryComparator(weight); }});
+
+ // register the creator for last dispatched time comparator.
+ comparatorCreatorRepository.put(LSTDISPATCHED_COMPARATOR_NAME, new ComparatorCreator(){
+ public FactorComparator<Executor> create(int weight) { return getLstDispatchedTimeComparator(weight); }});
+
+ // register the creator for CPU Usage comparator.
+ comparatorCreatorRepository.put(CPUUSAGE_COMPARATOR_NAME, new ComparatorCreator(){
+ public FactorComparator<Executor> create(int weight) { return getCpuUsageComparator(weight); }});
+ }
+
+
+ /**
+ * constructor of the ExecutorComparator.
+ * @param comparatorList the list of comparator, plus its weight information to be registered,
+ * the parameter must be a not-empty and valid list object.
+ * */
+ public ExecutorComparator(Map<String,Integer> comparatorList) {
+ if (null == comparatorList|| comparatorList.size() == 0){
+ throw new IllegalArgumentException("failed to initialize executor comparator" +
+ "as the passed comparator list is invalid or empty.");
+ }
+
+ // register the comparators, we will now throw here if the weight is invalid, it is handled in the super.
+ for (Entry<String,Integer> entry : comparatorList.entrySet()){
+ if (comparatorCreatorRepository.containsKey(entry.getKey())){
+ this.registerFactorComparator(comparatorCreatorRepository.
+ get(entry.getKey()).
+ create(entry.getValue()));
+ } else {
+ throw new IllegalArgumentException(String.format("failed to initialize executor comparator " +
+ "as the comparator implementation for requested factor '%s' doesn't exist.",
+ entry.getKey()));
+ }
+ }
+ }
+
+ @Override
+ public String getName() {
+ return "ExecutorComparator";
+ }
+
+ private interface ComparatorCreator{
+ FactorComparator<Executor> create(int weight);
+ }
+
+ /**<pre>
+ * helper function that does the object on two statistics, comparator can leverage this function to provide
+ * shortcuts if the statistics object is missing from one or both sides of the executors.
+ * </pre>
+ * @param stat1 the first statistics object to be checked .
+ * @param stat2 the second statistics object to be checked.
+ * @param caller the name of the calling function, for logging purpose.
+ * @param result result Integer to pass out the result in case the statistics are not both valid.
+ * @return true if the passed statistics are NOT both valid, a shortcut can be made (caller can consume the result),
+ * false otherwise.
+ * */
+ private static boolean statisticsObjectCheck(ExecutorInfo statisticsObj1,
+ ExecutorInfo statisticsObj2, String caller, Integer result){
+ result = 0 ;
+ // both doesn't expose the info
+ if (null == statisticsObj1 && null == statisticsObj2){
+ logger.info(String.format("%s : neither of the executors exposed statistics info.",
+ caller));
+ return true;
+ }
+
+ //right side doesn't expose the info.
+ if (null == statisticsObj2 ){
+ logger.info(String.format("%s : choosing left side and the right side executor doesn't expose statistics info",
+ caller));
+ result = 1;
+ return true;
+ }
+
+ //left side doesn't expose the info.
+ if (null == statisticsObj1 ){
+ logger.info(String.format("%s : choosing right side and the left side executor doesn't expose statistics info",
+ caller));
+ result = -1;
+ return true;
+ }
+
+ // both not null
+ return false;
+ }
+
+ /**
+ * function defines the number of assigned flow comparator.
+ * @param weight weight of the comparator.
+ * */
+ private static FactorComparator<Executor> getNumberOfAssignedFlowComparator(int weight){
+ return FactorComparator.create(NUMOFASSIGNEDFLOW_COMPARATOR_NAME, weight, new Comparator<Executor>(){
+
+ @Override
+ public int compare(Executor o1, Executor o2) {
+ ExecutorInfo stat1 = o1.getExecutorInfo();
+ ExecutorInfo stat2 = o2.getExecutorInfo();
+
+ Integer result = 0;
+ if (statisticsObjectCheck(stat1,stat2,NUMOFASSIGNEDFLOW_COMPARATOR_NAME,result)){
+ return result;
+ }
+ return ((Integer)stat1.getRemainingFlowCapacity()).compareTo(stat2.getRemainingFlowCapacity());
+ }});
+ }
+
+ /**
+ * function defines the cpuUsage comparator.
+ * @param weight weight of the comparator.
+ * @return
+ * */
+ private static FactorComparator<Executor> getCpuUsageComparator(int weight){
+ return FactorComparator.create(CPUUSAGE_COMPARATOR_NAME, weight, new Comparator<Executor>(){
+
+ @Override
+ public int compare(Executor o1, Executor o2) {
+ ExecutorInfo stat1 = o1.getExecutorInfo();
+ ExecutorInfo stat2 = o2.getExecutorInfo();
+
+ int result = 0;
+ if (statisticsObjectCheck(stat1,stat2,CPUUSAGE_COMPARATOR_NAME,result)){
+ return result;
+ }
+
+ // CPU usage , the lesser the value is, the better.
+ return ((Double)stat2.getCpuUsage()).compareTo(stat1.getCpuUsage());
+ }});
+ }
+
+
+ /**
+ * function defines the last dispatched time comparator.
+ * @param weight weight of the comparator.
+ * @return
+ * */
+ private static FactorComparator<Executor> getLstDispatchedTimeComparator(int weight){
+ return FactorComparator.create(LSTDISPATCHED_COMPARATOR_NAME, weight, new Comparator<Executor>(){
+
+ @Override
+ public int compare(Executor o1, Executor o2) {
+ ExecutorInfo stat1 = o1.getExecutorInfo();
+ ExecutorInfo stat2 = o2.getExecutorInfo();
+
+ int result = 0;
+ if (statisticsObjectCheck(stat1,stat2,LSTDISPATCHED_COMPARATOR_NAME,result)){
+ return result;
+ }
+ // Note: an earlier date time indicates higher weight.
+ return ((Long)stat2.getLastDispatchedTime()).compareTo(stat1.getLastDispatchedTime());
+ }});
+ }
+
+
+ /**<pre>
+ * function defines the Memory comparator.
+ * Note: comparator firstly take the absolute value of the remaining memory, if both sides have the same value,
+ * it go further to check the percent of the remaining memory.
+ * </pre>
+ * @param weight weight of the comparator.
+
+ * @return
+ * */
+ private static FactorComparator<Executor> getMemoryComparator(int weight){
+ return FactorComparator.create(MEMORY_COMPARATOR_NAME, weight, new Comparator<Executor>(){
+
+ @Override
+ public int compare(Executor o1, Executor o2) {
+ ExecutorInfo stat1 = o1.getExecutorInfo();
+ ExecutorInfo stat2 = o2.getExecutorInfo();
+
+ int result = 0;
+ if (statisticsObjectCheck(stat1,stat2,MEMORY_COMPARATOR_NAME,result)){
+ return result;
+ }
+
+ if (stat1.getRemainingMemoryInMB() != stat2.getRemainingMemoryInMB()){
+ return stat1.getRemainingMemoryInMB() > stat2.getRemainingMemoryInMB() ? 1:-1;
+ }
+
+ return Double.compare(stat1.getRemainingMemoryPercent(), stat2.getRemainingMemoryPercent());
+ }});
+ }
+}
diff --git a/azkaban-common/src/main/java/azkaban/executor/selector/ExecutorFilter.java b/azkaban-common/src/main/java/azkaban/executor/selector/ExecutorFilter.java
new file mode 100644
index 0000000..031650e
--- /dev/null
+++ b/azkaban-common/src/main/java/azkaban/executor/selector/ExecutorFilter.java
@@ -0,0 +1,171 @@
+/*
+ * Copyright 2015 LinkedIn Corp.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package azkaban.executor.selector;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import azkaban.executor.ExecutableFlow;
+import azkaban.executor.Executor;
+import azkaban.executor.ExecutorInfo;
+
+/**
+ * De-normalized version of the candidateFilter, which also contains the implementation of the factor filters.
+ * */
+public final class ExecutorFilter extends CandidateFilter<Executor, ExecutableFlow> {
+ private static Map<String, FactorFilter<Executor, ExecutableFlow>> filterRepository = null;
+
+ /**
+ * Gets the name list of all available filters.
+ * @return the list of the names.
+ * */
+ public static Set<String> getAvailableFilterNames(){
+ return filterRepository.keySet();
+ }
+
+
+ // factor filter names.
+ private static final String STATICREMAININGFLOWSIZE_FILTER_NAME = "StaticRemainingFlowSize";
+ private static final String MINIMUMFREEMEMORY_FILTER_NAME = "MinimunFreeMemory";
+ private static final String CPUSTATUS_FILTER_NAME = "CpuStatus";
+
+ /**
+ * static initializer of the class.
+ * We will build the filter repository here.
+ * when a new filter is added, please do remember to register it here.
+ * */
+ static {
+ filterRepository = new HashMap<String, FactorFilter<Executor, ExecutableFlow>>();
+ filterRepository.put(STATICREMAININGFLOWSIZE_FILTER_NAME, getStaticRemainingFlowSizeFilter());
+ filterRepository.put(MINIMUMFREEMEMORY_FILTER_NAME, getMinimumReservedMemoryFilter());
+ filterRepository.put(CPUSTATUS_FILTER_NAME, getCpuStatusFilter());
+ }
+
+ /**
+ * constructor of the ExecutorFilter.
+ * @param filterList the list of filter to be registered, the parameter must be a not-empty and valid list object.
+ * */
+ public ExecutorFilter(List<String> filterList) {
+ // shortcut if the filter list is invalid. A little bit ugly to have to throw in constructor.
+ if (null == filterList || filterList.size() == 0){
+ logger.error("failed to initialize executor filter as the passed filter list is invalid or empty.");
+ throw new IllegalArgumentException("filterList");
+ }
+
+ // register the filters according to the list.
+ for (String filterName : filterList){
+ if (filterRepository.containsKey(filterName)){
+ this.registerFactorFilter(filterRepository.get(filterName));
+ } else {
+ logger.error(String.format("failed to initialize executor filter "+
+ "as the filter implementation for requested factor '%s' doesn't exist.",
+ filterName));
+ throw new IllegalArgumentException("filterList");
+ }
+ }
+ }
+
+ @Override
+ public String getName() {
+ return "ExecutorFilter";
+ }
+
+ /**<pre>
+ * function to register the static remaining flow size filter.
+ * NOTE : this is a static filter which means the filter will be filtering based on the system standard which is not
+ * Coming for the passed flow.
+ * Ideally this filter will make sure only the executor hasn't reached the Max allowed # of executing flows.
+ *</pre>
+ * */
+ private static FactorFilter<Executor, ExecutableFlow> getStaticRemainingFlowSizeFilter(){
+ return FactorFilter.create(STATICREMAININGFLOWSIZE_FILTER_NAME, new FactorFilter.Filter<Executor, ExecutableFlow>() {
+ public boolean filterTarget(Executor filteringTarget, ExecutableFlow referencingObject) {
+ if (null == filteringTarget){
+ logger.info(String.format("%s : filtering out the target as it is null.", STATICREMAININGFLOWSIZE_FILTER_NAME));
+ return false;
+ }
+
+ ExecutorInfo stats = filteringTarget.getExecutorInfo();
+ if (null == stats) {
+ logger.info(String.format("%s : filtering out %s as it's stats is unavailable.",
+ STATICREMAININGFLOWSIZE_FILTER_NAME,
+ filteringTarget.toString()));
+ return false;
+ }
+ return stats.getRemainingFlowCapacity() > 0 ;
+ }
+ });
+ }
+
+ /**
+ * function to register the static Minimum Reserved Memory filter.
+ * NOTE : this is a static filter which means the filter will be filtering based on the system standard which is not
+ * Coming for the passed flow.
+ * This filter will filter out any executors that has the remaining memory below 6G
+ * */
+ private static FactorFilter<Executor, ExecutableFlow> getMinimumReservedMemoryFilter(){
+ return FactorFilter.create(MINIMUMFREEMEMORY_FILTER_NAME, new FactorFilter.Filter<Executor, ExecutableFlow>() {
+ private static final int MINIMUM_FREE_MEMORY = 6 * 1024;
+ public boolean filterTarget(Executor filteringTarget, ExecutableFlow referencingObject) {
+ if (null == filteringTarget){
+ logger.info(String.format("%s : filtering out the target as it is null.", MINIMUMFREEMEMORY_FILTER_NAME));
+ return false;
+ }
+
+ ExecutorInfo stats = filteringTarget.getExecutorInfo();
+ if (null == stats) {
+ logger.info(String.format("%s : filtering out %s as it's stats is unavailable.",
+ MINIMUMFREEMEMORY_FILTER_NAME,
+ filteringTarget.toString()));
+ return false;
+ }
+ return stats.getRemainingMemoryInMB() > MINIMUM_FREE_MEMORY ;
+ }
+ });
+ }
+
+
+ /**
+ * function to register the static Minimum Reserved Memory filter.
+ * NOTE : <pre> this is a static filter which means the filter will be filtering based on the system standard which
+ * is not Coming for the passed flow.
+ * This filter will filter out any executors that the current CPU usage exceed 95%
+ * </pre>
+ * */
+ private static FactorFilter<Executor, ExecutableFlow> getCpuStatusFilter(){
+ return FactorFilter.create(CPUSTATUS_FILTER_NAME, new FactorFilter.Filter<Executor, ExecutableFlow>() {
+ private static final int MAX_CPU_CURRENT_USAGE = 95;
+ public boolean filterTarget(Executor filteringTarget, ExecutableFlow referencingObject) {
+ if (null == filteringTarget){
+ logger.info(String.format("%s : filtering out the target as it is null.", CPUSTATUS_FILTER_NAME));
+ return false;
+ }
+
+ ExecutorInfo stats = filteringTarget.getExecutorInfo();
+ if (null == stats) {
+ logger.info(String.format("%s : filtering out %s as it's stats is unavailable.",
+ MINIMUMFREEMEMORY_FILTER_NAME,
+ filteringTarget.toString()));
+ return false;
+ }
+ return stats.getCpuUsage() < MAX_CPU_CURRENT_USAGE ;
+ }
+ });
+ }
+}
diff --git a/azkaban-common/src/main/java/azkaban/executor/selector/ExecutorSelector.java b/azkaban-common/src/main/java/azkaban/executor/selector/ExecutorSelector.java
new file mode 100644
index 0000000..d7236e8
--- /dev/null
+++ b/azkaban-common/src/main/java/azkaban/executor/selector/ExecutorSelector.java
@@ -0,0 +1,45 @@
+/*
+ * Copyright 2015 LinkedIn Corp.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package azkaban.executor.selector;
+
+import java.util.List;
+import java.util.Map;
+
+import azkaban.executor.ExecutableFlow;
+import azkaban.executor.Executor;
+
+/**<pre>
+ * Executor selector class implementation.
+ * NOTE: This class is a de-generalized version of the CandidateSelector, which provides a
+ * clean and convenient constructor to take in filter and comparator name list and build
+ * the instance from that.
+ *</pre>
+ * */
+public class ExecutorSelector extends CandidateSelector<Executor, ExecutableFlow> {
+
+ /**
+ * Contractor of the class.
+ * @param filterList name list of the filters to be registered,
+ * filter feature will be disabled if a null value is passed.
+ * @param comparatorList name/weight pair list of the comparators to be registered ,
+ * again comparator feature is disabled if a null value is passed.
+ * */
+ public ExecutorSelector(List<String> filterList, Map<String,Integer> comparatorList) {
+ super(null == filterList || filterList.isEmpty() ? null : new ExecutorFilter(filterList),
+ null == comparatorList || comparatorList.isEmpty() ? null : new ExecutorComparator(comparatorList));
+ }
+}
diff --git a/azkaban-common/src/main/java/azkaban/executor/selector/FactorComparator.java b/azkaban-common/src/main/java/azkaban/executor/selector/FactorComparator.java
index 1d9d162..6d4d2e0 100644
--- a/azkaban-common/src/main/java/azkaban/executor/selector/FactorComparator.java
+++ b/azkaban-common/src/main/java/azkaban/executor/selector/FactorComparator.java
@@ -33,7 +33,7 @@ public final class FactorComparator<T>{
* method provided below.
* @param factorName : the factor name .
* @param weight : the weight of the comparator.
- * @ comparator : function to be provided by user on how the comparison should be made.
+ * @param comparator : function to be provided by user on how the comparison should be made.
* */
private FactorComparator(String factorName, int weight, Comparator<T> comparator){
this.factorName = factorName;
diff --git a/azkaban-common/src/main/java/azkaban/executor/selector/FactorFilter.java b/azkaban-common/src/main/java/azkaban/executor/selector/FactorFilter.java
index 74b9d5d..04b04b7 100644
--- a/azkaban-common/src/main/java/azkaban/executor/selector/FactorFilter.java
+++ b/azkaban-common/src/main/java/azkaban/executor/selector/FactorFilter.java
@@ -65,13 +65,11 @@ public final class FactorFilter<T,V>{
public interface Filter<T,V>{
/**function to analyze the target item according to the reference object to decide whether the item should be filtered.
- * @param filteringTarget: object to be checked.
- * @param referencingObject: object which contains statistics based on which a decision is made whether
+ * @param filteringTarget object to be checked.
+ * @param referencingObject object which contains statistics based on which a decision is made whether
* the object being checked need to be filtered or not.
* @return true if the check passed, false if check failed, which means the item need to be filtered.
* */
public boolean filterTarget(T filteringTarget, V referencingObject);
}
-
-
}
diff --git a/azkaban-common/src/main/java/azkaban/executor/selector/Selector.java b/azkaban-common/src/main/java/azkaban/executor/selector/Selector.java
index 610f75a..605fd28 100644
--- a/azkaban-common/src/main/java/azkaban/executor/selector/Selector.java
+++ b/azkaban-common/src/main/java/azkaban/executor/selector/Selector.java
@@ -19,13 +19,15 @@ package azkaban.executor.selector;
import java.util.List;
-/** Definition of the selector interface.
+/**<pre>
+ * Definition of the selector interface.
* an implementation of the selector interface provides the functionality
* to return a candidate from the candidateList that suits best for the dispatchingObject.
+ * </pre>
* @param K : type of the candidate.
* @param V : type of the dispatching object.
*/
-public interface Selector <K,V> {
+public interface Selector <K extends Comparable<K>,V> {
/** Function returns the next best suit candidate from the candidateList for the dispatching object.
* @param candidateList : List of the candidates to select from .
diff --git a/azkaban-common/src/test/java/azkaban/executor/SelectorTest.java b/azkaban-common/src/test/java/azkaban/executor/SelectorTest.java
index b5b4509..e54b182 100644
--- a/azkaban-common/src/test/java/azkaban/executor/SelectorTest.java
+++ b/azkaban-common/src/test/java/azkaban/executor/SelectorTest.java
@@ -20,6 +20,9 @@ import java.util.ArrayList;
import java.util.Collections;
import java.util.Comparator;
import java.util.Date;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
import org.apache.log4j.BasicConfigurator;
import org.junit.After;
@@ -29,10 +32,11 @@ import org.junit.BeforeClass;
import org.junit.Test;
import azkaban.executor.selector.*;
+import azkaban.utils.JSONUtils;
public class SelectorTest {
// mock executor object.
- protected class MockExecutorObject{
+ protected class MockExecutorObject implements Comparable <MockExecutorObject>{
public String name;
public int port;
public double percentOfRemainingMemory;
@@ -66,6 +70,11 @@ public class SelectorTest {
{
return this.name;
}
+
+ @Override
+ public int compareTo(MockExecutorObject o) {
+ return null == o ? 1 : this.hashCode() - o.hashCode();
+ }
}
// Mock flow object.
@@ -602,4 +611,128 @@ public class SelectorTest {
Assert.assertTrue(null == executor);
}
+
+ @Test
+ public void testCreatingExectorfilterObject() throws Exception{
+ List<String> validList = new ArrayList<String>(ExecutorFilter.getAvailableFilterNames());
+ try {
+ new ExecutorFilter(validList);
+ }catch (Exception ex){
+ Assert.fail("creating ExecutorFilter with valid list throws exception . ex -" + ex.getMessage());
+ }
+ }
+
+ @Test
+ public void testCreatingExectorfilterObjectWInvalidList() throws Exception{
+ List<String> invalidList = new ArrayList<String>();
+ invalidList.add("notExistingFilter");
+ Exception result = null;
+ try {
+ new ExecutorFilter(invalidList);
+ }catch (Exception ex){
+ if (ex instanceof IllegalArgumentException)
+ result = ex;
+ }
+ Assert.assertNotNull(result);
+ }
+
+ @Test
+ public void testCreatingExectorComparatorObject() throws Exception{
+ Map<String,Integer> comparatorMap = new HashMap<String,Integer>();
+ for (String name : ExecutorComparator.getAvailableComparatorNames()){
+ comparatorMap.put(name, 1);
+ }
+ try {
+ new ExecutorComparator(comparatorMap);
+ }catch (Exception ex){
+ Assert.fail("creating ExecutorComparator with valid list throws exception . ex -" + ex.getMessage());
+ }
+ }
+
+ @Test
+ public void testCreatingExectorComparatorObjectWInvalidName() throws Exception{
+ Map<String,Integer> comparatorMap = new HashMap<String,Integer>();
+ comparatorMap.put("invalidName", 0);
+ Exception result = null;
+ try {
+ new ExecutorComparator(comparatorMap);
+ }catch (Exception ex){
+ if (ex instanceof IllegalArgumentException)
+ result = ex;
+ }
+ Assert.assertNotNull(result);
+ }
+
+ @Test
+ public void testCreatingExectorComparatorObjectWInvalidWeight() throws Exception{
+ Map<String,Integer> comparatorMap = new HashMap<String,Integer>();
+ for (String name : ExecutorComparator.getAvailableComparatorNames()){
+ comparatorMap.put(name, -1);
+ }
+ Exception result = null;
+ try {
+ new ExecutorComparator(comparatorMap);
+ }catch (Exception ex){
+ if (ex instanceof IllegalArgumentException)
+ result = ex;
+ }
+ Assert.assertNotNull(result);
+ }
+
+ @Test
+ public void testCreatingExecutorSelectorWithEmptyFilterComparatorList() throws Exception{
+ List<Executor> executorList = new ArrayList<Executor>();
+ executorList.add(new Executor(1, "host1", 80, true));
+ executorList.add(new Executor(2, "host2", 80, true));
+ executorList.add(new Executor(3, "host3", 80, true));
+
+ executorList.get(0).setExecutorInfo(new ExecutorInfo(99.9, 14095, 50, System.currentTimeMillis(), 89, 0));
+ executorList.get(1).setExecutorInfo(new ExecutorInfo(50, 14095, 50, System.currentTimeMillis(), 90, 0));
+ executorList.get(2).setExecutorInfo(new ExecutorInfo(99.9, 14095, 50, System.currentTimeMillis(), 90, 0));
+
+ ExecutableFlow flow = new ExecutableFlow();
+
+ ExecutorSelector selector = new ExecutorSelector(null , null);
+ Executor executor = selector.getBest(executorList, flow);
+ Assert.assertEquals(executorList.get(2), executor);
+ }
+
+
+ @Test
+ public void testExecutorSelectorE2E() throws Exception{
+ List<String> filterList = new ArrayList<String>(ExecutorFilter.getAvailableFilterNames());
+ Map<String,Integer> comparatorMap = new HashMap<String,Integer>();
+ List<Executor> executorList = new ArrayList<Executor>();
+ executorList.add(new Executor(1, "host1", 80, true));
+ executorList.add(new Executor(2, "host2", 80, true));
+ executorList.add(new Executor(3, "host3", 80, true));
+
+ executorList.get(0).setExecutorInfo(new ExecutorInfo(99.9, 14095, 50, System.currentTimeMillis(), 89, 0));
+ executorList.get(1).setExecutorInfo(new ExecutorInfo(50, 14095, 50, System.currentTimeMillis(), 90, 0));
+ executorList.get(2).setExecutorInfo(new ExecutorInfo(99.9, 14095, 50, System.currentTimeMillis(), 90, 0));
+
+ ExecutableFlow flow = new ExecutableFlow();
+
+ for (String name : ExecutorComparator.getAvailableComparatorNames()){
+ comparatorMap.put(name, 1);
+ }
+ ExecutorSelector selector = new ExecutorSelector(filterList,comparatorMap);
+ Executor executor = selector.getBest(executorList, flow);
+ Assert.assertEquals(executorList.get(0), executor);
+
+ // simulate that once the flow is assigned, executor1's remaining TMP storage dropped to 2048
+ // now we do the getBest again executor3 is expected to be selected as it has a earlier last dispatched time.
+ executorList.get(0).setExecutorInfo(new ExecutorInfo(99.9, 4095, 50, System.currentTimeMillis(), 90, 1));
+ executor = selector.getBest(executorList, flow);
+ Assert.assertEquals(executorList.get(2), executor);
+ }
+
+ @Test
+ public void testExecutorInfoJsonParser() throws Exception{
+ ExecutorInfo exeInfo = new ExecutorInfo(99.9, 14095, 50, System.currentTimeMillis(), 89, 10);
+ String json = JSONUtils.toJSON(exeInfo);
+ ExecutorInfo exeInfo2 = ExecutorInfo.fromJSONString(json);
+ Assert.assertTrue(exeInfo.equals(exeInfo2));
+ }
+
}
diff --git a/azkaban-common/src/test/java/azkaban/utils/cache/CacheTest.java b/azkaban-common/src/test/java/azkaban/utils/cache/CacheTest.java
index 905cf5e..125f1b3 100644
--- a/azkaban-common/src/test/java/azkaban/utils/cache/CacheTest.java
+++ b/azkaban-common/src/test/java/azkaban/utils/cache/CacheTest.java
@@ -84,9 +84,9 @@ public class CacheTest {
@Test
public void testTimeToLiveExpiry() {
- CacheManager.setUpdateFrequency(200);
CacheManager manager = CacheManager.getInstance();
Cache cache = manager.createCache();
+ CacheManager.setUpdateFrequency(200);
cache.setUpdateFrequencyMs(200);
cache.setEjectionPolicy(EjectionPolicy.FIFO);
@@ -122,9 +122,9 @@ public class CacheTest {
@Test
public void testIdleExpireExpiry() {
- CacheManager.setUpdateFrequency(250);
CacheManager manager = CacheManager.getInstance();
Cache cache = manager.createCache();
+ CacheManager.setUpdateFrequency(250);
cache.setUpdateFrequencyMs(250);
cache.setEjectionPolicy(EjectionPolicy.FIFO);
diff --git a/azkaban-execserver/src/main/java/azkaban/execapp/AzkabanExecutorServer.java b/azkaban-execserver/src/main/java/azkaban/execapp/AzkabanExecutorServer.java
index 6e012ab..235989d 100644
--- a/azkaban-execserver/src/main/java/azkaban/execapp/AzkabanExecutorServer.java
+++ b/azkaban-execserver/src/main/java/azkaban/execapp/AzkabanExecutorServer.java
@@ -131,6 +131,7 @@ public class AzkabanExecutorServer {
root.addServlet(new ServletHolder(new ExecutorServlet()), "/executor");
root.addServlet(new ServletHolder(new JMXHttpServlet()), "/jmx");
root.addServlet(new ServletHolder(new StatsServlet()), "/stats");
+ root.addServlet(new ServletHolder(new ServerStatisticsServlet()), "/serverstastics");
root.setAttribute(ServerConstants.AZKABAN_SERVLET_CONTEXT_KEY, this);
@@ -175,7 +176,7 @@ public class AzkabanExecutorServer {
/**
* Configure Metric Reporting as per azkaban.properties settings
- *
+ *
* @throws MetricException
*/
private void configureMetricReports() throws MetricException {
@@ -224,13 +225,13 @@ public class AzkabanExecutorServer {
/**
* Load a custom class, which is provided by a configuration
* CUSTOM_JMX_ATTRIBUTE_PROCESSOR_PROPERTY.
- *
+ *
* This method will try to instantiate an instance of this custom class and
* with given properties as the argument in the constructor.
- *
+ *
* Basically the custom class must have a constructor that takes an argument
* with type Properties.
- *
+ *
* @param props
*/
private void loadCustomJMXAttributeProcessor(Props props) {
diff --git a/azkaban-execserver/src/main/java/azkaban/execapp/FlowRunnerManager.java b/azkaban-execserver/src/main/java/azkaban/execapp/FlowRunnerManager.java
index c8d7f1c..361db00 100644
--- a/azkaban-execserver/src/main/java/azkaban/execapp/FlowRunnerManager.java
+++ b/azkaban-execserver/src/main/java/azkaban/execapp/FlowRunnerManager.java
@@ -23,6 +23,7 @@ import java.io.IOException;
import java.lang.Thread.State;
import java.util.ArrayList;
import java.util.Collections;
+import java.util.Date;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
@@ -142,6 +143,9 @@ public class FlowRunnerManager implements EventListener,
private Object executionDirDeletionSync = new Object();
+ // date time of the the last flow submitted.
+ private long lastFlowSubmittedDate = 0;
+
public FlowRunnerManager(Props props, ExecutorLoader executorLoader,
ProjectLoader projectLoader, ClassLoader parentClassLoader)
throws IOException {
@@ -254,6 +258,13 @@ public class FlowRunnerManager implements EventListener,
return allProjects;
}
+ public long getLastFlowSubmittedTime(){
+ // Note: this is not thread safe and may result in providing dirty data.
+ // we will provide this data as is for now and will revisit if there
+ // is a string justification for change.
+ return lastFlowSubmittedDate;
+ }
+
public Props getGlobalProps() {
return globalProps;
}
@@ -507,6 +518,8 @@ public class FlowRunnerManager implements EventListener,
Future<?> future = executorService.submit(runner);
// keep track of this future
submittedFlows.put(future, runner.getExecutionId());
+ // update the last submitted time.
+ this.lastFlowSubmittedDate = System.currentTimeMillis();
} catch (RejectedExecutionException re) {
throw new ExecutorManagerException(
"Azkaban server can't execute any more flows. "
@@ -517,7 +530,7 @@ public class FlowRunnerManager implements EventListener,
/**
* Configure Azkaban metrics tracking for a new flowRunner instance
- *
+ *
* @param flowRunner
*/
private void configureFlowLevelMetrics(FlowRunner flowRunner) {
diff --git a/azkaban-execserver/src/main/java/azkaban/execapp/ServerStatisticsServlet.java b/azkaban-execserver/src/main/java/azkaban/execapp/ServerStatisticsServlet.java
new file mode 100644
index 0000000..5566837
--- /dev/null
+++ b/azkaban-execserver/src/main/java/azkaban/execapp/ServerStatisticsServlet.java
@@ -0,0 +1,255 @@
+/*
+ * Copyright 2015 LinkedIn Corp.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package azkaban.execapp;
+
+import java.io.File;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.InputStreamReader;
+import java.util.ArrayList;
+import java.util.List;
+
+import javax.servlet.ServletException;
+import javax.servlet.http.HttpServlet;
+import javax.servlet.http.HttpServletRequest;
+import javax.servlet.http.HttpServletResponse;
+
+import org.apache.log4j.Logger;
+
+import azkaban.executor.ExecutorInfo;
+import azkaban.utils.JSONUtils;
+
+public class ServerStatisticsServlet extends HttpServlet {
+ private static final long serialVersionUID = 1L;
+ private static final int cacheTimeInMilliseconds = 1000;
+ private static final int samplesToTakeForMemory = 1;
+ private static final Logger logger = Logger.getLogger(ServerStatisticsServlet.class);
+ private static final String noCacheParamName = "nocache";
+
+ protected static long lastRefreshedTime = 0;
+ protected static ExecutorInfo cachedstats = null;
+
+ /**
+ * Handle all get request to Statistics Servlet {@inheritDoc}
+ *
+ * @see javax.servlet.http.HttpServlet#doGet(javax.servlet.http.HttpServletRequest,
+ * javax.servlet.http.HttpServletResponse)
+ */
+ protected void doGet(HttpServletRequest req, HttpServletResponse resp)
+ throws ServletException, IOException {
+
+ boolean noCache = null!= req && Boolean.valueOf(req.getParameter(noCacheParamName));
+
+ if (noCache || System.currentTimeMillis() - lastRefreshedTime > cacheTimeInMilliseconds){
+ this.populateStatistics(noCache);
+ }
+
+ JSONUtils.toJSON(cachedstats, resp.getOutputStream(), true);
+ }
+
+ /**
+ * fill the result set with the percent of the remaining system memory on the server.
+ * @param stats reference to the result container which contains all the results, this specific method
+ * will only work work on the property "remainingMemory" and "remainingMemoryPercent".
+ *
+ * NOTE:
+ * a double value will be used to present the remaining memory,
+ * a returning value of '55.6' means 55.6%
+ */
+ protected void fillRemainingMemoryPercent(ExecutorInfo stats){
+ if (new File("/bin/bash").exists()
+ && new File("/bin/cat").exists()
+ && new File("/bin/grep").exists()
+ && new File("/proc/meminfo").exists()) {
+ java.lang.ProcessBuilder processBuilder =
+ new java.lang.ProcessBuilder("/bin/bash", "-c", "/bin/cat /proc/meminfo | grep -E \"MemTotal|MemFree\"");
+ try {
+ ArrayList<String> output = new ArrayList<String>();
+ Process process = processBuilder.start();
+ process.waitFor();
+ InputStream inputStream = process.getInputStream();
+ try {
+ java.io.BufferedReader reader =
+ new java.io.BufferedReader(new InputStreamReader(inputStream));
+ String line = null;
+ while ((line = reader.readLine()) != null) {
+ output.add(line);
+ }
+ }finally {
+ inputStream.close();
+ }
+
+ long totalMemory = 0;
+ long freeMemory = 0;
+
+ // process the output from bash call.
+ // we expect the result from the bash call to be something like following -
+ // MemTotal: 65894264 kB
+ // MemFree: 61104076 kB
+ if (output.size() == 2) {
+ for (String result : output){
+ // find the total memory and value the variable.
+ if (result.contains("MemTotal") && result.split("\\s+").length > 2){
+ try {
+ totalMemory = Long.parseLong(result.split("\\s+")[1]);
+ logger.info("Total memory : " + totalMemory);
+ }catch(NumberFormatException e){
+ logger.error("yielding 0 for total memory as output is invalid -" + result);
+ }
+ }
+ // find the free memory and value the variable.
+ if (result.contains("MemFree") && result.split("\\s+").length > 2){
+ try {
+ freeMemory = Long.parseLong(result.split("\\s+")[1]);
+ logger.info("Free memory : " + freeMemory);
+ }catch(NumberFormatException e){
+ logger.error("yielding 0 for total memory as output is invalid -" + result);
+ }
+ }
+ }
+ }else {
+ logger.error("failed to get total/free memory info as the bash call returned invalid result.");
+ }
+
+ // the number got from the proc file is in KBs we want to see the number in MBs so we are deviding it by 1024.
+ stats.setRemainingMemoryInMB(freeMemory/1024);
+ stats.setRemainingMemoryPercent(totalMemory == 0 ? 0 : ((double)freeMemory / (double)totalMemory)*100);
+ }
+ catch (Exception ex){
+ logger.error("failed fetch system memory info " +
+ "as exception is captured when fetching result from bash call. Ex -" + ex.getMessage());
+ }
+ } else {
+ logger.error("failed fetch system memory info, one or more files from the following list are missing - " +
+ "'/bin/bash'," + "'/bin/cat'," +"'/proc/loadavg'");
+ }
+ }
+
+ /**
+ * call the data providers to fill the returning data container for statistics data.
+ * This function refreshes the static cached copy of data in case if necessary.
+ * */
+ protected synchronized void populateStatistics(boolean noCache){
+ //check again before starting the work.
+ if (noCache || System.currentTimeMillis() - lastRefreshedTime > cacheTimeInMilliseconds){
+ final ExecutorInfo stats = new ExecutorInfo();
+
+ List<Thread> workerPool = new ArrayList<Thread>();
+ workerPool.add(new Thread(new Runnable(){ public void run() {
+ fillRemainingMemoryPercent(stats); }},"RemainingMemoryPercent"));
+
+ workerPool.add(new Thread(new Runnable(){ public void run() {
+ fillRemainingFlowCapacityAndLastDispatchedTime(stats); }},"RemainingFlowCapacityAndLastDispatchedTime"));
+
+ workerPool.add(new Thread(new Runnable(){ public void run() {
+ fillCpuUsage(stats); }},"CpuUsage"));
+
+ // start all the working threads.
+ for (Thread thread : workerPool){thread.start();}
+
+ // wait for all the threads to finish their work.
+ // NOTE: the result container itself is not thread safe, we are good as for now no
+ // working thread will modify the same property, nor have any of them
+ // need to compute values based on value(s) of other properties.
+ for (Thread thread : workerPool){
+ try {
+ // we gave maxim 5 seconds to let the thread finish work.
+ thread.join(5000);;
+ } catch (InterruptedException e) {
+ logger.error(String.format("failed to collect information for %s as the working thread is interrupted. Ex - ",
+ thread.getName(), e.getMessage()));
+ }}
+
+ cachedstats = stats;
+ lastRefreshedTime = System.currentTimeMillis();
+ }
+ }
+
+ /**
+ * fill the result set with the remaining flow capacity .
+ * @param stats reference to the result container which contains all the results, this specific method
+ * will only work on the property "remainingFlowCapacity".
+ */
+ protected void fillRemainingFlowCapacityAndLastDispatchedTime(ExecutorInfo stats){
+
+ AzkabanExecutorServer server = AzkabanExecutorServer.getApp();
+ if (server != null){
+ FlowRunnerManager runnerMgr = AzkabanExecutorServer.getApp().getFlowRunnerManager();
+ int assignedFlows = runnerMgr.getNumRunningFlows() + runnerMgr.getNumQueuedFlows();
+ stats.setRemainingFlowCapacity(runnerMgr.getMaxNumRunningFlows() - assignedFlows);
+ stats.setNumberOfAssignedFlows(assignedFlows);
+ stats.setLastDispatchedTime(runnerMgr.getLastFlowSubmittedTime());
+ }else {
+ logger.error("failed to get data for remaining flow capacity or LastDispatchedTime" +
+ " as the AzkabanExecutorServer has yet been initialized.");
+ }
+ }
+
+
+ /**<pre>
+ * fill the result set with the Remaining temp Storage .
+ * Note : As the Top bash call doesn't yield accurate result for the system load,
+ * the implementation has been changed to load from the "proc/loadavg" which keeps
+ * the moving average of the system load, we are pulling the average for the recent 1 min.
+ *</pre>
+ * @param stats reference to the result container which contains all the results, this specific method
+ * will only work on the property "cpuUdage".
+ */
+ protected void fillCpuUsage(ExecutorInfo stats){
+ if (new File("/bin/bash").exists() && new File("/bin/cat").exists() && new File("/proc/loadavg").exists()) {
+ java.lang.ProcessBuilder processBuilder =
+ new java.lang.ProcessBuilder("/bin/bash", "-c", "/bin/cat /proc/loadavg");
+ try {
+ ArrayList<String> output = new ArrayList<String>();
+ Process process = processBuilder.start();
+ process.waitFor();
+ InputStream inputStream = process.getInputStream();
+ try {
+ java.io.BufferedReader reader =
+ new java.io.BufferedReader(new InputStreamReader(inputStream));
+ String line = null;
+ while ((line = reader.readLine()) != null) {
+ output.add(line);
+ }
+ }finally {
+ inputStream.close();
+ }
+
+ // process the output from bash call.
+ if (output.size() > 0) {
+ String[] splitedresult = output.get(0).split("\\s+");
+ double cpuUsage = 0.0;
+
+ try {
+ cpuUsage = Double.parseDouble(splitedresult[0]);
+ }catch(NumberFormatException e){
+ logger.error("yielding 0.0 for CPU usage as output is invalid -" + output.get(0));
+ }
+ logger.info("System load : " + cpuUsage);
+ stats.setCpuUpsage(cpuUsage);
+ }
+ }
+ catch (Exception ex){
+ logger.error("failed fetch system load info " +
+ "as exception is captured when fetching result from bash call. Ex -" + ex.getMessage());
+ }
+ } else {
+ logger.error("failed fetch system load info, one or more files from the following list are missing - " +
+ "'/bin/bash'," + "'/bin/cat'," +"'/proc/loadavg'");
+ }
+ }
+}
diff --git a/azkaban-execserver/src/test/java/azkaban/execapp/StatisticsServletTest.java b/azkaban-execserver/src/test/java/azkaban/execapp/StatisticsServletTest.java
new file mode 100644
index 0000000..55078b5
--- /dev/null
+++ b/azkaban-execserver/src/test/java/azkaban/execapp/StatisticsServletTest.java
@@ -0,0 +1,76 @@
+package azkaban.execapp;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+import azkaban.executor.ExecutorInfo;
+
+public class StatisticsServletTest {
+ private class MockStatisticsServlet extends ServerStatisticsServlet{
+ /** */
+ private static final long serialVersionUID = 1L;
+
+ public ExecutorInfo getStastics(){
+ return cachedstats;
+ }
+
+ public long getUpdatedTime(){
+ return lastRefreshedTime;
+ }
+
+ public void callPopulateStatistics(){
+ this.populateStatistics(false);
+ }
+
+ public void callFillCpuUsage(ExecutorInfo stats){
+ this.fillCpuUsage(stats);}
+
+ public void callFillRemainingMemoryPercent(ExecutorInfo stats){
+ this.fillRemainingMemoryPercent(stats);}
+ }
+ private MockStatisticsServlet statServlet = new MockStatisticsServlet();
+
+ @Test
+ public void testFillMemory() {
+ ExecutorInfo stats = new ExecutorInfo();
+ this.statServlet.callFillRemainingMemoryPercent(stats);
+ // assume any machine that runs this test should
+ // have bash and top available and at least got some remaining memory.
+ Assert.assertTrue(stats.getRemainingMemoryInMB() > 0);
+ Assert.assertTrue(stats.getRemainingMemoryPercent() > 0);
+ }
+
+ @Test
+ public void testFillCpu() {
+ ExecutorInfo stats = new ExecutorInfo();
+ this.statServlet.callFillCpuUsage(stats);
+ Assert.assertTrue(stats.getCpuUsage() > 0);
+ }
+
+ @Test
+ public void testPopulateStatistics() {
+ this.statServlet.callPopulateStatistics();
+ Assert.assertNotNull(this.statServlet.getStastics());
+ Assert.assertTrue(this.statServlet.getStastics().getRemainingMemoryInMB() > 0);
+ Assert.assertTrue(this.statServlet.getStastics().getRemainingMemoryPercent() > 0);
+ Assert.assertTrue(this.statServlet.getStastics().getCpuUsage() > 0);
+ }
+
+ @Test
+ public void testPopulateStatisticsCache() {
+ this.statServlet.callPopulateStatistics();
+ final long updatedTime = this.statServlet.getUpdatedTime();
+ while (System.currentTimeMillis() - updatedTime < 1000){
+ this.statServlet.callPopulateStatistics();
+ Assert.assertEquals(updatedTime, this.statServlet.getUpdatedTime());
+ }
+
+ try {
+ Thread.sleep(1000);
+ } catch (InterruptedException e) {}
+
+ // make sure cache expires after timeout.
+ this.statServlet.callPopulateStatistics();
+ Assert.assertNotEquals(updatedTime, this.statServlet.getUpdatedTime());
+ }
+}