Details
diff --git a/azkaban-common/src/main/java/azkaban/executor/Executor.java b/azkaban-common/src/main/java/azkaban/executor/Executor.java
index 32176e9..a81b95d 100644
--- a/azkaban-common/src/main/java/azkaban/executor/Executor.java
+++ b/azkaban-common/src/main/java/azkaban/executor/Executor.java
@@ -108,7 +108,8 @@ public class Executor {
this.isActive = isActive;
}
+ @Override
public String toString() {
- return String.format("%s:%d", host, port);
+ return String.format("[%d]%s:%d", id, host, port);
}
}
diff --git a/azkaban-common/src/main/java/azkaban/executor/ExecutorManager.java b/azkaban-common/src/main/java/azkaban/executor/ExecutorManager.java
index ed64364..ab9859e 100644
--- a/azkaban-common/src/main/java/azkaban/executor/ExecutorManager.java
+++ b/azkaban-common/src/main/java/azkaban/executor/ExecutorManager.java
@@ -69,6 +69,10 @@ public class ExecutorManager extends EventHandler implements
"azkaban.use.multiple.executors";
private static final String AZKABAN_WEBSERVER_QUEUE_SIZE =
"azkaban.webserver.queue.size";
+ private static final String AZKABAN_ACTIVE_EXECUTOR_REFRESHINTERVAL_IN_MS =
+ "azkaban.activeexecutor.refresh.milisecinterval";
+ private static final String AZKABAN_ACTIVE_EXECUTOR_REFRESHINTERVAL_IN_FLOW =
+ "azkaban.activeexecutor.refresh.flowinterval";
private static Logger logger = Logger.getLogger(ExecutorManager.class);
private ExecutorLoader executorLoader;
@@ -80,16 +84,9 @@ public class ExecutorManager extends EventHandler implements
private ConcurrentHashMap<Integer, ExecutableFlow> recentlyFinished =
new ConcurrentHashMap<Integer, ExecutableFlow>();
- /* map to easily access queued flows */
- final private ConcurrentHashMap<Integer, Pair<ExecutionReference, ExecutableFlow>> queuedFlowMap =
- new ConcurrentHashMap<Integer, Pair<ExecutionReference, ExecutableFlow>>();
- /* web server side queue */
- final private BlockingQueue<Pair<ExecutionReference, ExecutableFlow>> queuedFlowList =
- new PriorityBlockingQueue<Pair<ExecutionReference, ExecutableFlow>>(10,
- new ExecutableFlowPriorityComparator());
+ QueuedExecutions queuedFlows;
final private Set<Executor> activeExecutors = new HashSet<Executor>();
- final private long webserverQueueCapacity;
private QueueProcessorThread queueProcessor;
private ExecutingManagerUpdaterThread executingManager;
@@ -126,15 +123,19 @@ public class ExecutorManager extends EventHandler implements
if(isMultiExecutorMode()) {
queueProcessor =
new QueueProcessorThread(azkProps.getBoolean(
- AZKABAN_QUEUEPROCESSING_ENABLED, true));
+ AZKABAN_QUEUEPROCESSING_ENABLED, true), azkProps.getLong(
+ AZKABAN_ACTIVE_EXECUTOR_REFRESHINTERVAL_IN_MS, 1000),
+ azkProps
+ .getInt(AZKABAN_ACTIVE_EXECUTOR_REFRESHINTERVAL_IN_FLOW, 1000));
queueProcessor.start();
}
long executionLogsRetentionMs =
- props.getLong("execution.logs.retention.ms",
- DEFAULT_EXECUTION_LOGS_RETENTION_MS);
- webserverQueueCapacity =
- props.getLong(AZKABAN_WEBSERVER_QUEUE_SIZE, 100000);
+ props.getLong("execution.logs.retention.ms",
+ DEFAULT_EXECUTION_LOGS_RETENTION_MS);
+
+ queuedFlows =
+ new QueuedExecutions(props.getLong(AZKABAN_WEBSERVER_QUEUE_SIZE, 100000));
cleanerThread = new CleanerThread(executionLogsRetentionMs);
cleanerThread.start();
@@ -335,7 +336,7 @@ public class ExecutorManager extends EventHandler implements
private void loadQueuedFlows() throws ExecutorManagerException {
for (Pair<ExecutionReference, ExecutableFlow> pair : executorLoader
.fetchQueuedFlows()) {
- enqueueFlow(pair.getSecond(), pair.getFirst());
+ queuedFlows.enqueue(pair.getSecond(), pair.getFirst());
}
}
@@ -350,7 +351,7 @@ public class ExecutorManager extends EventHandler implements
public List<Integer> getRunningFlows(int projectId, String flowId) {
List<Integer> executionIds = new ArrayList<Integer>();
executionIds.addAll(getRunningFlowsHelper(projectId, flowId,
- queuedFlowMap.values()));
+ queuedFlows.getAllEntries()));
executionIds.addAll(getRunningFlowsHelper(projectId, flowId,
runningFlows.values()));
return executionIds;
@@ -380,7 +381,7 @@ public class ExecutorManager extends EventHandler implements
throws IOException {
List<Pair<ExecutableFlow, Executor>> flows =
new ArrayList<Pair<ExecutableFlow, Executor>>();
- getActiveFlowsWithExecutorHelper(flows, queuedFlowMap.values());
+ getActiveFlowsWithExecutorHelper(flows, queuedFlows.getAllEntries());
getActiveFlowsWithExecutorHelper(flows, runningFlows.values());
return flows;
}
@@ -407,11 +408,11 @@ public class ExecutorManager extends EventHandler implements
boolean isRunning = false;
isRunning =
isRunning
- || isFlowRunningHelper(projectId, flowId, queuedFlowMap.values());
+ || isFlowRunningHelper(projectId, flowId, queuedFlows.getAllEntries());
isRunning =
isRunning
|| isFlowRunningHelper(projectId, flowId, runningFlows.values());
- return false;
+ return isRunning;
}
/* Search a running flow in a collection */
@@ -437,8 +438,8 @@ public class ExecutorManager extends EventHandler implements
throws ExecutorManagerException {
if (runningFlows.containsKey(execId)) {
return runningFlows.get(execId).getSecond();
- } else if (queuedFlowMap.containsKey(execId)) {
- return queuedFlowMap.get(execId).getSecond();
+ } else if (queuedFlows.hasExecution(execId)) {
+ return queuedFlows.getFlow(execId);
} else {
return executorLoader.fetchExecutableFlow(execId);
}
@@ -454,7 +455,7 @@ public class ExecutorManager extends EventHandler implements
@Override
public List<ExecutableFlow> getRunningFlows() {
ArrayList<ExecutableFlow> flows = new ArrayList<ExecutableFlow>();
- getActiveFlowHelper(flows, queuedFlowMap.values());
+ getActiveFlowHelper(flows, queuedFlows.getAllEntries());
getActiveFlowHelper(flows, runningFlows.values());
return flows;
}
@@ -479,7 +480,7 @@ public class ExecutorManager extends EventHandler implements
*/
public String getRunningFlowIds() {
List<Integer> allIds = new ArrayList<Integer>();
- getRunningFlowsIdsHelper(allIds, queuedFlowMap.values());
+ getRunningFlowsIdsHelper(allIds, queuedFlows.getAllEntries());
getRunningFlowsIdsHelper(allIds, runningFlows.values());
Collections.sort(allIds);
return allIds.toString();
@@ -494,7 +495,7 @@ public class ExecutorManager extends EventHandler implements
*/
public String getQueuedFlowIds() {
List<Integer> allIds = new ArrayList<Integer>();
- getRunningFlowsIdsHelper(allIds, queuedFlowMap.values());
+ getRunningFlowsIdsHelper(allIds, queuedFlows.getAllEntries());
Collections.sort(allIds);
return allIds.toString();
}
@@ -688,13 +689,9 @@ public class ExecutorManager extends EventHandler implements
runningFlows.get(exFlow.getExecutionId());
callExecutorServer(pair.getFirst(), ConnectorParams.CANCEL_ACTION,
userId);
- } else if (queuedFlowMap.containsKey(exFlow.getExecutionId())) {
- Pair<ExecutionReference, ExecutableFlow> pair =
- queuedFlowMap.get(exFlow.getExecutionId());
- synchronized (pair) {
- dequeueFlow(exFlow.getExecutionId());
- finalizeFlows(exFlow);
- }
+ } else if (queuedFlows.hasExecution(exFlow.getExecutionId())) {
+ queuedFlows.dequeue(exFlow.getExecutionId());
+ finalizeFlows(exFlow);
} else {
throw new ExecutorManagerException("Execution "
+ exFlow.getExecutionId() + " of flow " + exFlow.getFlowId()
@@ -863,7 +860,7 @@ public class ExecutorManager extends EventHandler implements
logger.info("Submitting execution flow " + flowId + " by " + userId);
String message = "";
- if (queuedFlowList.size() >= webserverQueueCapacity) {
+ if (queuedFlows.isFull()) {
message =
String
.format(
@@ -928,7 +925,7 @@ public class ExecutorManager extends EventHandler implements
if (isMultiExecutorMode()) {
//Take MultiExecutor route
executorLoader.addActiveExecutableReference(reference);
- enqueueFlow(exflow, reference);
+ queuedFlows.enqueue(exflow, reference);
} else {
// assign only local executor we have
reference.setExecutor(activeExecutors.iterator().next());
@@ -951,47 +948,6 @@ public class ExecutorManager extends EventHandler implements
}
}
-
- /**
- * Wraps BoundedQueue Take method to have a corresponding update in
- * queuedFlowMap lookup table
- *
- * @return
- * @throws InterruptedException
- */
- private Pair<ExecutionReference, ExecutableFlow> waitAndFetchQueueHead()
- throws InterruptedException {
- Pair<ExecutionReference, ExecutableFlow> pair = queuedFlowList.take();
- if (pair != null && pair.getFirst() != null) {
- queuedFlowMap.remove(pair.getFirst().getExecId());
- }
- return pair;
- }
-
- /* Helper method to have a single point of deletion in the queued flows */
- private void dequeueFlow(int executionId) {
- if (queuedFlowMap.containsKey(executionId)) {
- queuedFlowList.remove(queuedFlowMap.get(executionId));
- queuedFlowMap.remove(executionId);
- }
- }
-
- /* Helper method to have a single point of insertion in the queued flows */
- private void enqueueFlow(ExecutableFlow exflow, ExecutionReference ref)
- throws ExecutorManagerException {
- Pair<ExecutionReference, ExecutableFlow> pair =
- new Pair<ExecutionReference, ExecutableFlow>(ref, exflow);
- try {
- queuedFlowMap.put(exflow.getExecutionId(), pair);
- queuedFlowList.put(pair);
- } catch (InterruptedException e) {
- String errMsg = "Failed to queue flow " + exflow.getExecutionId();
- logger.error(errMsg, e);
- finalizeFlows(exflow);
- throw new ExecutorManagerException(errMsg);
- }
- }
-
private void cleanOldExecutionLogs(long millis) {
try {
int count = executorLoader.removeExecutionLogsByTime(millis);
@@ -1710,21 +1666,26 @@ public class ExecutorManager extends EventHandler implements
}
}
-
/*
* This thread is responsible for processing queued flows.
*/
private class QueueProcessorThread extends Thread {
private static final long QUEUE_PROCESSOR_WAIT_IN_MS = 1000;
- private static final long ACTIVE_EXECUTOR_REFRESH_WINDOW_IN_MS = 1000;
private static final int MAX_DISPATCHING_ERRORS_PERMITTED = 5;
- private static final int MAX_CONTINUOUS_FLOW_PROCESSED = 10;
+ private final long activeExecutorRefreshWindowInMilisec;
+ private final int activeExecutorRefreshWindowInFlows;
private boolean shutdown = false;
private boolean isActive = true;
- public QueueProcessorThread(boolean isActive) {
+ public QueueProcessorThread(boolean isActive,
+ long activeExecutorRefreshWindowInTime,
+ int activeExecutorRefreshWindowInFlows) {
setActive(isActive);
+ this.activeExecutorRefreshWindowInFlows =
+ activeExecutorRefreshWindowInFlows;
+ this.activeExecutorRefreshWindowInMilisec =
+ activeExecutorRefreshWindowInTime;
this.setName("AzkabanWebServer-QueueProcessor-Thread");
}
@@ -1748,8 +1709,8 @@ public class ExecutorManager extends EventHandler implements
try {
// start processing queue if active, other wait for sometime
if (isActive) {
- processQueuedFlows(ACTIVE_EXECUTOR_REFRESH_WINDOW_IN_MS,
- MAX_CONTINUOUS_FLOW_PROCESSED);
+ processQueuedFlows(activeExecutorRefreshWindowInMilisec,
+ activeExecutorRefreshWindowInFlows);
}
wait(QUEUE_PROCESSOR_WAIT_IN_MS);
} catch (Exception e) {
@@ -1764,19 +1725,19 @@ public class ExecutorManager extends EventHandler implements
private void processQueuedFlows(long activeExecutorsRefreshWindow,
int maxContinuousFlowProcessed) throws InterruptedException,
ExecutorManagerException {
- long lastProcessingTime = System.currentTimeMillis();
+ long lastExecutorRefreshTime = System.currentTimeMillis();
Pair<ExecutionReference, ExecutableFlow> runningCandidate;
int currentContinuousFlowProcessed = 0;
- while (isActive() && (runningCandidate = waitAndFetchQueueHead()) != null) {
+ while (isActive() && (runningCandidate = queuedFlows.fetchHead()) != null) {
ExecutionReference reference = runningCandidate.getFirst();
ExecutableFlow exflow = runningCandidate.getSecond();
long currentTime = System.currentTimeMillis();
- if (currentTime - lastProcessingTime > activeExecutorsRefreshWindow
- || currentContinuousFlowProcessed >= maxContinuousFlowProcessed) {
+ if (currentTime - lastExecutorRefreshTime > activeExecutorsRefreshWindow
+ || currentContinuousFlowProcessed >= maxContinuousFlowProcessed) {
refreshExecutors(); // Refresh executor stats to be used by selector
- lastProcessingTime = currentTime;
+ lastExecutorRefreshTime = currentTime;
currentContinuousFlowProcessed = 0;
}
@@ -1841,7 +1802,7 @@ public class ExecutorManager extends EventHandler implements
finalizeFlows(exflow);
} else {
// again queue this flow
- enqueueFlow(exflow, reference);
+ queuedFlows.enqueue(exflow, reference);
}
}
diff --git a/azkaban-common/src/main/java/azkaban/executor/QueuedExecutions.java b/azkaban-common/src/main/java/azkaban/executor/QueuedExecutions.java
new file mode 100644
index 0000000..641ffae
--- /dev/null
+++ b/azkaban-common/src/main/java/azkaban/executor/QueuedExecutions.java
@@ -0,0 +1,201 @@
+package azkaban.executor;
+
+import java.util.Collection;
+import java.util.Collections;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.PriorityBlockingQueue;
+
+import org.apache.log4j.Logger;
+
+import azkaban.utils.Pair;
+
+/**
+ * <pre>
+ * Composite data structure to represent non-dispatched flows in webserver.
+ * This data structure wraps a blocking queue and a concurrent hashmap.
+ * </pre>
+ */
+public class QueuedExecutions {
+ private static Logger logger = Logger.getLogger(QueuedExecutions.class);
+ final long capacity;
+
+ /* map to easily access queued flows */
+ final private ConcurrentHashMap<Integer, Pair<ExecutionReference, ExecutableFlow>> queuedFlowMap;
+ /* actual queue */
+ final private BlockingQueue<Pair<ExecutionReference, ExecutableFlow>> queuedFlowList;
+
+ public QueuedExecutions(long capacity) {
+ this.capacity = capacity;
+ queuedFlowMap =
+ new ConcurrentHashMap<Integer, Pair<ExecutionReference, ExecutableFlow>>();
+ queuedFlowList =
+ new PriorityBlockingQueue<Pair<ExecutionReference, ExecutableFlow>>(10,
+ new ExecutableFlowPriorityComparator());
+ }
+
+ /**
+ * Wraps BoundedQueue Take method to have a corresponding update in
+ * queuedFlowMap lookup table
+ *
+ * @return
+ * @throws InterruptedException
+ */
+ public Pair<ExecutionReference, ExecutableFlow> fetchHead()
+ throws InterruptedException {
+ Pair<ExecutionReference, ExecutableFlow> pair = queuedFlowList.take();
+ if (pair != null && pair.getFirst() != null) {
+ queuedFlowMap.remove(pair.getFirst().getExecId());
+ }
+ return pair;
+ }
+
+ /**
+ * Helper method to have a single point of deletion in the queued flows
+ *
+ * @param executionId
+ */
+ public void dequeue(int executionId) {
+ if (queuedFlowMap.containsKey(executionId)) {
+ queuedFlowList.remove(queuedFlowMap.get(executionId));
+ queuedFlowMap.remove(executionId);
+ }
+ }
+
+ /**
+ * <pre>
+ * Helper method to have a single point of insertion in the queued flows
+ *
+ * @param exflow
+ * flow to be enqueued
+ * @param ref
+ * reference to be enqueued
+ * @throws ExecutorManagerException
+ * case 1: if blocking queue put method fails due to
+ * InterruptedException
+ * case 2: if there already an element with
+ * same execution Id
+ * </pre>
+ */
+ public void enqueue(ExecutableFlow exflow, ExecutionReference ref)
+ throws ExecutorManagerException {
+ if (hasExecution(exflow.getExecutionId())) {
+ String errMsg = "Flow already in queue " + exflow.getExecutionId();
+ throw new ExecutorManagerException(errMsg);
+ }
+
+ Pair<ExecutionReference, ExecutableFlow> pair =
+ new Pair<ExecutionReference, ExecutableFlow>(ref, exflow);
+ try {
+ queuedFlowMap.put(exflow.getExecutionId(), pair);
+ queuedFlowList.put(pair);
+ } catch (InterruptedException e) {
+ String errMsg = "Failed to insert flow " + exflow.getExecutionId();
+ logger.error(errMsg, e);
+ throw new ExecutorManagerException(errMsg);
+ }
+ }
+
+ /**
+ * <pre>
+ * Enqueues all the elements of a collection
+ *
+ * @param collection
+ *
+ * @throws ExecutorManagerException
+ * case 1: if blocking queue put method fails due to
+ * InterruptedException
+ * case 2: if there already an element with
+ * same execution Id
+ * </pre>
+ */
+ public void enqueueAll(
+ Collection<Pair<ExecutionReference, ExecutableFlow>> collection)
+ throws ExecutorManagerException {
+ for (Pair<ExecutionReference, ExecutableFlow> pair : collection) {
+ enqueue(pair.getSecond(), pair.getFirst());
+ }
+ }
+
+ /**
+ * Returns a read only collection of all the queued (flows, reference) pairs
+ *
+ * @return
+ */
+ public Collection<Pair<ExecutionReference, ExecutableFlow>> getAllEntries() {
+ return Collections.unmodifiableCollection(queuedFlowMap.values());
+ }
+
+ /**
+ * Checks if an execution is queued or not
+ *
+ * @param executionId
+ * @return
+ */
+ public boolean hasExecution(int executionId) {
+ return queuedFlowMap.containsKey(executionId);
+ }
+
+ /**
+ * Fetch flow for an execution. Returns null, if execution not in queue
+ *
+ * @param executionId
+ * @return
+ */
+ public ExecutableFlow getFlow(int executionId) {
+ if (hasExecution(executionId)) {
+ return queuedFlowMap.get(executionId).getSecond();
+ }
+ return null;
+ }
+
+ /**
+ * Fetch Activereference for an execution. Returns null, if execution not in
+ * queue
+ *
+ * @param executionId
+ * @return
+ */
+ public ExecutionReference getReference(int executionId) {
+ if (hasExecution(executionId)) {
+ return queuedFlowMap.get(executionId).getFirst();
+ }
+ return null;
+ }
+
+ /**
+ * Size of the queue
+ *
+ * @return
+ */
+ public long size() {
+ return queuedFlowList.size();
+ }
+
+ /**
+ * Verify, if queue is full as per initialized capacity
+ *
+ * @return
+ */
+ public boolean isFull() {
+ return size() >= capacity;
+ }
+
+ /**
+ * Verify, if queue is empty or not
+ *
+ * @return
+ */
+ public boolean isEmpty() {
+ return queuedFlowList.isEmpty() && queuedFlowMap.isEmpty();
+ }
+
+ /**
+ * Empties queue by dequeuing all the elements
+ */
+ public void clear() {
+ for (Pair<ExecutionReference, ExecutableFlow> pair : queuedFlowMap.values()) {
+ dequeue(pair.getFirst().getExecId());
+ }
+ }
+}
diff --git a/azkaban-common/src/main/java/azkaban/jmx/JmxExecutorManager.java b/azkaban-common/src/main/java/azkaban/jmx/JmxExecutorManager.java
index 287e971..bbd642a 100644
--- a/azkaban-common/src/main/java/azkaban/jmx/JmxExecutorManager.java
+++ b/azkaban-common/src/main/java/azkaban/jmx/JmxExecutorManager.java
@@ -69,7 +69,7 @@ public class JmxExecutorManager implements JmxExecutorManagerMBean {
}
@Override
- public String getUndispatchedFlows() {
+ public String getQueuedFlows() {
return manager.getQueuedFlowIds();
}
diff --git a/azkaban-common/src/main/java/azkaban/jmx/JmxExecutorManagerMBean.java b/azkaban-common/src/main/java/azkaban/jmx/JmxExecutorManagerMBean.java
index babb2a0..94012e0 100644
--- a/azkaban-common/src/main/java/azkaban/jmx/JmxExecutorManagerMBean.java
+++ b/azkaban-common/src/main/java/azkaban/jmx/JmxExecutorManagerMBean.java
@@ -43,8 +43,8 @@ public interface JmxExecutorManagerMBean {
@DisplayName("OPERATION: isQueueProcessorActive")
public boolean isQueueProcessorActive();
- @DisplayName("OPERATION: getUndispatchedFlows")
- public String getUndispatchedFlows();
+ @DisplayName("OPERATION: getQueuedFlows")
+ public String getQueuedFlows();
@DisplayName("OPERATION: getQueueProcessorThreadState")
public String getQueueProcessorThreadState();
diff --git a/azkaban-common/src/test/java/azkaban/executor/ExecutableFlowPriorityComparatorTest.java b/azkaban-common/src/test/java/azkaban/executor/ExecutableFlowPriorityComparatorTest.java
index 319f314..f9850d6 100644
--- a/azkaban-common/src/test/java/azkaban/executor/ExecutableFlowPriorityComparatorTest.java
+++ b/azkaban-common/src/test/java/azkaban/executor/ExecutableFlowPriorityComparatorTest.java
@@ -35,34 +35,19 @@ import azkaban.user.User;
import azkaban.utils.JSONUtils;
import azkaban.utils.Pair;
import azkaban.utils.Props;
+import azkaban.utils.TestUtils;
/**
* Test class for ExecutableFlowPriorityComparator
* */
public class ExecutableFlowPriorityComparatorTest {
- /* Directory with serialized description of test flows */
- private static final String UNIT_BASE_DIR =
- "../azkaban-test/src/test/resources/executions/exectest1/";
-
- private File getFlowDir(String flow) {
- return new File(UNIT_BASE_DIR + flow + ".flow");
- }
/* Helper method to create an ExecutableFlow from serialized description */
private ExecutableFlow createExecutableFlow(String flowName, int priority,
long updateTime) throws IOException {
- File jsonFlowFile = getFlowDir(flowName);
- @SuppressWarnings("unchecked")
- HashMap<String, Object> flowObj =
- (HashMap<String, Object>) JSONUtils.parseJSONFromFile(jsonFlowFile);
-
- Flow flow = Flow.flowFromObject(flowObj);
- Project project = new Project(1, "flow");
- HashMap<String, Flow> flowMap = new HashMap<String, Flow>();
- flowMap.put(flow.getId(), flow);
- project.setFlows(flowMap);
- ExecutableFlow execFlow = new ExecutableFlow(project, flow);
+ ExecutableFlow execFlow =
+ TestUtils.createExecutableFlow("exectest1", flowName);
execFlow.setUpdateTime(updateTime);
if (priority > 0) {
diff --git a/azkaban-common/src/test/java/azkaban/executor/ExecutorManagerTest.java b/azkaban-common/src/test/java/azkaban/executor/ExecutorManagerTest.java
index d77b734..62d187b 100644
--- a/azkaban-common/src/test/java/azkaban/executor/ExecutorManagerTest.java
+++ b/azkaban-common/src/test/java/azkaban/executor/ExecutorManagerTest.java
@@ -34,22 +34,12 @@ import azkaban.user.User;
import azkaban.utils.JSONUtils;
import azkaban.utils.Pair;
import azkaban.utils.Props;
+import azkaban.utils.TestUtils;
/**
* Test class for executor manager
*/
public class ExecutorManagerTest {
- /* Directory with serialized description of test flows */
- private static final String UNIT_BASE_DIR =
- "../azkaban-test/src/test/resources/executions/exectest1/";
-
- private File getFlowDir(String flow) {
- return new File(UNIT_BASE_DIR + flow + ".flow");
- }
-
- private User getTestUser() {
- return new User("testUser");
- }
/* Helper method to create a ExecutorManager Instance */
private ExecutorManager createMultiExecutorManagerInstance()
@@ -72,41 +62,18 @@ public class ExecutorManagerTest {
return new ExecutorManager(props, loader, new HashMap<String, Alerter>());
}
- /* Helper method to create an ExecutableFlow from serialized description */
- private ExecutableFlow createExecutableFlow(String flowName)
- throws IOException {
- File jsonFlowFile = getFlowDir(flowName);
- @SuppressWarnings("unchecked")
- HashMap<String, Object> flowObj =
- (HashMap<String, Object>) JSONUtils.parseJSONFromFile(jsonFlowFile);
-
- Flow flow = Flow.flowFromObject(flowObj);
- Project project = new Project(1, "flow");
- HashMap<String, Flow> flowMap = new HashMap<String, Flow>();
- flowMap.put(flow.getId(), flow);
- project.setFlows(flowMap);
- ExecutableFlow execFlow = new ExecutableFlow(project, flow);
-
- return execFlow;
- }
-
/*
* Test create an executor manager instance without any executor local or
* remote
*/
- @Test
- public void testNoExecutorScenario() {
- try {
- Props props = new Props();
- props.put(ExecutorManager.AZKABAN_USE_MULTIPLE_EXECUTORS, "true");
- ExecutorLoader loader = new MockExecutorLoader();
- @SuppressWarnings("unused")
- ExecutorManager manager =
- new ExecutorManager(props, loader, new HashMap<String, Alerter>());
- Assert.fail("Expecting exception, but didn't get one");
- } catch (ExecutorManagerException ex) {
- System.out.println("Test true");
- }
+ @Test(expected = ExecutorManagerException.class)
+ public void testNoExecutorScenario() throws ExecutorManagerException {
+ Props props = new Props();
+ props.put(ExecutorManager.AZKABAN_USE_MULTIPLE_EXECUTORS, "true");
+ ExecutorLoader loader = new MockExecutorLoader();
+ @SuppressWarnings("unused")
+ ExecutorManager manager =
+ new ExecutorManager(props, loader, new HashMap<String, Alerter>());
}
/*
@@ -180,29 +147,24 @@ public class ExecutorManagerTest {
* Test executor manager active executor reload and resulting in no active
* executors
*/
- @Test
- public void testSetupExecutorsException() {
- try {
- Props props = new Props();
- props.put(ExecutorManager.AZKABAN_USE_MULTIPLE_EXECUTORS, "true");
- ExecutorLoader loader = new MockExecutorLoader();
- Executor executor1 = loader.addExecutor("localhost", 12345);
-
- ExecutorManager manager =
- new ExecutorManager(props, loader, new HashMap<String, Alerter>());
- Set<Executor> activeExecutors =
- new HashSet(manager.getAllActiveExecutors());
- Assert.assertArrayEquals(activeExecutors.toArray(),
- new Executor[] { executor1 });
-
- // mark older executor as inactive
- executor1.setActive(false);
- loader.updateExecutor(executor1);
- manager.setupExecutors();
- Assert.fail("Expecting exception, but didn't get one");
- } catch (ExecutorManagerException ex) {
- System.out.println("Test true");
- }
+ @Test(expected = ExecutorManagerException.class)
+ public void testSetupExecutorsException() throws ExecutorManagerException {
+ Props props = new Props();
+ props.put(ExecutorManager.AZKABAN_USE_MULTIPLE_EXECUTORS, "true");
+ ExecutorLoader loader = new MockExecutorLoader();
+ Executor executor1 = loader.addExecutor("localhost", 12345);
+
+ ExecutorManager manager =
+ new ExecutorManager(props, loader, new HashMap<String, Alerter>());
+ Set<Executor> activeExecutors =
+ new HashSet(manager.getAllActiveExecutors());
+ Assert.assertArrayEquals(activeExecutors.toArray(),
+ new Executor[] { executor1 });
+
+ // mark older executor as inactive
+ executor1.setActive(false);
+ loader.updateExecutor(executor1);
+ manager.setupExecutors();
}
/* Test disabling queue process thread to pause dispatching */
@@ -229,12 +191,12 @@ public class ExecutorManagerTest {
public void testQueuedFlows() throws ExecutorManagerException, IOException {
ExecutorLoader loader = new MockExecutorLoader();
ExecutorManager manager = createMultiExecutorManagerInstance(loader);
- ExecutableFlow flow1 = createExecutableFlow("exec1");
+ ExecutableFlow flow1 = TestUtils.createExecutableFlow("exectest1", "exec1");
flow1.setExecutionId(1);
- ExecutableFlow flow2 = createExecutableFlow("exec2");
+ ExecutableFlow flow2 = TestUtils.createExecutableFlow("exectest1", "exec2");
flow2.setExecutionId(2);
- User testUser = getTestUser();
+ User testUser = TestUtils.getTestUser();
manager.submitExecutableFlow(flow1, testUser.getUserId());
manager.submitExecutableFlow(flow2, testUser.getUserId());
@@ -261,24 +223,17 @@ public class ExecutorManagerTest {
}
/* Test submit duplicate flow when previous instance is not dispatched */
- @Test
+ @Test(expected = ExecutorManagerException.class)
public void testDuplicateQueuedFlows() throws ExecutorManagerException,
IOException {
- try {
- ExecutorManager manager = createMultiExecutorManagerInstance();
- ExecutableFlow flow1 = createExecutableFlow("exec1");
- flow1.getExecutionOptions().setConcurrentOption(
- ExecutionOptions.CONCURRENT_OPTION_SKIP);
-
- User testUser = getTestUser();
- manager.submitExecutableFlow(flow1, testUser.getUserId());
- manager.submitExecutableFlow(flow1, testUser.getUserId());
- manager.enableQueueProcessorThread();
- manager.submitExecutableFlow(flow1, testUser.getUserId());
- Assert.fail("Expecting exception, but didn't get one");
- } catch (ExecutorManagerException ex) {
- System.out.println("Test true");
- }
+ ExecutorManager manager = createMultiExecutorManagerInstance();
+ ExecutableFlow flow1 = TestUtils.createExecutableFlow("exectest1", "exec1");
+ flow1.getExecutionOptions().setConcurrentOption(
+ ExecutionOptions.CONCURRENT_OPTION_SKIP);
+
+ User testUser = TestUtils.getTestUser();
+ manager.submitExecutableFlow(flow1, testUser.getUserId());
+ manager.submitExecutableFlow(flow1, testUser.getUserId());
}
/*
@@ -289,8 +244,8 @@ public class ExecutorManagerTest {
public void testKillQueuedFlow() throws ExecutorManagerException, IOException {
ExecutorLoader loader = new MockExecutorLoader();
ExecutorManager manager = createMultiExecutorManagerInstance(loader);
- ExecutableFlow flow1 = createExecutableFlow("exec1");
- User testUser = getTestUser();
+ ExecutableFlow flow1 = TestUtils.createExecutableFlow("exectest1", "exec1");
+ User testUser = TestUtils.getTestUser();
manager.submitExecutableFlow(flow1, testUser.getUserId());
manager.cancelFlow(flow1, testUser.getUserId());
diff --git a/azkaban-common/src/test/java/azkaban/executor/JdbcExecutorLoaderTest.java b/azkaban-common/src/test/java/azkaban/executor/JdbcExecutorLoaderTest.java
index e725a8d..a7e2b5f 100644
--- a/azkaban-common/src/test/java/azkaban/executor/JdbcExecutorLoaderTest.java
+++ b/azkaban-common/src/test/java/azkaban/executor/JdbcExecutorLoaderTest.java
@@ -49,9 +49,13 @@ import azkaban.utils.FileIOUtils.LogData;
import azkaban.utils.JSONUtils;
import azkaban.utils.Pair;
import azkaban.utils.Props;
+import azkaban.utils.TestUtils;
public class JdbcExecutorLoaderTest {
private static boolean testDBExists;
+ /* Directory with serialized description of test flows */
+ private static final String UNIT_BASE_DIR =
+ "../azkaban-test/src/test/resources/executions";
// @TODO remove this and turn into local host.
private static final String host = "localhost";
private static final int port = 3306;
@@ -59,10 +63,6 @@ public class JdbcExecutorLoaderTest {
private static final String user = "azkaban";
private static final String password = "azkaban";
private static final int numConnections = 10;
- private static final String UNIT_BASE_DIR =
- "../azkaban-test/src/test/resources/executions/";
-
- private File flowDir = new File(UNIT_BASE_DIR + "/exectest1");
@BeforeClass
public static void setupDB() {
@@ -230,7 +230,7 @@ public class JdbcExecutorLoaderTest {
return;
}
ExecutorLoader loader = createLoader();
- ExecutableFlow flow = createExecutableFlow("exec1");
+ ExecutableFlow flow = TestUtils.createExecutableFlow("exectest1", "exec1");
loader.uploadExecutableFlow(flow);
@@ -259,7 +259,7 @@ public class JdbcExecutorLoaderTest {
}
ExecutorLoader loader = createLoader();
- ExecutableFlow flow = createExecutableFlow("exec1");
+ ExecutableFlow flow = TestUtils.createExecutableFlow("exectest1", "exec1");
loader.uploadExecutableFlow(flow);
@@ -298,7 +298,7 @@ public class JdbcExecutorLoaderTest {
ExecutableFlow flow = createExecutableFlow(10, "exec1");
flow.setExecutionId(10);
- File jobFile = new File(flowDir, "job10.job");
+ File jobFile = new File(UNIT_BASE_DIR + "/exectest1", "job10.job");
Props props = new Props(null, jobFile);
props.put("test", "test2");
ExecutableNode oldNode = flow.getExecutableNode("job10");
@@ -343,7 +343,7 @@ public class JdbcExecutorLoaderTest {
return;
}
ExecutorLoader loader = createLoader();
- ExecutableFlow flow = createExecutableFlow("exec1");
+ ExecutableFlow flow = TestUtils.createExecutableFlow("exectest1", "exec1");
loader.uploadExecutableFlow(flow);
try {
loader.assignExecutor(flow.getExecutionId(), 1);
@@ -391,7 +391,7 @@ public class JdbcExecutorLoaderTest {
return;
}
ExecutorLoader loader = createLoader();
- ExecutableFlow flow = createExecutableFlow("exec1");
+ ExecutableFlow flow = TestUtils.createExecutableFlow("exectest1", "exec1");
loader.uploadExecutableFlow(flow);
Assert.assertEquals(loader.fetchExecutorByExecutionId(flow.getExecutionId()),
null);
@@ -408,7 +408,7 @@ public class JdbcExecutorLoaderTest {
String host = "localhost";
int port = 12345;
Executor executor = loader.addExecutor(host, port);
- ExecutableFlow flow = createExecutableFlow("exec1");
+ ExecutableFlow flow = TestUtils.createExecutableFlow("exectest1", "exec1");
loader.uploadExecutableFlow(flow);
loader.assignExecutor(executor.getId(), flow.getExecutionId());
Assert.assertEquals(loader.fetchExecutorByExecutionId(flow.getExecutionId()),
@@ -434,13 +434,13 @@ public class JdbcExecutorLoaderTest {
int port = 12345;
Executor executor = loader.addExecutor(host, port);
- ExecutableFlow flow = createExecutableFlow("exec1");
+ ExecutableFlow flow = TestUtils.createExecutableFlow("exectest1", "exec1");
loader.uploadExecutableFlow(flow);
loader.assignExecutor(executor.getId(), flow.getExecutionId());
// only completed flows
Assert.assertTrue(queuedFlows.isEmpty());
- ExecutableFlow flow2 = createExecutableFlow("exec2");
+ ExecutableFlow flow2 = TestUtils.createExecutableFlow("exectest1", "exec2");
loader.uploadExecutableFlow(flow);
loader.assignExecutor(executor.getId(), flow.getExecutionId());
ExecutionReference ref = new ExecutionReference(flow2.getExecutionId());
@@ -461,9 +461,9 @@ public class JdbcExecutorLoaderTest {
List<Pair<ExecutionReference, ExecutableFlow>> queuedFlows =
new LinkedList<Pair<ExecutionReference, ExecutableFlow>>();
- ExecutableFlow flow = createExecutableFlow("exec1");
+ ExecutableFlow flow = TestUtils.createExecutableFlow("exectest1", "exec1");
loader.uploadExecutableFlow(flow);
- ExecutableFlow flow2 = createExecutableFlow("exec2");
+ ExecutableFlow flow2 = TestUtils.createExecutableFlow("exectest1", "exec2");
loader.uploadExecutableFlow(flow);
ExecutionReference ref2 = new ExecutionReference(flow2.getExecutionId());
@@ -732,20 +732,20 @@ public class JdbcExecutorLoaderTest {
}
ExecutorLoader loader = createLoader();
- ExecutableFlow flow1 = createExecutableFlow("exec1");
+ ExecutableFlow flow1 = TestUtils.createExecutableFlow("exectest1", "exec1");
loader.uploadExecutableFlow(flow1);
Executor executor = new Executor(2, "test", 1, true);
ExecutionReference ref1 =
new ExecutionReference(flow1.getExecutionId(), executor);
loader.addActiveExecutableReference(ref1);
- ExecutableFlow flow2 = createExecutableFlow("exec1");
+ ExecutableFlow flow2 = TestUtils.createExecutableFlow("exectest1", "exec1");
loader.uploadExecutableFlow(flow2);
ExecutionReference ref2 =
new ExecutionReference(flow2.getExecutionId(), executor);
loader.addActiveExecutableReference(ref2);
- ExecutableFlow flow3 = createExecutableFlow("exec1");
+ ExecutableFlow flow3 = TestUtils.createExecutableFlow("exectest1", "exec1");
loader.uploadExecutableFlow(flow3);
Map<Integer, Pair<ExecutionReference, ExecutableFlow>> activeFlows1 =
@@ -884,37 +884,10 @@ public class JdbcExecutorLoaderTest {
}
private ExecutableFlow createExecutableFlow(int executionId, String flowName)
- throws IOException {
- File jsonFlowFile = new File(flowDir, flowName + ".flow");
- @SuppressWarnings("unchecked")
- HashMap<String, Object> flowObj =
- (HashMap<String, Object>) JSONUtils.parseJSONFromFile(jsonFlowFile);
-
- Flow flow = Flow.flowFromObject(flowObj);
- Project project = new Project(1, "flow");
- HashMap<String, Flow> flowMap = new HashMap<String, Flow>();
- flowMap.put(flow.getId(), flow);
- project.setFlows(flowMap);
- ExecutableFlow execFlow = new ExecutableFlow(project, flow);
+ throws IOException {
+ ExecutableFlow execFlow =
+ TestUtils.createExecutableFlow("exectest1", flowName);
execFlow.setExecutionId(executionId);
-
- return execFlow;
- }
-
- private ExecutableFlow createExecutableFlow(String flowName)
- throws IOException {
- File jsonFlowFile = new File(flowDir, flowName + ".flow");
- @SuppressWarnings("unchecked")
- HashMap<String, Object> flowObj =
- (HashMap<String, Object>) JSONUtils.parseJSONFromFile(jsonFlowFile);
-
- Flow flow = Flow.flowFromObject(flowObj);
- Project project = new Project(1, "flow");
- HashMap<String, Flow> flowMap = new HashMap<String, Flow>();
- flowMap.put(flow.getId(), flow);
- project.setFlows(flowMap);
- ExecutableFlow execFlow = new ExecutableFlow(project, flow);
-
return execFlow;
}
diff --git a/azkaban-common/src/test/java/azkaban/executor/QueuedExecutionsTest.java b/azkaban-common/src/test/java/azkaban/executor/QueuedExecutionsTest.java
new file mode 100644
index 0000000..94bbd7e
--- /dev/null
+++ b/azkaban-common/src/test/java/azkaban/executor/QueuedExecutionsTest.java
@@ -0,0 +1,208 @@
+package azkaban.executor;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+import azkaban.flow.Flow;
+import azkaban.project.Project;
+import azkaban.utils.JSONUtils;
+import azkaban.utils.Pair;
+
+public class QueuedExecutionsTest {
+ /* Directory with serialized description of test flows */
+ private static final String UNIT_BASE_DIR =
+ "../azkaban-test/src/test/resources/executions/exectest1/";
+
+ private File getFlowDir(String flow) {
+ return new File(UNIT_BASE_DIR + flow + ".flow");
+ }
+
+ /*
+ * Helper method to create an (ExecutionReference, ExecutableFlow) from
+ * serialized description
+ */
+ private Pair<ExecutionReference, ExecutableFlow> createExecutablePair(
+ String flowName, int execId) throws IOException {
+ File jsonFlowFile = getFlowDir(flowName);
+ @SuppressWarnings("unchecked")
+ HashMap<String, Object> flowObj =
+ (HashMap<String, Object>) JSONUtils.parseJSONFromFile(jsonFlowFile);
+
+ Flow flow = Flow.flowFromObject(flowObj);
+ Project project = new Project(1, "flow");
+ HashMap<String, Flow> flowMap = new HashMap<String, Flow>();
+ flowMap.put(flow.getId(), flow);
+ project.setFlows(flowMap);
+ ExecutableFlow execFlow = new ExecutableFlow(project, flow);
+ execFlow.setExecutionId(execId);
+ ExecutionReference ref = new ExecutionReference(execId);
+ return new Pair<ExecutionReference, ExecutableFlow>(ref, execFlow);
+ }
+
+ public List<Pair<ExecutionReference, ExecutableFlow>> getDummyData()
+ throws IOException {
+ List<Pair<ExecutionReference, ExecutableFlow>> dataList =
+ new ArrayList<Pair<ExecutionReference, ExecutableFlow>>();
+ dataList.add(createExecutablePair("exec1", 1));
+ dataList.add(createExecutablePair("exec2", 2));
+ return dataList;
+ }
+
+ /* Test enqueue method happy case */
+ @Test
+ public void testEnqueueHappyCase() throws IOException,
+ ExecutorManagerException {
+ QueuedExecutions queue = new QueuedExecutions(5);
+ List<Pair<ExecutionReference, ExecutableFlow>> dataList = getDummyData();
+ for (Pair<ExecutionReference, ExecutableFlow> pair : dataList) {
+ queue.enqueue(pair.getSecond(), pair.getFirst());
+ }
+
+ Assert.assertTrue(queue.getAllEntries().containsAll(dataList));
+ Assert.assertTrue(dataList.containsAll(queue.getAllEntries()));
+ }
+
+ /* Test enqueue duplicate execution ids */
+ @Test(expected = ExecutorManagerException.class)
+ public void testEnqueueDuplicateExecution() throws IOException,
+ ExecutorManagerException {
+ Pair<ExecutionReference, ExecutableFlow> pair1 =
+ createExecutablePair("exec1", 1);
+ QueuedExecutions queue = new QueuedExecutions(5);
+ queue.enqueue(pair1.getSecond(), pair1.getFirst());
+ queue.enqueue(pair1.getSecond(), pair1.getFirst());
+ }
+
+ /* Test enqueue more than capacity */
+ @Test(expected = ExecutorManagerException.class)
+ public void testEnqueueOverflow() throws IOException,
+ ExecutorManagerException {
+ Pair<ExecutionReference, ExecutableFlow> pair1 =
+ createExecutablePair("exec1", 1);
+ QueuedExecutions queue = new QueuedExecutions(1);
+ queue.enqueue(pair1.getSecond(), pair1.getFirst());
+ queue.enqueue(pair1.getSecond(), pair1.getFirst());
+ }
+
+ /* Test EnqueueAll method */
+ @Test
+ public void testEnqueueAll() throws IOException, ExecutorManagerException {
+ QueuedExecutions queue = new QueuedExecutions(5);
+ List<Pair<ExecutionReference, ExecutableFlow>> dataList = getDummyData();
+ queue.enqueueAll(dataList);
+ Assert.assertTrue(queue.getAllEntries().containsAll(dataList));
+ Assert.assertTrue(dataList.containsAll(queue.getAllEntries()));
+ }
+
+ /* Test size method */
+ @Test
+ public void testSize() throws IOException, ExecutorManagerException {
+ QueuedExecutions queue = new QueuedExecutions(5);
+ List<Pair<ExecutionReference, ExecutableFlow>> dataList = getDummyData();
+ queue.enqueueAll(dataList);
+ Assert.assertEquals(queue.size(), 2);
+ }
+
+ /* Test dequeue method */
+ @Test
+ public void testDequeue() throws IOException, ExecutorManagerException {
+ QueuedExecutions queue = new QueuedExecutions(5);
+ List<Pair<ExecutionReference, ExecutableFlow>> dataList = getDummyData();
+ queue.enqueueAll(dataList);
+ queue.dequeue(dataList.get(0).getFirst().getExecId());
+ Assert.assertEquals(queue.size(), 1);
+ Assert.assertTrue(queue.getAllEntries().contains(dataList.get(1)));
+ }
+
+ /* Test clear method */
+ @Test
+ public void testClear() throws IOException, ExecutorManagerException {
+ QueuedExecutions queue = new QueuedExecutions(5);
+ List<Pair<ExecutionReference, ExecutableFlow>> dataList = getDummyData();
+ queue.enqueueAll(dataList);
+ Assert.assertEquals(queue.size(), 2);
+ queue.clear();
+ Assert.assertEquals(queue.size(), 0);
+ }
+
+ /* Test isEmpty method */
+ @Test
+ public void testIsEmpty() throws IOException, ExecutorManagerException {
+ QueuedExecutions queue = new QueuedExecutions(5);
+ List<Pair<ExecutionReference, ExecutableFlow>> dataList = getDummyData();
+ Assert.assertTrue(queue.isEmpty());
+ queue.enqueueAll(dataList);
+ Assert.assertEquals(queue.size(), 2);
+ queue.clear();
+ Assert.assertTrue(queue.isEmpty());
+ }
+
+ /* Test fetchHead method */
+ @Test
+ public void testFetchHead() throws IOException, ExecutorManagerException,
+ InterruptedException {
+ QueuedExecutions queue = new QueuedExecutions(5);
+ List<Pair<ExecutionReference, ExecutableFlow>> dataList = getDummyData();
+ Assert.assertTrue(queue.isEmpty());
+ queue.enqueueAll(dataList);
+ Assert.assertEquals(queue.fetchHead(), dataList.get(0));
+ Assert.assertEquals(queue.fetchHead(), dataList.get(1));
+ }
+
+ /* Test isFull method */
+ @Test
+ public void testIsFull() throws IOException, ExecutorManagerException,
+ InterruptedException {
+ QueuedExecutions queue = new QueuedExecutions(2);
+ List<Pair<ExecutionReference, ExecutableFlow>> dataList = getDummyData();
+ queue.enqueueAll(dataList);
+ Assert.assertTrue(queue.isFull());
+ }
+
+ /* Test hasExecution method */
+ @Test
+ public void testHasExecution() throws IOException, ExecutorManagerException,
+ InterruptedException {
+ QueuedExecutions queue = new QueuedExecutions(2);
+ List<Pair<ExecutionReference, ExecutableFlow>> dataList = getDummyData();
+ queue.enqueueAll(dataList);
+ for (Pair<ExecutionReference, ExecutableFlow> pair : dataList) {
+ Assert.assertTrue(queue.hasExecution(pair.getFirst().getExecId()));
+ }
+ Assert.assertFalse(queue.hasExecution(5));
+ Assert.assertFalse(queue.hasExecution(7));
+ Assert.assertFalse(queue.hasExecution(15));
+ }
+
+ /* Test getFlow method */
+ @Test
+ public void testGetFlow() throws IOException, ExecutorManagerException,
+ InterruptedException {
+ QueuedExecutions queue = new QueuedExecutions(2);
+ List<Pair<ExecutionReference, ExecutableFlow>> dataList = getDummyData();
+ queue.enqueueAll(dataList);
+ for (Pair<ExecutionReference, ExecutableFlow> pair : dataList) {
+ Assert.assertEquals(pair.getSecond(),
+ queue.getFlow(pair.getFirst().getExecId()));
+ }
+ }
+
+ /* Test getReferences method */
+ @Test
+ public void testGetReferences() throws IOException, ExecutorManagerException,
+ InterruptedException {
+ QueuedExecutions queue = new QueuedExecutions(2);
+ List<Pair<ExecutionReference, ExecutableFlow>> dataList = getDummyData();
+ queue.enqueueAll(dataList);
+ for (Pair<ExecutionReference, ExecutableFlow> pair : dataList) {
+ Assert.assertEquals(pair.getFirst(),
+ queue.getReference(pair.getFirst().getExecId()));
+ }
+ }
+}
\ No newline at end of file
diff --git a/azkaban-common/src/test/java/azkaban/utils/TestUtils.java b/azkaban-common/src/test/java/azkaban/utils/TestUtils.java
new file mode 100644
index 0000000..e51b575
--- /dev/null
+++ b/azkaban-common/src/test/java/azkaban/utils/TestUtils.java
@@ -0,0 +1,62 @@
+/*
+ * Copyright 2015 LinkedIn Corp.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package azkaban.utils;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.HashMap;
+
+import azkaban.executor.ExecutableFlow;
+import azkaban.flow.Flow;
+import azkaban.project.Project;
+import azkaban.user.User;
+
+/**
+ * Commonly used utils method for unit/integration tests
+ */
+public class TestUtils {
+ /* Directory with serialized description of test flows */
+ private static final String UNIT_BASE_DIR =
+ "../azkaban-test/src/test/resources/executions";
+
+ public static File getFlowDir(String projectName, String flow) {
+ return new File(String.format("%s/%s/%s.flow", UNIT_BASE_DIR, projectName,
+ flow));
+ }
+
+ public static User getTestUser() {
+ return new User("testUser");
+ }
+
+ /* Helper method to create an ExecutableFlow from serialized description */
+ public static ExecutableFlow createExecutableFlow(String projectName, String flowName)
+ throws IOException {
+ File jsonFlowFile = getFlowDir(projectName, flowName);
+ @SuppressWarnings("unchecked")
+ HashMap<String, Object> flowObj =
+ (HashMap<String, Object>) JSONUtils.parseJSONFromFile(jsonFlowFile);
+
+ Flow flow = Flow.flowFromObject(flowObj);
+ Project project = new Project(1, "flow");
+ HashMap<String, Flow> flowMap = new HashMap<String, Flow>();
+ flowMap.put(flow.getId(), flow);
+ project.setFlows(flowMap);
+ ExecutableFlow execFlow = new ExecutableFlow(project, flow);
+
+ return execFlow;
+ }
+}