package azkaban.executor;
import azkaban.flow.Flow;
import azkaban.project.Project;
import azkaban.utils.JSONUtils;
import azkaban.utils.Pair;
import azkaban.utils.TestUtils;
import java.io.File;
import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import org.junit.Assert;
import org.junit.Test;
public class QueuedExecutionsTest {
private File getFlowDir(final String flow) {
return TestUtils.getFlowDir("exectest1", flow);
}
private Pair<ExecutionReference, ExecutableFlow> createExecutablePair(
final String flowName, final int execId) throws IOException {
final File jsonFlowFile = getFlowDir(flowName);
final HashMap<String, Object> flowObj =
(HashMap<String, Object>) JSONUtils.parseJSONFromFile(jsonFlowFile);
final Flow flow = Flow.flowFromObject(flowObj);
final Project project = new Project(1, "flow");
final HashMap<String, Flow> flowMap = new HashMap<>();
flowMap.put(flow.getId(), flow);
project.setFlows(flowMap);
final ExecutableFlow execFlow = new ExecutableFlow(project, flow);
execFlow.setExecutionId(execId);
final ExecutionReference ref = new ExecutionReference(execId);
return new Pair<>(ref, execFlow);
}
public List<Pair<ExecutionReference, ExecutableFlow>> getDummyData()
throws IOException {
final List<Pair<ExecutionReference, ExecutableFlow>> dataList =
new ArrayList<>();
dataList.add(createExecutablePair("exec1", 1));
dataList.add(createExecutablePair("exec2", 2));
return dataList;
}
@Test
public void testEnqueueHappyCase() throws IOException,
ExecutorManagerException {
final QueuedExecutions queue = new QueuedExecutions(5);
final List<Pair<ExecutionReference, ExecutableFlow>> dataList = getDummyData();
for (final Pair<ExecutionReference, ExecutableFlow> pair : dataList) {
queue.enqueue(pair.getSecond(), pair.getFirst());
}
Assert.assertTrue(queue.getAllEntries().containsAll(dataList));
Assert.assertTrue(dataList.containsAll(queue.getAllEntries()));
}
@Test(expected = ExecutorManagerException.class)
public void testEnqueueDuplicateExecution() throws IOException,
ExecutorManagerException {
final Pair<ExecutionReference, ExecutableFlow> pair1 =
createExecutablePair("exec1", 1);
final QueuedExecutions queue = new QueuedExecutions(5);
queue.enqueue(pair1.getSecond(), pair1.getFirst());
queue.enqueue(pair1.getSecond(), pair1.getFirst());
}
@Test(expected = ExecutorManagerException.class)
public void testEnqueueOverflow() throws IOException,
ExecutorManagerException {
final Pair<ExecutionReference, ExecutableFlow> pair1 =
createExecutablePair("exec1", 1);
final QueuedExecutions queue = new QueuedExecutions(1);
queue.enqueue(pair1.getSecond(), pair1.getFirst());
queue.enqueue(pair1.getSecond(), pair1.getFirst());
}
@Test
public void testEnqueueAll() throws IOException, ExecutorManagerException {
final QueuedExecutions queue = new QueuedExecutions(5);
final List<Pair<ExecutionReference, ExecutableFlow>> dataList = getDummyData();
queue.enqueueAll(dataList);
Assert.assertTrue(queue.getAllEntries().containsAll(dataList));
Assert.assertTrue(dataList.containsAll(queue.getAllEntries()));
}
@Test
public void testSize() throws IOException, ExecutorManagerException {
final QueuedExecutions queue = new QueuedExecutions(5);
final List<Pair<ExecutionReference, ExecutableFlow>> dataList = getDummyData();
queue.enqueueAll(dataList);
Assert.assertEquals(queue.size(), 2);
}
@Test
public void testDequeue() throws IOException, ExecutorManagerException {
final QueuedExecutions queue = new QueuedExecutions(5);
final List<Pair<ExecutionReference, ExecutableFlow>> dataList = getDummyData();
queue.enqueueAll(dataList);
queue.dequeue(dataList.get(0).getFirst().getExecId());
Assert.assertEquals(queue.size(), 1);
Assert.assertTrue(queue.getAllEntries().contains(dataList.get(1)));
}
@Test
public void testClear() throws IOException, ExecutorManagerException {
final QueuedExecutions queue = new QueuedExecutions(5);
final List<Pair<ExecutionReference, ExecutableFlow>> dataList = getDummyData();
queue.enqueueAll(dataList);
Assert.assertEquals(queue.size(), 2);
queue.clear();
Assert.assertEquals(queue.size(), 0);
}
@Test
public void testIsEmpty() throws IOException, ExecutorManagerException {
final QueuedExecutions queue = new QueuedExecutions(5);
final List<Pair<ExecutionReference, ExecutableFlow>> dataList = getDummyData();
Assert.assertTrue(queue.isEmpty());
queue.enqueueAll(dataList);
Assert.assertEquals(queue.size(), 2);
queue.clear();
Assert.assertTrue(queue.isEmpty());
}
@Test
public void testFetchHead() throws IOException, ExecutorManagerException,
InterruptedException {
final QueuedExecutions queue = new QueuedExecutions(5);
final List<Pair<ExecutionReference, ExecutableFlow>> dataList = getDummyData();
Assert.assertTrue(queue.isEmpty());
queue.enqueueAll(dataList);
Assert.assertEquals(queue.fetchHead(), dataList.get(0));
Assert.assertEquals(queue.fetchHead(), dataList.get(1));
}
@Test
public void testIsFull() throws IOException, ExecutorManagerException,
InterruptedException {
final QueuedExecutions queue = new QueuedExecutions(2);
final List<Pair<ExecutionReference, ExecutableFlow>> dataList = getDummyData();
queue.enqueueAll(dataList);
Assert.assertTrue(queue.isFull());
}
@Test
public void testHasExecution() throws IOException, ExecutorManagerException,
InterruptedException {
final QueuedExecutions queue = new QueuedExecutions(2);
final List<Pair<ExecutionReference, ExecutableFlow>> dataList = getDummyData();
queue.enqueueAll(dataList);
for (final Pair<ExecutionReference, ExecutableFlow> pair : dataList) {
Assert.assertTrue(queue.hasExecution(pair.getFirst().getExecId()));
}
Assert.assertFalse(queue.hasExecution(5));
Assert.assertFalse(queue.hasExecution(7));
Assert.assertFalse(queue.hasExecution(15));
}
@Test
public void testGetFlow() throws IOException, ExecutorManagerException,
InterruptedException {
final QueuedExecutions queue = new QueuedExecutions(2);
final List<Pair<ExecutionReference, ExecutableFlow>> dataList = getDummyData();
queue.enqueueAll(dataList);
for (final Pair<ExecutionReference, ExecutableFlow> pair : dataList) {
Assert.assertEquals(pair.getSecond(),
queue.getFlow(pair.getFirst().getExecId()));
}
}
@Test
public void testGetReferences() throws IOException, ExecutorManagerException,
InterruptedException {
final QueuedExecutions queue = new QueuedExecutions(2);
final List<Pair<ExecutionReference, ExecutableFlow>> dataList = getDummyData();
queue.enqueueAll(dataList);
for (final Pair<ExecutionReference, ExecutableFlow> pair : dataList) {
Assert.assertEquals(pair.getFirst(),
queue.getReference(pair.getFirst().getExecId()));
}
}
}