azkaban-aplcache
Changes
azkaban-webserver/.gitignore 1(+1 -0)
Details
diff --git a/azkaban-common/src/main/java/azkaban/executor/ExecutionOptions.java b/azkaban-common/src/main/java/azkaban/executor/ExecutionOptions.java
index 312be44..d8b10f1 100644
--- a/azkaban-common/src/main/java/azkaban/executor/ExecutionOptions.java
+++ b/azkaban-common/src/main/java/azkaban/executor/ExecutionOptions.java
@@ -34,6 +34,8 @@ public class ExecutionOptions {
public static final String CONCURRENT_OPTION_PIPELINE = "pipeline";
public static final String CONCURRENT_OPTION_IGNORE = "ignore";
public static final String FLOW_PRIORITY = "flowPriority";
+ /* override dispatcher selection and use executor id specified */
+ public static final String USE_EXECUTOR = "useExecutor";
public static final int DEFAULT_FLOW_PRIORITY = 5;
private static final String FLOW_PARAMETERS = "flowParameters";
diff --git a/azkaban-common/src/main/java/azkaban/executor/Executor.java b/azkaban-common/src/main/java/azkaban/executor/Executor.java
index e21ae2a..f0600ab 100644
--- a/azkaban-common/src/main/java/azkaban/executor/Executor.java
+++ b/azkaban-common/src/main/java/azkaban/executor/Executor.java
@@ -92,9 +92,9 @@ public class Executor implements Comparable<Executor> {
@Override
public String toString(){
- return String.format("%s (id: %s)",
- null == this.host || this.host.length() == 0 ? "(empty)" : this.host,
- this.id);
+ return String.format("%s:%s (id: %s)",
+ null == this.host || this.host.length() == 0 ? "(empty)" : this.host,
+ this.port, this.id);
}
public String getHost() {
diff --git a/azkaban-common/src/main/java/azkaban/executor/ExecutorManager.java b/azkaban-common/src/main/java/azkaban/executor/ExecutorManager.java
index c97ef00..d1120ea 100644
--- a/azkaban-common/src/main/java/azkaban/executor/ExecutorManager.java
+++ b/azkaban-common/src/main/java/azkaban/executor/ExecutorManager.java
@@ -20,8 +20,8 @@ import java.io.File;
import java.io.IOException;
import java.lang.Thread.State;
import java.net.URI;
-import java.net.URISyntaxException;
import java.util.ArrayList;
+import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
@@ -29,18 +29,16 @@ import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
-import java.util.concurrent.BlockingQueue;
+import java.util.TreeMap;
+import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
-import java.util.concurrent.PriorityBlockingQueue;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
import org.apache.commons.lang.StringUtils;
-import org.apache.http.client.HttpClient;
-import org.apache.http.client.ResponseHandler;
-import org.apache.http.client.methods.HttpGet;
-import org.apache.http.client.utils.URIBuilder;
-import org.apache.http.impl.client.BasicResponseHandler;
-import org.apache.http.impl.client.DefaultHttpClient;
import org.apache.log4j.Logger;
import org.joda.time.DateTime;
@@ -48,6 +46,9 @@ import azkaban.alert.Alerter;
import azkaban.event.Event;
import azkaban.event.Event.Type;
import azkaban.event.EventHandler;
+import azkaban.executor.selector.ExecutorComparator;
+import azkaban.executor.selector.ExecutorFilter;
+import azkaban.executor.selector.ExecutorSelector;
import azkaban.project.Project;
import azkaban.project.ProjectWhitelist;
import azkaban.scheduler.ScheduleStatisticManager;
@@ -63,6 +64,10 @@ import azkaban.utils.Props;
*/
public class ExecutorManager extends EventHandler implements
ExecutorManagerAdapter {
+ static final String AZKABAN_EXECUTOR_SELECTOR_FILTERS =
+ "azkaban.executorselector.filters";
+ static final String AZKABAN_EXECUTOR_SELECTOR_COMPARATOR_PREFIX =
+ "azkaban.executorselector.comparator.";
static final String AZKABAN_QUEUEPROCESSING_ENABLED =
"azkaban.queueprocessing.enabled";
static final String AZKABAN_USE_MULTIPLE_EXECUTORS =
@@ -71,8 +76,10 @@ public class ExecutorManager extends EventHandler implements
"azkaban.webserver.queue.size";
private static final String AZKABAN_ACTIVE_EXECUTOR_REFRESHINTERVAL_IN_MS =
"azkaban.activeexecutor.refresh.milisecinterval";
- private static final String AZKABAN_ACTIVE_EXECUTOR_REFRESHINTERVAL_IN_FLOW =
+ private static final String AZKABAN_ACTIVE_EXECUTOR_REFRESHINTERVAL_IN_NUM_FLOW =
"azkaban.activeexecutor.refresh.flowinterval";
+ private static final String AZKABAN_EXECUTORINFO_REFRESH_MAX_THREADS =
+ "azkaban.executorinfo.refresh.maxThreads";
private static Logger logger = Logger.getLogger(ExecutorManager.class);
private ExecutorLoader executorLoader;
@@ -102,15 +109,21 @@ public class ExecutorManager extends EventHandler implements
File cacheDir;
- final Props azkProps;
+ private final Props azkProps;
+ private List<String> filterList;
+ private Map<String, Integer> comparatorWeightsMap;
+ private long lastSuccessfulExecutorInfoRefresh;
+ private ExecutorService executorInforRefresherService;
public ExecutorManager(Props props, ExecutorLoader loader,
Map<String, Alerter> alters) throws ExecutorManagerException {
azkProps = props;
-
this.executorLoader = loader;
this.setupExecutors();
this.loadRunningFlows();
+
+ queuedFlows =
+ new QueuedExecutions(props.getLong(AZKABAN_WEBSERVER_QUEUE_SIZE, 100000));
this.loadQueuedFlows();
alerters = alters;
@@ -121,41 +134,56 @@ public class ExecutorManager extends EventHandler implements
executingManager.start();
if(isMultiExecutorMode()) {
- queueProcessor =
- new QueueProcessorThread(azkProps.getBoolean(
- AZKABAN_QUEUEPROCESSING_ENABLED, true), azkProps.getLong(
- AZKABAN_ACTIVE_EXECUTOR_REFRESHINTERVAL_IN_MS, 1000),
- azkProps
- .getInt(AZKABAN_ACTIVE_EXECUTOR_REFRESHINTERVAL_IN_FLOW, 1000));
- queueProcessor.start();
+ setupMultiExecutorMode();
}
long executionLogsRetentionMs =
props.getLong("execution.logs.retention.ms",
DEFAULT_EXECUTION_LOGS_RETENTION_MS);
- queuedFlows =
- new QueuedExecutions(props.getLong(AZKABAN_WEBSERVER_QUEUE_SIZE, 100000));
-
cleanerThread = new CleanerThread(executionLogsRetentionMs);
cleanerThread.start();
}
+ private void setupMultiExecutorMode() {
+ // initliatize hard filters for executor selector from azkaban.properties
+ String filters = azkProps.getString(AZKABAN_EXECUTOR_SELECTOR_FILTERS, "");
+ if (filters != null) {
+ filterList = Arrays.asList(StringUtils.split(filters, ","));
+ }
+
+ // initliatize comparator feature weights for executor selector from
+ // azkaban.properties
+ Map<String, String> compListStrings =
+ azkProps.getMapByPrefix(AZKABAN_EXECUTOR_SELECTOR_COMPARATOR_PREFIX);
+ if (compListStrings != null) {
+ comparatorWeightsMap = new TreeMap<String, Integer>();
+ for (Map.Entry<String, String> entry : compListStrings.entrySet()) {
+ comparatorWeightsMap.put(entry.getKey(), Integer.valueOf(entry.getValue()));
+ }
+ }
+
+ executorInforRefresherService =
+ Executors.newFixedThreadPool(azkProps.getInt(
+ AZKABAN_EXECUTORINFO_REFRESH_MAX_THREADS, 5));
+
+ // configure queue processor
+ queueProcessor =
+ new QueueProcessorThread(azkProps.getBoolean(
+ AZKABAN_QUEUEPROCESSING_ENABLED, true), azkProps.getLong(
+ AZKABAN_ACTIVE_EXECUTOR_REFRESHINTERVAL_IN_MS, 1000), azkProps.getInt(
+ AZKABAN_ACTIVE_EXECUTOR_REFRESHINTERVAL_IN_NUM_FLOW, 1000));
+
+ queueProcessor.start();
+ }
+
/**
- * <pre>
- * Setup activeExecutors using azkaban.properties and database executors
- * Note:
- * 1. If azkaban.use.multiple.executors is set true, this method will
- * load all active executors
- * 2. In local mode, If a local executor is specified and it is missing from db,
- * this method add local executor as active in DB
- * 3. In local mode, If a local executor is specified and it is marked inactive in db,
- * this method will convert local executor as active in DB
- * </pre>
*
- * @throws ExecutorManagerException
+ * {@inheritDoc}
+ * @see azkaban.executor.ExecutorManagerAdapter#setupExecutors()
*/
+ @Override
public void setupExecutors() throws ExecutorManagerException {
Set<Executor> newExecutors = new HashSet<Executor>();
@@ -181,8 +209,10 @@ public class ExecutorManager extends EventHandler implements
}
if (newExecutors.isEmpty()) {
+ logger.error("No active executor found");
throw new ExecutorManagerException("No active executor found");
} else if(newExecutors.size() > 1 && !isMultiExecutorMode()) {
+ logger.error("Multiple local executors specified");
throw new ExecutorManagerException("Multiple local executors specified");
} else {
// clear all active executors, only if we have at least one new active
@@ -201,15 +231,56 @@ public class ExecutorManager extends EventHandler implements
*/
private void refreshExecutors() {
synchronized (activeExecutors) {
- // TODO: rest api call to refresh executor stats
+
+ List<Pair<Executor, Future<String>>> futures =
+ new ArrayList<Pair<Executor, Future<String>>>();
+ for (final Executor executor : activeExecutors) {
+ // execute each executorInfo refresh task to fetch
+ Future<String> fetchExecutionInfo =
+ executorInforRefresherService.submit(new Callable<String>() {
+ @Override
+ public String call() throws Exception {
+ return callExecutorForJsonString(executor.getHost(),
+ executor.getPort(), "/serverstastics", null);
+ }
+ });
+ futures.add(new Pair<Executor, Future<String>>(executor,
+ fetchExecutionInfo));
+ }
+
+ boolean wasSuccess = true;
+ for (Pair<Executor, Future<String>> refreshPair : futures) {
+ Executor executor = refreshPair.getFirst();
+ try {
+ // max 5 secs
+ String jsonString = refreshPair.getSecond().get(5, TimeUnit.SECONDS);
+ executor.setExecutorInfo(ExecutorInfo.fromJSONString(jsonString));
+ logger.info("Successfully refreshed ExecutorInfo for executor: "
+ + executor);
+ } catch (TimeoutException e) {
+ wasSuccess = false;
+ logger.error("Timed out while waiting for ExecutorInfo refresh"
+ + executor, e);
+ } catch (Exception e) {
+ wasSuccess = false;
+ logger.error("Failed to update ExecutorInfo for executor : "
+ + executor, e);
+ }
+ }
+
+ // update is successful for all executors
+ if (wasSuccess) {
+ lastSuccessfulExecutorInfoRefresh = System.currentTimeMillis();
+ }
}
}
/**
- * Disable flow dispatching in QueueProcessor
- *
- * @throws ExecutorManagerException
+ * Throws exception if running in local mode
+ * {@inheritDoc}
+ * @see azkaban.executor.ExecutorManagerAdapter#disableQueueProcessorThread()
*/
+ @Override
public void disableQueueProcessorThread() throws ExecutorManagerException {
if (isMultiExecutorMode()) {
queueProcessor.setActive(false);
@@ -220,10 +291,11 @@ public class ExecutorManager extends EventHandler implements
}
/**
- * Enable flow dispatching in QueueProcessor
- *
- * @throws ExecutorManagerException
+ * Throws exception if running in local mode
+ * {@inheritDoc}
+ * @see azkaban.executor.ExecutorManagerAdapter#enableQueueProcessorThread()
*/
+ @Override
public void enableQueueProcessorThread() throws ExecutorManagerException {
if (isMultiExecutorMode()) {
queueProcessor.setActive(true);
@@ -253,6 +325,34 @@ public class ExecutorManager extends EventHandler implements
return false;
}
+ /**
+ * Return last Successful ExecutorInfo Refresh for all active executors
+ *
+ * @return
+ */
+ public long getLastSuccessfulExecutorInfoRefresh() {
+ return this.lastSuccessfulExecutorInfoRefresh;
+ }
+
+ /**
+ * Get currently supported Comparators available to use via azkaban.properties
+ *
+ * @return
+ */
+ public Set<String> getAvailableExecutorComparatorNames() {
+ return ExecutorComparator.getAvailableComparatorNames();
+
+ }
+
+ /**
+ * Get currently supported filters available to use via azkaban.properties
+ *
+ * @return
+ */
+ public Set<String> getAvailableExecutorFilterNames() {
+ return ExecutorFilter.getAvailableFilterNames();
+ }
+
@Override
public State getExecutorManagerThreadState() {
return executingManager.getState();
@@ -334,9 +434,12 @@ public class ExecutorManager extends EventHandler implements
* any executor
*/
private void loadQueuedFlows() throws ExecutorManagerException {
- for (Pair<ExecutionReference, ExecutableFlow> pair : executorLoader
- .fetchQueuedFlows()) {
- queuedFlows.enqueue(pair.getSecond(), pair.getFirst());
+ List<Pair<ExecutionReference, ExecutableFlow>> retrievedExecutions =
+ executorLoader.fetchQueuedFlows();
+ if (retrievedExecutions != null) {
+ for (Pair<ExecutionReference, ExecutableFlow> pair : retrievedExecutions) {
+ queuedFlows.enqueue(pair.getSecond(), pair.getFirst());
+ }
}
}
@@ -927,11 +1030,12 @@ public class ExecutorManager extends EventHandler implements
executorLoader.addActiveExecutableReference(reference);
queuedFlows.enqueue(exflow, reference);
} else {
+ Executor executor = activeExecutors.iterator().next();
// assign only local executor we have
- reference.setExecutor(activeExecutors.iterator().next());
+ reference.setExecutor(executor);
executorLoader.addActiveExecutableReference(reference);
try {
- callExecutorServer(reference, ConnectorParams.EXECUTE_ACTION);
+ callExecutorServer(exflow, executor, ConnectorParams.EXECUTE_ACTION);
runningFlows.put(exflow.getExecutionId(),
new Pair<ExecutionReference, ExecutableFlow>(reference, exflow));
} catch (ExecutorManagerException e) {
@@ -957,11 +1061,11 @@ public class ExecutorManager extends EventHandler implements
}
}
- private Map<String, Object> callExecutorServer(ExecutionReference ref,
- String action) throws ExecutorManagerException {
+ private Map<String, Object> callExecutorServer(ExecutableFlow exflow,
+ Executor executor, String action) throws ExecutorManagerException {
try {
- return callExecutorServer(ref.getHost(), ref.getPort(), action,
- ref.getExecId(), null, (Pair<String, String>[]) null);
+ return callExecutorServer(executor.getHost(), executor.getPort(), action,
+ exflow.getExecutionId(), null, (Pair<String, String>[]) null);
} catch (IOException e) {
throw new ExecutorManagerException(e);
}
@@ -1002,57 +1106,63 @@ public class ExecutorManager extends EventHandler implements
private Map<String, Object> callExecutorServer(String host, int port,
String action, Integer executionId, String user,
Pair<String, String>... params) throws IOException {
- URIBuilder builder = new URIBuilder();
- builder.setScheme("http").setHost(host).setPort(port).setPath("/executor");
-
- builder.setParameter(ConnectorParams.ACTION_PARAM, action);
-
- if (executionId != null) {
- builder.setParameter(ConnectorParams.EXECID_PARAM,
- String.valueOf(executionId));
- }
+ List<Pair<String, String>> paramList = new ArrayList<Pair<String,String>>();
- if (user != null) {
- builder.setParameter(ConnectorParams.USER_PARAM, user);
+ // if params = null
+ if(params != null) {
+ paramList.addAll(Arrays.asList(params));
}
- if (params != null) {
- for (Pair<String, String> pair : params) {
- builder.setParameter(pair.getFirst(), pair.getSecond());
- }
- }
+ paramList
+ .add(new Pair<String, String>(ConnectorParams.ACTION_PARAM, action));
+ paramList.add(new Pair<String, String>(ConnectorParams.EXECID_PARAM, String
+ .valueOf(executionId)));
+ paramList.add(new Pair<String, String>(ConnectorParams.USER_PARAM, user));
- URI uri = null;
- try {
- uri = builder.build();
- } catch (URISyntaxException e) {
- throw new IOException(e);
- }
+ Map<String, Object> jsonResponse =
+ callExecutorForJsonObject(host, port, "/executor", paramList);
- ResponseHandler<String> responseHandler = new BasicResponseHandler();
+ return jsonResponse;
+ }
- HttpClient httpclient = new DefaultHttpClient();
- HttpGet httpget = new HttpGet(uri);
- String response = null;
- try {
- response = httpclient.execute(httpget, responseHandler);
- } catch (IOException e) {
- throw e;
- } finally {
- httpclient.getConnectionManager().shutdown();
- }
+ /*
+ * Helper method used by ExecutorManager to call executor and return json
+ * object map
+ */
+ private Map<String, Object> callExecutorForJsonObject(String host, int port,
+ String path, List<Pair<String, String>> paramList) throws IOException {
+ String responseString =
+ callExecutorForJsonString(host, port, path, paramList);
@SuppressWarnings("unchecked")
Map<String, Object> jsonResponse =
- (Map<String, Object>) JSONUtils.parseJSONFromString(response);
+ (Map<String, Object>) JSONUtils.parseJSONFromString(responseString);
String error = (String) jsonResponse.get(ConnectorParams.RESPONSE_ERROR);
if (error != null) {
throw new IOException(error);
}
-
return jsonResponse;
}
+ /*
+ * Helper method used by ExecutorManager to call executor and return raw json
+ * string
+ */
+ private String callExecutorForJsonString(String host, int port, String path,
+ List<Pair<String, String>> paramList) throws IOException {
+ if (paramList == null) {
+ paramList = new ArrayList<Pair<String, String>>();
+ }
+
+ ExecutorApiClient apiclient = ExecutorApiClient.getInstance();
+ @SuppressWarnings("unchecked")
+ URI uri =
+ ExecutorApiClient.buildUri(host, port, path, true,
+ paramList.toArray(new Pair[0]));
+
+ return apiclient.httpGet(uri, null);
+ }
+
/**
* Manage servlet call for stats servlet in Azkaban execution server
* {@inheritDoc}
@@ -1065,90 +1175,38 @@ public class ExecutorManager extends EventHandler implements
@Override
public Map<String, Object> callExecutorStats(int executorId, String action,
Pair<String, String>... params) throws IOException, ExecutorManagerException {
-
- URIBuilder builder = new URIBuilder();
Executor executor = fetchExecutor(executorId);
- builder.setScheme("http").setHost(executor.getHost())
- .setPort(executor.getPort()).setPath("/stats");
- builder.setParameter(ConnectorParams.ACTION_PARAM, action);
+ List<Pair<String, String>> paramList =
+ new ArrayList<Pair<String, String>>();
+ // if params = null
if (params != null) {
- for (Pair<String, String> pair : params) {
- builder.setParameter(pair.getFirst(), pair.getSecond());
- }
- }
-
- URI uri = null;
- try {
- uri = builder.build();
- } catch (URISyntaxException e) {
- throw new IOException(e);
- }
-
- ResponseHandler<String> responseHandler = new BasicResponseHandler();
-
- HttpClient httpclient = new DefaultHttpClient();
- HttpGet httpget = new HttpGet(uri);
- String response = null;
- try {
- response = httpclient.execute(httpget, responseHandler);
- } catch (IOException e) {
- throw e;
- } finally {
- httpclient.getConnectionManager().shutdown();
+ paramList.addAll(Arrays.asList(params));
}
- @SuppressWarnings("unchecked")
- Map<String, Object> jsonResponse =
- (Map<String, Object>) JSONUtils.parseJSONFromString(response);
+ paramList
+ .add(new Pair<String, String>(ConnectorParams.ACTION_PARAM, action));
- return jsonResponse;
+ return callExecutorForJsonObject(executor.getHost(), executor.getPort(),
+ "/stats", paramList);
}
@Override
public Map<String, Object> callExecutorJMX(String hostPort, String action,
String mBean) throws IOException {
- URIBuilder builder = new URIBuilder();
+ List<Pair<String, String>> paramList =
+ new ArrayList<Pair<String, String>>();
- String[] hostPortSplit = hostPort.split(":");
- builder.setScheme("http").setHost(hostPortSplit[0])
- .setPort(Integer.parseInt(hostPortSplit[1])).setPath("/jmx");
-
- builder.setParameter(action, "");
- if (mBean != null) {
- builder.setParameter(ConnectorParams.JMX_MBEAN, mBean);
- }
-
- URI uri = null;
- try {
- uri = builder.build();
- } catch (URISyntaxException e) {
- throw new IOException(e);
- }
-
- ResponseHandler<String> responseHandler = new BasicResponseHandler();
-
- HttpClient httpclient = new DefaultHttpClient();
- HttpGet httpget = new HttpGet(uri);
- String response = null;
- try {
- response = httpclient.execute(httpget, responseHandler);
- } catch (IOException e) {
- throw e;
- } finally {
- httpclient.getConnectionManager().shutdown();
+ paramList.add(new Pair<String, String>(action, ""));
+ if(mBean != null) {
+ paramList.add(new Pair<String, String>(ConnectorParams.JMX_MBEAN, mBean));
}
- @SuppressWarnings("unchecked")
- Map<String, Object> jsonResponse =
- (Map<String, Object>) JSONUtils.parseJSONFromString(response);
- String error = (String) jsonResponse.get(ConnectorParams.RESPONSE_ERROR);
- if (error != null) {
- throw new IOException(error);
- }
- return jsonResponse;
+ String[] hostPortSplit = hostPort.split(":");
+ return callExecutorForJsonObject(hostPortSplit[0],
+ Integer.valueOf(hostPortSplit[1]), "/jmx", paramList);
}
@Override
@@ -1748,6 +1806,7 @@ public class ExecutorManager extends EventHandler implements
currentContinuousFlowProcessed = 0;
}
+ exflow.setUpdateTime(currentTime);
// process flow with current snapshot of activeExecutors
processFlow(reference, exflow, new HashSet<Executor>(activeExecutors));
currentContinuousFlowProcessed++;
@@ -1776,12 +1835,57 @@ public class ExecutorManager extends EventHandler implements
}
}
+ /* Helper method to fetch overriding Executor, if a valid user has specifed otherwise return null */
+ private Executor getUserSpecifiedExecutor(ExecutionOptions options,
+ int executionId) {
+ Executor executor = null;
+ if (options != null
+ && options.getFlowParameters() != null
+ && options.getFlowParameters().containsKey(
+ ExecutionOptions.USE_EXECUTOR)) {
+ try {
+ int executorId =
+ Integer.valueOf(options.getFlowParameters().get(
+ ExecutionOptions.USE_EXECUTOR));
+ executor = fetchExecutor(executorId);
+
+ if (executor == null) {
+ logger
+ .warn(String
+ .format(
+ "User specified executor id: %d for execution id: %d is not active, Looking up db.",
+ executorId, executionId));
+ executor = executorLoader.fetchExecutor(executorId);
+ if (executor == null) {
+ logger
+ .warn(String
+ .format(
+ "User specified executor id: %d for execution id: %d is missing from db. Defaulting to availableExecutors",
+ executorId, executionId));
+ }
+ }
+ } catch (ExecutorManagerException ex) {
+ logger.error("Failed to fetch user specified executor for exec_id = "
+ + executionId, ex);
+ }
+ }
+ return executor;
+ }
+
/* Choose Executor for exflow among the available executors */
private Executor selectExecutor(ExecutableFlow exflow,
Set<Executor> availableExecutors) {
- Executor choosenExecutor;
- // TODO: use dispatcher
- choosenExecutor = availableExecutors.iterator().next();
+ Executor choosenExecutor =
+ getUserSpecifiedExecutor(exflow.getExecutionOptions(),
+ exflow.getExecutionId());
+
+ // If no executor was specified by admin
+ if (choosenExecutor == null) {
+ logger.info("Using dispatcher for execution id :"
+ + exflow.getExecutionId());
+ ExecutorSelector selector = new ExecutorSelector(filterList, comparatorWeightsMap);
+ choosenExecutor = selector.getBest(activeExecutors, exflow);
+ }
return choosenExecutor;
}
@@ -1794,7 +1898,6 @@ public class ExecutorManager extends EventHandler implements
"Reached handleDispatchExceptionCase stage for exec %d with error count %d",
exflow.getExecutionId(), reference.getNumErrors()));
reference.setNumErrors(reference.getNumErrors() + 1);
- reference.setExecutor(null);
if (reference.getNumErrors() >= MAX_DISPATCHING_ERRORS_PERMITTED
|| remainingExecutors.size() <= 1) {
logger.error("Failed to process queued flow");
@@ -1812,27 +1915,20 @@ public class ExecutorManager extends EventHandler implements
.info(String
.format(
"Reached handleNoExecutorSelectedCase stage for exec %d with error count %d",
- exflow.getExecutionId(), reference.getNumErrors()));
- reference.setNumErrors(reference.getNumErrors() + 1);
- // Scenario: when dispatcher didn't assigned any executor
- if (reference.getNumErrors() >= MAX_DISPATCHING_ERRORS_PERMITTED) {
- finalizeFlows(exflow);
- } else {
- // again queue this flow
- queuedFlows.enqueue(exflow, reference);
- }
+ exflow.getExecutionId(), reference.getNumErrors()));
+ // TODO: handle scenario where a high priority flow failing to get
+ // schedule can starve all others
+ queuedFlows.enqueue(exflow, reference);
}
private void dispatch(ExecutionReference reference, ExecutableFlow exflow,
Executor choosenExecutor) throws ExecutorManagerException {
exflow.setUpdateTime(System.currentTimeMillis());
-
- // to be moved after db update once we integrate rest api changes
+ callExecutorServer(exflow, choosenExecutor,
+ ConnectorParams.EXECUTE_ACTION);
+ executorLoader.assignExecutor(choosenExecutor.getId(),
+ exflow.getExecutionId());
reference.setExecutor(choosenExecutor);
- // TODO: ADD rest call to do an actual dispatch
- callExecutorServer(reference, ConnectorParams.EXECUTE_ACTION);
- executorLoader.assignExecutor(exflow.getExecutionId(),
- choosenExecutor.getId());
// move from flow to running flows
runningFlows.put(exflow.getExecutionId(),
diff --git a/azkaban-common/src/main/java/azkaban/executor/ExecutorManagerAdapter.java b/azkaban-common/src/main/java/azkaban/executor/ExecutorManagerAdapter.java
index 2b47293..c50b0bc 100644
--- a/azkaban-common/src/main/java/azkaban/executor/ExecutorManagerAdapter.java
+++ b/azkaban-common/src/main/java/azkaban/executor/ExecutorManagerAdapter.java
@@ -231,4 +231,33 @@ public interface ExecutorManagerAdapter {
*/
public Executor fetchExecutor(int executorId) throws ExecutorManagerException;
+ /**
+ * <pre>
+ * Setup activeExecutors using azkaban.properties and database executors
+ * Note:
+ * 1. If azkaban.use.multiple.executors is set true, this method will
+ * load all active executors
+ * 2. In local mode, If a local executor is specified and it is missing from db,
+ * this method add local executor as active in DB
+ * 3. In local mode, If a local executor is specified and it is marked inactive in db,
+ * this method will convert local executor as active in DB
+ * </pre>
+ *
+ * @throws ExecutorManagerException
+ */
+ public void setupExecutors() throws ExecutorManagerException;
+
+ /**
+ * Enable flow dispatching in QueueProcessor
+ *
+ * @throws ExecutorManagerException
+ */
+ public void enableQueueProcessorThread() throws ExecutorManagerException;
+
+ /**
+ * Disable flow dispatching in QueueProcessor
+ *
+ * @throws ExecutorManagerException
+ */
+ public void disableQueueProcessorThread() throws ExecutorManagerException;
}
diff --git a/azkaban-common/src/main/java/azkaban/jmx/JmxExecutorManager.java b/azkaban-common/src/main/java/azkaban/jmx/JmxExecutorManager.java
index bbd642a..d459ae9 100644
--- a/azkaban-common/src/main/java/azkaban/jmx/JmxExecutorManager.java
+++ b/azkaban-common/src/main/java/azkaban/jmx/JmxExecutorManager.java
@@ -78,4 +78,19 @@ public class JmxExecutorManager implements JmxExecutorManagerMBean {
return manager.getQueueProcessorThreadState().toString();
}
+ @Override
+ public List<String> getAvailableExecutorComparatorNames() {
+ return new ArrayList<String>(manager.getAvailableExecutorComparatorNames());
+ }
+
+ @Override
+ public List<String> getAvailableExecutorFilterNames() {
+ return new ArrayList<String>(manager.getAvailableExecutorFilterNames());
+ }
+
+ @Override
+ public long getLastSuccessfulExecutorInfoRefresh() {
+ return manager.getLastSuccessfulExecutorInfoRefresh();
+ }
+
}
diff --git a/azkaban-common/src/main/java/azkaban/jmx/JmxExecutorManagerMBean.java b/azkaban-common/src/main/java/azkaban/jmx/JmxExecutorManagerMBean.java
index 94012e0..69e401c 100644
--- a/azkaban-common/src/main/java/azkaban/jmx/JmxExecutorManagerMBean.java
+++ b/azkaban-common/src/main/java/azkaban/jmx/JmxExecutorManagerMBean.java
@@ -49,4 +49,13 @@ public interface JmxExecutorManagerMBean {
@DisplayName("OPERATION: getQueueProcessorThreadState")
public String getQueueProcessorThreadState();
+ @DisplayName("OPERATION: getAvailableExecutorComparatorNames")
+ List<String> getAvailableExecutorComparatorNames();
+
+ @DisplayName("OPERATION: getAvailableExecutorFilterNames")
+ List<String> getAvailableExecutorFilterNames();
+
+ @DisplayName("OPERATION: getLastSuccessfulExecutorInfoRefresh")
+ long getLastSuccessfulExecutorInfoRefresh();
+
}
diff --git a/azkaban-common/src/main/java/azkaban/server/HttpRequestUtils.java b/azkaban-common/src/main/java/azkaban/server/HttpRequestUtils.java
index 80eff74..fc159ec 100644
--- a/azkaban-common/src/main/java/azkaban/server/HttpRequestUtils.java
+++ b/azkaban-common/src/main/java/azkaban/server/HttpRequestUtils.java
@@ -25,9 +25,17 @@ import java.util.Map;
import javax.servlet.ServletException;
import javax.servlet.http.HttpServletRequest;
+import org.apache.commons.lang.StringUtils;
+
import azkaban.executor.ExecutionOptions;
import azkaban.executor.ExecutionOptions.FailureAction;
+import azkaban.executor.ExecutorManagerException;
import azkaban.executor.mail.DefaultMailCreator;
+import azkaban.user.Permission;
+import azkaban.user.Permission.Type;
+import azkaban.user.Role;
+import azkaban.user.User;
+import azkaban.user.UserManager;
import azkaban.utils.JSONUtils;
public class HttpRequestUtils {
@@ -113,6 +121,68 @@ public class HttpRequestUtils {
}
/**
+ * <pre>
+ * Remove following flow param if submitting user is not an Azkaban admin
+ * FLOW_PRIORITY
+ * USE_EXECUTOR
+ * @param userManager
+ * @param options
+ * @param user
+ * </pre>
+ */
+ public static void filterAdminOnlyFlowParams(UserManager userManager,
+ ExecutionOptions options, User user) throws ExecutorManagerException {
+ if (options == null || options.getFlowParameters() == null)
+ return;
+
+ Map<String, String> params = options.getFlowParameters();
+ // is azkaban Admin
+ if (!hasPermission(userManager, user, Type.ADMIN)) {
+ params.remove(ExecutionOptions.FLOW_PRIORITY);
+ params.remove(ExecutionOptions.USE_EXECUTOR);
+ } else {
+ validateIntegerParam(params, ExecutionOptions.FLOW_PRIORITY);
+ validateIntegerParam(params, ExecutionOptions.USE_EXECUTOR);
+ }
+ }
+
+ /**
+ * parse a string as number and throws exception if parsed value is not a
+ * valid integer
+ * @param params
+ * @param paramName
+ * @throws ExecutorManagerException if paramName is not a valid integer
+ */
+ public static boolean validateIntegerParam(Map<String, String> params,
+ String paramName) throws ExecutorManagerException {
+ if (params != null && params.containsKey(paramName)
+ && !StringUtils.isNumeric(params.get(paramName))) {
+ throw new ExecutorManagerException(paramName + " should be an integer");
+ }
+ return true;
+ }
+
+ /**
+ * returns true if user has access of type
+ *
+ * @param userManager
+ * @param user
+ * @param type
+ * @return
+ */
+ public static boolean hasPermission(UserManager userManager, User user,
+ Permission.Type type) {
+ for (String roleName : user.getRoles()) {
+ Role role = userManager.getRole(roleName);
+ if (role.getPermission().isPermissionSet(type)
+ || role.getPermission().isPermissionSet(Permission.Type.ADMIN)) {
+ return true;
+ }
+ }
+ return false;
+ }
+
+ /**
* Checks for the existance of the parameter in the request
*
* @param request
diff --git a/azkaban-common/src/test/java/azkaban/server/HttpRequestUtilsTest.java b/azkaban-common/src/test/java/azkaban/server/HttpRequestUtilsTest.java
new file mode 100644
index 0000000..24f6ef7
--- /dev/null
+++ b/azkaban-common/src/test/java/azkaban/server/HttpRequestUtilsTest.java
@@ -0,0 +1,116 @@
+/*
+ * Copyright 2015 LinkedIn Corp.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package azkaban.server;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+import azkaban.executor.ExecutableFlow;
+import azkaban.executor.ExecutionOptions;
+import azkaban.executor.ExecutorManagerException;
+import azkaban.user.Permission.Type;
+import azkaban.user.User;
+import azkaban.user.UserManager;
+import azkaban.user.UserManagerException;
+import azkaban.utils.TestUtils;
+
+/**
+ * Test class for HttpRequestUtils
+ */
+public final class HttpRequestUtilsTest {
+ /* Helper method to get a test flow and add required properties */
+ public static ExecutableFlow createExecutableFlow() throws IOException {
+ ExecutableFlow flow = TestUtils.createExecutableFlow("exectest1", "exec1");
+ flow.getExecutionOptions().getFlowParameters()
+ .put(ExecutionOptions.FLOW_PRIORITY, "1");
+ flow.getExecutionOptions().getFlowParameters()
+ .put(ExecutionOptions.USE_EXECUTOR, "2");
+ return flow;
+ }
+
+ /* Test that flow properties are removed for non-admin user */
+ @Test
+ public void TestFilterNonAdminOnlyFlowParams() throws IOException,
+ ExecutorManagerException, UserManagerException {
+ ExecutableFlow flow = createExecutableFlow();
+ UserManager manager = TestUtils.createTestXmlUserManager();
+ User user = manager.getUser("testUser", "testUser");
+
+ HttpRequestUtils.filterAdminOnlyFlowParams(manager,
+ flow.getExecutionOptions(), user);
+
+ Assert.assertFalse(flow.getExecutionOptions().getFlowParameters()
+ .containsKey(ExecutionOptions.FLOW_PRIORITY));
+ Assert.assertFalse(flow.getExecutionOptions().getFlowParameters()
+ .containsKey(ExecutionOptions.USE_EXECUTOR));
+ }
+
+ /* Test that flow properties are retained for admin user */
+ @Test
+ public void TestFilterAdminOnlyFlowParams() throws IOException,
+ ExecutorManagerException, UserManagerException {
+ ExecutableFlow flow = createExecutableFlow();
+ UserManager manager = TestUtils.createTestXmlUserManager();
+ User user = manager.getUser("testAdmin", "testAdmin");
+
+ HttpRequestUtils.filterAdminOnlyFlowParams(manager,
+ flow.getExecutionOptions(), user);
+
+ Assert.assertTrue(flow.getExecutionOptions().getFlowParameters()
+ .containsKey(ExecutionOptions.FLOW_PRIORITY));
+ Assert.assertTrue(flow.getExecutionOptions().getFlowParameters()
+ .containsKey(ExecutionOptions.USE_EXECUTOR));
+ }
+
+ /* Test exception, if param is a valid integer */
+ @Test
+ public void testvalidIntegerParam() throws ExecutorManagerException {
+ Map<String, String> params = new HashMap<String, String>();
+ params.put("param1", "123");
+ HttpRequestUtils.validateIntegerParam(params, "param1");
+ }
+
+ /* Test exception, if param is not a valid integer */
+ @Test(expected = ExecutorManagerException.class)
+ public void testInvalidIntegerParam() throws ExecutorManagerException {
+ Map<String, String> params = new HashMap<String, String>();
+ params.put("param1", "1dff2");
+ HttpRequestUtils.validateIntegerParam(params, "param1");
+ }
+
+ /* Verify permission for admin user */
+ @Test
+ public void testHasAdminPermission() throws UserManagerException {
+ UserManager manager = TestUtils.createTestXmlUserManager();
+ User adminUser = manager.getUser("testAdmin", "testAdmin");
+ Assert.assertTrue(HttpRequestUtils.hasPermission(manager, adminUser,
+ Type.ADMIN));
+ }
+
+ /* verify permission for non-admin user */
+ @Test
+ public void testHasOrdinaryPermission() throws UserManagerException {
+ UserManager manager = TestUtils.createTestXmlUserManager();
+ User testUser = manager.getUser("testUser", "testUser");
+ Assert.assertFalse(HttpRequestUtils.hasPermission(manager, testUser,
+ Type.ADMIN));
+ }
+}
diff --git a/azkaban-common/src/test/java/azkaban/utils/TestUtils.java b/azkaban-common/src/test/java/azkaban/utils/TestUtils.java
index e51b575..68b10ee 100644
--- a/azkaban-common/src/test/java/azkaban/utils/TestUtils.java
+++ b/azkaban-common/src/test/java/azkaban/utils/TestUtils.java
@@ -24,17 +24,22 @@ import azkaban.executor.ExecutableFlow;
import azkaban.flow.Flow;
import azkaban.project.Project;
import azkaban.user.User;
+import azkaban.user.UserManager;
+import azkaban.user.XmlUserManager;
/**
* Commonly used utils method for unit/integration tests
*/
public class TestUtils {
+ /* Base resource direcotyr for unit tests */
+ private static final String UNIT_RESOURCE_DIR =
+ "../azkaban-test/src/test/resources";
/* Directory with serialized description of test flows */
- private static final String UNIT_BASE_DIR =
- "../azkaban-test/src/test/resources/executions";
+ private static final String UNIT_EXECUTION_DIR =
+ UNIT_RESOURCE_DIR + "/executions";
public static File getFlowDir(String projectName, String flow) {
- return new File(String.format("%s/%s/%s.flow", UNIT_BASE_DIR, projectName,
+ return new File(String.format("%s/%s/%s.flow", UNIT_EXECUTION_DIR, projectName,
flow));
}
@@ -43,8 +48,8 @@ public class TestUtils {
}
/* Helper method to create an ExecutableFlow from serialized description */
- public static ExecutableFlow createExecutableFlow(String projectName, String flowName)
- throws IOException {
+ public static ExecutableFlow createExecutableFlow(String projectName,
+ String flowName) throws IOException {
File jsonFlowFile = getFlowDir(projectName, flowName);
@SuppressWarnings("unchecked")
HashMap<String, Object> flowObj =
@@ -59,4 +64,13 @@ public class TestUtils {
return execFlow;
}
+
+ /* Helper method to create an XmlUserManager from XML_FILE_PARAM file */
+ public static UserManager createTestXmlUserManager() {
+ Props props = new Props();
+ props.put(XmlUserManager.XML_FILE_PARAM, UNIT_RESOURCE_DIR
+ + "/azkaban-users.xml");
+ UserManager manager = new XmlUserManager(props);
+ return manager;
+ }
}
diff --git a/azkaban-execserver/src/main/java/azkaban/execapp/AzkabanExecutorServer.java b/azkaban-execserver/src/main/java/azkaban/execapp/AzkabanExecutorServer.java
index 235989d..8f5df1a 100644
--- a/azkaban-execserver/src/main/java/azkaban/execapp/AzkabanExecutorServer.java
+++ b/azkaban-execserver/src/main/java/azkaban/execapp/AzkabanExecutorServer.java
@@ -23,6 +23,7 @@ import java.io.InputStream;
import java.io.InputStreamReader;
import java.lang.management.ManagementFactory;
import java.lang.reflect.Constructor;
+import java.net.InetAddress;
import java.util.ArrayList;
import java.util.List;
import java.util.TimeZone;
@@ -501,4 +502,18 @@ public class AzkabanExecutorServer {
return null;
}
}
+
+ /**
+ * Returns host:port combination for currently running executor
+ * @return
+ */
+ public String getExecutorHostPort() {
+ String host = "unkownHost";
+ try {
+ host = InetAddress.getLocalHost().getCanonicalHostName();
+ } catch (Exception e) {
+ logger.error("Failed to fetch LocalHostName");
+ }
+ return host + ":" + props.getInt("executor.port", DEFAULT_PORT_NUMBER);
+ }
}
diff --git a/azkaban-execserver/src/main/java/azkaban/execapp/FlowRunner.java b/azkaban-execserver/src/main/java/azkaban/execapp/FlowRunner.java
index da98b99..7eedee4 100644
--- a/azkaban-execserver/src/main/java/azkaban/execapp/FlowRunner.java
+++ b/azkaban-execserver/src/main/java/azkaban/execapp/FlowRunner.java
@@ -271,6 +271,7 @@ public class FlowRunner extends EventHandler implements Runnable {
this.watcher.setLogger(logger);
}
+ logger.info("Assigned executor : " + AzkabanExecutorServer.getApp().getExecutorHostPort());
logger.info("Running execid:" + execId + " flow:" + flowId + " project:"
+ projectId + " version:" + version);
if (pipelineExecId != null) {
@@ -840,7 +841,7 @@ public class FlowRunner extends EventHandler implements Runnable {
/**
* Configure Azkaban metrics tracking for a new jobRunner instance
- *
+ *
* @param jobRunner
*/
private void configureJobLevelMetrics(JobRunner jobRunner) {
diff --git a/azkaban-test/src/test/resources/azkaban-users.xml b/azkaban-test/src/test/resources/azkaban-users.xml
new file mode 100644
index 0000000..55941a7
--- /dev/null
+++ b/azkaban-test/src/test/resources/azkaban-users.xml
@@ -0,0 +1,5 @@
+<azkaban-users>
+ <user username="testAdmin" password="testAdmin" roles="admin" groups="azkaban" />
+ <user username="testUser" password="testUser" />
+ <role name="admin" permissions="ADMIN" />
+</azkaban-users>
azkaban-webserver/.gitignore 1(+1 -0)
diff --git a/azkaban-webserver/.gitignore b/azkaban-webserver/.gitignore
new file mode 100644
index 0000000..5e56e04
--- /dev/null
+++ b/azkaban-webserver/.gitignore
@@ -0,0 +1 @@
+/bin
diff --git a/azkaban-webserver/src/main/java/azkaban/webapp/servlet/ExecutorServlet.java b/azkaban-webserver/src/main/java/azkaban/webapp/servlet/ExecutorServlet.java
index f3e8c8f..4970fce 100644
--- a/azkaban-webserver/src/main/java/azkaban/webapp/servlet/ExecutorServlet.java
+++ b/azkaban-webserver/src/main/java/azkaban/webapp/servlet/ExecutorServlet.java
@@ -29,6 +29,7 @@ import javax.servlet.http.HttpServletResponse;
import org.apache.commons.lang.StringEscapeUtils;
+import azkaban.executor.ConnectorParams;
import azkaban.executor.ExecutableFlow;
import azkaban.executor.ExecutableFlowBase;
import azkaban.executor.ExecutableNode;
@@ -135,6 +136,12 @@ public class ExecutorServlet extends LoginAbstractAzkabanServlet {
ajaxFetchExecutableFlowInfo(req, resp, ret, session.getUser(), exFlow);
}
}
+ } else if (ajaxName.equals("reloadExecutors")) {
+ ajaxReloadExecutors(req, resp, ret, session.getUser());
+ } else if (ajaxName.equals("enableQueueProcessor")) {
+ ajaxUpdateQueueProcessor(req, resp, ret, session.getUser(), true);
+ } else if (ajaxName.equals("disableQueueProcessor")) {
+ ajaxUpdateQueueProcessor(req, resp, ret, session.getUser(), false);
} else if (ajaxName.equals("getRunning")) {
String projectName = getParam(req, "project");
String flowName = getParam(req, "flow");
@@ -158,6 +165,63 @@ public class ExecutorServlet extends LoginAbstractAzkabanServlet {
}
}
+ /**
+ * <pre>
+ * Enables queueProcessor if @param status is true
+ * disables queueProcessor if @param status is false.
+ * </pre>
+ */
+ private void ajaxUpdateQueueProcessor(HttpServletRequest req,
+ HttpServletResponse resp, HashMap<String, Object> returnMap, User user,
+ boolean enableQueue) {
+ boolean wasSuccess = false;
+ if (HttpRequestUtils.hasPermission(userManager, user, Type.ADMIN)) {
+ try {
+ if (enableQueue) {
+ executorManager.enableQueueProcessorThread();
+ } else {
+ executorManager.disableQueueProcessorThread();
+ }
+ returnMap.put(ConnectorParams.STATUS_PARAM,
+ ConnectorParams.RESPONSE_SUCCESS);
+ wasSuccess = true;
+ } catch (ExecutorManagerException e) {
+ returnMap.put(ConnectorParams.RESPONSE_ERROR, e.getMessage());
+ }
+ } else {
+ returnMap.put(ConnectorParams.RESPONSE_ERROR,
+ "Only Admins are allowed to update queue processor");
+ }
+ if (!wasSuccess) {
+ returnMap.put(ConnectorParams.STATUS_PARAM,
+ ConnectorParams.RESPONSE_ERROR);
+ }
+ }
+
+ /* Reloads executors from DB and azkaban.properties via executorManager */
+ private void ajaxReloadExecutors(HttpServletRequest req,
+ HttpServletResponse resp, HashMap<String, Object> returnMap, User user) {
+ boolean wasSuccess = false;
+ if (HttpRequestUtils.hasPermission(userManager, user, Type.ADMIN)) {
+ try {
+ executorManager.setupExecutors();
+ returnMap.put(ConnectorParams.STATUS_PARAM,
+ ConnectorParams.RESPONSE_SUCCESS);
+ wasSuccess = true;
+ } catch (ExecutorManagerException e) {
+ returnMap.put(ConnectorParams.RESPONSE_ERROR,
+ "Failed to refresh the executors " + e.getMessage());
+ }
+ } else {
+ returnMap.put(ConnectorParams.RESPONSE_ERROR,
+ "Only Admins are allowed to refresh the executors");
+ }
+ if (!wasSuccess) {
+ returnMap.put(ConnectorParams.STATUS_PARAM,
+ ConnectorParams.RESPONSE_ERROR);
+ }
+ }
+
@Override
protected void handlePost(HttpServletRequest req, HttpServletResponse resp,
Session session) throws ServletException, IOException {
@@ -813,14 +877,14 @@ public class ExecutorServlet extends LoginAbstractAzkabanServlet {
if (!options.isSuccessEmailsOverridden()) {
options.setSuccessEmails(flow.getSuccessEmails());
}
- fixFlowPriorityByPermission(options, user);
options.setMailCreator(flow.getMailCreator());
try {
+ HttpRequestUtils.filterAdminOnlyFlowParams(userManager, options, user);
String message =
executorManager.submitExecutableFlow(exflow, user.getUserId());
ret.put("message", message);
- } catch (ExecutorManagerException e) {
+ } catch (Exception e) {
e.printStackTrace();
ret.put("error",
"Error submitting flow " + exflow.getFlowId() + ". " + e.getMessage());
@@ -829,15 +893,6 @@ public class ExecutorServlet extends LoginAbstractAzkabanServlet {
ret.put("execid", exflow.getExecutionId());
}
- /* Reset flow priority if submitting user is not a Azkaban admin */
- private void fixFlowPriorityByPermission(ExecutionOptions options, User user) {
- if (!(options.getFlowParameters().containsKey(
- ExecutionOptions.FLOW_PRIORITY) && hasPermission(user, Type.ADMIN))) {
- options.getFlowParameters().put(ExecutionOptions.FLOW_PRIORITY,
- String.valueOf(ExecutionOptions.DEFAULT_FLOW_PRIORITY));
- }
- }
-
public class ExecutorVelocityHelper {
public String getProjectName(int id) {
Project project = projectManager.getProject(id);
@@ -848,16 +903,4 @@ public class ExecutorServlet extends LoginAbstractAzkabanServlet {
return project.getName();
}
}
-
- /* returns true if user has access of type */
- protected boolean hasPermission(User user, Permission.Type type) {
- for (String roleName : user.getRoles()) {
- Role role = userManager.getRole(roleName);
- if (role.getPermission().isPermissionSet(type)
- || role.getPermission().isPermissionSet(Permission.Type.ADMIN)) {
- return true;
- }
- }
- return false;
- }
}
diff --git a/azkaban-webserver/src/main/java/azkaban/webapp/servlet/ScheduleServlet.java b/azkaban-webserver/src/main/java/azkaban/webapp/servlet/ScheduleServlet.java
index f688820..c6445c0 100644
--- a/azkaban-webserver/src/main/java/azkaban/webapp/servlet/ScheduleServlet.java
+++ b/azkaban-webserver/src/main/java/azkaban/webapp/servlet/ScheduleServlet.java
@@ -62,6 +62,7 @@ import azkaban.sla.SlaOption;
import azkaban.user.Permission;
import azkaban.user.Permission.Type;
import azkaban.user.User;
+import azkaban.user.UserManager;
import azkaban.utils.JSONUtils;
import azkaban.utils.SplitterOutputStream;
import azkaban.utils.Utils;
@@ -73,11 +74,13 @@ public class ScheduleServlet extends LoginAbstractAzkabanServlet {
private static final Logger logger = Logger.getLogger(ScheduleServlet.class);
private ProjectManager projectManager;
private ScheduleManager scheduleManager;
+ private UserManager userManager;
@Override
public void init(ServletConfig config) throws ServletException {
super.init(config);
AzkabanWebServer server = (AzkabanWebServer) getApplication();
+ userManager = server.getUserManager();
projectManager = server.getProjectManager();
scheduleManager = server.getScheduleManager();
}
@@ -656,6 +659,7 @@ public class ScheduleServlet extends LoginAbstractAzkabanServlet {
ExecutionOptions flowOptions = null;
try {
flowOptions = HttpRequestUtils.parseFlowOptions(req);
+ HttpRequestUtils.filterAdminOnlyFlowParams(userManager, flowOptions, user);
} catch (Exception e) {
ret.put("error", e.getMessage());
}
diff --git a/azkaban-webserver/src/main/resources/azkaban/webapp/servlet/velocity/statsPage.vm b/azkaban-webserver/src/main/resources/azkaban/webapp/servlet/velocity/statsPage.vm
index d77bc52..27be342 100644
--- a/azkaban-webserver/src/main/resources/azkaban/webapp/servlet/velocity/statsPage.vm
+++ b/azkaban-webserver/src/main/resources/azkaban/webapp/servlet/velocity/statsPage.vm
@@ -41,7 +41,7 @@
} else {
$('#metricName').empty();
for(var index = 0; index < responseData.metricList.length; index++) {
- $('#metricName').append($('<option value="1">' + responseData.metricList[index] + '</option>'));
+ $('#metricName').append($('<option value="' + responseData.metricList[index] +'">' + responseData.metricList[index] + '</option>'));
}
}
};