azkaban-aplcache
Details
azkaban-common/build.gradle 1(+1 -0)
diff --git a/azkaban-common/build.gradle b/azkaban-common/build.gradle
index 6e7ac72..658344f 100644
--- a/azkaban-common/build.gradle
+++ b/azkaban-common/build.gradle
@@ -49,6 +49,7 @@ dependencies {
testCompile deps.hadoopHdfs
testCompile project(':test')
testCompile project(path: ':azkaban-db', configuration: 'testOutput')
+ testCompile deps.commonsCompress
}
tasks.withType(JavaCompile) {
diff --git a/azkaban-common/src/main/java/azkaban/flow/Flow.java b/azkaban-common/src/main/java/azkaban/flow/Flow.java
index a346659..09963d0 100644
--- a/azkaban-common/src/main/java/azkaban/flow/Flow.java
+++ b/azkaban-common/src/main/java/azkaban/flow/Flow.java
@@ -171,29 +171,32 @@ public class Flow {
}
}
- for (final Node node : this.startNodes) {
- node.setLevel(0);
- this.numLevels = 0;
- recursiveSetLevels(node);
- }
+ setLevelsAndEdgeNodes(new HashSet<>(startNodes), 0);
}
}
- private void recursiveSetLevels(final Node node) {
- final Set<Edge> edges = this.outEdges.get(node.getId());
- if (edges != null) {
- for (final Edge edge : edges) {
- final Node nextNode = this.nodes.get(edge.getTargetId());
- edge.setSource(node);
- edge.setTarget(nextNode);
+ private void setLevelsAndEdgeNodes(final Set<Node> levelNodes, int level) {
+ final Set<Node> nextLevelNodes = new HashSet<>();
+
+ for (Node node : levelNodes) {
+ node.setLevel(level);
+
+ final Set<Edge> edges = outEdges.get(node.getId());
+ if (edges != null) {
+ edges.forEach(edge -> {
+ edge.setSource(node);
+ edge.setTarget(nodes.get(edge.getTargetId()));
- // We pick whichever is higher to get the max distance from root.
- final int level = Math.max(node.getLevel() + 1, nextNode.getLevel());
- nextNode.setLevel(level);
- this.numLevels = Math.max(level, this.numLevels);
- recursiveSetLevels(nextNode);
+ nextLevelNodes.add(edge.getTarget());
+ });
}
}
+
+ numLevels = level;
+
+ if (!nextLevelNodes.isEmpty()) {
+ setLevelsAndEdgeNodes(nextLevelNodes, level++);
+ }
}
public Node getNode(final String nodeId) {
diff --git a/azkaban-common/src/main/java/azkaban/project/DirectoryFlowLoader.java b/azkaban-common/src/main/java/azkaban/project/DirectoryFlowLoader.java
index c53e5c1..db2c2bb 100644
--- a/azkaban-common/src/main/java/azkaban/project/DirectoryFlowLoader.java
+++ b/azkaban-common/src/main/java/azkaban/project/DirectoryFlowLoader.java
@@ -317,7 +317,6 @@ public class DirectoryFlowLoader implements FlowLoader {
}
// Now create flows. Bad flows are marked invalid
- final Set<String> visitedNodes = new HashSet<>();
for (final Node base : this.nodeMap.values()) {
// Root nodes can be discovered when parsing jobs
if (this.rootNodes.contains(base.getId())
@@ -329,15 +328,20 @@ public class DirectoryFlowLoader implements FlowLoader {
FlowLoaderUtils.addEmailPropsToFlow(flow, jobProp);
flow.addAllFlowProperties(this.flowPropsList);
- constructFlow(flow, base, visitedNodes);
+ final Set<String> visitedNodesOnPath = new HashSet<>();
+ final Set<String> visitedNodesEver = new HashSet<>();
+ constructFlow(flow, base, visitedNodesOnPath, visitedNodesEver);
+
flow.initialize();
this.flowMap.put(base.getId(), flow);
}
}
}
- private void constructFlow(final Flow flow, final Node node, final Set<String> visited) {
- visited.add(node.getId());
+ private void constructFlow(final Flow flow, final Node node, final Set<String> visitedOnPath,
+ final Set<String> visitedEver) {
+ visitedOnPath.add(node.getId());
+ visitedEver.add(node.getId());
flow.addNode(node);
if (SpecialJobTypes.EMBEDDED_FLOW_TYPE.equals(node.getType())) {
@@ -359,22 +363,25 @@ public class DirectoryFlowLoader implements FlowLoader {
for (Edge edge : dependencies.values()) {
if (edge.hasError()) {
flow.addEdge(edge);
- } else if (visited.contains(edge.getSourceId())) {
+ } else if (visitedOnPath.contains(edge.getSourceId())) {
// We have a cycle. We set it as an error edge
edge = new Edge(edge.getSourceId(), node.getId());
edge.setError("Cyclical dependencies found.");
this.errors.add("Cyclical dependency found at " + edge.getId());
flow.addEdge(edge);
+ } else if (visitedEver.contains(edge.getSourceId())) {
+ // this node was already checked, don't need to check further
+ flow.addEdge(edge);
} else {
// This should not be null
flow.addEdge(edge);
final Node sourceNode = this.nodeMap.get(edge.getSourceId());
- constructFlow(flow, sourceNode, visited);
+ constructFlow(flow, sourceNode, visitedOnPath, visitedEver);
}
}
}
- visited.remove(node.getId());
+ visitedOnPath.remove(node.getId());
}
private String getNameWithoutExtension(final File file) {
diff --git a/azkaban-common/src/test/java/azkaban/project/DirectoryFlowLoaderTest.java b/azkaban-common/src/test/java/azkaban/project/DirectoryFlowLoaderTest.java
index cd6ebec..4bb013d 100644
--- a/azkaban-common/src/test/java/azkaban/project/DirectoryFlowLoaderTest.java
+++ b/azkaban-common/src/test/java/azkaban/project/DirectoryFlowLoaderTest.java
@@ -18,6 +18,18 @@ package azkaban.project;
import azkaban.test.executions.ExecutionsTestUtil;
import azkaban.utils.Props;
+import com.google.common.io.Files;
+import com.google.common.io.MoreFiles;
+import com.google.common.io.RecursiveDeleteOption;
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import org.apache.commons.compress.archivers.tar.TarArchiveEntry;
+import org.apache.commons.compress.archivers.tar.TarArchiveInputStream;
+import org.apache.commons.compress.compressors.bzip2.BZip2CompressorInputStream;
+import org.apache.commons.compress.utils.IOUtils;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
@@ -65,4 +77,49 @@ public class DirectoryFlowLoaderTest {
// Should be 3 errors: jobe->innerFlow, innerFlow->jobe, innerFlow
Assert.assertEquals(3, loader.getErrors().size());
}
+
+ private static File decompressTarBZ2(InputStream is) throws IOException {
+ File outputDir = Files.createTempDir();
+
+ try (TarArchiveInputStream tais = new TarArchiveInputStream(
+ new BZip2CompressorInputStream(is))) {
+ TarArchiveEntry entry;
+ while ((entry = tais.getNextTarEntry()) != null) {
+ if (entry.isDirectory()) {
+ continue;
+ }
+
+ File outputFile = new File(outputDir, entry.getName());
+ File parent = outputFile.getParentFile();
+ if (!parent.exists()) {
+ parent.mkdirs();
+ }
+
+ IOUtils.copy(tais, new FileOutputStream(outputFile));
+ }
+
+ return outputDir;
+ }
+ }
+
+ @Test
+ public void testMassiveFlow() throws Exception {
+ final DirectoryFlowLoader loader = new DirectoryFlowLoader(new Props());
+
+ File projectDir = null;
+ try (InputStream is = new FileInputStream(
+ ExecutionsTestUtil.getDataRootDir() + "/massive-flows/massive-flows.tar.bz2")) {
+ projectDir = decompressTarBZ2(is);
+
+ loader.loadProjectFlow(this.project, projectDir);
+ Assert.assertEquals(0, loader.getErrors().size());
+ Assert.assertEquals(185, loader.getFlowMap().size());
+ Assert.assertEquals(0, loader.getPropsList().size());
+ Assert.assertEquals(7121, loader.getJobPropsMap().size());
+ } finally {
+ if (projectDir != null) {
+ MoreFiles.deleteRecursively(projectDir.toPath(), RecursiveDeleteOption.ALLOW_INSECURE);
+ }
+ }
+ }
}
build.gradle 1(+1 -0)
diff --git a/build.gradle b/build.gradle
index d179314..60eea52 100644
--- a/build.gradle
+++ b/build.gradle
@@ -64,6 +64,7 @@ ext.deps = [
awaitility : 'org.awaitility:awaitility:3.0.0',
collections : 'commons-collections:commons-collections:3.2.2',
commonsLang : 'commons-lang:commons-lang:2.6',
+ commonsCompress : 'org.apache.commons:commons-compress:1.16.1',
dbcp2 : 'org.apache.commons:commons-dbcp2:2.1.1',
dbutils : 'commons-dbutils:commons-dbutils:1.5',
fileupload : 'commons-fileupload:commons-fileupload:1.2.1',
diff --git a/test/execution-test-data/massive-flows/massive-flows.tar.bz2 b/test/execution-test-data/massive-flows/massive-flows.tar.bz2
new file mode 100644
index 0000000..1f03467
Binary files /dev/null and b/test/execution-test-data/massive-flows/massive-flows.tar.bz2 differ