azkaban-aplcache

optimize flow traversal (#1677) (#1680) In order to provide

3/21/2018 5:36:53 PM

Details

diff --git a/azkaban-common/build.gradle b/azkaban-common/build.gradle
index 6e7ac72..658344f 100644
--- a/azkaban-common/build.gradle
+++ b/azkaban-common/build.gradle
@@ -49,6 +49,7 @@ dependencies {
     testCompile deps.hadoopHdfs
     testCompile project(':test')
     testCompile project(path: ':azkaban-db', configuration: 'testOutput')
+    testCompile deps.commonsCompress
 }
 
 tasks.withType(JavaCompile) {
diff --git a/azkaban-common/src/main/java/azkaban/flow/Flow.java b/azkaban-common/src/main/java/azkaban/flow/Flow.java
index a346659..09963d0 100644
--- a/azkaban-common/src/main/java/azkaban/flow/Flow.java
+++ b/azkaban-common/src/main/java/azkaban/flow/Flow.java
@@ -171,29 +171,32 @@ public class Flow {
         }
       }
 
-      for (final Node node : this.startNodes) {
-        node.setLevel(0);
-        this.numLevels = 0;
-        recursiveSetLevels(node);
-      }
+      setLevelsAndEdgeNodes(new HashSet<>(startNodes), 0);
     }
   }
 
