diff --git a/azkaban-common/src/main/java/azkaban/executor/ExecutorManager.java b/azkaban-common/src/main/java/azkaban/executor/ExecutorManager.java
index c97ef00..bf87497 100644
--- a/azkaban-common/src/main/java/azkaban/executor/ExecutorManager.java
+++ b/azkaban-common/src/main/java/azkaban/executor/ExecutorManager.java
@@ -20,8 +20,8 @@ import java.io.File;
import java.io.IOException;
import java.lang.Thread.State;
import java.net.URI;
-import java.net.URISyntaxException;
import java.util.ArrayList;
+import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
@@ -29,18 +29,13 @@ import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
-import java.util.concurrent.BlockingQueue;
+import java.util.TreeMap;
import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
-import java.util.concurrent.PriorityBlockingQueue;
+import java.util.concurrent.TimeUnit;
import org.apache.commons.lang.StringUtils;
-import org.apache.http.client.HttpClient;
-import org.apache.http.client.ResponseHandler;
-import org.apache.http.client.methods.HttpGet;
-import org.apache.http.client.utils.URIBuilder;
-import org.apache.http.impl.client.BasicResponseHandler;
-import org.apache.http.impl.client.DefaultHttpClient;
import org.apache.log4j.Logger;
import org.joda.time.DateTime;
@@ -48,6 +43,7 @@ import azkaban.alert.Alerter;
import azkaban.event.Event;
import azkaban.event.Event.Type;
import azkaban.event.EventHandler;
+import azkaban.executor.selector.ExecutorSelector;
import azkaban.project.Project;
import azkaban.project.ProjectWhitelist;
import azkaban.scheduler.ScheduleStatisticManager;
@@ -63,6 +59,10 @@ import azkaban.utils.Props;
*/
public class ExecutorManager extends EventHandler implements
ExecutorManagerAdapter {
+ static final String AZKABAN_EXECUTOR_SELECTOR_FILTERS =
+ "azkaban.executorselector.filters";
+ static final String AZKABAN_EXECUTOR_SELECTOR_COMPARATOR_PREFIX =
+ "azkaban.executorselector.comparator.";
static final String AZKABAN_QUEUEPROCESSING_ENABLED =
"azkaban.queueprocessing.enabled";
static final String AZKABAN_USE_MULTIPLE_EXECUTORS =
@@ -103,14 +103,18 @@ public class ExecutorManager extends EventHandler implements
File cacheDir;
final Props azkProps;
+ List<String> filterList;
+ Map<String, Integer> comparatorWeightsMap;
public ExecutorManager(Props props, ExecutorLoader loader,
Map<String, Alerter> alters) throws ExecutorManagerException {
azkProps = props;
-
this.executorLoader = loader;
this.setupExecutors();
this.loadRunningFlows();
+
+ queuedFlows =
+ new QueuedExecutions(props.getLong(AZKABAN_WEBSERVER_QUEUE_SIZE, 100000));
this.loadQueuedFlows();
alerters = alters;
@@ -121,41 +125,51 @@ public class ExecutorManager extends EventHandler implements
executingManager.start();
if(isMultiExecutorMode()) {
- queueProcessor =
- new QueueProcessorThread(azkProps.getBoolean(
- AZKABAN_QUEUEPROCESSING_ENABLED, true), azkProps.getLong(
- AZKABAN_ACTIVE_EXECUTOR_REFRESHINTERVAL_IN_MS, 1000),
- azkProps
- .getInt(AZKABAN_ACTIVE_EXECUTOR_REFRESHINTERVAL_IN_FLOW, 1000));
- queueProcessor.start();
+ setupMultiExecutorMode();
}
long executionLogsRetentionMs =
props.getLong("execution.logs.retention.ms",
DEFAULT_EXECUTION_LOGS_RETENTION_MS);
- queuedFlows =
- new QueuedExecutions(props.getLong(AZKABAN_WEBSERVER_QUEUE_SIZE, 100000));
-
cleanerThread = new CleanerThread(executionLogsRetentionMs);
cleanerThread.start();
}
+ private void setupMultiExecutorMode() {
+ // initliatize hard filters for executor selector from azkaban.properties
+ String filters = azkProps.getString(AZKABAN_EXECUTOR_SELECTOR_FILTERS, "");
+ if (filters != null) {
+ filterList = Arrays.asList(StringUtils.split(filters, ","));
+ }
+
+ // initliatize comparator feature weights for executor selector from
+ // azkaban.properties
+ Map<String, String> compListStrings =
+ azkProps.getMapByPrefix(AZKABAN_EXECUTOR_SELECTOR_COMPARATOR_PREFIX);
+ if (compListStrings != null) {
+ comparatorWeightsMap = new TreeMap<String, Integer>();
+ for (Map.Entry<String, String> entry : compListStrings.entrySet()) {
+ comparatorWeightsMap.put(entry.getKey(), Integer.valueOf(entry.getValue()));
+ }
+ }
+
+ // configure queue processor
+ queueProcessor =
+ new QueueProcessorThread(azkProps.getBoolean(
+ AZKABAN_QUEUEPROCESSING_ENABLED, true), azkProps.getLong(
+ AZKABAN_ACTIVE_EXECUTOR_REFRESHINTERVAL_IN_MS, 1000), azkProps.getInt(
+ AZKABAN_ACTIVE_EXECUTOR_REFRESHINTERVAL_IN_FLOW, 1000));
+ queueProcessor.start();
+ }
+
/**
- * <pre>
- * Setup activeExecutors using azkaban.properties and database executors
- * Note:
- * 1. If azkaban.use.multiple.executors is set true, this method will
- * load all active executors
- * 2. In local mode, If a local executor is specified and it is missing from db,
- * this method add local executor as active in DB
- * 3. In local mode, If a local executor is specified and it is marked inactive in db,
- * this method will convert local executor as active in DB
- * </pre>
*
- * @throws ExecutorManagerException
+ * {@inheritDoc}
+ * @see azkaban.executor.ExecutorManagerAdapter#setupExecutors()
*/
+ @Override
public void setupExecutors() throws ExecutorManagerException {
Set<Executor> newExecutors = new HashSet<Executor>();
@@ -201,15 +215,44 @@ public class ExecutorManager extends EventHandler implements
*/
private void refreshExecutors() {
synchronized (activeExecutors) {
- // TODO: rest api call to refresh executor stats
+ ExecutorService taskExecutors =
+ Executors.newFixedThreadPool(activeExecutors.size());
+ for (final Executor executor : activeExecutors) {
+
+ // execute each executorInfo refresh task
+ taskExecutors.execute(new Runnable() {
+ @Override
+ public void run() {
+ try {
+ String jsonResponse =
+ callExecutorForJsonString(executor.getHost(),
+ executor.getPort(), "/serverstastics", null);
+ executor.setExecutorInfo(ExecutorInfo
+ .fromJSONString(jsonResponse));
+ } catch (Exception e) {
+ logger.error("Failed to update ExecutorInfo executorId :"
+ + executor, e);
+ }
+ }
+ });
+ }
+
+ taskExecutors.shutdown();
+ try {
+ // wait 5 seconds for all executors to be refreshed
+ taskExecutors.awaitTermination(5, TimeUnit.SECONDS);
+ } catch (InterruptedException e) {
+ logger.error("Timed out while waiting for executorInfo refresh", e);
+ }
}
}
/**
- * Disable flow dispatching in QueueProcessor
- *
- * @throws ExecutorManagerException
+ * Throws exception if running in local mode
+ * {@inheritDoc}
+ * @see azkaban.executor.ExecutorManagerAdapter#disableQueueProcessorThread()
*/
+ @Override
public void disableQueueProcessorThread() throws ExecutorManagerException {
if (isMultiExecutorMode()) {
queueProcessor.setActive(false);
@@ -220,10 +263,11 @@ public class ExecutorManager extends EventHandler implements
}
/**
- * Enable flow dispatching in QueueProcessor
- *
- * @throws ExecutorManagerException
+ * Throws exception if running in local mode
+ * {@inheritDoc}
+ * @see azkaban.executor.ExecutorManagerAdapter#enableQueueProcessorThread()
*/
+ @Override
public void enableQueueProcessorThread() throws ExecutorManagerException {
if (isMultiExecutorMode()) {
queueProcessor.setActive(true);
@@ -334,9 +378,12 @@ public class ExecutorManager extends EventHandler implements
* any executor
*/
private void loadQueuedFlows() throws ExecutorManagerException {
- for (Pair<ExecutionReference, ExecutableFlow> pair : executorLoader
- .fetchQueuedFlows()) {
- queuedFlows.enqueue(pair.getSecond(), pair.getFirst());
+ List<Pair<ExecutionReference, ExecutableFlow>> retrievedExecutions =
+ executorLoader.fetchQueuedFlows();
+ if (retrievedExecutions != null) {
+ for (Pair<ExecutionReference, ExecutableFlow> pair : retrievedExecutions) {
+ queuedFlows.enqueue(pair.getSecond(), pair.getFirst());
+ }
}
}
@@ -927,11 +974,12 @@ public class ExecutorManager extends EventHandler implements
executorLoader.addActiveExecutableReference(reference);
queuedFlows.enqueue(exflow, reference);
} else {
+ Executor executor = activeExecutors.iterator().next();
// assign only local executor we have
- reference.setExecutor(activeExecutors.iterator().next());
+ reference.setExecutor(executor);
executorLoader.addActiveExecutableReference(reference);
try {
- callExecutorServer(reference, ConnectorParams.EXECUTE_ACTION);
+ callExecutorServer(exflow, executor, ConnectorParams.EXECUTE_ACTION);
runningFlows.put(exflow.getExecutionId(),
new Pair<ExecutionReference, ExecutableFlow>(reference, exflow));
} catch (ExecutorManagerException e) {
@@ -957,11 +1005,11 @@ public class ExecutorManager extends EventHandler implements
}
}
- private Map<String, Object> callExecutorServer(ExecutionReference ref,
- String action) throws ExecutorManagerException {
+ private Map<String, Object> callExecutorServer(ExecutableFlow exflow,
+ Executor executor, String action) throws ExecutorManagerException {
try {
- return callExecutorServer(ref.getHost(), ref.getPort(), action,
- ref.getExecId(), null, (Pair<String, String>[]) null);
+ return callExecutorServer(executor.getHost(), executor.getPort(), action,
+ exflow.getExecutionId(), null, (Pair<String, String>[]) null);
} catch (IOException e) {
throw new ExecutorManagerException(e);
}
@@ -1002,57 +1050,63 @@ public class ExecutorManager extends EventHandler implements
private Map<String, Object> callExecutorServer(String host, int port,
String action, Integer executionId, String user,
Pair<String, String>... params) throws IOException {
- URIBuilder builder = new URIBuilder();
- builder.setScheme("http").setHost(host).setPort(port).setPath("/executor");
-
- builder.setParameter(ConnectorParams.ACTION_PARAM, action);
+ List<Pair<String, String>> paramList = new ArrayList<Pair<String,String>>();
- if (executionId != null) {
- builder.setParameter(ConnectorParams.EXECID_PARAM,
- String.valueOf(executionId));
+ // if params = null
+ if(params != null) {
+ paramList.addAll(Arrays.asList(params));
}
- if (user != null) {
- builder.setParameter(ConnectorParams.USER_PARAM, user);
- }
-
- if (params != null) {
- for (Pair<String, String> pair : params) {
- builder.setParameter(pair.getFirst(), pair.getSecond());
- }
- }
+ paramList
+ .add(new Pair<String, String>(ConnectorParams.ACTION_PARAM, action));
+ paramList.add(new Pair<String, String>(ConnectorParams.EXECID_PARAM, String
+ .valueOf(executionId)));
+ paramList.add(new Pair<String, String>(ConnectorParams.USER_PARAM, user));
- URI uri = null;
- try {
- uri = builder.build();
- } catch (URISyntaxException e) {
- throw new IOException(e);
- }
+ Map<String, Object> jsonResponse =
+ callExecutorForJsonObject(host, port, "/executor", paramList);
- ResponseHandler<String> responseHandler = new BasicResponseHandler();
+ return jsonResponse;
+ }
- HttpClient httpclient = new DefaultHttpClient();
- HttpGet httpget = new HttpGet(uri);
- String response = null;
- try {
- response = httpclient.execute(httpget, responseHandler);
- } catch (IOException e) {
- throw e;
- } finally {
- httpclient.getConnectionManager().shutdown();
- }
+ /*
+ * Helper method used by ExecutorManager to call executor and return json
+ * object map
+ */
+ private Map<String, Object> callExecutorForJsonObject(String host, int port,
+ String path, List<Pair<String, String>> paramList) throws IOException {
+ String responseString =
+ callExecutorForJsonString(host, port, path, paramList);
@SuppressWarnings("unchecked")
Map<String, Object> jsonResponse =
- (Map<String, Object>) JSONUtils.parseJSONFromString(response);
+ (Map<String, Object>) JSONUtils.parseJSONFromString(responseString);
String error = (String) jsonResponse.get(ConnectorParams.RESPONSE_ERROR);
if (error != null) {
throw new IOException(error);
}
-
return jsonResponse;
}
+ /*
+ * Helper method used by ExecutorManager to call executor and return raw json
+ * string
+ */
+ private String callExecutorForJsonString(String host, int port, String path,
+ List<Pair<String, String>> paramList) throws IOException {
+ if (paramList == null) {
+ paramList = new ArrayList<Pair<String, String>>();
+ }
+
+ ExecutorApiClient apiclient = ExecutorApiClient.getInstance();
+ @SuppressWarnings("unchecked")
+ URI uri =
+ ExecutorApiClient.buildUri(host, port, path, true,
+ paramList.toArray(new Pair[0]));
+
+ return apiclient.httpGet(uri, null);
+ }
+
/**
* Manage servlet call for stats servlet in Azkaban execution server
* {@inheritDoc}
@@ -1065,90 +1119,35 @@ public class ExecutorManager extends EventHandler implements
@Override
public Map<String, Object> callExecutorStats(int executorId, String action,
Pair<String, String>... params) throws IOException, ExecutorManagerException {
-
- URIBuilder builder = new URIBuilder();
Executor executor = fetchExecutor(executorId);
- builder.setScheme("http").setHost(executor.getHost())
- .setPort(executor.getPort()).setPath("/stats");
- builder.setParameter(ConnectorParams.ACTION_PARAM, action);
+ List<Pair<String, String>> paramList =
+ new ArrayList<Pair<String, String>>();
+ // if params = null
if (params != null) {
- for (Pair<String, String> pair : params) {
- builder.setParameter(pair.getFirst(), pair.getSecond());
- }
- }
-
- URI uri = null;
- try {
- uri = builder.build();
- } catch (URISyntaxException e) {
- throw new IOException(e);
- }
-
- ResponseHandler<String> responseHandler = new BasicResponseHandler();
-
- HttpClient httpclient = new DefaultHttpClient();
- HttpGet httpget = new HttpGet(uri);
- String response = null;
- try {
- response = httpclient.execute(httpget, responseHandler);
- } catch (IOException e) {
- throw e;
- } finally {
- httpclient.getConnectionManager().shutdown();
+ paramList.addAll(Arrays.asList(params));
}
- @SuppressWarnings("unchecked")
- Map<String, Object> jsonResponse =
- (Map<String, Object>) JSONUtils.parseJSONFromString(response);
+ paramList
+ .add(new Pair<String, String>(ConnectorParams.ACTION_PARAM, action));
- return jsonResponse;
+ return callExecutorForJsonObject(executor.getHost(), executor.getPort(),
+ "/stats", paramList);
}
@Override
public Map<String, Object> callExecutorJMX(String hostPort, String action,
String mBean) throws IOException {
- URIBuilder builder = new URIBuilder();
+ List<Pair<String, String>> paramList =
+ new ArrayList<Pair<String, String>>();
- String[] hostPortSplit = hostPort.split(":");
- builder.setScheme("http").setHost(hostPortSplit[0])
- .setPort(Integer.parseInt(hostPortSplit[1])).setPath("/jmx");
+ paramList.add(new Pair<String, String>(ConnectorParams.JMX_MBEAN, mBean));
- builder.setParameter(action, "");
- if (mBean != null) {
- builder.setParameter(ConnectorParams.JMX_MBEAN, mBean);
- }
-
- URI uri = null;
- try {
- uri = builder.build();
- } catch (URISyntaxException e) {
- throw new IOException(e);
- }
-
- ResponseHandler<String> responseHandler = new BasicResponseHandler();
-
- HttpClient httpclient = new DefaultHttpClient();
- HttpGet httpget = new HttpGet(uri);
- String response = null;
- try {
- response = httpclient.execute(httpget, responseHandler);
- } catch (IOException e) {
- throw e;
- } finally {
- httpclient.getConnectionManager().shutdown();
- }
-
- @SuppressWarnings("unchecked")
- Map<String, Object> jsonResponse =
- (Map<String, Object>) JSONUtils.parseJSONFromString(response);
- String error = (String) jsonResponse.get(ConnectorParams.RESPONSE_ERROR);
- if (error != null) {
- throw new IOException(error);
- }
- return jsonResponse;
+ String[] hostPortSplit = hostPort.split(":");
+ return callExecutorForJsonObject(hostPortSplit[0],
+ Integer.valueOf(hostPortSplit[1]), "/jmx", paramList);
}
@Override
@@ -1748,6 +1747,7 @@ public class ExecutorManager extends EventHandler implements
currentContinuousFlowProcessed = 0;
}
+ exflow.setUpdateTime(currentTime);
// process flow with current snapshot of activeExecutors
processFlow(reference, exflow, new HashSet<Executor>(activeExecutors));
currentContinuousFlowProcessed++;
@@ -1776,12 +1776,49 @@ public class ExecutorManager extends EventHandler implements
}
}
+ /* Helper method to fetch overriding Executor, if a valid user has specifed otherwise return null */
+ private Executor getUserSpecifiedExecutor(ExecutableFlow exflow) {
+ Executor executor = null;
+ ExecutionOptions options = exflow.getExecutionOptions();
+ if (options != null
+ && options.getFlowParameters() != null
+ && options.getFlowParameters().containsKey(
+ ExecutionOptions.USE_EXECUTOR)) {
+ try {
+ int executorId =
+ Integer.valueOf(options.getFlowParameters().get(
+ ExecutionOptions.USE_EXECUTOR));
+ executor = fetchExecutor(executorId);
+
+ if (executor == null) {
+ logger
+ .warn(String
+ .format(
+ "User specified executor id: %d for execution id: %d is not active, Looking up db.",
+ executorId, exflow.getExecutionId()));
+ executor = executorLoader.fetchExecutor(executorId);
+ }
+
+ } catch (Exception ex) {
+ logger.error("Failed to fetch user specified executor for exec_id = "
+ + exflow.getExecutionId(), ex);
+ }
+ }
+ return executor;
+ }
+
/* Choose Executor for exflow among the available executors */
private Executor selectExecutor(ExecutableFlow exflow,
Set<Executor> availableExecutors) {
- Executor choosenExecutor;
- // TODO: use dispatcher
- choosenExecutor = availableExecutors.iterator().next();
+ Executor choosenExecutor = getUserSpecifiedExecutor(exflow);
+
+ // If no executor was specified by admin
+ if (choosenExecutor == null) {
+ logger.info("Using dispatcher for execution id :"
+ + exflow.getExecutionId());
+ ExecutorSelector selector = new ExecutorSelector(filterList, comparatorWeightsMap);
+ choosenExecutor = selector.getBest(activeExecutors, exflow);
+ }
return choosenExecutor;
}
@@ -1794,7 +1831,6 @@ public class ExecutorManager extends EventHandler implements
"Reached handleDispatchExceptionCase stage for exec %d with error count %d",
exflow.getExecutionId(), reference.getNumErrors()));
reference.setNumErrors(reference.getNumErrors() + 1);
- reference.setExecutor(null);
if (reference.getNumErrors() >= MAX_DISPATCHING_ERRORS_PERMITTED
|| remainingExecutors.size() <= 1) {
logger.error("Failed to process queued flow");
@@ -1812,27 +1848,20 @@ public class ExecutorManager extends EventHandler implements
.info(String
.format(
"Reached handleNoExecutorSelectedCase stage for exec %d with error count %d",
- exflow.getExecutionId(), reference.getNumErrors()));
- reference.setNumErrors(reference.getNumErrors() + 1);
- // Scenario: when dispatcher didn't assigned any executor
- if (reference.getNumErrors() >= MAX_DISPATCHING_ERRORS_PERMITTED) {
- finalizeFlows(exflow);
- } else {
- // again queue this flow
- queuedFlows.enqueue(exflow, reference);
- }
+ exflow.getExecutionId(), reference.getNumErrors()));
+ // TODO: handle scenario where a high priority flow failing to get
+ // schedule can starve all others
+ queuedFlows.enqueue(exflow, reference);
}
private void dispatch(ExecutionReference reference, ExecutableFlow exflow,
Executor choosenExecutor) throws ExecutorManagerException {
exflow.setUpdateTime(System.currentTimeMillis());
-
- // to be moved after db update once we integrate rest api changes
+ callExecutorServer(exflow, choosenExecutor,
+ ConnectorParams.EXECUTE_ACTION);
+ executorLoader.assignExecutor(choosenExecutor.getId(),
+ exflow.getExecutionId());
reference.setExecutor(choosenExecutor);
- // TODO: ADD rest call to do an actual dispatch
- callExecutorServer(reference, ConnectorParams.EXECUTE_ACTION);
- executorLoader.assignExecutor(exflow.getExecutionId(),
- choosenExecutor.getId());
// move from flow to running flows
runningFlows.put(exflow.getExecutionId(),
diff --git a/azkaban-common/src/test/java/azkaban/server/HttpRequestUtilsTest.java b/azkaban-common/src/test/java/azkaban/server/HttpRequestUtilsTest.java
new file mode 100644
index 0000000..24f6ef7
--- /dev/null
+++ b/azkaban-common/src/test/java/azkaban/server/HttpRequestUtilsTest.java
@@ -0,0 +1,116 @@
+/*
+ * Copyright 2015 LinkedIn Corp.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package azkaban.server;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+import azkaban.executor.ExecutableFlow;
+import azkaban.executor.ExecutionOptions;
+import azkaban.executor.ExecutorManagerException;
+import azkaban.user.Permission.Type;
+import azkaban.user.User;
+import azkaban.user.UserManager;
+import azkaban.user.UserManagerException;
+import azkaban.utils.TestUtils;
+
+/**
+ * Test class for HttpRequestUtils
+ */
+public final class HttpRequestUtilsTest {
+ /* Helper method to get a test flow and add required properties */
+ public static ExecutableFlow createExecutableFlow() throws IOException {
+ ExecutableFlow flow = TestUtils.createExecutableFlow("exectest1", "exec1");
+ flow.getExecutionOptions().getFlowParameters()
+ .put(ExecutionOptions.FLOW_PRIORITY, "1");
+ flow.getExecutionOptions().getFlowParameters()
+ .put(ExecutionOptions.USE_EXECUTOR, "2");
+ return flow;
+ }
+
+ /* Test that flow properties are removed for non-admin user */
+ @Test
+ public void TestFilterNonAdminOnlyFlowParams() throws IOException,
+ ExecutorManagerException, UserManagerException {
+ ExecutableFlow flow = createExecutableFlow();
+ UserManager manager = TestUtils.createTestXmlUserManager();
+ User user = manager.getUser("testUser", "testUser");
+
+ HttpRequestUtils.filterAdminOnlyFlowParams(manager,
+ flow.getExecutionOptions(), user);
+
+ Assert.assertFalse(flow.getExecutionOptions().getFlowParameters()
+ .containsKey(ExecutionOptions.FLOW_PRIORITY));
+ Assert.assertFalse(flow.getExecutionOptions().getFlowParameters()
+ .containsKey(ExecutionOptions.USE_EXECUTOR));
+ }
+
+ /* Test that flow properties are retained for admin user */
+ @Test
+ public void TestFilterAdminOnlyFlowParams() throws IOException,
+ ExecutorManagerException, UserManagerException {
+ ExecutableFlow flow = createExecutableFlow();
+ UserManager manager = TestUtils.createTestXmlUserManager();
+ User user = manager.getUser("testAdmin", "testAdmin");
+
+ HttpRequestUtils.filterAdminOnlyFlowParams(manager,
+ flow.getExecutionOptions(), user);
+
+ Assert.assertTrue(flow.getExecutionOptions().getFlowParameters()
+ .containsKey(ExecutionOptions.FLOW_PRIORITY));
+ Assert.assertTrue(flow.getExecutionOptions().getFlowParameters()
+ .containsKey(ExecutionOptions.USE_EXECUTOR));
+ }
+
+ /* Test exception, if param is a valid integer */
+ @Test
+ public void testvalidIntegerParam() throws ExecutorManagerException {
+ Map<String, String> params = new HashMap<String, String>();
+ params.put("param1", "123");
+ HttpRequestUtils.validateIntegerParam(params, "param1");
+ }
+
+ /* Test exception, if param is not a valid integer */
+ @Test(expected = ExecutorManagerException.class)
+ public void testInvalidIntegerParam() throws ExecutorManagerException {
+ Map<String, String> params = new HashMap<String, String>();
+ params.put("param1", "1dff2");
+ HttpRequestUtils.validateIntegerParam(params, "param1");
+ }
+
+ /* Verify permission for admin user */
+ @Test
+ public void testHasAdminPermission() throws UserManagerException {
+ UserManager manager = TestUtils.createTestXmlUserManager();
+ User adminUser = manager.getUser("testAdmin", "testAdmin");
+ Assert.assertTrue(HttpRequestUtils.hasPermission(manager, adminUser,
+ Type.ADMIN));
+ }
+
+ /* verify permission for non-admin user */
+ @Test
+ public void testHasOrdinaryPermission() throws UserManagerException {
+ UserManager manager = TestUtils.createTestXmlUserManager();
+ User testUser = manager.getUser("testUser", "testUser");
+ Assert.assertFalse(HttpRequestUtils.hasPermission(manager, testUser,
+ Type.ADMIN));
+ }
+}
diff --git a/azkaban-webserver/src/main/java/azkaban/webapp/servlet/ExecutorServlet.java b/azkaban-webserver/src/main/java/azkaban/webapp/servlet/ExecutorServlet.java
index f3e8c8f..4a693a9 100644
--- a/azkaban-webserver/src/main/java/azkaban/webapp/servlet/ExecutorServlet.java
+++ b/azkaban-webserver/src/main/java/azkaban/webapp/servlet/ExecutorServlet.java
@@ -29,6 +29,7 @@ import javax.servlet.http.HttpServletResponse;
import org.apache.commons.lang.StringEscapeUtils;
+import azkaban.executor.ConnectorParams;
import azkaban.executor.ExecutableFlow;
import azkaban.executor.ExecutableFlowBase;
import azkaban.executor.ExecutableNode;
@@ -135,6 +136,12 @@ public class ExecutorServlet extends LoginAbstractAzkabanServlet {
ajaxFetchExecutableFlowInfo(req, resp, ret, session.getUser(), exFlow);
}
}
+ } else if (ajaxName.equals("reloadExecutors")) {
+ ajaxReloadExecutors(req, resp, ret, session.getUser());
+ } else if (ajaxName.equals("enableQueueProcessor")) {
+ ajaxUpdateQueueProcessor(req, resp, ret, session.getUser(), true);
+ } else if (ajaxName.equals("disableQueueProcessor")) {
+ ajaxUpdateQueueProcessor(req, resp, ret, session.getUser(), false);
} else if (ajaxName.equals("getRunning")) {
String projectName = getParam(req, "project");
String flowName = getParam(req, "flow");
@@ -158,6 +165,61 @@ public class ExecutorServlet extends LoginAbstractAzkabanServlet {
}
}
+ /**
+ * Enables queueProcessor if @param status is true and disables queueProcessor
+ * is @param status is false.
+ */
+ private void ajaxUpdateQueueProcessor(HttpServletRequest req,
+ HttpServletResponse resp, HashMap<String, Object> returnMap, User user,
+ boolean status) {
+ boolean wasSuccess = false;
+ if (HttpRequestUtils.hasPermission(userManager, user, Type.ADMIN)) {
+ try {
+ if (status) {
+ executorManager.enableQueueProcessorThread();
+ } else {
+ executorManager.disableQueueProcessorThread();
+ }
+ returnMap.put(ConnectorParams.STATUS_PARAM,
+ ConnectorParams.RESPONSE_SUCCESS);
+ wasSuccess = true;
+ } catch (ExecutorManagerException e) {
+ returnMap.put(ConnectorParams.RESPONSE_ERROR, e.getMessage());
+ }
+ } else {
+ returnMap.put(ConnectorParams.RESPONSE_ERROR,
+ "Only Admins are allowed to update queue processor");
+ }
+ if (!wasSuccess) {
+ returnMap.put(ConnectorParams.STATUS_PARAM,
+ ConnectorParams.RESPONSE_ERROR);
+ }
+ }
+
+ /* Reloads executors from DB and azkaban.properties via executorManager */
+ private void ajaxReloadExecutors(HttpServletRequest req,
+ HttpServletResponse resp, HashMap<String, Object> returnMap, User user) {
+ boolean wasSuccess = false;
+ if (HttpRequestUtils.hasPermission(userManager, user, Type.ADMIN)) {
+ try {
+ executorManager.setupExecutors();
+ returnMap.put(ConnectorParams.STATUS_PARAM,
+ ConnectorParams.RESPONSE_SUCCESS);
+ wasSuccess = true;
+ } catch (ExecutorManagerException e) {
+ returnMap.put(ConnectorParams.RESPONSE_ERROR,
+ "Failed to refresh the executors " + e.getMessage());
+ }
+ } else {
+ returnMap.put(ConnectorParams.RESPONSE_ERROR,
+ "Only Admins are allowed to refresh the executors");
+ }
+ if (!wasSuccess) {
+ returnMap.put(ConnectorParams.STATUS_PARAM,
+ ConnectorParams.RESPONSE_ERROR);
+ }
+ }
+
@Override
protected void handlePost(HttpServletRequest req, HttpServletResponse resp,
Session session) throws ServletException, IOException {
@@ -813,14 +875,14 @@ public class ExecutorServlet extends LoginAbstractAzkabanServlet {
if (!options.isSuccessEmailsOverridden()) {
options.setSuccessEmails(flow.getSuccessEmails());
}
- fixFlowPriorityByPermission(options, user);
options.setMailCreator(flow.getMailCreator());
try {
+ HttpRequestUtils.filterAdminOnlyFlowParams(userManager, options, user);
String message =
executorManager.submitExecutableFlow(exflow, user.getUserId());
ret.put("message", message);
- } catch (ExecutorManagerException e) {
+ } catch (Exception e) {
e.printStackTrace();
ret.put("error",
"Error submitting flow " + exflow.getFlowId() + ". " + e.getMessage());
@@ -829,15 +891,6 @@ public class ExecutorServlet extends LoginAbstractAzkabanServlet {
ret.put("execid", exflow.getExecutionId());
}
- /* Reset flow priority if submitting user is not a Azkaban admin */
- private void fixFlowPriorityByPermission(ExecutionOptions options, User user) {
- if (!(options.getFlowParameters().containsKey(
- ExecutionOptions.FLOW_PRIORITY) && hasPermission(user, Type.ADMIN))) {
- options.getFlowParameters().put(ExecutionOptions.FLOW_PRIORITY,
- String.valueOf(ExecutionOptions.DEFAULT_FLOW_PRIORITY));
- }
- }
-
public class ExecutorVelocityHelper {
public String getProjectName(int id) {
Project project = projectManager.getProject(id);
@@ -848,16 +901,4 @@ public class ExecutorServlet extends LoginAbstractAzkabanServlet {
return project.getName();
}
}
-
- /* returns true if user has access of type */
- protected boolean hasPermission(User user, Permission.Type type) {
- for (String roleName : user.getRoles()) {
- Role role = userManager.getRole(roleName);
- if (role.getPermission().isPermissionSet(type)
- || role.getPermission().isPermissionSet(Permission.Type.ADMIN)) {
- return true;
- }
- }
- return false;
- }
}