Details
diff --git a/azkaban-common/src/main/java/azkaban/executor/ConnectorParams.java b/azkaban-common/src/main/java/azkaban/executor/ConnectorParams.java
index 8bc725f..8abbd61 100644
--- a/azkaban-common/src/main/java/azkaban/executor/ConnectorParams.java
+++ b/azkaban-common/src/main/java/azkaban/executor/ConnectorParams.java
@@ -17,6 +17,7 @@
package azkaban.executor;
public interface ConnectorParams {
+ public static final String EXECUTOR_ID_PARAM = "executorId";
public static final String ACTION_PARAM = "action";
public static final String EXECID_PARAM = "execid";
public static final String SHAREDTOKEN_PARAM = "token";
diff --git a/azkaban-common/src/main/java/azkaban/executor/ExecutorManager.java b/azkaban-common/src/main/java/azkaban/executor/ExecutorManager.java
index af0c7be..3740227 100644
--- a/azkaban-common/src/main/java/azkaban/executor/ExecutorManager.java
+++ b/azkaban-common/src/main/java/azkaban/executor/ExecutorManager.java
@@ -168,6 +168,21 @@ public class ExecutorManager extends EventHandler implements
}
@Override
+ public Set<Executor> getAllActiveExecutors() {
+ return activeExecutors;
+ }
+
+ @Override
+ public Executor fetchExecutor(int executorId) throws ExecutorManagerException {
+ for (Executor executor : activeExecutors) {
+ if (executor.getId() == executorId) {
+ return executor;
+ }
+ }
+ return executorLoader.fetchExecutor(executorId);
+ }
+
+ @Override
public Set<String> getPrimaryServerHosts() {
// TODO: do we want to have a primary
HashSet<String> ports = new HashSet<String>();
@@ -236,6 +251,27 @@ public class ExecutorManager extends EventHandler implements
}
/**
+ *
+ * {@inheritDoc}
+ * @see azkaban.executor.ExecutorManagerAdapter#getActiveFlowsWithExecutor()
+ */
+ @Override
+ public List<Pair<ExecutableFlow, Executor>> getActiveFlowsWithExecutor()
+ throws IOException {
+ List<Pair<ExecutableFlow, Executor>> flows =
+ new ArrayList<Pair<ExecutableFlow, Executor>>();
+ for (Pair<ExecutionReference, ExecutableFlow> ref : queuedFlowMap.values()) {
+ flows.add(new Pair<ExecutableFlow, Executor>(ref.getSecond(), ref
+ .getFirst().getExecutor()));
+ }
+ for (Pair<ExecutionReference, ExecutableFlow> ref : runningFlows.values()) {
+ flows.add(new Pair<ExecutableFlow, Executor>(ref.getSecond(), ref
+ .getFirst().getExecutor()));
+ }
+ return flows;
+ }
+
+ /**
* Checks whether the given flow has an active (running, non-dispatched)
* executions {@inheritDoc}
*
@@ -851,17 +887,18 @@ public class ExecutorManager extends EventHandler implements
/**
* Manage servlet call for stats servlet in Azkaban execution server
* {@inheritDoc}
+ * @throws ExecutorManagerException
*
* @see azkaban.executor.ExecutorManagerAdapter#callExecutorStats(java.lang.String,
* azkaban.utils.Pair[])
*/
@Override
- public Map<String, Object> callExecutorStats(String action,
- Pair<String, String>... params) throws IOException {
+ public Map<String, Object> callExecutorStats(int executorId, String action,
+ Pair<String, String>... params) throws IOException, ExecutorManagerException {
URIBuilder builder = new URIBuilder();
- // TODO: fix to take host and port form user
- // builder.setScheme("http").setHost(executorHost).setPort(executorPort).setPath("/stats");
+ Executor executor = fetchExecutor(executorId);
+ builder.setScheme("http").setHost(executor.getHost()).setPort(executor.getPort()).setPath("/stats");
builder.setParameter(ConnectorParams.ACTION_PARAM, action);
@@ -1438,7 +1475,9 @@ public class ExecutorManager extends EventHandler implements
}
private void refreshExecutors() {
- // TODO: rest api call to refresh executor stats
+ synchronized (activeExecutors) {
+ // TODO: rest api call to refresh executor stats
+ }
}
private void processQueuedFlows(long maxContinousSubmission) {
diff --git a/azkaban-common/src/main/java/azkaban/executor/ExecutorManagerAdapter.java b/azkaban-common/src/main/java/azkaban/executor/ExecutorManagerAdapter.java
index 10379ec..6991c20 100644
--- a/azkaban-common/src/main/java/azkaban/executor/ExecutorManagerAdapter.java
+++ b/azkaban-common/src/main/java/azkaban/executor/ExecutorManagerAdapter.java
@@ -88,6 +88,18 @@ public interface ExecutorManagerAdapter {
public List<ExecutableFlow> getRunningFlows() throws IOException;
+ /**
+ * <pre>
+ * Returns All running with executors and queued flows
+ * Note, returns empty list if there isn't any running or queued flows
+ * </pre>
+ *
+ * @return
+ * @throws IOException
+ */
+ public List<Pair<ExecutableFlow, Executor>> getActiveFlowsWithExecutor()
+ throws IOException;
+
public List<ExecutableFlow> getRecentlyFinishedFlows();
public List<ExecutableFlow> getExecutableFlows(Project project,
@@ -177,9 +189,10 @@ public interface ExecutorManagerAdapter {
* <li>{@link azkaban.executor.ConnectorParams#STATS_SET_ENABLEMETRICS}<li>
* <li>{@link azkaban.executor.ConnectorParams#STATS_SET_DISABLEMETRICS}<li>
* </ul>
+ * @throws ExecutorManagerException
*/
- public Map<String, Object> callExecutorStats(String action,
- Pair<String, String>... params) throws IOException;
+ public Map<String, Object> callExecutorStats(int executorId, String action,
+ Pair<String, String>... param) throws IOException, ExecutorManagerException;
public Map<String, Object> callExecutorJMX(String hostPort, String action,
String mBean) throws IOException;
@@ -196,4 +209,24 @@ public interface ExecutorManagerAdapter {
public Set<? extends String> getPrimaryServerHosts();
+ /**
+ * Returns a set of all the active executors maintained by active executors
+ *
+ * @return
+ */
+ public Set<Executor> getAllActiveExecutors();
+
+ /**
+ * <pre>
+ * Fetch executor from executors with a given executorId
+ * Note:
+ * 1. throws an Exception in case of a SQL issue
+ * 2. return null when no executor is found with the given executorId
+ * </pre>
+ *
+ * @throws ExecutorManagerException
+ *
+ */
+ public Executor fetchExecutor(int executorId) throws ExecutorManagerException;
+
}
diff --git a/azkaban-execserver/src/main/java/azkaban/execapp/StatsServlet.java b/azkaban-execserver/src/main/java/azkaban/execapp/StatsServlet.java
index e66a586..1d9af6d 100644
--- a/azkaban-execserver/src/main/java/azkaban/execapp/StatsServlet.java
+++ b/azkaban-execserver/src/main/java/azkaban/execapp/StatsServlet.java
@@ -72,7 +72,7 @@ public class StatsServlet extends HttpServlet implements ConnectorParams {
/**
* Handle all get request to Stats Servlet {@inheritDoc}
- *
+ *
* @see javax.servlet.http.HttpServlet#doGet(javax.servlet.http.HttpServletRequest,
* javax.servlet.http.HttpServletResponse)
*/
@@ -176,7 +176,7 @@ public class StatsServlet extends HttpServlet implements ConnectorParams {
/**
* Get metric snapshots for a metric and date specification
- *
+ *
* @throws ServletException
*/
private void handleGetMetricHistory(HttpServletRequest req,
@@ -252,7 +252,7 @@ public class StatsServlet extends HttpServlet implements ConnectorParams {
/**
* Update tracking interval for a given metrics
- *
+ *
* @throws ServletException
*/
private void handleChangeMetricInterval(HttpServletRequest req,
diff --git a/azkaban-webserver/src/main/java/azkaban/webapp/servlet/ExecutorServlet.java b/azkaban-webserver/src/main/java/azkaban/webapp/servlet/ExecutorServlet.java
index 2923d5e..3d3f760 100644
--- a/azkaban-webserver/src/main/java/azkaban/webapp/servlet/ExecutorServlet.java
+++ b/azkaban-webserver/src/main/java/azkaban/webapp/servlet/ExecutorServlet.java
@@ -34,6 +34,7 @@ import azkaban.executor.ExecutableFlowBase;
import azkaban.executor.ExecutableNode;
import azkaban.executor.ExecutionOptions;
import azkaban.executor.ExecutionOptions.FailureAction;
+import azkaban.executor.Executor;
import azkaban.executor.ExecutorManagerAdapter;
import azkaban.executor.ExecutorManagerException;
import azkaban.executor.Status;
@@ -49,6 +50,7 @@ import azkaban.user.Permission;
import azkaban.user.Permission.Type;
import azkaban.user.User;
import azkaban.utils.FileIOUtils.LogData;
+import azkaban.utils.Pair;
import azkaban.webapp.AzkabanWebServer;
import azkaban.webapp.plugin.PluginRegistry;
import azkaban.webapp.plugin.ViewerPlugin;
@@ -225,7 +227,8 @@ public class ExecutorServlet extends LoginAbstractAzkabanServlet {
newPage(req, resp, session,
"azkaban/webapp/servlet/velocity/executionspage.vm");
- List<ExecutableFlow> runningFlows = executorManager.getRunningFlows();
+ List<Pair<ExecutableFlow, Executor>> runningFlows =
+ executorManager.getActiveFlowsWithExecutor();
page.add("runningFlows", runningFlows.isEmpty() ? null : runningFlows);
List<ExecutableFlow> finishedFlows =
diff --git a/azkaban-webserver/src/main/java/azkaban/webapp/servlet/StatsServlet.java b/azkaban-webserver/src/main/java/azkaban/webapp/servlet/StatsServlet.java
index 6d14839..06b6f86 100644
--- a/azkaban-webserver/src/main/java/azkaban/webapp/servlet/StatsServlet.java
+++ b/azkaban-webserver/src/main/java/azkaban/webapp/servlet/StatsServlet.java
@@ -22,6 +22,7 @@ import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
+import java.util.Set;
import javax.servlet.ServletConfig;
import javax.servlet.ServletException;
@@ -29,7 +30,9 @@ import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
import azkaban.executor.ConnectorParams;
+import azkaban.executor.Executor;
import azkaban.executor.ExecutorManager;
+import azkaban.executor.ExecutorManagerException;
import azkaban.server.session.Session;
import azkaban.user.Permission;
import azkaban.user.Role;
@@ -67,55 +70,105 @@ public class StatsServlet extends LoginAbstractAzkabanServlet {
private void handleAJAXAction(HttpServletRequest req, HttpServletResponse resp, Session session)
throws ServletException, IOException {
HashMap<String, Object> ret = new HashMap<String, Object>();
+ int executorId = getIntParam(req, ConnectorParams.EXECUTOR_ID_PARAM);
String actionName = getParam(req, ConnectorParams.ACTION_PARAM);
if (actionName.equals(ConnectorParams.STATS_GET_METRICHISTORY)) {
- handleGetMetricHistory(req, ret, session.getUser());
+ handleGetMetricHistory(executorId, req, ret, session.getUser());
+ } else if (actionName.equals(ConnectorParams.STATS_GET_ALLMETRICSNAME)) {
+ handleGetAllMetricName(executorId, req, ret);
} else if (actionName.equals(ConnectorParams.STATS_SET_REPORTINGINTERVAL)) {
- handleChangeConfigurationRequest(ConnectorParams.STATS_SET_REPORTINGINTERVAL, req, ret);
+ handleChangeConfigurationRequest(executorId, ConnectorParams.STATS_SET_REPORTINGINTERVAL, req, ret);
} else if (actionName.equals(ConnectorParams.STATS_SET_CLEANINGINTERVAL)) {
- handleChangeConfigurationRequest(ConnectorParams.STATS_SET_CLEANINGINTERVAL, req, ret);
+ handleChangeConfigurationRequest(executorId, ConnectorParams.STATS_SET_CLEANINGINTERVAL, req, ret);
} else if (actionName.equals(ConnectorParams.STATS_SET_MAXREPORTERPOINTS)) {
- handleChangeConfigurationRequest(ConnectorParams.STATS_SET_MAXREPORTERPOINTS, req, ret);
+ handleChangeConfigurationRequest(executorId, ConnectorParams.STATS_SET_MAXREPORTERPOINTS, req, ret);
} else if (actionName.equals(ConnectorParams.STATS_SET_ENABLEMETRICS)) {
- handleChangeConfigurationRequest(ConnectorParams.STATS_SET_ENABLEMETRICS, req, ret);
+ handleChangeConfigurationRequest(executorId, ConnectorParams.STATS_SET_ENABLEMETRICS, req, ret);
} else if (actionName.equals(ConnectorParams.STATS_SET_DISABLEMETRICS)) {
- handleChangeConfigurationRequest(ConnectorParams.STATS_SET_DISABLEMETRICS, req, ret);
+ handleChangeConfigurationRequest(executorId, ConnectorParams.STATS_SET_DISABLEMETRICS, req, ret);
}
writeJSON(resp, ret);
}
/**
+ * Get all metrics tracked by the given executor
+ *
+ * @param executorId
+ * @param req
+ * @param ret
+ */
+ private void handleGetAllMetricName(int executorId, HttpServletRequest req,
+ HashMap<String, Object> ret) throws IOException {
+ Map<String, Object> result;
+ try {
+ result =
+ execManager.callExecutorStats(executorId,
+ ConnectorParams.STATS_GET_ALLMETRICSNAME,
+ (Pair<String, String>[]) null);
+
+ if (result.containsKey(ConnectorParams.RESPONSE_ERROR)) {
+ ret.put("error", result.get(ConnectorParams.RESPONSE_ERROR).toString());
+ } else {
+ ret.put("metricList", result.get("data"));
+ }
+ } catch (ExecutorManagerException e) {
+ ret.put("error", "Failed to fetch metric names for executor : "
+ + executorId);
+ }
+ }
+
+ /**
* Generic method to facilitate actionName action using Azkaban exec server
+ * @param executorId
* @param actionName Name of the action
+ * @throws ExecutorManagerException
*/
- private void handleChangeConfigurationRequest(String actionName, HttpServletRequest req, HashMap<String, Object> ret)
+ private void handleChangeConfigurationRequest(int executorId, String actionName, HttpServletRequest req, HashMap<String, Object> ret)
throws ServletException, IOException {
- Map<String, Object> result = execManager.callExecutorStats(actionName, getAllParams(req));
- if (result.containsKey(ConnectorParams.RESPONSE_ERROR)) {
- ret.put(ConnectorParams.RESPONSE_ERROR, result.get(ConnectorParams.RESPONSE_ERROR).toString());
- } else {
- ret.put(ConnectorParams.STATUS_PARAM, result.get(ConnectorParams.STATUS_PARAM));
+ try {
+ Map<String, Object> result =
+ execManager
+ .callExecutorStats(executorId, actionName, getAllParams(req));
+ if (result.containsKey(ConnectorParams.RESPONSE_ERROR)) {
+ ret.put(ConnectorParams.RESPONSE_ERROR,
+ result.get(ConnectorParams.RESPONSE_ERROR).toString());
+ } else {
+ ret.put(ConnectorParams.STATUS_PARAM,
+ result.get(ConnectorParams.STATUS_PARAM));
+ }
+ } catch (ExecutorManagerException ex) {
+ ret.put("error", "Failed to change config change");
}
}
/**
* Get metric snapshots for a metric and date specification
+ * @param executorId
* @throws ServletException
+ * @throws ExecutorManagerException
*/
- private void handleGetMetricHistory(HttpServletRequest req, HashMap<String, Object> ret, User user)
- throws IOException, ServletException {
- Map<String, Object> result =
- execManager.callExecutorStats(ConnectorParams.STATS_GET_METRICHISTORY, getAllParams(req));
- if (result.containsKey(ConnectorParams.RESPONSE_ERROR)) {
- ret.put(ConnectorParams.RESPONSE_ERROR, result.get(ConnectorParams.RESPONSE_ERROR).toString());
- } else {
- ret.put("data", result.get("data"));
+ private void handleGetMetricHistory(int executorId, HttpServletRequest req,
+ HashMap<String, Object> ret, User user) throws IOException,
+ ServletException {
+ try {
+ Map<String, Object> result =
+ execManager.callExecutorStats(executorId,
+ ConnectorParams.STATS_GET_METRICHISTORY, getAllParams(req));
+ if (result.containsKey(ConnectorParams.RESPONSE_ERROR)) {
+ ret.put(ConnectorParams.RESPONSE_ERROR,
+ result.get(ConnectorParams.RESPONSE_ERROR).toString());
+ } else {
+ ret.put("data", result.get("data"));
+ }
+ } catch (ExecutorManagerException ex) {
+ ret.put("error", "Failed to fetch metric history");
}
}
/**
+ * @throws ExecutorManagerException
*
*/
private void handleStatePageLoad(HttpServletRequest req, HttpServletResponse resp, Session session)
@@ -128,14 +181,20 @@ public class StatsServlet extends LoginAbstractAzkabanServlet {
}
try {
+ Set<Executor> executors = execManager.getAllActiveExecutors();
+ page.add("executorList", executors);
+
Map<String, Object> result =
- execManager.callExecutorStats(ConnectorParams.STATS_GET_ALLMETRICSNAME, (Pair<String, String>[]) null);
+ execManager.callExecutorStats(executors.iterator().next().getId(),
+ ConnectorParams.STATS_GET_ALLMETRICSNAME,
+ (Pair<String, String>[]) null);
if (result.containsKey(ConnectorParams.RESPONSE_ERROR)) {
- page.add("errorMsg", result.get(ConnectorParams.RESPONSE_ERROR).toString());
+ page.add("errorMsg", result.get(ConnectorParams.RESPONSE_ERROR)
+ .toString());
} else {
page.add("metricList", result.get("data"));
}
- } catch (IOException e) {
+ } catch (Exception e) {
page.add("errorMsg", "Failed to get a response from Azkaban exec server");
}
diff --git a/azkaban-webserver/src/main/resources/azkaban/webapp/servlet/velocity/executionspage.vm b/azkaban-webserver/src/main/resources/azkaban/webapp/servlet/velocity/executionspage.vm
index 49d1085..9c2a6fb 100644
--- a/azkaban-webserver/src/main/resources/azkaban/webapp/servlet/velocity/executionspage.vm
+++ b/azkaban-webserver/src/main/resources/azkaban/webapp/servlet/velocity/executionspage.vm
@@ -68,6 +68,7 @@
<tr>
<th>#</th>
<th class="execid">Execution Id</th>
+ <th >Executor Id</th>
<th>Flow</th>
<th>Project</th>
<th class="user">User</th>
@@ -80,25 +81,33 @@
</tr>
</thead>
<tbody>
-#if ($runningFlows)
+
+#if ( !$null.isNull(${runningFlows}))
#foreach ($flow in $runningFlows)
<tr>
<td class="tb-name">
$velocityCount
</td>
<td class="tb-name">
- <a href="${context}/executor?execid=${flow.executionId}">${flow.executionId}</a>
+ <a href="${context}/executor?execid=${flow.getFirst().executionId}">${flow.getFirst().executionId}</a>
</td>
- <td><a href="${context}/manager?project=$vmutils.getProjectName(${flow.projectId})&flow=${flow.flowId}">${flow.flowId}</a></td>
<td>
- <a href="${context}/manager?project=$vmutils.getProjectName(${flow.projectId})">$vmutils.getProjectName(${flow.projectId})</a>
+ #if (${flow.getSecond()})
+ ${flow.getSecond().getId()}
+ #else
+ -
+ #end
</td>
- <td>${flow.submitUser}</td>
- <td>${flow.proxyUsers}</td>
- <td>$utils.formatDate(${flow.startTime})</td>
- <td>$utils.formatDate(${flow.endTime})</td>
- <td>$utils.formatDuration(${flow.startTime}, ${flow.endTime})</td>
- <td><div class="status ${flow.status}">$utils.formatStatus(${flow.status})</div></td>
+ <td><a href="${context}/manager?project=$vmutils.getProjectName(${flow.getFirst().projectId})&flow=${flow.getFirst().flowId}">${flow.getFirst().flowId}</a></td>
+ <td>
+ <a href="${context}/manager?project=$vmutils.getProjectName(${flow.getFirst().projectId})">$vmutils.getProjectName(${flow.getFirst().projectId})</a>
+ </td>
+ <td>${flow.getFirst().submitUser}</td>
+ <td>${flow.getFirst().proxyUsers}</td>
+ <td>$utils.formatDate(${flow.getFirst().startTime})</td>
+ <td>$utils.formatDate(${flow.getFirst().endTime})</td>
+ <td>$utils.formatDuration(${flow.getFirst().startTime}, ${flow.getFirst().endTime})</td>
+ <td><div class="status ${flow.getFirst().status}">$utils.formatStatus(${flow.getFirst().status})</div></td>
<td></td>
</tr>
#end
diff --git a/azkaban-webserver/src/main/resources/azkaban/webapp/servlet/velocity/statsPage.vm b/azkaban-webserver/src/main/resources/azkaban/webapp/servlet/velocity/statsPage.vm
index 4596ac2..d77bc52 100644
--- a/azkaban-webserver/src/main/resources/azkaban/webapp/servlet/velocity/statsPage.vm
+++ b/azkaban-webserver/src/main/resources/azkaban/webapp/servlet/velocity/statsPage.vm
@@ -29,6 +29,25 @@
var currentTime = ${currentTime};
var timezone = "${timezone}";
+ function refreshMetricList() {
+ var requestURL = '/stats';
+ var requestData = {
+ 'action': 'getAllMetricNames',
+ 'executorId': $('#executorName').val()
+ };
+ var successHandler = function(responseData) {
+ if(responseData.error != null) {
+ $('#reportedMetric').html(responseData.error);
+ } else {
+ $('#metricName').empty();
+ for(var index = 0; index < responseData.metricList.length; index++) {
+ $('#metricName').append($('<option value="1">' + responseData.metricList[index] + '</option>'));
+ }
+ }
+ };
+ $.get(requestURL, requestData, successHandler, 'json');
+ }
+
function refreshMetricChart() {
var requestURL = '/stats';
var requestData = {
@@ -36,7 +55,8 @@
'from': new Date($('#datetimebegin').val()).toUTCString(),
'to' : new Date($('#datetimeend').val()).toUTCString(),
'metricName': $('#metricName').val(),
- 'useStats': $("#useStats").is(':checked')
+ 'useStats': $("#useStats").is(':checked'),
+ 'executorId': $('#executorName').val()
};
var successHandler = function(responseData) {
if(responseData.error != null) {
@@ -67,6 +87,7 @@
$('#datetimebegin').data('DateTimePicker').setEndDate(e.date);
});
$('#retrieve').click(refreshMetricChart);
+ $('#executorName').click(refreshMetricList);
});
</script>
@@ -84,8 +105,16 @@
<div class="header-title" style="width: 17%;">
<h1><a href="${context}/stats">Statistics</a></h1>
</div>
- <div class="header-control" style="width: 900px; padding-top: 5px;">
+ <div class="header-control" style="width: 1300px; padding-top: 5px;">
<form id="metric-form" method="get">
+ <label for="executorLabel" >Executor</label>
+ #if (!$executorList.isEmpty())
+ <select id="executorName" name="executorName" style="width:200px">
+ #foreach ($executor in $executorList)
+ <option value="${executor.getId()}" style="width:200px">${executor.getHost()}:${executor.getPort()}</option>
+ #end
+ </select>
+ #end
<label for="metricLabel" >Metric</label>
#if (!$metricList.isEmpty())
<select id="metricName" name="metricName" style="width:200px">
@@ -119,4 +148,4 @@
<!-- /container-full -->
#end
</body>
- <html>
\ No newline at end of file
+ <html>