-  private void recursiveSetLevels(final Node node) {
-    final Set<Edge> edges = this.outEdges.get(node.getId());
-    if (edges != null) {
-      for (final Edge edge : edges) {
-        final Node nextNode = this.nodes.get(edge.getTargetId());
-        edge.setSource(node);
-        edge.setTarget(nextNode);
+  private void setLevelsAndEdgeNodes(final Set<Node> levelNodes, int level) {
+    final Set<Node> nextLevelNodes = new HashSet<>();
+
+    for (Node node : levelNodes) {
+      node.setLevel(level);
+
+      final Set<Edge> edges = outEdges.get(node.getId());
+      if (edges != null) {
+        edges.forEach(edge -> {
+          edge.setSource(node);
+          edge.setTarget(nodes.get(edge.getTargetId()));
 
-        // We pick whichever is higher to get the max distance from root.
-        final int level = Math.max(node.getLevel() + 1, nextNode.getLevel());
-        nextNode.setLevel(level);
-        this.numLevels = Math.max(level, this.numLevels);
-        recursiveSetLevels(nextNode);
+          nextLevelNodes.add(edge.getTarget());
+        });
       }
     }
+
+    numLevels = level;
+
+    if (!nextLevelNodes.isEmpty()) {
+      setLevelsAndEdgeNodes(nextLevelNodes, level++);
+    }
   }
 
   public Node getNode(final String nodeId) {
diff --git a/azkaban-common/src/main/java/azkaban/project/DirectoryFlowLoader.java b/azkaban-common/src/main/java/azkaban/project/DirectoryFlowLoader.java
index c53e5c1..db2c2bb 100644
--- a/azkaban-common/src/main/java/azkaban/project/DirectoryFlowLoader.java
+++ b/azkaban-common/src/main/java/azkaban/project/DirectoryFlowLoader.java
@@ -317,7 +317,6 @@ public class DirectoryFlowLoader implements FlowLoader {
     }
 
     // Now create flows. Bad flows are marked invalid
-    final Set<String> visitedNodes = new HashSet<>();
     for (final Node base : this.nodeMap.values()) {
       // Root nodes can be discovered when parsing jobs
       if (this.rootNodes.contains(base.getId())
@@ -329,15 +328,20 @@ public class DirectoryFlowLoader implements FlowLoader {
         FlowLoaderUtils.addEmailPropsToFlow(flow, jobProp);
 
         flow.addAllFlowProperties(this.flowPropsList);
-        constructFlow(flow, base, visitedNodes);
+        final Set<String> visitedNodesOnPath = new HashSet<>();
+        final Set<String> visitedNodesEver = new HashSet<>();
+        constructFlow(flow, base, visitedNodesOnPath, visitedNodesEver);
+
         flow.initialize();
         this.flowMap.put(base.getId(), flow);
       }
     }
   }
 
-  private void constructFlow(final Flow flow, final Node node, final Set<String> visited) {
-    visited.add(node.getId());
+  private void constructFlow(final Flow flow, final Node node, final Set<String> visitedOnPath,
+      final Set<String> visitedEver) {
+    visitedOnPath.add(node.getId());
+    visitedEver.add(node.getId());
 
     flow.addNode(node);
     if (SpecialJobTypes.EMBEDDED_FLOW_TYPE.equals(node.getType())) {
@@ -359,22 +363,25 @@ public class DirectoryFlowLoader implements FlowLoader {
       for (Edge edge : dependencies.values()) {
         if (edge.hasError()) {
           flow.addEdge(edge);
-        } else if (visited.contains(edge.getSourceId())) {
+        } else if (visitedOnPath.contains(edge.getSourceId())) {
           // We have a cycle. We set it as an error edge
           edge = new Edge(edge.getSourceId(), node.getId());
           edge.setError("Cyclical dependencies found.");
           this.errors.add("Cyclical dependency found at " + edge.getId());
           flow.addEdge(edge);
+        } else if (visitedEver.contains(edge.getSourceId())) {
+          // this node was already checked, don't need to check further
+          flow.addEdge(edge);
         } else {
           // This should not be null
           flow.addEdge(edge);
           final Node sourceNode = this.nodeMap.get(edge.getSourceId());
-          constructFlow(flow, sourceNode, visited);
+          constructFlow(flow, sourceNode, visitedOnPath, visitedEver);
         }
       }
     }
 
-    visited.remove(node.getId());
+    visitedOnPath.remove(node.getId());
   }
 
   private String getNameWithoutExtension(final File file) {
diff --git a/azkaban-common/src/test/java/azkaban/project/DirectoryFlowLoaderTest.java b/azkaban-common/src/test/java/azkaban/project/DirectoryFlowLoaderTest.java
index cd6ebec..4bb013d 100644
--- a/azkaban-common/src/test/java/azkaban/project/DirectoryFlowLoaderTest.java
+++ b/azkaban-common/src/test/java/azkaban/project/DirectoryFlowLoaderTest.java
@@ -18,6 +18,18 @@ package azkaban.project;
 
 import azkaban.test.executions.ExecutionsTestUtil;
 import azkaban.utils.Props;
+import com.google.common.io.Files;
+import com.google.common.io.MoreFiles;
+import com.google.common.io.RecursiveDeleteOption;
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import org.apache.commons.compress.archivers.tar.TarArchiveEntry;
+import org.apache.commons.compress.archivers.tar.TarArchiveInputStream;
+import org.apache.commons.compress.compressors.bzip2.BZip2CompressorInputStream;
+import org.apache.commons.compress.utils.IOUtils;
 import org.junit.Assert;
 import org.junit.Before;
 import org.junit.Test;
@@ -65,4 +77,49 @@ public class DirectoryFlowLoaderTest {
     // Should be 3 errors: jobe->innerFlow, innerFlow->jobe, innerFlow
     Assert.assertEquals(3, loader.getErrors().size());
   }
+
+  private static File decompressTarBZ2(InputStream is) throws IOException {
+    File outputDir = Files.createTempDir();
+
+    try (TarArchiveInputStream tais = new TarArchiveInputStream(
+        new BZip2CompressorInputStream(is))) {
+      TarArchiveEntry entry;
+      while ((entry = tais.getNextTarEntry()) != null) {
+        if (entry.isDirectory()) {
+          continue;
+        }
+
+        File outputFile = new File(outputDir, entry.getName());
+        File parent = outputFile.getParentFile();
+        if (!parent.exists()) {
+          parent.mkdirs();
+        }
+
+        IOUtils.copy(tais, new FileOutputStream(outputFile));
+      }
+
+      return outputDir;
+    }
+  }
+
+  @Test
+  public void testMassiveFlow() throws Exception {
+    final DirectoryFlowLoader loader = new DirectoryFlowLoader(new Props());
+
+    File projectDir = null;
+    try (InputStream is = new FileInputStream(
+        ExecutionsTestUtil.getDataRootDir() + "/massive-flows/massive-flows.tar.bz2")) {
+      projectDir = decompressTarBZ2(is);
+
+      loader.loadProjectFlow(this.project, projectDir);
+      Assert.assertEquals(0, loader.getErrors().size());
+      Assert.assertEquals(185, loader.getFlowMap().size());
+      Assert.assertEquals(0, loader.getPropsList().size());
+      Assert.assertEquals(7121, loader.getJobPropsMap().size());
+    } finally {
+      if (projectDir != null) {
+        MoreFiles.deleteRecursively(projectDir.toPath(), RecursiveDeleteOption.ALLOW_INSECURE);
+      }
+    }
+  }
 }

build.gradle 1(+1 -0)

diff --git a/build.gradle b/build.gradle
index d179314..60eea52 100644
--- a/build.gradle
+++ b/build.gradle
@@ -64,6 +64,7 @@ ext.deps = [
         awaitility          : 'org.awaitility:awaitility:3.0.0',
         collections         : 'commons-collections:commons-collections:3.2.2',
         commonsLang         : 'commons-lang:commons-lang:2.6',
+        commonsCompress     : 'org.apache.commons:commons-compress:1.16.1',
         dbcp2               : 'org.apache.commons:commons-dbcp2:2.1.1',
         dbutils             : 'commons-dbutils:commons-dbutils:1.5',
         fileupload          : 'commons-fileupload:commons-fileupload:1.2.1',
diff --git a/test/execution-test-data/massive-flows/massive-flows.tar.bz2 b/test/execution-test-data/massive-flows/massive-flows.tar.bz2
new file mode 100644
index 0000000..1f03467
Binary files /dev/null and b/test/execution-test-data/massive-flows/massive-flows.tar.bz2 differ