diff --git a/azkaban-common/src/test/java/azkaban/executor/ExecutorManagerTest.java b/azkaban-common/src/test/java/azkaban/executor/ExecutorManagerTest.java
index 7f3727d..f944421 100644
--- a/azkaban-common/src/test/java/azkaban/executor/ExecutorManagerTest.java
+++ b/azkaban-common/src/test/java/azkaban/executor/ExecutorManagerTest.java
@@ -16,18 +16,15 @@
package azkaban.executor;
-import azkaban.AzkabanCommonModule;
-import azkaban.ServiceProvider;
-import com.google.inject.Guice;
-import com.google.inject.Injector;
import java.io.IOException;
+import java.util.Arrays;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
-import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Set;
+import java.util.stream.Collectors;
import org.junit.Assert;
import org.junit.Test;
@@ -210,21 +207,20 @@ public class ExecutorManagerTest {
manager.submitExecutableFlow(flow1, testUser.getUserId());
manager.submitExecutableFlow(flow2, testUser.getUserId());
- List<ExecutableFlow> testFlows = new LinkedList<ExecutableFlow>();
- testFlows.add(flow1);
- testFlows.add(flow2);
+ List<Integer> testFlows = Arrays.asList(flow1.getExecutionId(), flow2.getExecutionId());
List<Pair<ExecutionReference, ExecutableFlow>> queuedFlowsDB =
loader.fetchQueuedFlows();
Assert.assertEquals(queuedFlowsDB.size(), testFlows.size());
// Verify things are correctly setup in db
for (Pair<ExecutionReference, ExecutableFlow> pair : queuedFlowsDB) {
- Assert.assertTrue(testFlows.contains(pair.getSecond()));
+ Assert.assertTrue(testFlows.contains(pair.getSecond().getExecutionId()));
}
// Verify running flows using old definition of "running" flows i.e. a
// non-dispatched flow is also considered running
- List<ExecutableFlow> managerActiveFlows = manager.getRunningFlows();
+ List<Integer> managerActiveFlows = manager.getRunningFlows()
+ .stream().map(ExecutableFlow::getExecutionId).collect(Collectors.toList());
Assert.assertTrue(managerActiveFlows.containsAll(testFlows)
&& testFlows.containsAll(managerActiveFlows));
diff --git a/azkaban-common/src/test/java/azkaban/executor/MockExecutorLoader.java b/azkaban-common/src/test/java/azkaban/executor/MockExecutorLoader.java
index 148a0a1..1d320c5 100644
--- a/azkaban-common/src/test/java/azkaban/executor/MockExecutorLoader.java
+++ b/azkaban-common/src/test/java/azkaban/executor/MockExecutorLoader.java
@@ -21,14 +21,23 @@ import azkaban.utils.FileIOUtils.LogData;
import azkaban.utils.Pair;
import azkaban.utils.Props;
import java.io.File;
+import java.io.IOException;
import java.util.ArrayList;
import java.util.Date;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
+import org.apache.commons.io.FileUtils;
+import org.apache.log4j.Logger;
+/**
+ * Used in unit tests to mock the "DB layer" (the real implementation is JdbcExecutorLoader).
+ * Captures status updates of jobs and flows (in memory) so that they can be checked in tests.
+ */
public class MockExecutorLoader implements ExecutorLoader {
+ private static final Logger logger = Logger.getLogger(MockExecutorLoader.class);
+
Map<Integer, Integer> executionExecutorMapping = new ConcurrentHashMap<>();
Map<Integer, ExecutableFlow> flows = new ConcurrentHashMap<>();
Map<String, ExecutableNode> nodes = new ConcurrentHashMap<>();
@@ -43,7 +52,12 @@ public class MockExecutorLoader implements ExecutorLoader {
@Override
public void uploadExecutableFlow(final ExecutableFlow flow)
throws ExecutorManagerException {
- this.flows.put(flow.getExecutionId(), flow);
+ // Clone the flow node to mimick how it would be saved in DB.
+ // If we would keep a handle to the original flow node, we would also see any changes made after
+ // this method was called. We must only store a snapshot of the current state.
+ // Also to avoid modifying statuses of the original job nodes in this.updateExecutableFlow()
+ final ExecutableFlow exFlow = ExecutableFlow.createExecutableFlowFromObject(flow.toObject());
+ this.flows.put(flow.getExecutionId(), exFlow);
this.flowUpdateCount++;
}
@@ -92,7 +106,14 @@ public class MockExecutorLoader implements ExecutorLoader {
public void uploadLogFile(final int execId, final String name, final int attempt,
final File... files)
throws ExecutorManagerException {
-
+ for (final File file : files) {
+ try {
+ final String logs = FileUtils.readFileToString(file, "UTF-8");
+ logger.info("Uploaded log for [" + name + "]:[" + execId + "]:\n" + logs);
+ } catch (final IOException e) {
+ throw new RuntimeException(e);
+ }
+ }
}
@Override
@@ -100,13 +121,17 @@ public class MockExecutorLoader implements ExecutorLoader {
throws ExecutorManagerException {
final ExecutableFlow toUpdate = this.flows.get(flow.getExecutionId());
- toUpdate.applyUpdateObject((Map<String, Object>) flow.toUpdateObject(0));
+ toUpdate.applyUpdateObject(flow.toUpdateObject(0));
this.flowUpdateCount++;
}
@Override
public void uploadExecutableNode(final ExecutableNode node, final Props inputParams)
throws ExecutorManagerException {
+ // Clone the job node to mimick how it would be saved in DB.
+ // If we would keep a handle to the original job node, we would also see any changes made after
+ // this method was called. We must only store a snapshot of the current state.
+ // Also to avoid modifying statuses of the original job nodes in this.updateExecutableNode()
final ExecutableNode exNode = new ExecutableNode();
exNode.fillExecutableFromMapObject(node.toObject());