azkaban-developers
Changes
azkaban-webserver/.gitignore 1(+1 -0)
Details
diff --git a/azkaban-common/src/main/java/azkaban/executor/ActiveExecutorsManager.java b/azkaban-common/src/main/java/azkaban/executor/ActiveExecutorsManager.java
new file mode 100644
index 0000000..c904ff0
--- /dev/null
+++ b/azkaban-common/src/main/java/azkaban/executor/ActiveExecutorsManager.java
@@ -0,0 +1,9 @@
+package azkaban.executor;
+
+public class ActiveExecutorsManager {
+
+ public ActiveExecutorsManager() {
+ // TODO Auto-generated constructor stub
+ }
+
+}
diff --git a/azkaban-common/src/main/java/azkaban/executor/Executor.java b/azkaban-common/src/main/java/azkaban/executor/Executor.java
index e21ae2a..f0600ab 100644
--- a/azkaban-common/src/main/java/azkaban/executor/Executor.java
+++ b/azkaban-common/src/main/java/azkaban/executor/Executor.java
@@ -92,9 +92,9 @@ public class Executor implements Comparable<Executor> {
@Override
public String toString(){
- return String.format("%s (id: %s)",
- null == this.host || this.host.length() == 0 ? "(empty)" : this.host,
- this.id);
+ return String.format("%s:%s (id: %s)",
+ null == this.host || this.host.length() == 0 ? "(empty)" : this.host,
+ this.port, this.id);
}
public String getHost() {
diff --git a/azkaban-common/src/main/java/azkaban/executor/ExecutorManager.java b/azkaban-common/src/main/java/azkaban/executor/ExecutorManager.java
index bf87497..7fbae66 100644
--- a/azkaban-common/src/main/java/azkaban/executor/ExecutorManager.java
+++ b/azkaban-common/src/main/java/azkaban/executor/ExecutorManager.java
@@ -30,10 +30,14 @@ import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.TreeMap;
+import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
import org.apache.commons.lang.StringUtils;
import org.apache.log4j.Logger;
@@ -43,6 +47,8 @@ import azkaban.alert.Alerter;
import azkaban.event.Event;
import azkaban.event.Event.Type;
import azkaban.event.EventHandler;
+import azkaban.executor.selector.ExecutorComparator;
+import azkaban.executor.selector.ExecutorFilter;
import azkaban.executor.selector.ExecutorSelector;
import azkaban.project.Project;
import azkaban.project.ProjectWhitelist;
@@ -71,8 +77,10 @@ public class ExecutorManager extends EventHandler implements
"azkaban.webserver.queue.size";
private static final String AZKABAN_ACTIVE_EXECUTOR_REFRESHINTERVAL_IN_MS =
"azkaban.activeexecutor.refresh.milisecinterval";
- private static final String AZKABAN_ACTIVE_EXECUTOR_REFRESHINTERVAL_IN_FLOW =
+ private static final String AZKABAN_ACTIVE_EXECUTOR_REFRESHINTERVAL_IN_NUM_FLOW =
"azkaban.activeexecutor.refresh.flowinterval";
+ private static final String AZKABAN_EXECUTORINFO_REFRESH_MAX_THREADS =
+ "azkaban.executorinfo.refresh.maxThreads";
private static Logger logger = Logger.getLogger(ExecutorManager.class);
private ExecutorLoader executorLoader;
@@ -102,9 +110,11 @@ public class ExecutorManager extends EventHandler implements
File cacheDir;
- final Props azkProps;
- List<String> filterList;
- Map<String, Integer> comparatorWeightsMap;
+ private final Props azkProps;
+ private List<String> filterList;
+ private Map<String, Integer> comparatorWeightsMap;
+ private long lastSuccessfulExecutorInfoRefresh;
+ private ExecutorService executorInforRefresherService;
public ExecutorManager(Props props, ExecutorLoader loader,
Map<String, Alerter> alters) throws ExecutorManagerException {
@@ -155,12 +165,16 @@ public class ExecutorManager extends EventHandler implements
}
}
+ executorInforRefresherService =
+ Executors.newFixedThreadPool(azkProps.getInt(
+ AZKABAN_EXECUTORINFO_REFRESH_MAX_THREADS, 5));
+
// configure queue processor
queueProcessor =
new QueueProcessorThread(azkProps.getBoolean(
AZKABAN_QUEUEPROCESSING_ENABLED, true), azkProps.getLong(
AZKABAN_ACTIVE_EXECUTOR_REFRESHINTERVAL_IN_MS, 1000), azkProps.getInt(
- AZKABAN_ACTIVE_EXECUTOR_REFRESHINTERVAL_IN_FLOW, 1000));
+ AZKABAN_ACTIVE_EXECUTOR_REFRESHINTERVAL_IN_NUM_FLOW, 1000));
queueProcessor.start();
}
@@ -195,8 +209,10 @@ public class ExecutorManager extends EventHandler implements
}
if (newExecutors.isEmpty()) {
+ logger.error("No active executor found");
throw new ExecutorManagerException("No active executor found");
} else if(newExecutors.size() > 1 && !isMultiExecutorMode()) {
+ logger.error("Multiple local executors specified");
throw new ExecutorManagerException("Multiple local executors specified");
} else {
// clear all active executors, only if we have at least one new active
@@ -215,34 +231,46 @@ public class ExecutorManager extends EventHandler implements
*/
private void refreshExecutors() {
synchronized (activeExecutors) {
- ExecutorService taskExecutors =
- Executors.newFixedThreadPool(activeExecutors.size());
- for (final Executor executor : activeExecutors) {
- // execute each executorInfo refresh task
- taskExecutors.execute(new Runnable() {
- @Override
- public void run() {
- try {
- String jsonResponse =
- callExecutorForJsonString(executor.getHost(),
- executor.getPort(), "/serverstastics", null);
- executor.setExecutorInfo(ExecutorInfo
- .fromJSONString(jsonResponse));
- } catch (Exception e) {
- logger.error("Failed to update ExecutorInfo executorId :"
- + executor, e);
+ List<Pair<Executor, Future<String>>> futures =
+ new ArrayList<Pair<Executor, Future<String>>>();
+ for (final Executor executor : activeExecutors) {
+ // execute each executorInfo refresh task to fetch
+ Future<String> fetchExecutionInfo =
+ executorInforRefresherService.submit(new Callable<String>() {
+ @Override
+ public String call() throws Exception {
+ return callExecutorForJsonString(executor.getHost(),
+ executor.getPort(), "/serverstastics", null);
}
- }
- });
+ });
+ futures.add(new Pair<Executor, Future<String>>(executor,
+ fetchExecutionInfo));
+ }
+
+ boolean wasSuccess = true;
+ for (Pair<Executor, Future<String>> refreshPair : futures) {
+ Executor executor = refreshPair.getFirst();
+ try {
+ // max 5 secs
+ String jsonString = refreshPair.getSecond().get(5, TimeUnit.SECONDS);
+ executor.setExecutorInfo(ExecutorInfo.fromJSONString(jsonString));
+ logger.info("Successfully refreshed ExecutorInfo for executor: "
+ + executor);
+ } catch (TimeoutException e) {
+ wasSuccess = false;
+ logger.error("Timed out while waiting for ExecutorInfo refresh"
+ + executor, e);
+ } catch (Exception e) {
+ wasSuccess = false;
+ logger.error("Failed to update ExecutorInfo for executor : "
+ + executor, e);
+ }
}
- taskExecutors.shutdown();
- try {
- // wait 5 seconds for all executors to be refreshed
- taskExecutors.awaitTermination(5, TimeUnit.SECONDS);
- } catch (InterruptedException e) {
- logger.error("Timed out while waiting for executorInfo refresh", e);
+ // update is successful for all executors
+ if (wasSuccess) {
+ lastSuccessfulExecutorInfoRefresh = System.currentTimeMillis();
}
}
}
@@ -297,6 +325,34 @@ public class ExecutorManager extends EventHandler implements
return false;
}
+ /**
+ * Return last Successful ExecutorInfo Refresh for all active executors
+ *
+ * @return
+ */
+ public long getLastSuccessfulExecutorInfoRefresh() {
+ return this.lastSuccessfulExecutorInfoRefresh;
+ }
+
+ /**
+ * Get currently supported Comparators available to use via azkaban.properties
+ *
+ * @return
+ */
+ public Set<String> getAvailableExecutorComparatorNames() {
+ return ExecutorComparator.getAvailableComparatorNames();
+
+ }
+
+ /**
+ * Get currently supported filters available to use via azkaban.properties
+ *
+ * @return
+ */
+ public Set<String> getAvailableExecutorFilterNames() {
+ return ExecutorFilter.getAvailableFilterNames();
+ }
+
@Override
public State getExecutorManagerThreadState() {
return executingManager.getState();
@@ -1143,7 +1199,10 @@ public class ExecutorManager extends EventHandler implements
List<Pair<String, String>> paramList =
new ArrayList<Pair<String, String>>();
- paramList.add(new Pair<String, String>(ConnectorParams.JMX_MBEAN, mBean));
+ paramList.add(new Pair<String, String>(action, ""));
+ if(mBean != null) {
+ paramList.add(new Pair<String, String>(ConnectorParams.JMX_MBEAN, mBean));
+ }
String[] hostPortSplit = hostPort.split(":");
return callExecutorForJsonObject(hostPortSplit[0],
@@ -1777,9 +1836,9 @@ public class ExecutorManager extends EventHandler implements
}
/* Helper method to fetch overriding Executor, if a valid user has specifed otherwise return null */
- private Executor getUserSpecifiedExecutor(ExecutableFlow exflow) {
+ private Executor getUserSpecifiedExecutor(ExecutionOptions options,
+ int executionId) {
Executor executor = null;
- ExecutionOptions options = exflow.getExecutionOptions();
if (options != null
&& options.getFlowParameters() != null
&& options.getFlowParameters().containsKey(
@@ -1795,13 +1854,19 @@ public class ExecutorManager extends EventHandler implements
.warn(String
.format(
"User specified executor id: %d for execution id: %d is not active, Looking up db.",
- executorId, exflow.getExecutionId()));
+ executorId, executionId));
executor = executorLoader.fetchExecutor(executorId);
+ if (executor == null) {
+ logger
+ .warn(String
+ .format(
+ "User specified executor id: %d for execution id: %d is missing from db. Defaulting to availableExecutors",
+ executorId, executionId));
+ }
}
-
- } catch (Exception ex) {
+ } catch (ExecutorManagerException ex) {
logger.error("Failed to fetch user specified executor for exec_id = "
- + exflow.getExecutionId(), ex);
+ + executionId, ex);
}
}
return executor;
@@ -1810,7 +1875,9 @@ public class ExecutorManager extends EventHandler implements
/* Choose Executor for exflow among the available executors */
private Executor selectExecutor(ExecutableFlow exflow,
Set<Executor> availableExecutors) {
- Executor choosenExecutor = getUserSpecifiedExecutor(exflow);
+ Executor choosenExecutor =
+ getUserSpecifiedExecutor(exflow.getExecutionOptions(),
+ exflow.getExecutionId());
// If no executor was specified by admin
if (choosenExecutor == null) {
diff --git a/azkaban-common/src/main/java/azkaban/jmx/JmxExecutorManager.java b/azkaban-common/src/main/java/azkaban/jmx/JmxExecutorManager.java
index bbd642a..d459ae9 100644
--- a/azkaban-common/src/main/java/azkaban/jmx/JmxExecutorManager.java
+++ b/azkaban-common/src/main/java/azkaban/jmx/JmxExecutorManager.java
@@ -78,4 +78,19 @@ public class JmxExecutorManager implements JmxExecutorManagerMBean {
return manager.getQueueProcessorThreadState().toString();
}
+ @Override
+ public List<String> getAvailableExecutorComparatorNames() {
+ return new ArrayList<String>(manager.getAvailableExecutorComparatorNames());
+ }
+
+ @Override
+ public List<String> getAvailableExecutorFilterNames() {
+ return new ArrayList<String>(manager.getAvailableExecutorFilterNames());
+ }
+
+ @Override
+ public long getLastSuccessfulExecutorInfoRefresh() {
+ return manager.getLastSuccessfulExecutorInfoRefresh();
+ }
+
}
diff --git a/azkaban-common/src/main/java/azkaban/jmx/JmxExecutorManagerMBean.java b/azkaban-common/src/main/java/azkaban/jmx/JmxExecutorManagerMBean.java
index 94012e0..69e401c 100644
--- a/azkaban-common/src/main/java/azkaban/jmx/JmxExecutorManagerMBean.java
+++ b/azkaban-common/src/main/java/azkaban/jmx/JmxExecutorManagerMBean.java
@@ -49,4 +49,13 @@ public interface JmxExecutorManagerMBean {
@DisplayName("OPERATION: getQueueProcessorThreadState")
public String getQueueProcessorThreadState();
+ @DisplayName("OPERATION: getAvailableExecutorComparatorNames")
+ List<String> getAvailableExecutorComparatorNames();
+
+ @DisplayName("OPERATION: getAvailableExecutorFilterNames")
+ List<String> getAvailableExecutorFilterNames();
+
+ @DisplayName("OPERATION: getLastSuccessfulExecutorInfoRefresh")
+ long getLastSuccessfulExecutorInfoRefresh();
+
}
diff --git a/azkaban-execserver/src/main/java/azkaban/execapp/AzkabanExecutorServer.java b/azkaban-execserver/src/main/java/azkaban/execapp/AzkabanExecutorServer.java
index 235989d..8f5df1a 100644
--- a/azkaban-execserver/src/main/java/azkaban/execapp/AzkabanExecutorServer.java
+++ b/azkaban-execserver/src/main/java/azkaban/execapp/AzkabanExecutorServer.java
@@ -23,6 +23,7 @@ import java.io.InputStream;
import java.io.InputStreamReader;
import java.lang.management.ManagementFactory;
import java.lang.reflect.Constructor;
+import java.net.InetAddress;
import java.util.ArrayList;
import java.util.List;
import java.util.TimeZone;
@@ -501,4 +502,18 @@ public class AzkabanExecutorServer {
return null;
}
}
+
+ /**
+ * Returns host:port combination for currently running executor
+ * @return
+ */
+ public String getExecutorHostPort() {
+ String host = "unkownHost";
+ try {
+ host = InetAddress.getLocalHost().getCanonicalHostName();
+ } catch (Exception e) {
+ logger.error("Failed to fetch LocalHostName");
+ }
+ return host + ":" + props.getInt("executor.port", DEFAULT_PORT_NUMBER);
+ }
}
diff --git a/azkaban-execserver/src/main/java/azkaban/execapp/FlowRunner.java b/azkaban-execserver/src/main/java/azkaban/execapp/FlowRunner.java
index da98b99..7eedee4 100644
--- a/azkaban-execserver/src/main/java/azkaban/execapp/FlowRunner.java
+++ b/azkaban-execserver/src/main/java/azkaban/execapp/FlowRunner.java
@@ -271,6 +271,7 @@ public class FlowRunner extends EventHandler implements Runnable {
this.watcher.setLogger(logger);
}
+ logger.info("Assigned executor : " + AzkabanExecutorServer.getApp().getExecutorHostPort());
logger.info("Running execid:" + execId + " flow:" + flowId + " project:"
+ projectId + " version:" + version);
if (pipelineExecId != null) {
@@ -840,7 +841,7 @@ public class FlowRunner extends EventHandler implements Runnable {
/**
* Configure Azkaban metrics tracking for a new jobRunner instance
- *
+ *
* @param jobRunner
*/
private void configureJobLevelMetrics(JobRunner jobRunner) {
azkaban-webserver/.gitignore 1(+1 -0)
diff --git a/azkaban-webserver/.gitignore b/azkaban-webserver/.gitignore
new file mode 100644
index 0000000..5e56e04
--- /dev/null
+++ b/azkaban-webserver/.gitignore
@@ -0,0 +1 @@
+/bin
diff --git a/azkaban-webserver/src/main/java/azkaban/webapp/servlet/ExecutorServlet.java b/azkaban-webserver/src/main/java/azkaban/webapp/servlet/ExecutorServlet.java
index 4a693a9..4970fce 100644
--- a/azkaban-webserver/src/main/java/azkaban/webapp/servlet/ExecutorServlet.java
+++ b/azkaban-webserver/src/main/java/azkaban/webapp/servlet/ExecutorServlet.java
@@ -166,16 +166,18 @@ public class ExecutorServlet extends LoginAbstractAzkabanServlet {
}
/**
- * Enables queueProcessor if @param status is true and disables queueProcessor
- * is @param status is false.
+ * <pre>
+ * Enables queueProcessor if @param status is true
+ * disables queueProcessor if @param status is false.
+ * </pre>
*/
private void ajaxUpdateQueueProcessor(HttpServletRequest req,
HttpServletResponse resp, HashMap<String, Object> returnMap, User user,
- boolean status) {
+ boolean enableQueue) {
boolean wasSuccess = false;
if (HttpRequestUtils.hasPermission(userManager, user, Type.ADMIN)) {
try {
- if (status) {
+ if (enableQueue) {
executorManager.enableQueueProcessorThread();
} else {
executorManager.disableQueueProcessorThread();
diff --git a/azkaban-webserver/src/main/resources/azkaban/webapp/servlet/velocity/statsPage.vm b/azkaban-webserver/src/main/resources/azkaban/webapp/servlet/velocity/statsPage.vm
index d77bc52..27be342 100644
--- a/azkaban-webserver/src/main/resources/azkaban/webapp/servlet/velocity/statsPage.vm
+++ b/azkaban-webserver/src/main/resources/azkaban/webapp/servlet/velocity/statsPage.vm
@@ -41,7 +41,7 @@
} else {
$('#metricName').empty();
for(var index = 0; index < responseData.metricList.length; index++) {
- $('#metricName').append($('<option value="1">' + responseData.metricList[index] + '</option>'));
+ $('#metricName').append($('<option value="' + responseData.metricList[index] +'">' + responseData.metricList[index] + '</option>'));
}
}
};