azkaban-developers

Fix bug in MockExecutorLoader (#1116) Fix concurrency bug

6/3/2017 7:40:55 PM

Details

diff --git a/azkaban-common/src/test/java/azkaban/executor/ExecutorManagerTest.java b/azkaban-common/src/test/java/azkaban/executor/ExecutorManagerTest.java
index 7f3727d..f944421 100644
--- a/azkaban-common/src/test/java/azkaban/executor/ExecutorManagerTest.java
+++ b/azkaban-common/src/test/java/azkaban/executor/ExecutorManagerTest.java
@@ -16,18 +16,15 @@
 
 package azkaban.executor;
 
-import azkaban.AzkabanCommonModule;
-import azkaban.ServiceProvider;
-import com.google.inject.Guice;
-import com.google.inject.Injector;
 import java.io.IOException;
+import java.util.Arrays;
 import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.HashSet;
-import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
+import java.util.stream.Collectors;
 
 import org.junit.Assert;
 import org.junit.Test;
@@ -210,21 +207,20 @@ public class ExecutorManagerTest {
     manager.submitExecutableFlow(flow1, testUser.getUserId());
     manager.submitExecutableFlow(flow2, testUser.getUserId());
 
-    List<ExecutableFlow> testFlows = new LinkedList<ExecutableFlow>();
-    testFlows.add(flow1);
-    testFlows.add(flow2);
+    List<Integer> testFlows = Arrays.asList(flow1.getExecutionId(), flow2.getExecutionId());
 
     List<Pair<ExecutionReference, ExecutableFlow>> queuedFlowsDB =
       loader.fetchQueuedFlows();
     Assert.assertEquals(queuedFlowsDB.size(), testFlows.size());
     // Verify things are correctly setup in db
     for (Pair<ExecutionReference, ExecutableFlow> pair : queuedFlowsDB) {
-      Assert.assertTrue(testFlows.contains(pair.getSecond()));
+      Assert.assertTrue(testFlows.contains(pair.getSecond().getExecutionId()));
     }
 
     // Verify running flows using old definition of "running" flows i.e. a
     // non-dispatched flow is also considered running
-    List<ExecutableFlow> managerActiveFlows = manager.getRunningFlows();
+    List<Integer> managerActiveFlows = manager.getRunningFlows()
+        .stream().map(ExecutableFlow::getExecutionId).collect(Collectors.toList());
     Assert.assertTrue(managerActiveFlows.containsAll(testFlows)
       && testFlows.containsAll(managerActiveFlows));
 
diff --git a/azkaban-common/src/test/java/azkaban/executor/MockExecutorLoader.java b/azkaban-common/src/test/java/azkaban/executor/MockExecutorLoader.java
index 148a0a1..1d320c5 100644
--- a/azkaban-common/src/test/java/azkaban/executor/MockExecutorLoader.java
+++ b/azkaban-common/src/test/java/azkaban/executor/MockExecutorLoader.java
@@ -21,14 +21,23 @@ import azkaban.utils.FileIOUtils.LogData;
 import azkaban.utils.Pair;
 import azkaban.utils.Props;
 import java.io.File;
+import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Date;
 import java.util.List;
 import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
+import org.apache.commons.io.FileUtils;
+import org.apache.log4j.Logger;
 
+/**
+ * Used in unit tests to mock the "DB layer" (the real implementation is JdbcExecutorLoader).
+ * Captures status updates of jobs and flows (in memory) so that they can be checked in tests.
+ */
 public class MockExecutorLoader implements ExecutorLoader {
 
+  private static final Logger logger = Logger.getLogger(MockExecutorLoader.class);
+
   Map<Integer, Integer> executionExecutorMapping = new ConcurrentHashMap<>();
   Map<Integer, ExecutableFlow> flows = new ConcurrentHashMap<>();
   Map<String, ExecutableNode> nodes = new ConcurrentHashMap<>();
@@ -43,7 +52,12 @@ public class MockExecutorLoader implements ExecutorLoader {
   @Override
   public void uploadExecutableFlow(final ExecutableFlow flow)
       throws ExecutorManagerException {
-    this.flows.put(flow.getExecutionId(), flow);
+    // Clone the flow node to mimick how it would be saved in DB.
+    // If we would keep a handle to the original flow node, we would also see any changes made after
+    // this method was called. We must only store a snapshot of the current state.
+    // Also to avoid modifying statuses of the original job nodes in this.updateExecutableFlow()
+    final ExecutableFlow exFlow = ExecutableFlow.createExecutableFlowFromObject(flow.toObject());
+    this.flows.put(flow.getExecutionId(), exFlow);
     this.flowUpdateCount++;
   }
 
@@ -92,7 +106,14 @@ public class MockExecutorLoader implements ExecutorLoader {
   public void uploadLogFile(final int execId, final String name, final int attempt,
       final File... files)
       throws ExecutorManagerException {
-
+    for (final File file : files) {
+      try {
+        final String logs = FileUtils.readFileToString(file, "UTF-8");
+        logger.info("Uploaded log for [" + name + "]:[" + execId + "]:\n" + logs);
+      } catch (final IOException e) {
+        throw new RuntimeException(e);
+      }
+    }
   }
 
   @Override
@@ -100,13 +121,17 @@ public class MockExecutorLoader implements ExecutorLoader {
       throws ExecutorManagerException {
     final ExecutableFlow toUpdate = this.flows.get(flow.getExecutionId());
 
-    toUpdate.applyUpdateObject((Map<String, Object>) flow.toUpdateObject(0));
+    toUpdate.applyUpdateObject(flow.toUpdateObject(0));
     this.flowUpdateCount++;
   }
 
   @Override
   public void uploadExecutableNode(final ExecutableNode node, final Props inputParams)
       throws ExecutorManagerException {
+    // Clone the job node to mimick how it would be saved in DB.
+    // If we would keep a handle to the original job node, we would also see any changes made after
+    // this method was called. We must only store a snapshot of the current state.
+    // Also to avoid modifying statuses of the original job nodes in this.updateExecutableNode()
     final ExecutableNode exNode = new ExecutableNode();
     exNode.fillExecutableFromMapObject(node.toObject());