Details
diff --git a/azkaban-common/src/main/java/azkaban/executor/ConnectorParams.java b/azkaban-common/src/main/java/azkaban/executor/ConnectorParams.java
index 8bc725f..8abbd61 100644
--- a/azkaban-common/src/main/java/azkaban/executor/ConnectorParams.java
+++ b/azkaban-common/src/main/java/azkaban/executor/ConnectorParams.java
@@ -17,6 +17,7 @@
package azkaban.executor;
public interface ConnectorParams {
+ public static final String EXECUTOR_ID_PARAM = "executorId";
public static final String ACTION_PARAM = "action";
public static final String EXECID_PARAM = "execid";
public static final String SHAREDTOKEN_PARAM = "token";
diff --git a/azkaban-common/src/main/java/azkaban/executor/ExecutableFlowPriorityComparator.java b/azkaban-common/src/main/java/azkaban/executor/ExecutableFlowPriorityComparator.java
new file mode 100644
index 0000000..3050a8d
--- /dev/null
+++ b/azkaban-common/src/main/java/azkaban/executor/ExecutableFlowPriorityComparator.java
@@ -0,0 +1,97 @@
+/*
+ * Copyright 2014 LinkedIn Corp.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package azkaban.executor;
+
+import java.util.Comparator;
+
+import org.apache.log4j.Logger;
+
+import azkaban.utils.Pair;
+
+/**
+ * Comparator implicitly used in priority queue for QueuedExecutions.
+ */
+public final class ExecutableFlowPriorityComparator implements
+ Comparator<Pair<ExecutionReference, ExecutableFlow>> {
+ private static Logger logger = Logger
+ .getLogger(ExecutableFlowPriorityComparator.class);
+
+ /**
+ * <pre>
+ * Sorting order is determined by:-
+ * 1. descending order of priority
+ * 2. if same priority, ascending order of update time
+ * 3. if same priority and updateTime, ascending order of execution id
+ * </pre>
+ *
+ * {@inheritDoc}
+ *
+ * @see java.util.Comparator#compare(java.lang.Object, java.lang.Object)
+ */
+ @Override
+ public int compare(Pair<ExecutionReference, ExecutableFlow> pair1,
+ Pair<ExecutionReference, ExecutableFlow> pair2) {
+ ExecutableFlow exflow1 = null, exflow2 = null;
+ if (pair1 != null && pair1.getSecond() != null) {
+ exflow1 = pair1.getSecond();
+ }
+ if (pair2 != null && pair2.getSecond() != null) {
+ exflow2 = pair2.getSecond();
+ }
+ if (exflow1 == null && exflow2 == null)
+ return 0;
+ else if (exflow1 == null)
+ return -1;
+ else if (exflow2 == null)
+ return 1;
+ else {
+ // descending order of priority
+ int diff = getPriority(exflow2) - getPriority(exflow1);
+ if (diff == 0) {
+ // ascending order of update time, if same priority
+ diff = (int) (exflow1.getUpdateTime() - exflow2.getUpdateTime());
+ }
+ if (diff == 0) {
+ // ascending order of execution id, if same priority and updateTime
+ diff = exflow1.getExecutionId() - exflow2.getExecutionId();
+ }
+ return diff;
+ }
+ }
+
+ /* Helper method to fetch flow priority from flow props */
+ private int getPriority(ExecutableFlow exflow) {
+ ExecutionOptions options = exflow.getExecutionOptions();
+ int priority = ExecutionOptions.DEFAULT_FLOW_PRIORITY;
+ if (options != null
+ && options.getFlowParameters() != null
+ && options.getFlowParameters()
+ .containsKey(ExecutionOptions.FLOW_PRIORITY)) {
+ try {
+ priority =
+ Integer.valueOf(options.getFlowParameters().get(
+ ExecutionOptions.FLOW_PRIORITY));
+ } catch (NumberFormatException ex) {
+ priority = ExecutionOptions.DEFAULT_FLOW_PRIORITY;
+ logger.error(
+ "Failed to parse flow priority for exec_id = "
+ + exflow.getExecutionId(), ex);
+ }
+ }
+ return priority;
+ }
+}
diff --git a/azkaban-common/src/main/java/azkaban/executor/ExecutionOptions.java b/azkaban-common/src/main/java/azkaban/executor/ExecutionOptions.java
index d4cb262..312be44 100644
--- a/azkaban-common/src/main/java/azkaban/executor/ExecutionOptions.java
+++ b/azkaban-common/src/main/java/azkaban/executor/ExecutionOptions.java
@@ -33,6 +33,8 @@ public class ExecutionOptions {
public static final String CONCURRENT_OPTION_SKIP = "skip";
public static final String CONCURRENT_OPTION_PIPELINE = "pipeline";
public static final String CONCURRENT_OPTION_IGNORE = "ignore";
+ public static final String FLOW_PRIORITY = "flowPriority";
+ public static final int DEFAULT_FLOW_PRIORITY = 5;
private static final String FLOW_PARAMETERS = "flowParameters";
private static final String NOTIFY_ON_FIRST_FAILURE = "notifyOnFirstFailure";
diff --git a/azkaban-common/src/main/java/azkaban/executor/ExecutionReference.java b/azkaban-common/src/main/java/azkaban/executor/ExecutionReference.java
index e314206..9d93476 100644
--- a/azkaban-common/src/main/java/azkaban/executor/ExecutionReference.java
+++ b/azkaban-common/src/main/java/azkaban/executor/ExecutionReference.java
@@ -18,16 +18,23 @@ package azkaban.executor;
public class ExecutionReference {
private final int execId;
- private final String host;
- private final int port;
+ private Executor executor;
private long updateTime;
private long nextCheckTime = -1;
private int numErrors = 0;
- public ExecutionReference(int execId, String host, int port) {
+
+ public ExecutionReference(int execId) {
+ this.execId = execId;
+ }
+
+ public ExecutionReference(int execId, Executor executor) {
+ if (executor == null) {
+ throw new IllegalArgumentException(String.format(
+ "Executor cannot be null for exec id: %d ExecutionReference", execId));
+ }
this.execId = execId;
- this.host = host;
- this.port = port;
+ this.executor = executor;
}
public void setUpdateTime(long updateTime) {
@@ -51,11 +58,11 @@ public class ExecutionReference {
}
public String getHost() {
- return host;
+ return executor.getHost();
}
public int getPort() {
- return port;
+ return executor.getPort();
}
public int getNumErrors() {
@@ -65,4 +72,12 @@ public class ExecutionReference {
public void setNumErrors(int numErrors) {
this.numErrors = numErrors;
}
-}
+
+ public void setExecutor(Executor executor) {
+ this.executor = executor;
+ }
+
+ public Executor getExecutor() {
+ return executor;
+ }
+}
\ No newline at end of file
diff --git a/azkaban-common/src/main/java/azkaban/executor/ExecutorLoader.java b/azkaban-common/src/main/java/azkaban/executor/ExecutorLoader.java
index 1bfa910..6120002 100644
--- a/azkaban-common/src/main/java/azkaban/executor/ExecutorLoader.java
+++ b/azkaban-common/src/main/java/azkaban/executor/ExecutorLoader.java
@@ -171,6 +171,50 @@ public interface ExecutorLoader {
public void removeActiveExecutableReference(int execId)
throws ExecutorManagerException;
+ /**
+ * <pre>
+ * Set an executor Id to an execution
+ * Note:-
+ * 1. throws an Exception in case of a SQL issue
+ * 2. throws an Exception in case executionId or executorId do not exist
+ * </pre>
+ *
+ * @param executorId
+ * @param execId
+ * @throws ExecutorManagerException
+ */
+ public void assignExecutor(int executorId, int execId)
+ throws ExecutorManagerException;
+
+ /**
+ * <pre>
+ * Fetches an executor corresponding to a given execution
+ * Note:-
+ * 1. throws an Exception in case of a SQL issue
+ * 2. return null when no executor is found with the given executionId
+ * </pre>
+ *
+ * @param executionId
+ * @return fetched Executor
+ * @throws ExecutorManagerException
+ */
+ public Executor fetchExecutorByExecutionId(int executionId)
+ throws ExecutorManagerException;
+
+ /**
+ * <pre>
+ * Fetch queued flows which have not yet dispatched
+ * Note:
+ * 1. throws an Exception in case of a SQL issue
+ * 2. return empty list when no queued execution is found
+ * </pre>
+ *
+ * @return List of queued flows and corresponding execution reference
+ * @throws ExecutorManagerException
+ */
+ public List<Pair<ExecutionReference, ExecutableFlow>> fetchQueuedFlows()
+ throws ExecutorManagerException;
+
public boolean updateExecutableReference(int execId, long updateTime)
throws ExecutorManagerException;
diff --git a/azkaban-common/src/main/java/azkaban/executor/ExecutorManager.java b/azkaban-common/src/main/java/azkaban/executor/ExecutorManager.java
index c86648e..c97ef00 100644
--- a/azkaban-common/src/main/java/azkaban/executor/ExecutorManager.java
+++ b/azkaban-common/src/main/java/azkaban/executor/ExecutorManager.java
@@ -22,13 +22,17 @@ import java.lang.Thread.State;
import java.net.URI;
import java.net.URISyntaxException;
import java.util.ArrayList;
+import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
+import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.Executors;
+import java.util.concurrent.PriorityBlockingQueue;
import org.apache.commons.lang.StringUtils;
import org.apache.http.client.HttpClient;
@@ -59,10 +63,19 @@ import azkaban.utils.Props;
*/
public class ExecutorManager extends EventHandler implements
ExecutorManagerAdapter {
+ static final String AZKABAN_QUEUEPROCESSING_ENABLED =
+ "azkaban.queueprocessing.enabled";
+ static final String AZKABAN_USE_MULTIPLE_EXECUTORS =
+ "azkaban.use.multiple.executors";
+ private static final String AZKABAN_WEBSERVER_QUEUE_SIZE =
+ "azkaban.webserver.queue.size";
+ private static final String AZKABAN_ACTIVE_EXECUTOR_REFRESHINTERVAL_IN_MS =
+ "azkaban.activeexecutor.refresh.milisecinterval";
+ private static final String AZKABAN_ACTIVE_EXECUTOR_REFRESHINTERVAL_IN_FLOW =
+ "azkaban.activeexecutor.refresh.flowinterval";
+
private static Logger logger = Logger.getLogger(ExecutorManager.class);
private ExecutorLoader executorLoader;
- private String executorHost;
- private int executorPort;
private CleanerThread cleanerThread;
@@ -71,8 +84,13 @@ public class ExecutorManager extends EventHandler implements
private ConcurrentHashMap<Integer, ExecutableFlow> recentlyFinished =
new ConcurrentHashMap<Integer, ExecutableFlow>();
- private ExecutingManagerUpdaterThread executingManager;
+ QueuedExecutions queuedFlows;
+ final private Set<Executor> activeExecutors = new HashSet<Executor>();
+ private QueueProcessorThread queueProcessor;
+
+ private ExecutingManagerUpdaterThread executingManager;
+ // 12 weeks
private static final long DEFAULT_EXECUTION_LOGS_RETENTION_MS = 3 * 4 * 7
* 24 * 60 * 60 * 1000l;
private long lastCleanerThreadCheckTime = -1;
@@ -84,12 +102,16 @@ public class ExecutorManager extends EventHandler implements
File cacheDir;
+ final Props azkProps;
+
public ExecutorManager(Props props, ExecutorLoader loader,
Map<String, Alerter> alters) throws ExecutorManagerException {
+ azkProps = props;
+
this.executorLoader = loader;
+ this.setupExecutors();
this.loadRunningFlows();
- executorHost = props.getString("executor.host", "localhost");
- executorPort = props.getInt("executor.port");
+ this.loadQueuedFlows();
alerters = alters;
@@ -98,14 +120,139 @@ public class ExecutorManager extends EventHandler implements
executingManager = new ExecutingManagerUpdaterThread();
executingManager.start();
+ if(isMultiExecutorMode()) {
+ queueProcessor =
+ new QueueProcessorThread(azkProps.getBoolean(
+ AZKABAN_QUEUEPROCESSING_ENABLED, true), azkProps.getLong(
+ AZKABAN_ACTIVE_EXECUTOR_REFRESHINTERVAL_IN_MS, 1000),
+ azkProps
+ .getInt(AZKABAN_ACTIVE_EXECUTOR_REFRESHINTERVAL_IN_FLOW, 1000));
+ queueProcessor.start();
+ }
+
long executionLogsRetentionMs =
- props.getLong("execution.logs.retention.ms",
- DEFAULT_EXECUTION_LOGS_RETENTION_MS);
+ props.getLong("execution.logs.retention.ms",
+ DEFAULT_EXECUTION_LOGS_RETENTION_MS);
+
+ queuedFlows =
+ new QueuedExecutions(props.getLong(AZKABAN_WEBSERVER_QUEUE_SIZE, 100000));
+
cleanerThread = new CleanerThread(executionLogsRetentionMs);
cleanerThread.start();
}
+ /**
+ * <pre>
+ * Setup activeExecutors using azkaban.properties and database executors
+ * Note:
+ * 1. If azkaban.use.multiple.executors is set true, this method will
+ * load all active executors
+ * 2. In local mode, If a local executor is specified and it is missing from db,
+ * this method add local executor as active in DB
+ * 3. In local mode, If a local executor is specified and it is marked inactive in db,
+ * this method will convert local executor as active in DB
+ * </pre>
+ *
+ * @throws ExecutorManagerException
+ */
+ public void setupExecutors() throws ExecutorManagerException {
+ Set<Executor> newExecutors = new HashSet<Executor>();
+
+ if (isMultiExecutorMode()) {
+ logger.info("Initializing multi executors from database");
+ newExecutors.addAll(executorLoader.fetchActiveExecutors());
+ } else if (azkProps.containsKey("executor.port")) {
+ // Add local executor, if specified as per properties
+ String executorHost = azkProps.getString("executor.host", "localhost");
+ int executorPort = azkProps.getInt("executor.port");
+ logger.info(String.format("Initializing local executor %s:%d",
+ executorHost, executorPort));
+ Executor executor =
+ executorLoader.fetchExecutor(executorHost, executorPort);
+ if (executor == null) {
+ executor = executorLoader.addExecutor(executorHost, executorPort);
+ } else if (!executor.isActive()) {
+ executor.setActive(true);
+ executorLoader.updateExecutor(executor);
+ }
+ newExecutors.add(new Executor(executor.getId(), executorHost,
+ executorPort, true));
+ }
+
+ if (newExecutors.isEmpty()) {
+ throw new ExecutorManagerException("No active executor found");
+ } else if(newExecutors.size() > 1 && !isMultiExecutorMode()) {
+ throw new ExecutorManagerException("Multiple local executors specified");
+ } else {
+ // clear all active executors, only if we have at least one new active
+ // executors
+ activeExecutors.clear();
+ activeExecutors.addAll(newExecutors);
+ }
+ }
+
+ private boolean isMultiExecutorMode() {
+ return azkProps.getBoolean(AZKABAN_USE_MULTIPLE_EXECUTORS, false);
+ }
+
+ /**
+ * Refresh Executor stats for all the actie executors in this executorManager
+ */
+ private void refreshExecutors() {
+ synchronized (activeExecutors) {
+ // TODO: rest api call to refresh executor stats
+ }
+ }
+
+ /**
+ * Disable flow dispatching in QueueProcessor
+ *
+ * @throws ExecutorManagerException
+ */
+ public void disableQueueProcessorThread() throws ExecutorManagerException {
+ if (isMultiExecutorMode()) {
+ queueProcessor.setActive(false);
+ } else {
+ throw new ExecutorManagerException(
+ "Cannot disable QueueProcessor in local mode");
+ }
+ }
+
+ /**
+ * Enable flow dispatching in QueueProcessor
+ *
+ * @throws ExecutorManagerException
+ */
+ public void enableQueueProcessorThread() throws ExecutorManagerException {
+ if (isMultiExecutorMode()) {
+ queueProcessor.setActive(true);
+ } else {
+ throw new ExecutorManagerException(
+ "Cannot enable QueueProcessor in local mode");
+ }
+ }
+
+ public State getQueueProcessorThreadState() {
+ if (isMultiExecutorMode())
+ return queueProcessor.getState();
+ else
+ return State.NEW; // not started in local mode
+ }
+
+ /**
+ * Returns state of QueueProcessor False, no flow is being dispatched True ,
+ * flows are being dispatched as expected
+ *
+ * @return
+ */
+ public boolean isQueueProcessorThreadActive() {
+ if (isMultiExecutorMode())
+ return queueProcessor.isActive();
+ else
+ return false;
+ }
+
@Override
public State getExecutorManagerThreadState() {
return executingManager.getState();
@@ -130,10 +277,33 @@ public class ExecutorManager extends EventHandler implements
}
@Override
+ public Collection<Executor> getAllActiveExecutors() {
+ return Collections.unmodifiableCollection(activeExecutors);
+ }
+
+ /**
+ *
+ * {@inheritDoc}
+ *
+ * @see azkaban.executor.ExecutorManagerAdapter#fetchExecutor(int)
+ */
+ @Override
+ public Executor fetchExecutor(int executorId) throws ExecutorManagerException {
+ for (Executor executor : activeExecutors) {
+ if (executor.getId() == executorId) {
+ return executor;
+ }
+ }
+ return executorLoader.fetchExecutor(executorId);
+ }
+
+ @Override
public Set<String> getPrimaryServerHosts() {
// Only one for now. More probably later.
HashSet<String> ports = new HashSet<String>();
- ports.add(executorHost + ":" + executorPort);
+ for (Executor executor : activeExecutors) {
+ ports.add(executor.getHost() + ":" + executor.getPort());
+ }
return ports;
}
@@ -141,11 +311,8 @@ public class ExecutorManager extends EventHandler implements
public Set<String> getAllActiveExecutorServerHosts() {
// Includes non primary server/hosts
HashSet<String> ports = new HashSet<String>();
- ports.add(executorHost + ":" + executorPort);
- for (Pair<ExecutionReference, ExecutableFlow> running : runningFlows
- .values()) {
- ExecutionReference ref = running.getFirst();
- ports.add(ref.getHost() + ":" + ref.getPort());
+ for (Executor executor : activeExecutors) {
+ ports.add(executor.getHost() + ":" + executor.getPort());
}
return ports;
@@ -153,59 +320,194 @@ public class ExecutorManager extends EventHandler implements
private void loadRunningFlows() throws ExecutorManagerException {
runningFlows.putAll(executorLoader.fetchActiveFlows());
+ // Finalize all flows which were running on an executor which is now
+ // inactive
+ for (Pair<ExecutionReference, ExecutableFlow> pair : runningFlows.values()) {
+ if (!activeExecutors.contains(pair.getFirst().getExecutor())) {
+ finalizeFlows(pair.getSecond());
+ }
+ }
+ }
+
+ /*
+ * load queued flows i.e with active_execution_reference and not assigned to
+ * any executor
+ */
+ private void loadQueuedFlows() throws ExecutorManagerException {
+ for (Pair<ExecutionReference, ExecutableFlow> pair : executorLoader
+ .fetchQueuedFlows()) {
+ queuedFlows.enqueue(pair.getSecond(), pair.getFirst());
+ }
}
+ /**
+ * Gets a list of all the active (running flows and non-dispatched flows)
+ * executions for a given project and flow {@inheritDoc}
+ *
+ * @see azkaban.executor.ExecutorManagerAdapter#getRunningFlows(int,
+ * java.lang.String)
+ */
@Override
public List<Integer> getRunningFlows(int projectId, String flowId) {
- ArrayList<Integer> executionIds = new ArrayList<Integer>();
- for (Pair<ExecutionReference, ExecutableFlow> ref : runningFlows.values()) {
+ List<Integer> executionIds = new ArrayList<Integer>();
+ executionIds.addAll(getRunningFlowsHelper(projectId, flowId,
+ queuedFlows.getAllEntries()));
+ executionIds.addAll(getRunningFlowsHelper(projectId, flowId,
+ runningFlows.values()));
+ return executionIds;
+ }
+
+ /* Helper method for getRunningFlows */
+ private List<Integer> getRunningFlowsHelper(int projectId, String flowId,
+ Collection<Pair<ExecutionReference, ExecutableFlow>> collection) {
+ List<Integer> executionIds = new ArrayList<Integer>();
+ for (Pair<ExecutionReference, ExecutableFlow> ref : collection) {
if (ref.getSecond().getFlowId().equals(flowId)
- && ref.getSecond().getProjectId() == projectId) {
+ && ref.getSecond().getProjectId() == projectId) {
executionIds.add(ref.getFirst().getExecId());
}
}
return executionIds;
}
+ /**
+ *
+ * {@inheritDoc}
+ *
+ * @see azkaban.executor.ExecutorManagerAdapter#getActiveFlowsWithExecutor()
+ */
+ @Override
+ public List<Pair<ExecutableFlow, Executor>> getActiveFlowsWithExecutor()
+ throws IOException {
+ List<Pair<ExecutableFlow, Executor>> flows =
+ new ArrayList<Pair<ExecutableFlow, Executor>>();
+ getActiveFlowsWithExecutorHelper(flows, queuedFlows.getAllEntries());
+ getActiveFlowsWithExecutorHelper(flows, runningFlows.values());
+ return flows;
+ }
+
+ /* Helper method for getActiveFlowsWithExecutor */
+ private void getActiveFlowsWithExecutorHelper(
+ List<Pair<ExecutableFlow, Executor>> flows,
+ Collection<Pair<ExecutionReference, ExecutableFlow>> collection) {
+ for (Pair<ExecutionReference, ExecutableFlow> ref : collection) {
+ flows.add(new Pair<ExecutableFlow, Executor>(ref.getSecond(), ref
+ .getFirst().getExecutor()));
+ }
+ }
+
+ /**
+ * Checks whether the given flow has an active (running, non-dispatched)
+ * executions {@inheritDoc}
+ *
+ * @see azkaban.executor.ExecutorManagerAdapter#isFlowRunning(int,
+ * java.lang.String)
+ */
@Override
public boolean isFlowRunning(int projectId, String flowId) {
- for (Pair<ExecutionReference, ExecutableFlow> ref : runningFlows.values()) {
+ boolean isRunning = false;
+ isRunning =
+ isRunning
+ || isFlowRunningHelper(projectId, flowId, queuedFlows.getAllEntries());
+ isRunning =
+ isRunning
+ || isFlowRunningHelper(projectId, flowId, runningFlows.values());
+ return isRunning;
+ }
+
+ /* Search a running flow in a collection */
+ private boolean isFlowRunningHelper(int projectId, String flowId,
+ Collection<Pair<ExecutionReference, ExecutableFlow>> collection) {
+ for (Pair<ExecutionReference, ExecutableFlow> ref : collection) {
if (ref.getSecond().getProjectId() == projectId
- && ref.getSecond().getFlowId().equals(flowId)) {
+ && ref.getSecond().getFlowId().equals(flowId)) {
return true;
}
}
return false;
}
+ /**
+ * Fetch ExecutableFlow from an active (running, non-dispatched) or from
+ * database {@inheritDoc}
+ *
+ * @see azkaban.executor.ExecutorManagerAdapter#getExecutableFlow(int)
+ */
@Override
public ExecutableFlow getExecutableFlow(int execId)
- throws ExecutorManagerException {
- Pair<ExecutionReference, ExecutableFlow> active = runningFlows.get(execId);
- if (active == null) {
+ throws ExecutorManagerException {
+ if (runningFlows.containsKey(execId)) {
+ return runningFlows.get(execId).getSecond();
+ } else if (queuedFlows.hasExecution(execId)) {
+ return queuedFlows.getFlow(execId);
+ } else {
return executorLoader.fetchExecutableFlow(execId);
}
- return active.getSecond();
}
+ /**
+ * Get all active (running, non-dispatched) flows
+ *
+ * {@inheritDoc}
+ *
+ * @see azkaban.executor.ExecutorManagerAdapter#getRunningFlows()
+ */
@Override
public List<ExecutableFlow> getRunningFlows() {
ArrayList<ExecutableFlow> flows = new ArrayList<ExecutableFlow>();
- for (Pair<ExecutionReference, ExecutableFlow> ref : runningFlows.values()) {
+ getActiveFlowHelper(flows, queuedFlows.getAllEntries());
+ getActiveFlowHelper(flows, runningFlows.values());
+ return flows;
+ }
+
+ /*
+ * Helper method to get all running flows from a Pair<ExecutionReference,
+ * ExecutableFlow collection
+ */
+ private void getActiveFlowHelper(ArrayList<ExecutableFlow> flows,
+ Collection<Pair<ExecutionReference, ExecutableFlow>> collection) {
+ for (Pair<ExecutionReference, ExecutableFlow> ref : collection) {
flows.add(ref.getSecond());
}
- return flows;
}
+ /**
+ * Get execution Ids of all active (running, non-dispatched) flows
+ *
+ * {@inheritDoc}
+ *
+ * @see azkaban.executor.ExecutorManagerAdapter#getRunningFlows()
+ */
public String getRunningFlowIds() {
List<Integer> allIds = new ArrayList<Integer>();
- for (Pair<ExecutionReference, ExecutableFlow> ref : runningFlows.values()) {
- allIds.add(ref.getSecond().getExecutionId());
- }
+ getRunningFlowsIdsHelper(allIds, queuedFlows.getAllEntries());
+ getRunningFlowsIdsHelper(allIds, runningFlows.values());
+ Collections.sort(allIds);
+ return allIds.toString();
+ }
+
+ /**
+ * Get execution Ids of all non-dispatched flows
+ *
+ * {@inheritDoc}
+ *
+ * @see azkaban.executor.ExecutorManagerAdapter#getRunningFlows()
+ */
+ public String getQueuedFlowIds() {
+ List<Integer> allIds = new ArrayList<Integer>();
+ getRunningFlowsIdsHelper(allIds, queuedFlows.getAllEntries());
Collections.sort(allIds);
return allIds.toString();
}
+ /* Helper method to flow ids of all running flows */
+ private void getRunningFlowsIdsHelper(List<Integer> allIds,
+ Collection<Pair<ExecutionReference, ExecutableFlow>> collection) {
+ for (Pair<ExecutionReference, ExecutableFlow> ref : collection) {
+ allIds.add(ref.getSecond().getExecutionId());
+ }
+ }
+
public List<ExecutableFlow> getRecentlyFinishedFlows() {
return new ArrayList<ExecutableFlow>(recentlyFinished.values());
}
@@ -371,18 +673,30 @@ public class ExecutorManager extends EventHandler implements
}
}
+ /**
+ * if flows was dispatched to an executor, cancel by calling Executor else if
+ * flow is still in queue, remove from queue and finalize {@inheritDoc}
+ *
+ * @see azkaban.executor.ExecutorManagerAdapter#cancelFlow(azkaban.executor.ExecutableFlow,
+ * java.lang.String)
+ */
@Override
public void cancelFlow(ExecutableFlow exFlow, String userId)
- throws ExecutorManagerException {
+ throws ExecutorManagerException {
synchronized (exFlow) {
- Pair<ExecutionReference, ExecutableFlow> pair =
+ if (runningFlows.containsKey(exFlow.getExecutionId())) {
+ Pair<ExecutionReference, ExecutableFlow> pair =
runningFlows.get(exFlow.getExecutionId());
- if (pair == null) {
+ callExecutorServer(pair.getFirst(), ConnectorParams.CANCEL_ACTION,
+ userId);
+ } else if (queuedFlows.hasExecution(exFlow.getExecutionId())) {
+ queuedFlows.dequeue(exFlow.getExecutionId());
+ finalizeFlows(exFlow);
+ } else {
throw new ExecutorManagerException("Execution "
- + exFlow.getExecutionId() + " of flow " + exFlow.getFlowId()
- + " isn't running.");
+ + exFlow.getExecutionId() + " of flow " + exFlow.getFlowId()
+ + " isn't running.");
}
- callExecutorServer(pair.getFirst(), ConnectorParams.CANCEL_ACTION, userId);
}
}
@@ -539,80 +853,97 @@ public class ExecutorManager extends EventHandler implements
@Override
public String submitExecutableFlow(ExecutableFlow exflow, String userId)
- throws ExecutorManagerException {
+ throws ExecutorManagerException {
synchronized (exflow) {
- logger.info("Submitting execution flow " + exflow.getFlowId() + " by "
- + userId);
-
- int projectId = exflow.getProjectId();
String flowId = exflow.getFlowId();
- exflow.setSubmitUser(userId);
- exflow.setSubmitTime(System.currentTimeMillis());
-
- List<Integer> running = getRunningFlows(projectId, flowId);
- ExecutionOptions options = exflow.getExecutionOptions();
- if (options == null) {
- options = new ExecutionOptions();
- }
+ logger.info("Submitting execution flow " + flowId + " by " + userId);
String message = "";
- if (options.getDisabledJobs() != null) {
- applyDisabledJobs(options.getDisabledJobs(), exflow);
- }
+ if (queuedFlows.isFull()) {
+ message =
+ String
+ .format(
+ "Failed to submit %s for project %s. Azkaban has overrun its webserver queue capacity",
+ flowId, exflow.getProjectName());
+ logger.error(message);
+ } else {
+ int projectId = exflow.getProjectId();
+ exflow.setSubmitUser(userId);
+ exflow.setSubmitTime(System.currentTimeMillis());
+
+ List<Integer> running = getRunningFlows(projectId, flowId);
+
+ ExecutionOptions options = exflow.getExecutionOptions();
+ if (options == null) {
+ options = new ExecutionOptions();
+ }
+
+ if (options.getDisabledJobs() != null) {
+ applyDisabledJobs(options.getDisabledJobs(), exflow);
+ }
- if (!running.isEmpty()) {
- if (options.getConcurrentOption().equals(
+ if (!running.isEmpty()) {
+ if (options.getConcurrentOption().equals(
ExecutionOptions.CONCURRENT_OPTION_PIPELINE)) {
- Collections.sort(running);
- Integer runningExecId = running.get(running.size() - 1);
+ Collections.sort(running);
+ Integer runningExecId = running.get(running.size() - 1);
- options.setPipelineExecutionId(runningExecId);
- message =
+ options.setPipelineExecutionId(runningExecId);
+ message =
"Flow " + flowId + " is already running with exec id "
- + runningExecId + ". Pipelining level "
- + options.getPipelineLevel() + ". \n";
- } else if (options.getConcurrentOption().equals(
+ + runningExecId + ". Pipelining level "
+ + options.getPipelineLevel() + ". \n";
+ } else if (options.getConcurrentOption().equals(
ExecutionOptions.CONCURRENT_OPTION_SKIP)) {
- throw new ExecutorManagerException("Flow " + flowId
+ throw new ExecutorManagerException("Flow " + flowId
+ " is already running. Skipping execution.",
ExecutorManagerException.Reason.SkippedExecution);
- } else {
- // The settings is to run anyways.
- message =
+ } else {
+ // The settings is to run anyways.
+ message =
"Flow " + flowId + " is already running with exec id "
- + StringUtils.join(running, ",")
- + ". Will execute concurrently. \n";
+ + StringUtils.join(running, ",")
+ + ". Will execute concurrently. \n";
+ }
}
- }
- boolean memoryCheck = !ProjectWhitelist.isProjectWhitelisted(exflow.getProjectId(),
- ProjectWhitelist.WhitelistType.MemoryCheck);
- options.setMemoryCheck(memoryCheck);
-
- // The exflow id is set by the loader. So it's unavailable until after
- // this call.
- executorLoader.uploadExecutableFlow(exflow);
-
- // We create an active flow reference in the datastore. If the upload
- // fails, we remove the reference.
- ExecutionReference reference =
- new ExecutionReference(exflow.getExecutionId(), executorHost,
- executorPort);
- executorLoader.addActiveExecutableReference(reference);
- try {
- callExecutorServer(reference, ConnectorParams.EXECUTE_ACTION);
- runningFlows.put(exflow.getExecutionId(),
- new Pair<ExecutionReference, ExecutableFlow>(reference, exflow));
+ boolean memoryCheck =
+ !ProjectWhitelist.isProjectWhitelisted(exflow.getProjectId(),
+ ProjectWhitelist.WhitelistType.MemoryCheck);
+ options.setMemoryCheck(memoryCheck);
+
+ // The exflow id is set by the loader. So it's unavailable until after
+ // this call.
+ executorLoader.uploadExecutableFlow(exflow);
+ // We create an active flow reference in the datastore. If the upload
+ // fails, we remove the reference.
+ ExecutionReference reference =
+ new ExecutionReference(exflow.getExecutionId());
+
+ if (isMultiExecutorMode()) {
+ //Take MultiExecutor route
+ executorLoader.addActiveExecutableReference(reference);
+ queuedFlows.enqueue(exflow, reference);
+ } else {
+ // assign only local executor we have
+ reference.setExecutor(activeExecutors.iterator().next());
+ executorLoader.addActiveExecutableReference(reference);
+ try {
+ callExecutorServer(reference, ConnectorParams.EXECUTE_ACTION);
+ runningFlows.put(exflow.getExecutionId(),
+ new Pair<ExecutionReference, ExecutableFlow>(reference, exflow));
+ } catch (ExecutorManagerException e) {
+ executorLoader.removeActiveExecutableReference(reference
+ .getExecId());
+ throw e;
+ }
+ }
message +=
- "Execution submitted successfully with exec id "
- + exflow.getExecutionId();
- } catch (ExecutorManagerException e) {
- executorLoader.removeActiveExecutableReference(reference.getExecId());
- throw e;
+ "Execution submitted successfully with exec id "
+ + exflow.getExecutionId();
}
-
return message;
}
}
@@ -725,13 +1056,20 @@ public class ExecutorManager extends EventHandler implements
/**
* Manage servlet call for stats servlet in Azkaban execution server
* {@inheritDoc}
- * @see azkaban.executor.ExecutorManagerAdapter#callExecutorStats(java.lang.String, azkaban.utils.Pair[])
+ *
+ * @throws ExecutorManagerException
+ *
+ * @see azkaban.executor.ExecutorManagerAdapter#callExecutorStats(java.lang.String,
+ * azkaban.utils.Pair[])
*/
@Override
- public Map<String, Object> callExecutorStats(String action, Pair<String, String>... params) throws IOException {
+ public Map<String, Object> callExecutorStats(int executorId, String action,
+ Pair<String, String>... params) throws IOException, ExecutorManagerException {
URIBuilder builder = new URIBuilder();
- builder.setScheme("http").setHost(executorHost).setPort(executorPort).setPath("/stats");
+ Executor executor = fetchExecutor(executorId);
+ builder.setScheme("http").setHost(executor.getHost())
+ .setPort(executor.getPort()).setPath("/stats");
builder.setParameter(ConnectorParams.ACTION_PARAM, action);
@@ -763,7 +1101,7 @@ public class ExecutorManager extends EventHandler implements
@SuppressWarnings("unchecked")
Map<String, Object> jsonResponse =
- (Map<String, Object>) JSONUtils.parseJSONFromString(response);
+ (Map<String, Object>) JSONUtils.parseJSONFromString(response);
return jsonResponse;
}
@@ -815,6 +1153,9 @@ public class ExecutorManager extends EventHandler implements
@Override
public void shutdown() {
+ if (isMultiExecutorMode()) {
+ queueProcessor.shutdown();
+ }
executingManager.shutdown();
}
@@ -846,7 +1187,7 @@ public class ExecutorManager extends EventHandler implements
lastThreadCheckTime = System.currentTimeMillis();
updaterStage = "Starting update all flows.";
- Map<ConnectionInfo, List<ExecutableFlow>> exFlowMap =
+ Map<Executor, List<ExecutableFlow>> exFlowMap =
getFlowToExecutorMap();
ArrayList<ExecutableFlow> finishedFlows =
new ArrayList<ExecutableFlow>();
@@ -854,16 +1195,16 @@ public class ExecutorManager extends EventHandler implements
new ArrayList<ExecutableFlow>();
if (exFlowMap.size() > 0) {
- for (Map.Entry<ConnectionInfo, List<ExecutableFlow>> entry : exFlowMap
+ for (Map.Entry<Executor, List<ExecutableFlow>> entry : exFlowMap
.entrySet()) {
List<Long> updateTimesList = new ArrayList<Long>();
List<Integer> executionIdsList = new ArrayList<Integer>();
- ConnectionInfo connection = entry.getKey();
+ Executor executor = entry.getKey();
updaterStage =
- "Starting update flows on " + connection.getHost() + ":"
- + connection.getPort();
+ "Starting update flows on " + executor.getHost() + ":"
+ + executor.getPort();
// We pack the parameters of the same host together before we
// query.
@@ -881,8 +1222,8 @@ public class ExecutorManager extends EventHandler implements
Map<String, Object> results = null;
try {
results =
- callExecutorServer(connection.getHost(),
- connection.getPort(), ConnectorParams.UPDATE_ACTION,
+ callExecutorServer(executor.getHost(),
+ executor.getPort(), ConnectorParams.UPDATE_ACTION,
null, null, executionIds, updateTimes);
} catch (IOException e) {
logger.error(e);
@@ -1222,15 +1563,16 @@ public class ExecutorManager extends EventHandler implements
}
}
- private Map<ConnectionInfo, List<ExecutableFlow>> getFlowToExecutorMap() {
- HashMap<ConnectionInfo, List<ExecutableFlow>> exFlowMap =
- new HashMap<ConnectionInfo, List<ExecutableFlow>>();
+ /* Group Executable flow by Executors to reduce number of REST calls */
+ private Map<Executor, List<ExecutableFlow>> getFlowToExecutorMap() {
+ HashMap<Executor, List<ExecutableFlow>> exFlowMap =
+ new HashMap<Executor, List<ExecutableFlow>>();
- ConnectionInfo lastPort = new ConnectionInfo(executorHost, executorPort);
for (Pair<ExecutionReference, ExecutableFlow> runningFlow : runningFlows
- .values()) {
+ .values()) {
ExecutionReference ref = runningFlow.getFirst();
ExecutableFlow flow = runningFlow.getSecond();
+ Executor executor = ref.getExecutor();
// We can set the next check time to prevent the checking of certain
// flows.
@@ -1238,16 +1580,10 @@ public class ExecutorManager extends EventHandler implements
continue;
}
- // Just a silly way to reduce object creation construction of objects
- // since it's most likely that the values will be the same.
- if (!lastPort.isEqual(ref.getHost(), ref.getPort())) {
- lastPort = new ConnectionInfo(ref.getHost(), ref.getPort());
- }
-
- List<ExecutableFlow> flows = exFlowMap.get(lastPort);
+ List<ExecutableFlow> flows = exFlowMap.get(executor);
if (flows == null) {
flows = new ArrayList<ExecutableFlow>();
- exFlowMap.put(lastPort, flows);
+ exFlowMap.put(executor, flows);
}
flows.add(flow);
@@ -1256,61 +1592,6 @@ public class ExecutorManager extends EventHandler implements
return exFlowMap;
}
- private static class ConnectionInfo {
- private String host;
- private int port;
-
- public ConnectionInfo(String host, int port) {
- this.host = host;
- this.port = port;
- }
-
- @SuppressWarnings("unused")
- private ConnectionInfo getOuterType() {
- return ConnectionInfo.this;
- }
-
- public boolean isEqual(String host, int port) {
- return this.port == port && this.host.equals(host);
- }
-
- public String getHost() {
- return host;
- }
-
- public int getPort() {
- return port;
- }
-
- @Override
- public int hashCode() {
- final int prime = 31;
- int result = 1;
- result = prime * result + ((host == null) ? 0 : host.hashCode());
- result = prime * result + port;
- return result;
- }
-
- @Override
- public boolean equals(Object obj) {
- if (this == obj)
- return true;
- if (obj == null)
- return false;
- if (getClass() != obj.getClass())
- return false;
- ConnectionInfo other = (ConnectionInfo) obj;
- if (host == null) {
- if (other.host != null)
- return false;
- } else if (!host.equals(other.host))
- return false;
- if (port != other.port)
- return false;
- return true;
- }
- }
-
@Override
public int getExecutableFlows(int projectId, String flowId, int from,
int length, List<ExecutableFlow> outputList)
@@ -1384,4 +1665,182 @@ public class ExecutorManager extends EventHandler implements
- executionLogsRetentionMs);
}
}
-}
+
+ /*
+ * This thread is responsible for processing queued flows using dispatcher and
+ * making rest api calls to executor server
+ */
+ private class QueueProcessorThread extends Thread {
+ private static final long QUEUE_PROCESSOR_WAIT_IN_MS = 1000;
+ private static final int MAX_DISPATCHING_ERRORS_PERMITTED = 5;
+ private final long activeExecutorRefreshWindowInMilisec;
+ private final int activeExecutorRefreshWindowInFlows;
+
+ private volatile boolean shutdown = false;
+ private volatile boolean isActive = true;
+
+ public QueueProcessorThread(boolean isActive,
+ long activeExecutorRefreshWindowInTime,
+ int activeExecutorRefreshWindowInFlows) {
+ setActive(isActive);
+ this.activeExecutorRefreshWindowInFlows =
+ activeExecutorRefreshWindowInFlows;
+ this.activeExecutorRefreshWindowInMilisec =
+ activeExecutorRefreshWindowInTime;
+ this.setName("AzkabanWebServer-QueueProcessor-Thread");
+ }
+
+ public void setActive(boolean isActive) {
+ this.isActive = isActive;
+ logger.info("QueueProcessorThread active turned " + this.isActive);
+ }
+
+ public boolean isActive() {
+ return isActive;
+ }
+
+ public void shutdown() {
+ shutdown = true;
+ this.interrupt();
+ }
+
+ public void run() {
+ // Loops till QueueProcessorThread is shutdown
+ while (!shutdown) {
+ synchronized (this) {
+ try {
+ // start processing queue if active, other wait for sometime
+ if (isActive) {
+ processQueuedFlows(activeExecutorRefreshWindowInMilisec,
+ activeExecutorRefreshWindowInFlows);
+ }
+ wait(QUEUE_PROCESSOR_WAIT_IN_MS);
+ } catch (Exception e) {
+ logger.error(
+ "QueueProcessorThread Interrupted. Probably to shut down.", e);
+ }
+ }
+ }
+ }
+
+ /* Method responsible for processing the non-dispatched flows */
+ private void processQueuedFlows(long activeExecutorsRefreshWindow,
+ int maxContinuousFlowProcessed) throws InterruptedException,
+ ExecutorManagerException {
+ long lastExecutorRefreshTime = System.currentTimeMillis();
+ Pair<ExecutionReference, ExecutableFlow> runningCandidate;
+ int currentContinuousFlowProcessed = 0;
+
+ while (isActive() && (runningCandidate = queuedFlows.fetchHead()) != null) {
+ ExecutionReference reference = runningCandidate.getFirst();
+ ExecutableFlow exflow = runningCandidate.getSecond();
+
+ long currentTime = System.currentTimeMillis();
+
+ // if we have dispatched more than maxContinuousFlowProcessed or
+ // It has been more then activeExecutorsRefreshWindow millisec since we
+ // refreshed
+ if (currentTime - lastExecutorRefreshTime > activeExecutorsRefreshWindow
+ || currentContinuousFlowProcessed >= maxContinuousFlowProcessed) {
+ // Refresh executorInfo for all activeExecutors
+ refreshExecutors();
+ lastExecutorRefreshTime = currentTime;
+ currentContinuousFlowProcessed = 0;
+ }
+
+ // process flow with current snapshot of activeExecutors
+ processFlow(reference, exflow, new HashSet<Executor>(activeExecutors));
+ currentContinuousFlowProcessed++;
+ }
+ }
+
+ /* process flow with a snapshot of available Executors */
+ private void processFlow(ExecutionReference reference,
+ ExecutableFlow exflow, Set<Executor> availableExecutors)
+ throws ExecutorManagerException {
+ synchronized (exflow) {
+ Executor selectedExecutor = selectExecutor(exflow, availableExecutors);
+ if (selectedExecutor != null) {
+ try {
+ dispatch(reference, exflow, selectedExecutor);
+ } catch (ExecutorManagerException e) {
+ logger.warn(String.format(
+ "Executor %s responded with exception for exec: %d",
+ selectedExecutor, exflow.getExecutionId()), e);
+ handleDispatchExceptionCase(reference, exflow, selectedExecutor,
+ availableExecutors);
+ }
+ } else {
+ handleNoExecutorSelectedCase(reference, exflow);
+ }
+ }
+ }
+
+ /* Choose Executor for exflow among the available executors */
+ private Executor selectExecutor(ExecutableFlow exflow,
+ Set<Executor> availableExecutors) {
+ Executor choosenExecutor;
+ // TODO: use dispatcher
+ choosenExecutor = availableExecutors.iterator().next();
+ return choosenExecutor;
+ }
+
+ private void handleDispatchExceptionCase(ExecutionReference reference,
+ ExecutableFlow exflow, Executor lastSelectedExecutor,
+ Set<Executor> remainingExecutors) throws ExecutorManagerException {
+ logger
+ .info(String
+ .format(
+ "Reached handleDispatchExceptionCase stage for exec %d with error count %d",
+ exflow.getExecutionId(), reference.getNumErrors()));
+ reference.setNumErrors(reference.getNumErrors() + 1);
+ reference.setExecutor(null);
+ if (reference.getNumErrors() >= MAX_DISPATCHING_ERRORS_PERMITTED
+ || remainingExecutors.size() <= 1) {
+ logger.error("Failed to process queued flow");
+ finalizeFlows(exflow);
+ } else {
+ remainingExecutors.remove(lastSelectedExecutor);
+ // try other executors except chosenExecutor
+ processFlow(reference, exflow, remainingExecutors);
+ }
+ }
+
+ private void handleNoExecutorSelectedCase(ExecutionReference reference,
+ ExecutableFlow exflow) throws ExecutorManagerException {
+ logger
+ .info(String
+ .format(
+ "Reached handleNoExecutorSelectedCase stage for exec %d with error count %d",
+ exflow.getExecutionId(), reference.getNumErrors()));
+ reference.setNumErrors(reference.getNumErrors() + 1);
+ // Scenario: when dispatcher didn't assigned any executor
+ if (reference.getNumErrors() >= MAX_DISPATCHING_ERRORS_PERMITTED) {
+ finalizeFlows(exflow);
+ } else {
+ // again queue this flow
+ queuedFlows.enqueue(exflow, reference);
+ }
+ }
+
+ private void dispatch(ExecutionReference reference, ExecutableFlow exflow,
+ Executor choosenExecutor) throws ExecutorManagerException {
+ exflow.setUpdateTime(System.currentTimeMillis());
+
+ // to be moved after db update once we integrate rest api changes
+ reference.setExecutor(choosenExecutor);
+ // TODO: ADD rest call to do an actual dispatch
+ callExecutorServer(reference, ConnectorParams.EXECUTE_ACTION);
+ executorLoader.assignExecutor(exflow.getExecutionId(),
+ choosenExecutor.getId());
+
+ // move from flow to running flows
+ runningFlows.put(exflow.getExecutionId(),
+ new Pair<ExecutionReference, ExecutableFlow>(reference, exflow));
+
+ logger.info(String.format(
+ "Successfully dispatched exec %d with error count %d",
+ exflow.getExecutionId(), reference.getNumErrors()));
+ }
+ }
+}
\ No newline at end of file
diff --git a/azkaban-common/src/main/java/azkaban/executor/ExecutorManagerAdapter.java b/azkaban-common/src/main/java/azkaban/executor/ExecutorManagerAdapter.java
index 10379ec..2b47293 100644
--- a/azkaban-common/src/main/java/azkaban/executor/ExecutorManagerAdapter.java
+++ b/azkaban-common/src/main/java/azkaban/executor/ExecutorManagerAdapter.java
@@ -18,6 +18,7 @@ package azkaban.executor;
import java.io.IOException;
import java.lang.Thread.State;
+import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.Set;
@@ -88,6 +89,18 @@ public interface ExecutorManagerAdapter {
public List<ExecutableFlow> getRunningFlows() throws IOException;
+ /**
+ * <pre>
+ * Returns All running with executors and queued flows
+ * Note, returns empty list if there isn't any running or queued flows
+ * </pre>
+ *
+ * @return
+ * @throws IOException
+ */
+ public List<Pair<ExecutableFlow, Executor>> getActiveFlowsWithExecutor()
+ throws IOException;
+
public List<ExecutableFlow> getRecentlyFinishedFlows();
public List<ExecutableFlow> getExecutableFlows(Project project,
@@ -177,9 +190,10 @@ public interface ExecutorManagerAdapter {
* <li>{@link azkaban.executor.ConnectorParams#STATS_SET_ENABLEMETRICS}<li>
* <li>{@link azkaban.executor.ConnectorParams#STATS_SET_DISABLEMETRICS}<li>
* </ul>
+ * @throws ExecutorManagerException
*/
- public Map<String, Object> callExecutorStats(String action,
- Pair<String, String>... params) throws IOException;
+ public Map<String, Object> callExecutorStats(int executorId, String action,
+ Pair<String, String>... param) throws IOException, ExecutorManagerException;
public Map<String, Object> callExecutorJMX(String hostPort, String action,
String mBean) throws IOException;
@@ -196,4 +210,25 @@ public interface ExecutorManagerAdapter {
public Set<? extends String> getPrimaryServerHosts();
+ /**
+ * Returns a collection of all the active executors maintained by active
+ * executors
+ *
+ * @return
+ */
+ public Collection<Executor> getAllActiveExecutors();
+
+ /**
+ * <pre>
+ * Fetch executor from executors with a given executorId
+ * Note:
+ * 1. throws an Exception in case of a SQL issue
+ * 2. return null when no executor is found with the given executorId
+ * </pre>
+ *
+ * @throws ExecutorManagerException
+ *
+ */
+ public Executor fetchExecutor(int executorId) throws ExecutorManagerException;
+
}
diff --git a/azkaban-common/src/main/java/azkaban/executor/JdbcExecutorLoader.java b/azkaban-common/src/main/java/azkaban/executor/JdbcExecutorLoader.java
index 2a1cf26..7740163 100644
--- a/azkaban-common/src/main/java/azkaban/executor/JdbcExecutorLoader.java
+++ b/azkaban-common/src/main/java/azkaban/executor/JdbcExecutorLoader.java
@@ -180,6 +180,27 @@ public class JdbcExecutorLoader extends AbstractJdbcLoader implements
}
}
+ /**
+ *
+ * {@inheritDoc}
+ * @see azkaban.executor.ExecutorLoader#fetchQueuedFlows()
+ */
+ @Override
+ public List<Pair<ExecutionReference, ExecutableFlow>> fetchQueuedFlows()
+ throws ExecutorManagerException {
+ QueryRunner runner = createQueryRunner();
+ FetchQueuedExecutableFlows flowHandler = new FetchQueuedExecutableFlows();
+
+ try {
+ List<Pair<ExecutionReference, ExecutableFlow>> flows =
+ runner.query(FetchQueuedExecutableFlows.FETCH_QUEUED_EXECUTABLE_FLOW,
+ flowHandler);
+ return flows;
+ } catch (SQLException e) {
+ throw new ExecutorManagerException("Error fetching active flows", e);
+ }
+ }
+
@Override
public Map<Integer, Pair<ExecutionReference, ExecutableFlow>> fetchActiveFlows()
throws ExecutorManagerException {
@@ -383,12 +404,11 @@ public class JdbcExecutorLoader extends AbstractJdbcLoader implements
throws ExecutorManagerException {
final String INSERT =
"INSERT INTO active_executing_flows "
- + "(exec_id, host, port, update_time) values (?,?,?,?)";
+ + "(exec_id, update_time) values (?,?)";
QueryRunner runner = createQueryRunner();
try {
- runner.update(INSERT, reference.getExecId(), reference.getHost(),
- reference.getPort(), reference.getUpdateTime());
+ runner.update(INSERT, reference.getExecId(), reference.getUpdateTime());
} catch (SQLException e) {
throw new ExecutorManagerException(
"Error updating active flow reference " + reference.getExecId(), e);
@@ -976,6 +996,65 @@ public class JdbcExecutorLoader extends AbstractJdbcLoader implements
return events;
}
+ /**
+ *
+ * {@inheritDoc}
+ *
+ * @see azkaban.executor.ExecutorLoader#assignExecutor(int, int)
+ */
+ @Override
+ public void assignExecutor(int executorId, int executionId)
+ throws ExecutorManagerException {
+ final String UPDATE =
+ "UPDATE execution_flows SET executor_id=? where exec_id=?";
+
+ QueryRunner runner = createQueryRunner();
+ try {
+ Executor executor = fetchExecutor(executorId);
+ if (executor == null) {
+ throw new ExecutorManagerException(String.format(
+ "Failed to assign non-existent executor Id: %d to execution : %d ",
+ executorId, executionId));
+ }
+
+ int rows = runner.update(UPDATE, executorId, executionId);
+ if (rows == 0) {
+ throw new ExecutorManagerException(String.format(
+ "Failed to assign executor Id: %d to non-existent execution : %d ",
+ executorId, executionId));
+ }
+ } catch (SQLException e) {
+ throw new ExecutorManagerException("Error updating executor id "
+ + executorId, e);
+ }
+ }
+
+ /**
+ *
+ * {@inheritDoc}
+ *
+ * @see azkaban.executor.ExecutorLoader#fetchExecutorByExecutionId(int)
+ */
+ @Override
+ public Executor fetchExecutorByExecutionId(int executionId)
+ throws ExecutorManagerException {
+ QueryRunner runner = createQueryRunner();
+ FetchExecutorHandler executorHandler = new FetchExecutorHandler();
+ Executor executor = null;
+ try {
+ List<Executor> executors =
+ runner.query(FetchExecutorHandler.FETCH_EXECUTION_EXECUTOR,
+ executorHandler, executionId);
+ if (executors.size() > 0) {
+ executor = executors.get(0);
+ }
+ } catch (SQLException e) {
+ throw new ExecutorManagerException(
+ "Error fetching executor for exec_id : " + executionId, e);
+ }
+ return executor;
+ }
+
private static class LastInsertID implements ResultSetHandler<Long> {
private static String LAST_INSERT_ID = "SELECT LAST_INSERT_ID()";
@@ -1178,13 +1257,80 @@ public class JdbcExecutorLoader extends AbstractJdbcLoader implements
}
}
+ /**
+ * JDBC ResultSetHandler to fetch queued executions
+ */
+ private static class FetchQueuedExecutableFlows implements
+ ResultSetHandler<List<Pair<ExecutionReference, ExecutableFlow>>> {
+ // Select queued unassigned flows
+ private static String FETCH_QUEUED_EXECUTABLE_FLOW =
+ "SELECT ex.exec_id exec_id, ex.enc_type enc_type, ex.flow_data flow_data, "
+ + " ax.update_time axUpdateTime FROM execution_flows ex"
+ + " INNER JOIN"
+ + " active_executing_flows ax ON ex.exec_id = ax.exec_id"
+ + " Where ex.executor_id is NULL";
+
+ @Override
+ public List<Pair<ExecutionReference, ExecutableFlow>> handle(ResultSet rs)
+ throws SQLException {
+ if (!rs.next()) {
+ return Collections
+ .<Pair<ExecutionReference, ExecutableFlow>> emptyList();
+ }
+
+ List<Pair<ExecutionReference, ExecutableFlow>> execFlows =
+ new ArrayList<Pair<ExecutionReference, ExecutableFlow>>();
+ do {
+ int id = rs.getInt(1);
+ int encodingType = rs.getInt(2);
+ byte[] data = rs.getBytes(3);
+ long updateTime = rs.getLong(4);
+
+ if (data == null) {
+ logger.error("Found a flow with empty data blob exec_id: " + id);
+ } else {
+ EncodingType encType = EncodingType.fromInteger(encodingType);
+ Object flowObj;
+ try {
+ // Convoluted way to inflate strings. Should find common package or
+ // helper function.
+ if (encType == EncodingType.GZIP) {
+ // Decompress the sucker.
+ String jsonString = GZIPUtils.unGzipString(data, "UTF-8");
+ flowObj = JSONUtils.parseJSONFromString(jsonString);
+ } else {
+ String jsonString = new String(data, "UTF-8");
+ flowObj = JSONUtils.parseJSONFromString(jsonString);
+ }
+
+ ExecutableFlow exFlow =
+ ExecutableFlow.createExecutableFlowFromObject(flowObj);
+ ExecutionReference ref = new ExecutionReference(id);
+ ref.setUpdateTime(updateTime);
+
+ execFlows.add(new Pair<ExecutionReference, ExecutableFlow>(ref,
+ exFlow));
+ } catch (IOException e) {
+ throw new SQLException("Error retrieving flow data " + id, e);
+ }
+ }
+ } while (rs.next());
+
+ return execFlows;
+ }
+ }
+
private static class FetchActiveExecutableFlows implements
ResultSetHandler<Map<Integer, Pair<ExecutionReference, ExecutableFlow>>> {
+ // Select running and executor assigned flows
private static String FETCH_ACTIVE_EXECUTABLE_FLOW =
- "SELECT ex.exec_id exec_id, ex.enc_type enc_type, ex.flow_data "
- + "flow_data, ax.host host, ax.port port, ax.update_time "
- + "axUpdateTime " + "FROM execution_flows ex "
- + "INNER JOIN active_executing_flows ax ON ex.exec_id = ax.exec_id";
+ "SELECT ex.exec_id exec_id, ex.enc_type enc_type, ex.flow_data flow_data, et.host host, "
+ + "et.port port, ax.update_time axUpdateTime, et.id executorId, et.active executorStatus"
+ + " FROM execution_flows ex"
+ + " INNER JOIN "
+ + " active_executing_flows ax ON ex.exec_id = ax.exec_id"
+ + " INNER JOIN "
+ + " executors et ON ex.executor_id = et.id";
@Override
public Map<Integer, Pair<ExecutionReference, ExecutableFlow>> handle(
@@ -1203,6 +1349,8 @@ public class JdbcExecutorLoader extends AbstractJdbcLoader implements
String host = rs.getString(4);
int port = rs.getInt(5);
long updateTime = rs.getLong(6);
+ int executorId = rs.getInt(7);
+ boolean executorStatus = rs.getBoolean(8);
if (data == null) {
execFlows.put(id, null);
@@ -1223,7 +1371,8 @@ public class JdbcExecutorLoader extends AbstractJdbcLoader implements
ExecutableFlow exFlow =
ExecutableFlow.createExecutableFlowFromObject(flowObj);
- ExecutionReference ref = new ExecutionReference(id, host, port);
+ Executor executor = new Executor(executorId, host, port, executorStatus);
+ ExecutionReference ref = new ExecutionReference(id, executor);
ref.setUpdateTime(updateTime);
execFlows.put(id, new Pair<ExecutionReference, ExecutableFlow>(ref,
@@ -1309,6 +1458,8 @@ public class JdbcExecutorLoader extends AbstractJdbcLoader implements
"SELECT COUNT(1) FROM execution_flows WHERE project_id=? AND flow_id=?";
private static String NUM_JOB_EXECUTIONS =
"SELECT COUNT(1) FROM execution_jobs WHERE project_id=? AND job_id=?";
+ private static String FETCH_EXECUTOR_ID =
+ "SELECT executor_id FROM execution_flows WHERE exec_id=?";
@Override
public Integer handle(ResultSet rs) throws SQLException {
@@ -1351,6 +1502,10 @@ public class JdbcExecutorLoader extends AbstractJdbcLoader implements
"SELECT id, host, port, active FROM executors where id=?";
private static String FETCH_EXECUTOR_BY_HOST_PORT =
"SELECT id, host, port, active FROM executors where host=? AND port=?";
+ private static String FETCH_EXECUTION_EXECUTOR =
+ "SELECT ex.id, ex.host, ex.port, ex.active FROM "
+ + " executors ex INNER JOIN execution_flows ef "
+ + "on ex.id = ef.executor_id where exec_id=?";
@Override
public List<Executor> handle(ResultSet rs) throws SQLException {
diff --git a/azkaban-common/src/main/java/azkaban/executor/QueuedExecutions.java b/azkaban-common/src/main/java/azkaban/executor/QueuedExecutions.java
new file mode 100644
index 0000000..641ffae
--- /dev/null
+++ b/azkaban-common/src/main/java/azkaban/executor/QueuedExecutions.java
@@ -0,0 +1,201 @@
+package azkaban.executor;
+
+import java.util.Collection;
+import java.util.Collections;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.PriorityBlockingQueue;
+
+import org.apache.log4j.Logger;
+
+import azkaban.utils.Pair;
+
+/**
+ * <pre>
+ * Composite data structure to represent non-dispatched flows in webserver.
+ * This data structure wraps a blocking queue and a concurrent hashmap.
+ * </pre>
+ */
+public class QueuedExecutions {
+ private static Logger logger = Logger.getLogger(QueuedExecutions.class);
+ final long capacity;
+
+ /* map to easily access queued flows */
+ final private ConcurrentHashMap<Integer, Pair<ExecutionReference, ExecutableFlow>> queuedFlowMap;
+ /* actual queue */
+ final private BlockingQueue<Pair<ExecutionReference, ExecutableFlow>> queuedFlowList;
+
+ public QueuedExecutions(long capacity) {
+ this.capacity = capacity;
+ queuedFlowMap =
+ new ConcurrentHashMap<Integer, Pair<ExecutionReference, ExecutableFlow>>();
+ queuedFlowList =
+ new PriorityBlockingQueue<Pair<ExecutionReference, ExecutableFlow>>(10,
+ new ExecutableFlowPriorityComparator());
+ }
+
+ /**
+ * Wraps BoundedQueue Take method to have a corresponding update in
+ * queuedFlowMap lookup table
+ *
+ * @return
+ * @throws InterruptedException
+ */
+ public Pair<ExecutionReference, ExecutableFlow> fetchHead()
+ throws InterruptedException {
+ Pair<ExecutionReference, ExecutableFlow> pair = queuedFlowList.take();
+ if (pair != null && pair.getFirst() != null) {
+ queuedFlowMap.remove(pair.getFirst().getExecId());
+ }
+ return pair;
+ }
+
+ /**
+ * Helper method to have a single point of deletion in the queued flows
+ *
+ * @param executionId
+ */
+ public void dequeue(int executionId) {
+ if (queuedFlowMap.containsKey(executionId)) {
+ queuedFlowList.remove(queuedFlowMap.get(executionId));
+ queuedFlowMap.remove(executionId);
+ }
+ }
+
+ /**
+ * <pre>
+ * Helper method to have a single point of insertion in the queued flows
+ *
+ * @param exflow
+ * flow to be enqueued
+ * @param ref
+ * reference to be enqueued
+ * @throws ExecutorManagerException
+ * case 1: if blocking queue put method fails due to
+ * InterruptedException
+ * case 2: if there already an element with
+ * same execution Id
+ * </pre>
+ */
+ public void enqueue(ExecutableFlow exflow, ExecutionReference ref)
+ throws ExecutorManagerException {
+ if (hasExecution(exflow.getExecutionId())) {
+ String errMsg = "Flow already in queue " + exflow.getExecutionId();
+ throw new ExecutorManagerException(errMsg);
+ }
+
+ Pair<ExecutionReference, ExecutableFlow> pair =
+ new Pair<ExecutionReference, ExecutableFlow>(ref, exflow);
+ try {
+ queuedFlowMap.put(exflow.getExecutionId(), pair);
+ queuedFlowList.put(pair);
+ } catch (InterruptedException e) {
+ String errMsg = "Failed to insert flow " + exflow.getExecutionId();
+ logger.error(errMsg, e);
+ throw new ExecutorManagerException(errMsg);
+ }
+ }
+
+ /**
+ * <pre>
+ * Enqueues all the elements of a collection
+ *
+ * @param collection
+ *
+ * @throws ExecutorManagerException
+ * case 1: if blocking queue put method fails due to
+ * InterruptedException
+ * case 2: if there already an element with
+ * same execution Id
+ * </pre>
+ */
+ public void enqueueAll(
+ Collection<Pair<ExecutionReference, ExecutableFlow>> collection)
+ throws ExecutorManagerException {
+ for (Pair<ExecutionReference, ExecutableFlow> pair : collection) {
+ enqueue(pair.getSecond(), pair.getFirst());
+ }
+ }
+
+ /**
+ * Returns a read only collection of all the queued (flows, reference) pairs
+ *
+ * @return
+ */
+ public Collection<Pair<ExecutionReference, ExecutableFlow>> getAllEntries() {
+ return Collections.unmodifiableCollection(queuedFlowMap.values());
+ }
+
+ /**
+ * Checks if an execution is queued or not
+ *
+ * @param executionId
+ * @return
+ */
+ public boolean hasExecution(int executionId) {
+ return queuedFlowMap.containsKey(executionId);
+ }
+
+ /**
+ * Fetch flow for an execution. Returns null, if execution not in queue
+ *
+ * @param executionId
+ * @return
+ */
+ public ExecutableFlow getFlow(int executionId) {
+ if (hasExecution(executionId)) {
+ return queuedFlowMap.get(executionId).getSecond();
+ }
+ return null;
+ }
+
+ /**
+ * Fetch Activereference for an execution. Returns null, if execution not in
+ * queue
+ *
+ * @param executionId
+ * @return
+ */
+ public ExecutionReference getReference(int executionId) {
+ if (hasExecution(executionId)) {
+ return queuedFlowMap.get(executionId).getFirst();
+ }
+ return null;
+ }
+
+ /**
+ * Size of the queue
+ *
+ * @return
+ */
+ public long size() {
+ return queuedFlowList.size();
+ }
+
+ /**
+ * Verify, if queue is full as per initialized capacity
+ *
+ * @return
+ */
+ public boolean isFull() {
+ return size() >= capacity;
+ }
+
+ /**
+ * Verify, if queue is empty or not
+ *
+ * @return
+ */
+ public boolean isEmpty() {
+ return queuedFlowList.isEmpty() && queuedFlowMap.isEmpty();
+ }
+
+ /**
+ * Empties queue by dequeuing all the elements
+ */
+ public void clear() {
+ for (Pair<ExecutionReference, ExecutableFlow> pair : queuedFlowMap.values()) {
+ dequeue(pair.getFirst().getExecId());
+ }
+ }
+}
diff --git a/azkaban-common/src/main/java/azkaban/jmx/JmxExecutorManager.java b/azkaban-common/src/main/java/azkaban/jmx/JmxExecutorManager.java
index 08e5534..bbd642a 100644
--- a/azkaban-common/src/main/java/azkaban/jmx/JmxExecutorManager.java
+++ b/azkaban-common/src/main/java/azkaban/jmx/JmxExecutorManager.java
@@ -62,4 +62,20 @@ public class JmxExecutorManager implements JmxExecutorManagerMBean {
public String getRunningFlows() {
return manager.getRunningFlowIds();
}
+
+ @Override
+ public boolean isQueueProcessorActive() {
+ return manager.isQueueProcessorThreadActive();
+ }
+
+ @Override
+ public String getQueuedFlows() {
+ return manager.getQueuedFlowIds();
+ }
+
+ @Override
+ public String getQueueProcessorThreadState() {
+ return manager.getQueueProcessorThreadState().toString();
+ }
+
}
diff --git a/azkaban-common/src/main/java/azkaban/jmx/JmxExecutorManagerMBean.java b/azkaban-common/src/main/java/azkaban/jmx/JmxExecutorManagerMBean.java
index 9bc1175..94012e0 100644
--- a/azkaban-common/src/main/java/azkaban/jmx/JmxExecutorManagerMBean.java
+++ b/azkaban-common/src/main/java/azkaban/jmx/JmxExecutorManagerMBean.java
@@ -39,4 +39,14 @@ public interface JmxExecutorManagerMBean {
@DisplayName("OPERATION: getPrimaryExecutorHostPorts")
public List<String> getPrimaryExecutorHostPorts();
+
+ @DisplayName("OPERATION: isQueueProcessorActive")
+ public boolean isQueueProcessorActive();
+
+ @DisplayName("OPERATION: getQueuedFlows")
+ public String getQueuedFlows();
+
+ @DisplayName("OPERATION: getQueueProcessorThreadState")
+ public String getQueueProcessorThreadState();
+
}
diff --git a/azkaban-common/src/main/java/azkaban/project/JdbcProjectLoader.java b/azkaban-common/src/main/java/azkaban/project/JdbcProjectLoader.java
index bde18ec..12b72dd 100644
--- a/azkaban-common/src/main/java/azkaban/project/JdbcProjectLoader.java
+++ b/azkaban-common/src/main/java/azkaban/project/JdbcProjectLoader.java
@@ -141,7 +141,7 @@ public class JdbcProjectLoader extends AbstractJdbcLoader implements
runner.query(connection, ProjectResultHandler.SELECT_PROJECT_BY_ID,
handler, id);
if (projects.isEmpty()) {
- throw new ProjectManagerException("No active project with id " + id
+ throw new ProjectManagerException("No project with id " + id
+ " exists in db.");
}
@@ -169,6 +169,67 @@ public class JdbcProjectLoader extends AbstractJdbcLoader implements
return project;
}
+ /**
+ * Fetch first project with a given name {@inheritDoc}
+ *
+ * @see azkaban.project.ProjectLoader#fetchProjectByName(java.lang.String)
+ */
+ @Override
+ public Project fetchProjectByName(String name)
+ throws ProjectManagerException {
+ Connection connection = getConnection();
+
+ Project project = null;
+ try {
+ project = fetchProjectByName(connection, name);
+ } finally {
+ DbUtils.closeQuietly(connection);
+ }
+
+ return project;
+ }
+
+ private Project fetchProjectByName(Connection connection, String name)
+ throws ProjectManagerException {
+ QueryRunner runner = new QueryRunner();
+ // Fetch the project
+ Project project = null;
+ ProjectResultHandler handler = new ProjectResultHandler();
+ try {
+ List<Project> projects =
+ runner.query(connection,
+ ProjectResultHandler.SELECT_PROJECT_BY_NAME, handler, name);
+ if (projects.isEmpty()) {
+ throw new ProjectManagerException(
+ "No project with name " + name + " exists in db.");
+ }
+
+ project = projects.get(0);
+ } catch (SQLException e) {
+ logger.error(ProjectResultHandler.SELECT_PROJECT_BY_NAME
+ + " failed.");
+ throw new ProjectManagerException(
+ "Query for existing project failed. Project " + name, e);
+ }
+
+ // Fetch the user permissions
+ List<Triple<String, Boolean, Permission>> permissions =
+ fetchPermissionsForProject(connection, project);
+
+ for (Triple<String, Boolean, Permission> perm : permissions) {
+ if (perm.getThird().toFlags() != 0) {
+ if (perm.getSecond()) {
+ project
+ .setGroupPermission(perm.getFirst(), perm.getThird());
+ } else {
+ project.setUserPermission(perm.getFirst(), perm.getThird());
+ }
+ }
+ }
+
+ return project;
+ }
+
private List<Triple<String, Boolean, Permission>> fetchPermissionsForProject(
Connection connection, Project project) throws ProjectManagerException {
ProjectPermissionsResultHandler permHander =
@@ -1136,6 +1197,9 @@ public class JdbcProjectLoader extends AbstractJdbcLoader implements
private static class ProjectResultHandler implements
ResultSetHandler<List<Project>> {
+ private static String SELECT_PROJECT_BY_NAME =
+ "SELECT id, name, active, modified_time, create_time, version, last_modified_by, description, enc_type, settings_blob FROM projects WHERE name=?";
+
private static String SELECT_PROJECT_BY_ID =
"SELECT id, name, active, modified_time, create_time, version, last_modified_by, description, enc_type, settings_blob FROM projects WHERE id=?";
diff --git a/azkaban-common/src/main/java/azkaban/project/ProjectLoader.java b/azkaban-common/src/main/java/azkaban/project/ProjectLoader.java
index 371599f..aa886d6 100644
--- a/azkaban-common/src/main/java/azkaban/project/ProjectLoader.java
+++ b/azkaban-common/src/main/java/azkaban/project/ProjectLoader.java
@@ -48,6 +48,14 @@ public interface ProjectLoader {
public Project fetchProjectById(int id) throws ProjectManagerException;
/**
+ * Loads whole project, including permissions, by the project name.
+ * @param name
+ * @return
+ * @throws ProjectManagerException
+ */
+ public Project fetchProjectByName(String name) throws ProjectManagerException;
+
+ /**
* Should create an empty project with the given name and user and adds it to
* the data store. It will auto assign a unique id for this project if
* successful.
@@ -269,5 +277,4 @@ public interface ProjectLoader {
throws ProjectManagerException;
void updateProjectSettings(Project project) throws ProjectManagerException;
-
}
diff --git a/azkaban-common/src/main/java/azkaban/project/ProjectLogEvent.java b/azkaban-common/src/main/java/azkaban/project/ProjectLogEvent.java
index a19d013..5512291 100644
--- a/azkaban-common/src/main/java/azkaban/project/ProjectLogEvent.java
+++ b/azkaban-common/src/main/java/azkaban/project/ProjectLogEvent.java
@@ -32,7 +32,8 @@ public class ProjectLogEvent {
UPLOADED(6),
SCHEDULE(7),
SLA(8),
- PROXY_USER(9);
+ PROXY_USER(9),
+ PURGE(10);
private int numVal;
@@ -64,6 +65,8 @@ public class ProjectLogEvent {
return SLA;
case 9:
return PROXY_USER;
+ case 10:
+ return PURGE;
case 128:
return ERROR;
default:
diff --git a/azkaban-common/src/main/java/azkaban/project/ProjectManager.java b/azkaban-common/src/main/java/azkaban/project/ProjectManager.java
index 89c6539..0f09b9c 100644
--- a/azkaban-common/src/main/java/azkaban/project/ProjectManager.java
+++ b/azkaban-common/src/main/java/azkaban/project/ProjectManager.java
@@ -196,13 +196,65 @@ public class ProjectManager {
return allProjects;
}
- public Project getProject(String name) {
- return projectsByName.get(name);
- }
+ /**
+ * Checks if a project is active using project_name
+ *
+ * @param name
+ */
+ public Boolean isActiveProject(String name) {
+ return projectsByName.containsKey(name);
+ }
- public Project getProject(int id) {
- return projectsById.get(id);
- }
+ /**
+ * Checks if a project is active using project_id
+ *
+ * @param name
+ */
+ public Boolean isActiveProject(int id) {
+ return projectsById.containsKey(id);
+ }
+
+ /**
+ * fetch active project from cache and inactive projects from db by
+ * project_name
+ *
+ * @param name
+ * @return
+ */
+ public Project getProject(String name) {
+ Project fetchedProject = null;
+ if (isActiveProject(name)) {
+ fetchedProject = projectsByName.get(name);
+ } else {
+ try {
+ fetchedProject = projectLoader.fetchProjectByName(name);
+ } catch (ProjectManagerException e) {
+ logger.error("Could not load project from store.", e);
+ }
+ }
+ return fetchedProject;
+ }
+
+ /**
+ * fetch active project from cache and inactive projects from db by
+ * project_id
+ *
+ * @param id
+ * @return
+ */
+ public Project getProject(int id) {
+ Project fetchedProject = null;
+ if (isActiveProject(id)) {
+ fetchedProject = projectsById.get(id);
+ } else {
+ try {
+ fetchedProject = projectLoader.fetchProjectById(id);
+ } catch (ProjectManagerException e) {
+ logger.error("Could not load project from store.", e);
+ }
+ }
+ return fetchedProject;
+ }
public Project createProject(String projectName, String description,
User creator) throws ProjectManagerException {
@@ -249,6 +301,25 @@ public class ProjectManager {
return newProject;
}
+ /**
+ * Permanently delete all project files and properties data for all versions
+ * of a project and log event in project_events table
+ *
+ * @param project
+ * @param deleter
+ * @return
+ * @throws ProjectManagerException
+ */
+ public synchronized Project purgeProject(Project project, User deleter)
+ throws ProjectManagerException {
+ projectLoader.cleanOlderProjectVersion(project.getId(),
+ project.getVersion() + 1);
+ projectLoader
+ .postEvent(project, EventType.PURGE, deleter.getUserId(), String
+ .format("Purged versions before %d", project.getVersion() + 1));
+ return project;
+ }
+
public synchronized Project removeProject(Project project, User deleter)
throws ProjectManagerException {
projectLoader.removeProject(project, deleter.getUserId());
diff --git a/azkaban-common/src/test/java/azkaban/executor/ExecutableFlowPriorityComparatorTest.java b/azkaban-common/src/test/java/azkaban/executor/ExecutableFlowPriorityComparatorTest.java
new file mode 100644
index 0000000..ed6543f
--- /dev/null
+++ b/azkaban-common/src/test/java/azkaban/executor/ExecutableFlowPriorityComparatorTest.java
@@ -0,0 +1,157 @@
+/*
+ * Copyright 2015 LinkedIn Corp.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package azkaban.executor;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.PriorityBlockingQueue;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+import azkaban.alert.Alerter;
+import azkaban.flow.Flow;
+import azkaban.project.Project;
+import azkaban.user.User;
+import azkaban.utils.JSONUtils;
+import azkaban.utils.Pair;
+import azkaban.utils.Props;
+import azkaban.utils.TestUtils;
+
+/**
+ * Test class for ExecutableFlowPriorityComparator
+ * */
+
+public class ExecutableFlowPriorityComparatorTest {
+
+ /* Helper method to create an ExecutableFlow from serialized description */
+ private ExecutableFlow createExecutableFlow(String flowName, int priority,
+ long updateTime, int executionId) throws IOException {
+ ExecutableFlow execFlow =
+ TestUtils.createExecutableFlow("exectest1", flowName);
+
+ execFlow.setUpdateTime(updateTime);
+ execFlow.setExecutionId(executionId);
+ if (priority > 0) {
+ execFlow.getExecutionOptions().getFlowParameters()
+ .put(ExecutionOptions.FLOW_PRIORITY, String.valueOf(priority));
+ }
+ return execFlow;
+ }
+
+ /* priority queue order when all priorities are explicitly specified */
+ @Test
+ public void testExplicitlySpecifiedPriorities() throws IOException,
+ InterruptedException {
+ ExecutableFlow flow1 = createExecutableFlow("exec1", 5, 3, 1);
+ ExecutableFlow flow2 = createExecutableFlow("exec2", 6, 3, 2);
+ ExecutableFlow flow3 = createExecutableFlow("exec3", 2, 3, 3);
+ ExecutionReference dummyRef = new ExecutionReference(0);
+
+ BlockingQueue<Pair<ExecutionReference, ExecutableFlow>> queue =
+ new PriorityBlockingQueue<Pair<ExecutionReference, ExecutableFlow>>(10,
+ new ExecutableFlowPriorityComparator());
+ queue.put(new Pair<ExecutionReference, ExecutableFlow>(dummyRef, flow1));
+ queue.put(new Pair<ExecutionReference, ExecutableFlow>(dummyRef, flow2));
+ queue.put(new Pair<ExecutionReference, ExecutableFlow>(dummyRef, flow3));
+
+ Assert.assertEquals(flow2, queue.take().getSecond());
+ Assert.assertEquals(flow1, queue.take().getSecond());
+ Assert.assertEquals(flow3, queue.take().getSecond());
+ }
+
+ /* priority queue order when some priorities are implicitly specified */
+ @Test
+ public void testMixedSpecifiedPriorities() throws IOException,
+ InterruptedException {
+ ExecutableFlow flow1 = createExecutableFlow("exec1", 3, 3, 1);
+ ExecutableFlow flow2 = createExecutableFlow("exec2", 2, 3, 2);
+ ExecutableFlow flow3 = createExecutableFlow("exec3", -2, 3, 3);
+ ExecutionReference dummyRef = new ExecutionReference(0);
+
+ BlockingQueue<Pair<ExecutionReference, ExecutableFlow>> queue =
+ new PriorityBlockingQueue<Pair<ExecutionReference, ExecutableFlow>>(10,
+ new ExecutableFlowPriorityComparator());
+ queue.put(new Pair<ExecutionReference, ExecutableFlow>(dummyRef, flow1));
+ queue.put(new Pair<ExecutionReference, ExecutableFlow>(dummyRef, flow2));
+ queue.put(new Pair<ExecutionReference, ExecutableFlow>(dummyRef, flow3));
+
+ Assert.assertEquals(flow3, queue.take().getSecond());
+ Assert.assertEquals(flow1, queue.take().getSecond());
+ Assert.assertEquals(flow2, queue.take().getSecond());
+ }
+
+ /*
+ * priority queue order when some priorities are equal, updatetime is used in
+ * this case
+ */
+ @Test
+ public void testEqualPriorities() throws IOException, InterruptedException {
+ ExecutableFlow flow1 = createExecutableFlow("exec1", 3, 1, 1);
+ ExecutableFlow flow2 = createExecutableFlow("exec2", 2, 2, 2);
+ ExecutableFlow flow3 = createExecutableFlow("exec3", -2, 3, 3);
+ ExecutableFlow flow4 = createExecutableFlow("exec3", 3, 4, 4);
+ ExecutionReference dummyRef = new ExecutionReference(0);
+
+ BlockingQueue<Pair<ExecutionReference, ExecutableFlow>> queue =
+ new PriorityBlockingQueue<Pair<ExecutionReference, ExecutableFlow>>(10,
+ new ExecutableFlowPriorityComparator());
+
+ queue.put(new Pair<ExecutionReference, ExecutableFlow>(dummyRef, flow4));
+ queue.put(new Pair<ExecutionReference, ExecutableFlow>(dummyRef, flow1));
+ queue.put(new Pair<ExecutionReference, ExecutableFlow>(dummyRef, flow2));
+ queue.put(new Pair<ExecutionReference, ExecutableFlow>(dummyRef, flow3));
+
+ Assert.assertEquals(flow3, queue.take().getSecond());
+ Assert.assertEquals(flow1, queue.take().getSecond());
+ Assert.assertEquals(flow4, queue.take().getSecond());
+ Assert.assertEquals(flow2, queue.take().getSecond());
+ }
+
+ /*
+ * priority queue order when some priorities and updatetime are equal,
+ * execution Id is used in this case
+ */
+ @Test
+ public void testEqualUpdateTimeAndPriority() throws IOException,
+ InterruptedException {
+ ExecutableFlow flow1 = createExecutableFlow("exec1", 3, 1, 1);
+ ExecutableFlow flow2 = createExecutableFlow("exec2", 2, 2, 2);
+ ExecutableFlow flow3 = createExecutableFlow("exec3", -2, 2, 3);
+ ExecutableFlow flow4 = createExecutableFlow("exec3", 3, 4, 4);
+ ExecutionReference dummyRef = new ExecutionReference(0);
+
+ BlockingQueue<Pair<ExecutionReference, ExecutableFlow>> queue =
+ new PriorityBlockingQueue<Pair<ExecutionReference, ExecutableFlow>>(10,
+ new ExecutableFlowPriorityComparator());
+
+ queue.put(new Pair<ExecutionReference, ExecutableFlow>(dummyRef, flow4));
+ queue.put(new Pair<ExecutionReference, ExecutableFlow>(dummyRef, flow1));
+ queue.put(new Pair<ExecutionReference, ExecutableFlow>(dummyRef, flow2));
+ queue.put(new Pair<ExecutionReference, ExecutableFlow>(dummyRef, flow3));
+
+ Assert.assertEquals(flow3, queue.take().getSecond());
+ Assert.assertEquals(flow1, queue.take().getSecond());
+ Assert.assertEquals(flow4, queue.take().getSecond());
+ Assert.assertEquals(flow2, queue.take().getSecond());
+ }
+}
diff --git a/azkaban-common/src/test/java/azkaban/executor/ExecutorManagerTest.java b/azkaban-common/src/test/java/azkaban/executor/ExecutorManagerTest.java
new file mode 100644
index 0000000..62d187b
--- /dev/null
+++ b/azkaban-common/src/test/java/azkaban/executor/ExecutorManagerTest.java
@@ -0,0 +1,258 @@
+/*
+ * Copyright 2014 LinkedIn Corp.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package azkaban.executor;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Set;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+import azkaban.alert.Alerter;
+import azkaban.flow.Flow;
+import azkaban.project.Project;
+import azkaban.user.User;
+import azkaban.utils.JSONUtils;
+import azkaban.utils.Pair;
+import azkaban.utils.Props;
+import azkaban.utils.TestUtils;
+
+/**
+ * Test class for executor manager
+ */
+public class ExecutorManagerTest {
+
+ /* Helper method to create a ExecutorManager Instance */
+ private ExecutorManager createMultiExecutorManagerInstance()
+ throws ExecutorManagerException {
+ return createMultiExecutorManagerInstance(new MockExecutorLoader());
+ }
+
+ /*
+ * Helper method to create a ExecutorManager Instance with the given
+ * ExecutorLoader
+ */
+ private ExecutorManager createMultiExecutorManagerInstance(
+ ExecutorLoader loader) throws ExecutorManagerException {
+ Props props = new Props();
+ props.put(ExecutorManager.AZKABAN_USE_MULTIPLE_EXECUTORS, "true");
+ props.put(ExecutorManager.AZKABAN_QUEUEPROCESSING_ENABLED, "false");
+
+ loader.addExecutor("localhost", 12345);
+ loader.addExecutor("localhost", 12346);
+ return new ExecutorManager(props, loader, new HashMap<String, Alerter>());
+ }
+
+ /*
+ * Test create an executor manager instance without any executor local or
+ * remote
+ */
+ @Test(expected = ExecutorManagerException.class)
+ public void testNoExecutorScenario() throws ExecutorManagerException {
+ Props props = new Props();
+ props.put(ExecutorManager.AZKABAN_USE_MULTIPLE_EXECUTORS, "true");
+ ExecutorLoader loader = new MockExecutorLoader();
+ @SuppressWarnings("unused")
+ ExecutorManager manager =
+ new ExecutorManager(props, loader, new HashMap<String, Alerter>());
+ }
+
+ /*
+ * Test backward compatibility with just local executor
+ */
+ @Test
+ public void testLocalExecutorScenario() throws ExecutorManagerException {
+ Props props = new Props();
+ props.put("executor.port", 12345);
+
+ ExecutorLoader loader = new MockExecutorLoader();
+ ExecutorManager manager =
+ new ExecutorManager(props, loader, new HashMap<String, Alerter>());
+ Set<Executor> activeExecutors =
+ new HashSet(manager.getAllActiveExecutors());
+
+ Assert.assertEquals(activeExecutors.size(), 1);
+ Executor executor = activeExecutors.iterator().next();
+ Assert.assertEquals(executor.getHost(), "localhost");
+ Assert.assertEquals(executor.getPort(), 12345);
+ Assert.assertArrayEquals(activeExecutors.toArray(), loader
+ .fetchActiveExecutors().toArray());
+ }
+
+ /*
+ * Test executor manager initialization with multiple executors
+ */
+ @Test
+ public void testMultipleExecutorScenario() throws ExecutorManagerException {
+ Props props = new Props();
+ props.put(ExecutorManager.AZKABAN_USE_MULTIPLE_EXECUTORS, "true");
+ ExecutorLoader loader = new MockExecutorLoader();
+ Executor executor1 = loader.addExecutor("localhost", 12345);
+ Executor executor2 = loader.addExecutor("localhost", 12346);
+
+ ExecutorManager manager =
+ new ExecutorManager(props, loader, new HashMap<String, Alerter>());
+ Set<Executor> activeExecutors =
+ new HashSet(manager.getAllActiveExecutors());
+ Assert.assertArrayEquals(activeExecutors.toArray(), new Executor[] {
+ executor1, executor2 });
+ }
+
+ /*
+ * Test executor manager active executor reload
+ */
+ @Test
+ public void testSetupExecutorsSucess() throws ExecutorManagerException {
+ Props props = new Props();
+ props.put(ExecutorManager.AZKABAN_USE_MULTIPLE_EXECUTORS, "true");
+ ExecutorLoader loader = new MockExecutorLoader();
+ Executor executor1 = loader.addExecutor("localhost", 12345);
+
+ ExecutorManager manager =
+ new ExecutorManager(props, loader, new HashMap<String, Alerter>());
+ Assert.assertArrayEquals(manager.getAllActiveExecutors().toArray(),
+ new Executor[] { executor1 });
+
+ // mark older executor as inactive
+ executor1.setActive(false);
+ loader.updateExecutor(executor1);
+ Executor executor2 = loader.addExecutor("localhost", 12346);
+ Executor executor3 = loader.addExecutor("localhost", 12347);
+ manager.setupExecutors();
+
+ Assert.assertArrayEquals(manager.getAllActiveExecutors().toArray(),
+ new Executor[] { executor2, executor3 });
+ }
+
+ /*
+ * Test executor manager active executor reload and resulting in no active
+ * executors
+ */
+ @Test(expected = ExecutorManagerException.class)
+ public void testSetupExecutorsException() throws ExecutorManagerException {
+ Props props = new Props();
+ props.put(ExecutorManager.AZKABAN_USE_MULTIPLE_EXECUTORS, "true");
+ ExecutorLoader loader = new MockExecutorLoader();
+ Executor executor1 = loader.addExecutor("localhost", 12345);
+
+ ExecutorManager manager =
+ new ExecutorManager(props, loader, new HashMap<String, Alerter>());
+ Set<Executor> activeExecutors =
+ new HashSet(manager.getAllActiveExecutors());
+ Assert.assertArrayEquals(activeExecutors.toArray(),
+ new Executor[] { executor1 });
+
+ // mark older executor as inactive
+ executor1.setActive(false);
+ loader.updateExecutor(executor1);
+ manager.setupExecutors();
+ }
+
+ /* Test disabling queue process thread to pause dispatching */
+ @Test
+ public void testDisablingQueueProcessThread() throws ExecutorManagerException {
+ ExecutorManager manager = createMultiExecutorManagerInstance();
+ manager.enableQueueProcessorThread();
+ Assert.assertEquals(manager.isQueueProcessorThreadActive(), true);
+ manager.disableQueueProcessorThread();
+ Assert.assertEquals(manager.isQueueProcessorThreadActive(), false);
+ }
+
+ /* Test renabling queue process thread to pause restart dispatching */
+ @Test
+ public void testEnablingQueueProcessThread() throws ExecutorManagerException {
+ ExecutorManager manager = createMultiExecutorManagerInstance();
+ Assert.assertEquals(manager.isQueueProcessorThreadActive(), false);
+ manager.enableQueueProcessorThread();
+ Assert.assertEquals(manager.isQueueProcessorThreadActive(), true);
+ }
+
+ /* Test submit a non-dispatched flow */
+ @Test
+ public void testQueuedFlows() throws ExecutorManagerException, IOException {
+ ExecutorLoader loader = new MockExecutorLoader();
+ ExecutorManager manager = createMultiExecutorManagerInstance(loader);
+ ExecutableFlow flow1 = TestUtils.createExecutableFlow("exectest1", "exec1");
+ flow1.setExecutionId(1);
+ ExecutableFlow flow2 = TestUtils.createExecutableFlow("exectest1", "exec2");
+ flow2.setExecutionId(2);
+
+ User testUser = TestUtils.getTestUser();
+ manager.submitExecutableFlow(flow1, testUser.getUserId());
+ manager.submitExecutableFlow(flow2, testUser.getUserId());
+
+ List<ExecutableFlow> testFlows = new LinkedList<ExecutableFlow>();
+ testFlows.add(flow1);
+ testFlows.add(flow2);
+
+ List<Pair<ExecutionReference, ExecutableFlow>> queuedFlowsDB =
+ loader.fetchQueuedFlows();
+ Assert.assertEquals(queuedFlowsDB.size(), testFlows.size());
+ // Verify things are correctly setup in db
+ for (Pair<ExecutionReference, ExecutableFlow> pair : queuedFlowsDB) {
+ Assert.assertTrue(testFlows.contains(pair.getSecond()));
+ }
+
+ // Verify running flows using old definition of "running" flows i.e. a
+ // non-dispatched flow is also considered running
+ List<ExecutableFlow> managerActiveFlows = manager.getRunningFlows();
+ Assert.assertTrue(managerActiveFlows.containsAll(testFlows)
+ && testFlows.containsAll(managerActiveFlows));
+
+ // Verify getQueuedFlowIds method
+ Assert.assertEquals("[1, 2]", manager.getQueuedFlowIds());
+ }
+
+ /* Test submit duplicate flow when previous instance is not dispatched */
+ @Test(expected = ExecutorManagerException.class)
+ public void testDuplicateQueuedFlows() throws ExecutorManagerException,
+ IOException {
+ ExecutorManager manager = createMultiExecutorManagerInstance();
+ ExecutableFlow flow1 = TestUtils.createExecutableFlow("exectest1", "exec1");
+ flow1.getExecutionOptions().setConcurrentOption(
+ ExecutionOptions.CONCURRENT_OPTION_SKIP);
+
+ User testUser = TestUtils.getTestUser();
+ manager.submitExecutableFlow(flow1, testUser.getUserId());
+ manager.submitExecutableFlow(flow1, testUser.getUserId());
+ }
+
+ /*
+ * Test killing a job in preparation stage at webserver side i.e. a
+ * non-dispatched flow
+ */
+ @Test
+ public void testKillQueuedFlow() throws ExecutorManagerException, IOException {
+ ExecutorLoader loader = new MockExecutorLoader();
+ ExecutorManager manager = createMultiExecutorManagerInstance(loader);
+ ExecutableFlow flow1 = TestUtils.createExecutableFlow("exectest1", "exec1");
+ User testUser = TestUtils.getTestUser();
+ manager.submitExecutableFlow(flow1, testUser.getUserId());
+
+ manager.cancelFlow(flow1, testUser.getUserId());
+ ExecutableFlow fetchedFlow =
+ loader.fetchExecutableFlow(flow1.getExecutionId());
+ Assert.assertEquals(fetchedFlow.getStatus(), Status.FAILED);
+
+ Assert.assertFalse(manager.getRunningFlows().contains(flow1));
+ }
+}
diff --git a/azkaban-common/src/test/java/azkaban/executor/JdbcExecutorLoaderTest.java b/azkaban-common/src/test/java/azkaban/executor/JdbcExecutorLoaderTest.java
index 71be50f..a7e2b5f 100644
--- a/azkaban-common/src/test/java/azkaban/executor/JdbcExecutorLoaderTest.java
+++ b/azkaban-common/src/test/java/azkaban/executor/JdbcExecutorLoaderTest.java
@@ -24,6 +24,7 @@ import java.sql.SQLException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
+import java.util.LinkedList;
import java.util.List;
import java.util.Map;
@@ -48,19 +49,21 @@ import azkaban.utils.FileIOUtils.LogData;
import azkaban.utils.JSONUtils;
import azkaban.utils.Pair;
import azkaban.utils.Props;
+import azkaban.utils.TestUtils;
public class JdbcExecutorLoaderTest {
private static boolean testDBExists;
+ /* Directory with serialized description of test flows */
+ private static final String UNIT_BASE_DIR =
+ "../azkaban-test/src/test/resources/executions";
// @TODO remove this and turn into local host.
- private static final String host = "cyu-ld.linkedin.biz";
+ private static final String host = "localhost";
private static final int port = 3306;
private static final String database = "azkaban2";
private static final String user = "azkaban";
private static final String password = "azkaban";
private static final int numConnections = 10;
- private File flowDir = new File("unit/executions/exectest1");
-
@BeforeClass
public static void setupDB() {
DataSource dataSource =
@@ -226,10 +229,8 @@ public class JdbcExecutorLoaderTest {
if (!isTestSetup()) {
return;
}
- Assert.assertEquals(1, 0);
-
ExecutorLoader loader = createLoader();
- ExecutableFlow flow = createExecutableFlow("exec1");
+ ExecutableFlow flow = TestUtils.createExecutableFlow("exectest1", "exec1");
loader.uploadExecutableFlow(flow);
@@ -258,7 +259,7 @@ public class JdbcExecutorLoaderTest {
}
ExecutorLoader loader = createLoader();
- ExecutableFlow flow = createExecutableFlow("exec1");
+ ExecutableFlow flow = TestUtils.createExecutableFlow("exectest1", "exec1");
loader.uploadExecutableFlow(flow);
@@ -297,7 +298,7 @@ public class JdbcExecutorLoaderTest {
ExecutableFlow flow = createExecutableFlow(10, "exec1");
flow.setExecutionId(10);
- File jobFile = new File(flowDir, "job10.job");
+ File jobFile = new File(UNIT_BASE_DIR + "/exectest1", "job10.job");
Props props = new Props(null, jobFile);
props.put("test", "test2");
ExecutableNode oldNode = flow.getExecutableNode("job10");
@@ -334,6 +335,149 @@ public class JdbcExecutorLoaderTest {
}
+ /* Test exception when assigning a non-existent executor to a flow */
+ @Test
+ public void testAssignExecutorInvalidExecutor()
+ throws ExecutorManagerException, IOException {
+ if (!isTestSetup()) {
+ return;
+ }
+ ExecutorLoader loader = createLoader();
+ ExecutableFlow flow = TestUtils.createExecutableFlow("exectest1", "exec1");
+ loader.uploadExecutableFlow(flow);
+ try {
+ loader.assignExecutor(flow.getExecutionId(), 1);
+ Assert.fail("Expecting exception, but didn't get one");
+ } catch (ExecutorManagerException ex) {
+ System.out.println("Test true");
+ }
+ }
+
+ /* Test exception when assigning an executor to a non-existent flow execution */
+ @Test
+ public void testAssignExecutorInvalidExecution()
+ throws ExecutorManagerException, IOException {
+ if (!isTestSetup()) {
+ return;
+ }
+ ExecutorLoader loader = createLoader();
+ String host = "localhost";
+ int port = 12345;
+ Executor executor = loader.addExecutor(host, port);
+ try {
+ loader.assignExecutor(2, executor.getId());
+ Assert.fail("Expecting exception, but didn't get one");
+ } catch (ExecutorManagerException ex) {
+ System.out.println("Test true");
+ }
+ }
+
+ /* Test null return when an invalid execution flows */
+ @Test
+ public void testFetchMissingExecutorByExecution()
+ throws ExecutorManagerException, IOException {
+ if (!isTestSetup()) {
+ return;
+ }
+ ExecutorLoader loader = createLoader();
+ Assert.assertEquals(loader.fetchExecutorByExecutionId(1), null);
+ }
+
+ /* Test null return when for a non-dispatched execution */
+ @Test
+ public void testFetchExecutorByQueuedExecution()
+ throws ExecutorManagerException, IOException {
+ if (!isTestSetup()) {
+ return;
+ }
+ ExecutorLoader loader = createLoader();
+ ExecutableFlow flow = TestUtils.createExecutableFlow("exectest1", "exec1");
+ loader.uploadExecutableFlow(flow);
+ Assert.assertEquals(loader.fetchExecutorByExecutionId(flow.getExecutionId()),
+ null);
+ }
+
+ /* Test happy case when assigning and fetching an executor to a flow execution */
+ @Test
+ public void testAssignAndFetchExecutor() throws ExecutorManagerException,
+ IOException {
+ if (!isTestSetup()) {
+ return;
+ }
+ ExecutorLoader loader = createLoader();
+ String host = "localhost";
+ int port = 12345;
+ Executor executor = loader.addExecutor(host, port);
+ ExecutableFlow flow = TestUtils.createExecutableFlow("exectest1", "exec1");
+ loader.uploadExecutableFlow(flow);
+ loader.assignExecutor(executor.getId(), flow.getExecutionId());
+ Assert.assertEquals(loader.fetchExecutorByExecutionId(flow.getExecutionId()),
+ executor);
+ }
+
+ /* Test fetchQueuedFlows when there are no queued flows */
+ @Test
+ public void testFetchNoQueuedFlows() throws ExecutorManagerException,
+ IOException {
+ if (!isTestSetup()) {
+ return;
+ }
+
+ ExecutorLoader loader = createLoader();
+ List<Pair<ExecutionReference, ExecutableFlow>> queuedFlows =
+ loader.fetchQueuedFlows();
+
+ // no execution flows at all i.e. no running, completed or queued flows
+ Assert.assertTrue(queuedFlows.isEmpty());
+
+ String host = "lcoalhost";
+ int port = 12345;
+ Executor executor = loader.addExecutor(host, port);
+
+ ExecutableFlow flow = TestUtils.createExecutableFlow("exectest1", "exec1");
+ loader.uploadExecutableFlow(flow);
+ loader.assignExecutor(executor.getId(), flow.getExecutionId());
+ // only completed flows
+ Assert.assertTrue(queuedFlows.isEmpty());
+
+ ExecutableFlow flow2 = TestUtils.createExecutableFlow("exectest1", "exec2");
+ loader.uploadExecutableFlow(flow);
+ loader.assignExecutor(executor.getId(), flow.getExecutionId());
+ ExecutionReference ref = new ExecutionReference(flow2.getExecutionId());
+ loader.addActiveExecutableReference(ref);
+ // only running and completed flows
+ Assert.assertTrue(queuedFlows.isEmpty());
+ }
+
+ /* Test fetchQueuedFlows happy case */
+ @Test
+ public void testFetchQueuedFlows() throws ExecutorManagerException,
+ IOException {
+ if (!isTestSetup()) {
+ return;
+ }
+
+ ExecutorLoader loader = createLoader();
+ List<Pair<ExecutionReference, ExecutableFlow>> queuedFlows =
+ new LinkedList<Pair<ExecutionReference, ExecutableFlow>>();
+
+ ExecutableFlow flow = TestUtils.createExecutableFlow("exectest1", "exec1");
+ loader.uploadExecutableFlow(flow);
+ ExecutableFlow flow2 = TestUtils.createExecutableFlow("exectest1", "exec2");
+ loader.uploadExecutableFlow(flow);
+
+ ExecutionReference ref2 = new ExecutionReference(flow2.getExecutionId());
+ loader.addActiveExecutableReference(ref2);
+ ExecutionReference ref = new ExecutionReference(flow.getExecutionId());
+ loader.addActiveExecutableReference(ref);
+
+ queuedFlows.add(new Pair<ExecutionReference, ExecutableFlow>(ref, flow));
+ queuedFlows.add(new Pair<ExecutionReference, ExecutableFlow>(ref2, flow2));
+
+ // only running and completed flows
+ Assert.assertArrayEquals(loader.fetchQueuedFlows().toArray(),
+ queuedFlows.toArray());
+ }
/* Test all executors fetch from empty executors */
@Test
@@ -434,7 +578,7 @@ public class JdbcExecutorLoaderTest {
ExecutorLoader loader = createLoader();
try {
String host = "localhost";
- int port = 123456;
+ int port = 12345;
loader.addExecutor(host, port);
loader.addExecutor(host, port);
Assert.fail("Expecting exception, but didn't get one");
@@ -457,6 +601,7 @@ public class JdbcExecutorLoaderTest {
} catch (ExecutorManagerException ex) {
System.out.println("Test true");
}
+ clearDB();
}
/* Test add & fetch by Id Executors */
@@ -587,19 +732,20 @@ public class JdbcExecutorLoaderTest {
}
ExecutorLoader loader = createLoader();
- ExecutableFlow flow1 = createExecutableFlow("exec1");
+ ExecutableFlow flow1 = TestUtils.createExecutableFlow("exectest1", "exec1");
loader.uploadExecutableFlow(flow1);
+ Executor executor = new Executor(2, "test", 1, true);
ExecutionReference ref1 =
- new ExecutionReference(flow1.getExecutionId(), "test", 1);
+ new ExecutionReference(flow1.getExecutionId(), executor);
loader.addActiveExecutableReference(ref1);
- ExecutableFlow flow2 = createExecutableFlow("exec1");
+ ExecutableFlow flow2 = TestUtils.createExecutableFlow("exectest1", "exec1");
loader.uploadExecutableFlow(flow2);
ExecutionReference ref2 =
- new ExecutionReference(flow2.getExecutionId(), "test", 1);
+ new ExecutionReference(flow2.getExecutionId(), executor);
loader.addActiveExecutableReference(ref2);
- ExecutableFlow flow3 = createExecutableFlow("exec1");
+ ExecutableFlow flow3 = TestUtils.createExecutableFlow("exectest1", "exec1");
loader.uploadExecutableFlow(flow3);
Map<Integer, Pair<ExecutionReference, ExecutableFlow>> activeFlows1 =
@@ -643,7 +789,7 @@ public class JdbcExecutorLoaderTest {
@Ignore @Test
public void testSmallUploadLog() throws ExecutorManagerException {
- File logDir = new File("unit/executions/logtest");
+ File logDir = new File(UNIT_BASE_DIR + "logtest");
File[] smalllog =
{ new File(logDir, "log1.log"), new File(logDir, "log2.log"),
new File(logDir, "log3.log") };
@@ -668,7 +814,7 @@ public class JdbcExecutorLoaderTest {
@Ignore @Test
public void testLargeUploadLog() throws ExecutorManagerException {
- File logDir = new File("unit/executions/logtest");
+ File logDir = new File(UNIT_BASE_DIR + "logtest");
// Multiple of 255 for Henry the Eigth
File[] largelog =
@@ -714,7 +860,7 @@ public class JdbcExecutorLoaderTest {
ExecutorLoader loader = createLoader();
- File logDir = new File("unit/executions/logtest");
+ File logDir = new File(UNIT_BASE_DIR + "logtest");
// Multiple of 255 for Henry the Eigth
File[] largelog =
@@ -738,37 +884,10 @@ public class JdbcExecutorLoaderTest {
}
private ExecutableFlow createExecutableFlow(int executionId, String flowName)
- throws IOException {
- File jsonFlowFile = new File(flowDir, flowName + ".flow");
- @SuppressWarnings("unchecked")
- HashMap<String, Object> flowObj =
- (HashMap<String, Object>) JSONUtils.parseJSONFromFile(jsonFlowFile);
-
- Flow flow = Flow.flowFromObject(flowObj);
- Project project = new Project(1, "flow");
- HashMap<String, Flow> flowMap = new HashMap<String, Flow>();
- flowMap.put(flow.getId(), flow);
- project.setFlows(flowMap);
- ExecutableFlow execFlow = new ExecutableFlow(project, flow);
+ throws IOException {
+ ExecutableFlow execFlow =
+ TestUtils.createExecutableFlow("exectest1", flowName);
execFlow.setExecutionId(executionId);
-
- return execFlow;
- }
-
- private ExecutableFlow createExecutableFlow(String flowName)
- throws IOException {
- File jsonFlowFile = new File(flowDir, flowName + ".flow");
- @SuppressWarnings("unchecked")
- HashMap<String, Object> flowObj =
- (HashMap<String, Object>) JSONUtils.parseJSONFromFile(jsonFlowFile);
-
- Flow flow = Flow.flowFromObject(flowObj);
- Project project = new Project(1, "flow");
- HashMap<String, Flow> flowMap = new HashMap<String, Flow>();
- flowMap.put(flow.getId(), flow);
- project.setFlows(flowMap);
- ExecutableFlow execFlow = new ExecutableFlow(project, flow);
-
return execFlow;
}
diff --git a/azkaban-common/src/test/java/azkaban/executor/MockExecutorLoader.java b/azkaban-common/src/test/java/azkaban/executor/MockExecutorLoader.java
index f2ffee8..833e0c6 100644
--- a/azkaban-common/src/test/java/azkaban/executor/MockExecutorLoader.java
+++ b/azkaban-common/src/test/java/azkaban/executor/MockExecutorLoader.java
@@ -30,6 +30,8 @@ import azkaban.utils.Props;
public class MockExecutorLoader implements ExecutorLoader {
+ HashMap<Integer, Integer> executionExecutorMapping =
+ new HashMap<Integer, Integer>();
HashMap<Integer, ExecutableFlow> flows =
new HashMap<Integer, ExecutableFlow>();
HashMap<String, ExecutableNode> nodes = new HashMap<String, ExecutableNode>();
@@ -291,11 +293,12 @@ public class MockExecutorLoader implements ExecutorLoader {
@Override
public Executor addExecutor(String host, int port)
throws ExecutorManagerException {
- if (fetchExecutor(host, port) != null) {
-
+ Executor executor = null;
+ if (fetchExecutor(host, port) == null) {
+ executorIdCounter++;
+ executor = new Executor(executorIdCounter, host, port, true);
+ executors.add(executor);
}
- executorIdCounter++;
- Executor executor = new Executor(executorIdCounter, host, port, true);
return executor;
}
@@ -334,4 +337,35 @@ public class MockExecutorLoader implements ExecutorLoader {
return executors;
}
+ @Override
+ public void assignExecutor(int executorId, int execId)
+ throws ExecutorManagerException {
+ ExecutionReference ref = refs.get(execId);
+ ref.setExecutor(fetchExecutor(executorId));
+ executionExecutorMapping.put(execId, executorId);
+ }
+
+ @Override
+ public Executor fetchExecutorByExecutionId(int execId) throws ExecutorManagerException {
+ if (executionExecutorMapping.containsKey(execId)) {
+ return fetchExecutor(executionExecutorMapping.get(execId));
+ } else {
+ throw new ExecutorManagerException(
+ "Failed to find executor with execution : " + execId);
+ }
+ }
+
+ @Override
+ public List<Pair<ExecutionReference, ExecutableFlow>> fetchQueuedFlows()
+ throws ExecutorManagerException {
+ List<Pair<ExecutionReference, ExecutableFlow>> queuedFlows =
+ new ArrayList<Pair<ExecutionReference, ExecutableFlow>>();
+ for (int execId : refs.keySet()) {
+ if (!executionExecutorMapping.containsKey(execId)) {
+ queuedFlows.add(new Pair<ExecutionReference, ExecutableFlow>(refs
+ .get(execId), flows.get(execId)));
+ }
+ }
+ return queuedFlows;
+ }
}
diff --git a/azkaban-common/src/test/java/azkaban/executor/QueuedExecutionsTest.java b/azkaban-common/src/test/java/azkaban/executor/QueuedExecutionsTest.java
new file mode 100644
index 0000000..94bbd7e
--- /dev/null
+++ b/azkaban-common/src/test/java/azkaban/executor/QueuedExecutionsTest.java
@@ -0,0 +1,208 @@
+package azkaban.executor;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+import azkaban.flow.Flow;
+import azkaban.project.Project;
+import azkaban.utils.JSONUtils;
+import azkaban.utils.Pair;
+
+public class QueuedExecutionsTest {
+ /* Directory with serialized description of test flows */
+ private static final String UNIT_BASE_DIR =
+ "../azkaban-test/src/test/resources/executions/exectest1/";
+
+ private File getFlowDir(String flow) {
+ return new File(UNIT_BASE_DIR + flow + ".flow");
+ }
+
+ /*
+ * Helper method to create an (ExecutionReference, ExecutableFlow) from
+ * serialized description
+ */
+ private Pair<ExecutionReference, ExecutableFlow> createExecutablePair(
+ String flowName, int execId) throws IOException {
+ File jsonFlowFile = getFlowDir(flowName);
+ @SuppressWarnings("unchecked")
+ HashMap<String, Object> flowObj =
+ (HashMap<String, Object>) JSONUtils.parseJSONFromFile(jsonFlowFile);
+
+ Flow flow = Flow.flowFromObject(flowObj);
+ Project project = new Project(1, "flow");
+ HashMap<String, Flow> flowMap = new HashMap<String, Flow>();
+ flowMap.put(flow.getId(), flow);
+ project.setFlows(flowMap);
+ ExecutableFlow execFlow = new ExecutableFlow(project, flow);
+ execFlow.setExecutionId(execId);
+ ExecutionReference ref = new ExecutionReference(execId);
+ return new Pair<ExecutionReference, ExecutableFlow>(ref, execFlow);
+ }
+
+ public List<Pair<ExecutionReference, ExecutableFlow>> getDummyData()
+ throws IOException {
+ List<Pair<ExecutionReference, ExecutableFlow>> dataList =
+ new ArrayList<Pair<ExecutionReference, ExecutableFlow>>();
+ dataList.add(createExecutablePair("exec1", 1));
+ dataList.add(createExecutablePair("exec2", 2));
+ return dataList;
+ }
+
+ /* Test enqueue method happy case */
+ @Test
+ public void testEnqueueHappyCase() throws IOException,
+ ExecutorManagerException {
+ QueuedExecutions queue = new QueuedExecutions(5);
+ List<Pair<ExecutionReference, ExecutableFlow>> dataList = getDummyData();
+ for (Pair<ExecutionReference, ExecutableFlow> pair : dataList) {
+ queue.enqueue(pair.getSecond(), pair.getFirst());
+ }
+
+ Assert.assertTrue(queue.getAllEntries().containsAll(dataList));
+ Assert.assertTrue(dataList.containsAll(queue.getAllEntries()));
+ }
+
+ /* Test enqueue duplicate execution ids */
+ @Test(expected = ExecutorManagerException.class)
+ public void testEnqueueDuplicateExecution() throws IOException,
+ ExecutorManagerException {
+ Pair<ExecutionReference, ExecutableFlow> pair1 =
+ createExecutablePair("exec1", 1);
+ QueuedExecutions queue = new QueuedExecutions(5);
+ queue.enqueue(pair1.getSecond(), pair1.getFirst());
+ queue.enqueue(pair1.getSecond(), pair1.getFirst());
+ }
+
+ /* Test enqueue more than capacity */
+ @Test(expected = ExecutorManagerException.class)
+ public void testEnqueueOverflow() throws IOException,
+ ExecutorManagerException {
+ Pair<ExecutionReference, ExecutableFlow> pair1 =
+ createExecutablePair("exec1", 1);
+ QueuedExecutions queue = new QueuedExecutions(1);
+ queue.enqueue(pair1.getSecond(), pair1.getFirst());
+ queue.enqueue(pair1.getSecond(), pair1.getFirst());
+ }
+
+ /* Test EnqueueAll method */
+ @Test
+ public void testEnqueueAll() throws IOException, ExecutorManagerException {
+ QueuedExecutions queue = new QueuedExecutions(5);
+ List<Pair<ExecutionReference, ExecutableFlow>> dataList = getDummyData();
+ queue.enqueueAll(dataList);
+ Assert.assertTrue(queue.getAllEntries().containsAll(dataList));
+ Assert.assertTrue(dataList.containsAll(queue.getAllEntries()));
+ }
+
+ /* Test size method */
+ @Test
+ public void testSize() throws IOException, ExecutorManagerException {
+ QueuedExecutions queue = new QueuedExecutions(5);
+ List<Pair<ExecutionReference, ExecutableFlow>> dataList = getDummyData();
+ queue.enqueueAll(dataList);
+ Assert.assertEquals(queue.size(), 2);
+ }
+
+ /* Test dequeue method */
+ @Test
+ public void testDequeue() throws IOException, ExecutorManagerException {
+ QueuedExecutions queue = new QueuedExecutions(5);
+ List<Pair<ExecutionReference, ExecutableFlow>> dataList = getDummyData();
+ queue.enqueueAll(dataList);
+ queue.dequeue(dataList.get(0).getFirst().getExecId());
+ Assert.assertEquals(queue.size(), 1);
+ Assert.assertTrue(queue.getAllEntries().contains(dataList.get(1)));
+ }
+
+ /* Test clear method */
+ @Test
+ public void testClear() throws IOException, ExecutorManagerException {
+ QueuedExecutions queue = new QueuedExecutions(5);
+ List<Pair<ExecutionReference, ExecutableFlow>> dataList = getDummyData();
+ queue.enqueueAll(dataList);
+ Assert.assertEquals(queue.size(), 2);
+ queue.clear();
+ Assert.assertEquals(queue.size(), 0);
+ }
+
+ /* Test isEmpty method */
+ @Test
+ public void testIsEmpty() throws IOException, ExecutorManagerException {
+ QueuedExecutions queue = new QueuedExecutions(5);
+ List<Pair<ExecutionReference, ExecutableFlow>> dataList = getDummyData();
+ Assert.assertTrue(queue.isEmpty());
+ queue.enqueueAll(dataList);
+ Assert.assertEquals(queue.size(), 2);
+ queue.clear();
+ Assert.assertTrue(queue.isEmpty());
+ }
+
+ /* Test fetchHead method */
+ @Test
+ public void testFetchHead() throws IOException, ExecutorManagerException,
+ InterruptedException {
+ QueuedExecutions queue = new QueuedExecutions(5);
+ List<Pair<ExecutionReference, ExecutableFlow>> dataList = getDummyData();
+ Assert.assertTrue(queue.isEmpty());
+ queue.enqueueAll(dataList);
+ Assert.assertEquals(queue.fetchHead(), dataList.get(0));
+ Assert.assertEquals(queue.fetchHead(), dataList.get(1));
+ }
+
+ /* Test isFull method */
+ @Test
+ public void testIsFull() throws IOException, ExecutorManagerException,
+ InterruptedException {
+ QueuedExecutions queue = new QueuedExecutions(2);
+ List<Pair<ExecutionReference, ExecutableFlow>> dataList = getDummyData();
+ queue.enqueueAll(dataList);
+ Assert.assertTrue(queue.isFull());
+ }
+
+ /* Test hasExecution method */
+ @Test
+ public void testHasExecution() throws IOException, ExecutorManagerException,
+ InterruptedException {
+ QueuedExecutions queue = new QueuedExecutions(2);
+ List<Pair<ExecutionReference, ExecutableFlow>> dataList = getDummyData();
+ queue.enqueueAll(dataList);
+ for (Pair<ExecutionReference, ExecutableFlow> pair : dataList) {
+ Assert.assertTrue(queue.hasExecution(pair.getFirst().getExecId()));
+ }
+ Assert.assertFalse(queue.hasExecution(5));
+ Assert.assertFalse(queue.hasExecution(7));
+ Assert.assertFalse(queue.hasExecution(15));
+ }
+
+ /* Test getFlow method */
+ @Test
+ public void testGetFlow() throws IOException, ExecutorManagerException,
+ InterruptedException {
+ QueuedExecutions queue = new QueuedExecutions(2);
+ List<Pair<ExecutionReference, ExecutableFlow>> dataList = getDummyData();
+ queue.enqueueAll(dataList);
+ for (Pair<ExecutionReference, ExecutableFlow> pair : dataList) {
+ Assert.assertEquals(pair.getSecond(),
+ queue.getFlow(pair.getFirst().getExecId()));
+ }
+ }
+
+ /* Test getReferences method */
+ @Test
+ public void testGetReferences() throws IOException, ExecutorManagerException,
+ InterruptedException {
+ QueuedExecutions queue = new QueuedExecutions(2);
+ List<Pair<ExecutionReference, ExecutableFlow>> dataList = getDummyData();
+ queue.enqueueAll(dataList);
+ for (Pair<ExecutionReference, ExecutableFlow> pair : dataList) {
+ Assert.assertEquals(pair.getFirst(),
+ queue.getReference(pair.getFirst().getExecId()));
+ }
+ }
+}
\ No newline at end of file
diff --git a/azkaban-common/src/test/java/azkaban/project/JdbcProjectLoaderTest.java b/azkaban-common/src/test/java/azkaban/project/JdbcProjectLoaderTest.java
index a7ef5f3..67efed9 100644
--- a/azkaban-common/src/test/java/azkaban/project/JdbcProjectLoaderTest.java
+++ b/azkaban-common/src/test/java/azkaban/project/JdbcProjectLoaderTest.java
@@ -28,7 +28,6 @@ import javax.sql.DataSource;
import org.apache.commons.dbutils.DbUtils;
import org.apache.commons.dbutils.QueryRunner;
import org.apache.commons.dbutils.ResultSetHandler;
-
import org.junit.Assert;
import org.junit.BeforeClass;
import org.junit.Ignore;
@@ -214,6 +213,96 @@ public class JdbcProjectLoaderTest {
DbUtils.closeQuietly(connection);
}
+ /** Test case to validated permissions for fetchProjectByName **/
+ @Test
+ public void testPermissionRetrivalByFetchProjectByName()
+ throws ProjectManagerException {
+ if (!isTestSetup()) {
+ return;
+ }
+
+ ProjectLoader loader = createLoader();
+ String projectName = "mytestProject";
+ String projectDescription = "This is my new project";
+ User user = new User("testUser");
+
+ Project project =
+ loader.createNewProject(projectName, projectDescription, user);
+
+ Permission perm = new Permission(0x2);
+ loader.updatePermission(project, user.getUserId(), perm, false);
+ loader.updatePermission(project, "group", perm, true);
+
+ Permission permOverride = new Permission(0x6);
+ loader.updatePermission(project, user.getUserId(), permOverride, false);
+
+ Project fetchedProject = loader.fetchProjectByName(project.getName());
+ assertProjectMemberEquals(project, fetchedProject);
+ Assert.assertEquals(permOverride,
+ fetchedProject.getUserPermission(user.getUserId()));
+ }
+
+ /** Default Test case for fetchProjectByName **/
+ @Test
+ public void testProjectRetrievalByFetchProjectByName()
+ throws ProjectManagerException {
+ if (!isTestSetup()) {
+ return;
+ }
+
+ ProjectLoader loader = createLoader();
+ String projectName = "mytestProject";
+ String projectDescription = "This is my new project";
+ User user = new User("testUser");
+
+ Project project =
+ loader.createNewProject(projectName, projectDescription, user);
+
+ Project fetchedProject = loader.fetchProjectByName(project.getName());
+ assertProjectMemberEquals(project, fetchedProject);
+ }
+
+ /** Default Test case for fetchProjectByName **/
+ @Test
+ public void testDuplicateRetrivalByFetchProjectByName()
+ throws ProjectManagerException {
+ if (!isTestSetup()) {
+ return;
+ }
+
+ ProjectLoader loader = createLoader();
+ String projectName = "mytestProject";
+ String projectDescription = "This is my new project";
+ User user = new User("testUser");
+
+ Project project =
+ loader.createNewProject(projectName, projectDescription, user);
+
+ loader.removeProject(project, user.getUserId());
+
+ Project newProject =
+ loader.createNewProject(projectName, projectDescription, user);
+
+ Project fetchedProject = loader.fetchProjectByName(project.getName());
+ Assert.assertEquals(newProject.getId(), fetchedProject.getId());
+
+ }
+
+ /** Test case for NonExistantProject project fetch **/
+ @Test
+ public void testInvalidProjectByFetchProjectByName() {
+ if (!isTestSetup()) {
+ return;
+ }
+ ProjectLoader loader = createLoader();
+ try {
+ loader.fetchProjectByName("NonExistantProject");
+ } catch (ProjectManagerException ex) {
+ System.out.println("Test true");
+ }
+ Assert.fail("Expecting exception, but didn't get one");
+ }
+
@Test
public void testCreateProject() throws ProjectManagerException {
if (!isTestSetup()) {
diff --git a/azkaban-common/src/test/java/azkaban/project/MockProjectLoader.java b/azkaban-common/src/test/java/azkaban/project/MockProjectLoader.java
index 909b82e..42d57b7 100644
--- a/azkaban-common/src/test/java/azkaban/project/MockProjectLoader.java
+++ b/azkaban-common/src/test/java/azkaban/project/MockProjectLoader.java
@@ -243,4 +243,10 @@ public class MockProjectLoader implements ProjectLoader {
// TODO Auto-generated method stub
}
+
+@Override
+public Project fetchProjectByName(String name) throws ProjectManagerException {
+ // TODO Auto-generated method stub
+ return null;
+}
}
diff --git a/azkaban-common/src/test/java/azkaban/utils/TestUtils.java b/azkaban-common/src/test/java/azkaban/utils/TestUtils.java
new file mode 100644
index 0000000..e51b575
--- /dev/null
+++ b/azkaban-common/src/test/java/azkaban/utils/TestUtils.java
@@ -0,0 +1,62 @@
+/*
+ * Copyright 2015 LinkedIn Corp.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package azkaban.utils;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.HashMap;
+
+import azkaban.executor.ExecutableFlow;
+import azkaban.flow.Flow;
+import azkaban.project.Project;
+import azkaban.user.User;
+
+/**
+ * Commonly used utils method for unit/integration tests
+ */
+public class TestUtils {
+ /* Directory with serialized description of test flows */
+ private static final String UNIT_BASE_DIR =
+ "../azkaban-test/src/test/resources/executions";
+
+ public static File getFlowDir(String projectName, String flow) {
+ return new File(String.format("%s/%s/%s.flow", UNIT_BASE_DIR, projectName,
+ flow));
+ }
+
+ public static User getTestUser() {
+ return new User("testUser");
+ }
+
+ /* Helper method to create an ExecutableFlow from serialized description */
+ public static ExecutableFlow createExecutableFlow(String projectName, String flowName)
+ throws IOException {
+ File jsonFlowFile = getFlowDir(projectName, flowName);
+ @SuppressWarnings("unchecked")
+ HashMap<String, Object> flowObj =
+ (HashMap<String, Object>) JSONUtils.parseJSONFromFile(jsonFlowFile);
+
+ Flow flow = Flow.flowFromObject(flowObj);
+ Project project = new Project(1, "flow");
+ HashMap<String, Flow> flowMap = new HashMap<String, Flow>();
+ flowMap.put(flow.getId(), flow);
+ project.setFlows(flowMap);
+ ExecutableFlow execFlow = new ExecutableFlow(project, flow);
+
+ return execFlow;
+ }
+}
diff --git a/azkaban-execserver/src/main/java/azkaban/execapp/StatsServlet.java b/azkaban-execserver/src/main/java/azkaban/execapp/StatsServlet.java
index e66a586..1d9af6d 100644
--- a/azkaban-execserver/src/main/java/azkaban/execapp/StatsServlet.java
+++ b/azkaban-execserver/src/main/java/azkaban/execapp/StatsServlet.java
@@ -72,7 +72,7 @@ public class StatsServlet extends HttpServlet implements ConnectorParams {
/**
* Handle all get request to Stats Servlet {@inheritDoc}
- *
+ *
* @see javax.servlet.http.HttpServlet#doGet(javax.servlet.http.HttpServletRequest,
* javax.servlet.http.HttpServletResponse)
*/
@@ -176,7 +176,7 @@ public class StatsServlet extends HttpServlet implements ConnectorParams {
/**
* Get metric snapshots for a metric and date specification
- *
+ *
* @throws ServletException
*/
private void handleGetMetricHistory(HttpServletRequest req,
@@ -252,7 +252,7 @@ public class StatsServlet extends HttpServlet implements ConnectorParams {
/**
* Update tracking interval for a given metrics
- *
+ *
* @throws ServletException
*/
private void handleChangeMetricInterval(HttpServletRequest req,
diff --git a/azkaban-sql/src/sql/update.active_executing_flows.3.0.sql b/azkaban-sql/src/sql/update.active_executing_flows.3.0.sql
new file mode 100644
index 0000000..dcb4ec5
--- /dev/null
+++ b/azkaban-sql/src/sql/update.active_executing_flows.3.0.sql
@@ -0,0 +1,2 @@
+ALTER TABLE active_executing_flows DROP COLUMN host;
+ALTER TABLE active_executing_flows DROP COLUMN port;
\ No newline at end of file
diff --git a/azkaban-sql/src/sql/update.execution_flows.3.0.sql b/azkaban-sql/src/sql/update.execution_flows.3.0.sql
new file mode 100644
index 0000000..2935810
--- /dev/null
+++ b/azkaban-sql/src/sql/update.execution_flows.3.0.sql
@@ -0,0 +1,2 @@
+ALTER TABLE execution_flows ADD COLUMN executor_id INT DEFAULT NULL;
+CREATE INDEX executor_id ON execution_flows(executor_id);
\ No newline at end of file
diff --git a/azkaban-webserver/src/main/java/azkaban/webapp/servlet/ExecutorServlet.java b/azkaban-webserver/src/main/java/azkaban/webapp/servlet/ExecutorServlet.java
index 2923d5e..f3e8c8f 100644
--- a/azkaban-webserver/src/main/java/azkaban/webapp/servlet/ExecutorServlet.java
+++ b/azkaban-webserver/src/main/java/azkaban/webapp/servlet/ExecutorServlet.java
@@ -34,6 +34,7 @@ import azkaban.executor.ExecutableFlowBase;
import azkaban.executor.ExecutableNode;
import azkaban.executor.ExecutionOptions;
import azkaban.executor.ExecutionOptions.FailureAction;
+import azkaban.executor.Executor;
import azkaban.executor.ExecutorManagerAdapter;
import azkaban.executor.ExecutorManagerException;
import azkaban.executor.Status;
@@ -47,8 +48,11 @@ import azkaban.server.HttpRequestUtils;
import azkaban.server.session.Session;
import azkaban.user.Permission;
import azkaban.user.Permission.Type;
+import azkaban.user.Role;
import azkaban.user.User;
+import azkaban.user.UserManager;
import azkaban.utils.FileIOUtils.LogData;
+import azkaban.utils.Pair;
import azkaban.webapp.AzkabanWebServer;
import azkaban.webapp.plugin.PluginRegistry;
import azkaban.webapp.plugin.ViewerPlugin;
@@ -59,11 +63,13 @@ public class ExecutorServlet extends LoginAbstractAzkabanServlet {
private ExecutorManagerAdapter executorManager;
private ScheduleManager scheduleManager;
private ExecutorVelocityHelper velocityHelper;
+ private UserManager userManager;
@Override
public void init(ServletConfig config) throws ServletException {
super.init(config);
AzkabanWebServer server = (AzkabanWebServer) getApplication();
+ userManager = server.getUserManager();
projectManager = server.getProjectManager();
executorManager = server.getExecutorManager();
scheduleManager = server.getScheduleManager();
@@ -225,7 +231,8 @@ public class ExecutorServlet extends LoginAbstractAzkabanServlet {
newPage(req, resp, session,
"azkaban/webapp/servlet/velocity/executionspage.vm");
- List<ExecutableFlow> runningFlows = executorManager.getRunningFlows();
+ List<Pair<ExecutableFlow, Executor>> runningFlows =
+ executorManager.getActiveFlowsWithExecutor();
page.add("runningFlows", runningFlows.isEmpty() ? null : runningFlows);
List<ExecutableFlow> finishedFlows =
@@ -806,6 +813,7 @@ public class ExecutorServlet extends LoginAbstractAzkabanServlet {
if (!options.isSuccessEmailsOverridden()) {
options.setSuccessEmails(flow.getSuccessEmails());
}
+ fixFlowPriorityByPermission(options, user);
options.setMailCreator(flow.getMailCreator());
try {
@@ -821,6 +829,15 @@ public class ExecutorServlet extends LoginAbstractAzkabanServlet {
ret.put("execid", exflow.getExecutionId());
}
+ /* Reset flow priority if submitting user is not a Azkaban admin */
+ private void fixFlowPriorityByPermission(ExecutionOptions options, User user) {
+ if (!(options.getFlowParameters().containsKey(
+ ExecutionOptions.FLOW_PRIORITY) && hasPermission(user, Type.ADMIN))) {
+ options.getFlowParameters().put(ExecutionOptions.FLOW_PRIORITY,
+ String.valueOf(ExecutionOptions.DEFAULT_FLOW_PRIORITY));
+ }
+ }
+
public class ExecutorVelocityHelper {
public String getProjectName(int id) {
Project project = projectManager.getProject(id);
@@ -831,4 +848,16 @@ public class ExecutorServlet extends LoginAbstractAzkabanServlet {
return project.getName();
}
}
+
+ /* returns true if user has access of type */
+ protected boolean hasPermission(User user, Permission.Type type) {
+ for (String roleName : user.getRoles()) {
+ Role role = userManager.getRole(roleName);
+ if (role.getPermission().isPermissionSet(type)
+ || role.getPermission().isPermissionSet(Permission.Type.ADMIN)) {
+ return true;
+ }
+ }
+ return false;
+ }
}
diff --git a/azkaban-webserver/src/main/java/azkaban/webapp/servlet/ProjectManagerServlet.java b/azkaban-webserver/src/main/java/azkaban/webapp/servlet/ProjectManagerServlet.java
index 435e1f9..dc0ef87 100644
--- a/azkaban-webserver/src/main/java/azkaban/webapp/servlet/ProjectManagerServlet.java
+++ b/azkaban-webserver/src/main/java/azkaban/webapp/servlet/ProjectManagerServlet.java
@@ -148,6 +148,8 @@ public class ProjectManagerServlet extends LoginAbstractAzkabanServlet {
handleFlowPage(req, resp, session);
} else if (hasParam(req, "delete")) {
handleRemoveProject(req, resp, session);
+ } else if (hasParam(req, "purge")) {
+ handlePurgeProject(req, resp, session);
} else if (hasParam(req, "download")) {
handleDownloadProject(req, resp, session);
} else {
@@ -518,6 +520,56 @@ public class ProjectManagerServlet extends LoginAbstractAzkabanServlet {
}
+ /**
+ * validate readiness of a project and user permission and use
+ * projectManager to purge the project if things looks good
+ **/
+ private void handlePurgeProject(HttpServletRequest req,
+ HttpServletResponse resp, Session session) throws ServletException,
+ IOException {
+ User user = session.getUser();
+ String projectName = getParam(req, "project");
+ HashMap<String, Object> ret = new HashMap<String, Object>();
+ boolean isOperationSuccessful = true;
+
+ if (projectManager.isActiveProject(projectName)) {
+ ret.put("error", "Project " + projectName
+ + " should be deleted before purging");
+ isOperationSuccessful = false;
+ }
+
+ try {
+ Project project = null;
+
+ // project is already deleted
+ if (isOperationSuccessful) {
+ project = projectManager.getProject(projectName);
+ if (project == null) {
+ ret.put("error", "no project with name : " + projectName);
+ isOperationSuccessful = false;
+ }
+ }
+
+ // only eligible users can purge a project
+ if (isOperationSuccessful
+ && !hasPermission(project, user, Type.ADMIN)) {
+ ret.put("error", "Cannot purge. User '" + user.getUserId()
+ + "' is not an ADMIN.");
+ isOperationSuccessful = false;
+ }
+
+ if (isOperationSuccessful) {
+ projectManager.purgeProject(project, user);
+ }
+ } catch (Exception e) {
+ ret.put("error", e.getMessage());
+ isOperationSuccessful = false;
+ }
+
+ ret.put("success", isOperationSuccessful);
+ this.writeJSON(resp, ret);
+ }
+
private void handleRemoveProject(HttpServletRequest req,
HttpServletResponse resp, Session session) throws ServletException,
IOException {
diff --git a/azkaban-webserver/src/main/java/azkaban/webapp/servlet/StatsServlet.java b/azkaban-webserver/src/main/java/azkaban/webapp/servlet/StatsServlet.java
index 6d14839..b2b3487 100644
--- a/azkaban-webserver/src/main/java/azkaban/webapp/servlet/StatsServlet.java
+++ b/azkaban-webserver/src/main/java/azkaban/webapp/servlet/StatsServlet.java
@@ -17,6 +17,7 @@
package azkaban.webapp.servlet;
import java.io.IOException;
+import java.util.Collection;
import java.util.HashMap;
import java.util.Iterator;
import java.util.LinkedList;
@@ -29,7 +30,9 @@ import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
import azkaban.executor.ConnectorParams;
+import azkaban.executor.Executor;
import azkaban.executor.ExecutorManager;
+import azkaban.executor.ExecutorManagerException;
import azkaban.server.session.Session;
import azkaban.user.Permission;
import azkaban.user.Role;
@@ -67,55 +70,105 @@ public class StatsServlet extends LoginAbstractAzkabanServlet {
private void handleAJAXAction(HttpServletRequest req, HttpServletResponse resp, Session session)
throws ServletException, IOException {
HashMap<String, Object> ret = new HashMap<String, Object>();
+ int executorId = getIntParam(req, ConnectorParams.EXECUTOR_ID_PARAM);
String actionName = getParam(req, ConnectorParams.ACTION_PARAM);
if (actionName.equals(ConnectorParams.STATS_GET_METRICHISTORY)) {
- handleGetMetricHistory(req, ret, session.getUser());
+ handleGetMetricHistory(executorId, req, ret, session.getUser());
+ } else if (actionName.equals(ConnectorParams.STATS_GET_ALLMETRICSNAME)) {
+ handleGetAllMetricName(executorId, req, ret);
} else if (actionName.equals(ConnectorParams.STATS_SET_REPORTINGINTERVAL)) {
- handleChangeConfigurationRequest(ConnectorParams.STATS_SET_REPORTINGINTERVAL, req, ret);
+ handleChangeConfigurationRequest(executorId, ConnectorParams.STATS_SET_REPORTINGINTERVAL, req, ret);
} else if (actionName.equals(ConnectorParams.STATS_SET_CLEANINGINTERVAL)) {
- handleChangeConfigurationRequest(ConnectorParams.STATS_SET_CLEANINGINTERVAL, req, ret);
+ handleChangeConfigurationRequest(executorId, ConnectorParams.STATS_SET_CLEANINGINTERVAL, req, ret);
} else if (actionName.equals(ConnectorParams.STATS_SET_MAXREPORTERPOINTS)) {
- handleChangeConfigurationRequest(ConnectorParams.STATS_SET_MAXREPORTERPOINTS, req, ret);
+ handleChangeConfigurationRequest(executorId, ConnectorParams.STATS_SET_MAXREPORTERPOINTS, req, ret);
} else if (actionName.equals(ConnectorParams.STATS_SET_ENABLEMETRICS)) {
- handleChangeConfigurationRequest(ConnectorParams.STATS_SET_ENABLEMETRICS, req, ret);
+ handleChangeConfigurationRequest(executorId, ConnectorParams.STATS_SET_ENABLEMETRICS, req, ret);
} else if (actionName.equals(ConnectorParams.STATS_SET_DISABLEMETRICS)) {
- handleChangeConfigurationRequest(ConnectorParams.STATS_SET_DISABLEMETRICS, req, ret);
+ handleChangeConfigurationRequest(executorId, ConnectorParams.STATS_SET_DISABLEMETRICS, req, ret);
}
writeJSON(resp, ret);
}
/**
+ * Get all metrics tracked by the given executor
+ *
+ * @param executorId
+ * @param req
+ * @param ret
+ */
+ private void handleGetAllMetricName(int executorId, HttpServletRequest req,
+ HashMap<String, Object> ret) throws IOException {
+ Map<String, Object> result;
+ try {
+ result =
+ execManager.callExecutorStats(executorId,
+ ConnectorParams.STATS_GET_ALLMETRICSNAME,
+ (Pair<String, String>[]) null);
+
+ if (result.containsKey(ConnectorParams.RESPONSE_ERROR)) {
+ ret.put("error", result.get(ConnectorParams.RESPONSE_ERROR).toString());
+ } else {
+ ret.put("metricList", result.get("data"));
+ }
+ } catch (ExecutorManagerException e) {
+ ret.put("error", "Failed to fetch metric names for executor : "
+ + executorId);
+ }
+ }
+
+ /**
* Generic method to facilitate actionName action using Azkaban exec server
+ * @param executorId
* @param actionName Name of the action
+ * @throws ExecutorManagerException
*/
- private void handleChangeConfigurationRequest(String actionName, HttpServletRequest req, HashMap<String, Object> ret)
+ private void handleChangeConfigurationRequest(int executorId, String actionName, HttpServletRequest req, HashMap<String, Object> ret)
throws ServletException, IOException {
- Map<String, Object> result = execManager.callExecutorStats(actionName, getAllParams(req));
- if (result.containsKey(ConnectorParams.RESPONSE_ERROR)) {
- ret.put(ConnectorParams.RESPONSE_ERROR, result.get(ConnectorParams.RESPONSE_ERROR).toString());
- } else {
- ret.put(ConnectorParams.STATUS_PARAM, result.get(ConnectorParams.STATUS_PARAM));
+ try {
+ Map<String, Object> result =
+ execManager
+ .callExecutorStats(executorId, actionName, getAllParams(req));
+ if (result.containsKey(ConnectorParams.RESPONSE_ERROR)) {
+ ret.put(ConnectorParams.RESPONSE_ERROR,
+ result.get(ConnectorParams.RESPONSE_ERROR).toString());
+ } else {
+ ret.put(ConnectorParams.STATUS_PARAM,
+ result.get(ConnectorParams.STATUS_PARAM));
+ }
+ } catch (ExecutorManagerException ex) {
+ ret.put("error", "Failed to change config change");
}
}
/**
* Get metric snapshots for a metric and date specification
+ * @param executorId
* @throws ServletException
+ * @throws ExecutorManagerException
*/
- private void handleGetMetricHistory(HttpServletRequest req, HashMap<String, Object> ret, User user)
- throws IOException, ServletException {
- Map<String, Object> result =
- execManager.callExecutorStats(ConnectorParams.STATS_GET_METRICHISTORY, getAllParams(req));
- if (result.containsKey(ConnectorParams.RESPONSE_ERROR)) {
- ret.put(ConnectorParams.RESPONSE_ERROR, result.get(ConnectorParams.RESPONSE_ERROR).toString());
- } else {
- ret.put("data", result.get("data"));
+ private void handleGetMetricHistory(int executorId, HttpServletRequest req,
+ HashMap<String, Object> ret, User user) throws IOException,
+ ServletException {
+ try {
+ Map<String, Object> result =
+ execManager.callExecutorStats(executorId,
+ ConnectorParams.STATS_GET_METRICHISTORY, getAllParams(req));
+ if (result.containsKey(ConnectorParams.RESPONSE_ERROR)) {
+ ret.put(ConnectorParams.RESPONSE_ERROR,
+ result.get(ConnectorParams.RESPONSE_ERROR).toString());
+ } else {
+ ret.put("data", result.get("data"));
+ }
+ } catch (ExecutorManagerException ex) {
+ ret.put("error", "Failed to fetch metric history");
}
}
/**
+ * @throws ExecutorManagerException
*
*/
private void handleStatePageLoad(HttpServletRequest req, HttpServletResponse resp, Session session)
@@ -128,14 +181,20 @@ public class StatsServlet extends LoginAbstractAzkabanServlet {
}
try {
+ Collection<Executor> executors = execManager.getAllActiveExecutors();
+ page.add("executorList", executors);
+
Map<String, Object> result =
- execManager.callExecutorStats(ConnectorParams.STATS_GET_ALLMETRICSNAME, (Pair<String, String>[]) null);
+ execManager.callExecutorStats(executors.iterator().next().getId(),
+ ConnectorParams.STATS_GET_ALLMETRICSNAME,
+ (Pair<String, String>[]) null);
if (result.containsKey(ConnectorParams.RESPONSE_ERROR)) {
- page.add("errorMsg", result.get(ConnectorParams.RESPONSE_ERROR).toString());
+ page.add("errorMsg", result.get(ConnectorParams.RESPONSE_ERROR)
+ .toString());
} else {
page.add("metricList", result.get("data"));
}
- } catch (IOException e) {
+ } catch (Exception e) {
page.add("errorMsg", "Failed to get a response from Azkaban exec server");
}
diff --git a/azkaban-webserver/src/main/resources/azkaban/webapp/servlet/velocity/executionspage.vm b/azkaban-webserver/src/main/resources/azkaban/webapp/servlet/velocity/executionspage.vm
index 49d1085..9c2a6fb 100644
--- a/azkaban-webserver/src/main/resources/azkaban/webapp/servlet/velocity/executionspage.vm
+++ b/azkaban-webserver/src/main/resources/azkaban/webapp/servlet/velocity/executionspage.vm
@@ -68,6 +68,7 @@
<tr>
<th>#</th>
<th class="execid">Execution Id</th>
+ <th >Executor Id</th>
<th>Flow</th>
<th>Project</th>
<th class="user">User</th>
@@ -80,25 +81,33 @@
</tr>
</thead>
<tbody>
-#if ($runningFlows)
+
+#if ( !$null.isNull(${runningFlows}))
#foreach ($flow in $runningFlows)
<tr>
<td class="tb-name">
$velocityCount
</td>
<td class="tb-name">
- <a href="${context}/executor?execid=${flow.executionId}">${flow.executionId}</a>
+ <a href="${context}/executor?execid=${flow.getFirst().executionId}">${flow.getFirst().executionId}</a>
</td>
- <td><a href="${context}/manager?project=$vmutils.getProjectName(${flow.projectId})&flow=${flow.flowId}">${flow.flowId}</a></td>
<td>
- <a href="${context}/manager?project=$vmutils.getProjectName(${flow.projectId})">$vmutils.getProjectName(${flow.projectId})</a>
+ #if (${flow.getSecond()})
+ ${flow.getSecond().getId()}
+ #else
+ -
+ #end
</td>
- <td>${flow.submitUser}</td>
- <td>${flow.proxyUsers}</td>
- <td>$utils.formatDate(${flow.startTime})</td>
- <td>$utils.formatDate(${flow.endTime})</td>
- <td>$utils.formatDuration(${flow.startTime}, ${flow.endTime})</td>
- <td><div class="status ${flow.status}">$utils.formatStatus(${flow.status})</div></td>
+ <td><a href="${context}/manager?project=$vmutils.getProjectName(${flow.getFirst().projectId})&flow=${flow.getFirst().flowId}">${flow.getFirst().flowId}</a></td>
+ <td>
+ <a href="${context}/manager?project=$vmutils.getProjectName(${flow.getFirst().projectId})">$vmutils.getProjectName(${flow.getFirst().projectId})</a>
+ </td>
+ <td>${flow.getFirst().submitUser}</td>
+ <td>${flow.getFirst().proxyUsers}</td>
+ <td>$utils.formatDate(${flow.getFirst().startTime})</td>
+ <td>$utils.formatDate(${flow.getFirst().endTime})</td>
+ <td>$utils.formatDuration(${flow.getFirst().startTime}, ${flow.getFirst().endTime})</td>
+ <td><div class="status ${flow.getFirst().status}">$utils.formatStatus(${flow.getFirst().status})</div></td>
<td></td>
</tr>
#end
diff --git a/azkaban-webserver/src/main/resources/azkaban/webapp/servlet/velocity/statsPage.vm b/azkaban-webserver/src/main/resources/azkaban/webapp/servlet/velocity/statsPage.vm
index 4596ac2..d77bc52 100644
--- a/azkaban-webserver/src/main/resources/azkaban/webapp/servlet/velocity/statsPage.vm
+++ b/azkaban-webserver/src/main/resources/azkaban/webapp/servlet/velocity/statsPage.vm
@@ -29,6 +29,25 @@
var currentTime = ${currentTime};
var timezone = "${timezone}";
+ function refreshMetricList() {
+ var requestURL = '/stats';
+ var requestData = {
+ 'action': 'getAllMetricNames',
+ 'executorId': $('#executorName').val()
+ };
+ var successHandler = function(responseData) {
+ if(responseData.error != null) {
+ $('#reportedMetric').html(responseData.error);
+ } else {
+ $('#metricName').empty();
+ for(var index = 0; index < responseData.metricList.length; index++) {
+ $('#metricName').append($('<option value="1">' + responseData.metricList[index] + '</option>'));
+ }
+ }
+ };
+ $.get(requestURL, requestData, successHandler, 'json');
+ }
+
function refreshMetricChart() {
var requestURL = '/stats';
var requestData = {
@@ -36,7 +55,8 @@
'from': new Date($('#datetimebegin').val()).toUTCString(),
'to' : new Date($('#datetimeend').val()).toUTCString(),
'metricName': $('#metricName').val(),
- 'useStats': $("#useStats").is(':checked')
+ 'useStats': $("#useStats").is(':checked'),
+ 'executorId': $('#executorName').val()
};
var successHandler = function(responseData) {
if(responseData.error != null) {
@@ -67,6 +87,7 @@
$('#datetimebegin').data('DateTimePicker').setEndDate(e.date);
});
$('#retrieve').click(refreshMetricChart);
+ $('#executorName').click(refreshMetricList);
});
</script>
@@ -84,8 +105,16 @@
<div class="header-title" style="width: 17%;">
<h1><a href="${context}/stats">Statistics</a></h1>
</div>
- <div class="header-control" style="width: 900px; padding-top: 5px;">
+ <div class="header-control" style="width: 1300px; padding-top: 5px;">
<form id="metric-form" method="get">
+ <label for="executorLabel" >Executor</label>
+ #if (!$executorList.isEmpty())
+ <select id="executorName" name="executorName" style="width:200px">
+ #foreach ($executor in $executorList)
+ <option value="${executor.getId()}" style="width:200px">${executor.getHost()}:${executor.getPort()}</option>
+ #end
+ </select>
+ #end
<label for="metricLabel" >Metric</label>
#if (!$metricList.isEmpty())
<select id="metricName" name="metricName" style="width:200px">
@@ -119,4 +148,4 @@
<!-- /container-full -->
#end
</body>
- <html>
\ No newline at end of file
+ <html>
diff --git a/azkaban-webserver/src/web/js/azkaban/view/exflow.js b/azkaban-webserver/src/web/js/azkaban/view/exflow.js
index dbb8ae4..026a946 100644
--- a/azkaban-webserver/src/web/js/azkaban/view/exflow.js
+++ b/azkaban-webserver/src/web/js/azkaban/view/exflow.js
@@ -178,8 +178,11 @@ azkaban.FlowTabView = Backbone.View.extend({
$("#retrybtn").hide();
if (data.status == "SUCCEEDED") {
- $("#executebtn").show();
+ $("#executebtn").show();
}
+ else if (data.status == "PREPARING") {
+ $("#cancelbtn").show();
+ }
else if (data.status == "FAILED") {
$("#executebtn").show();
}