azkaban-developers

Details

diff --git a/unit/executions/execpropstest/innerflow.job b/unit/executions/execpropstest/innerflow.job
new file mode 100644
index 0000000..18def67
--- /dev/null
+++ b/unit/executions/execpropstest/innerflow.job
@@ -0,0 +1,6 @@
+type=flow
+flow.name=job4
+dependencies=job2
+props5=innerflow5
+props6=innerflow6
+props8=innerflow8
\ No newline at end of file
diff --git a/unit/executions/execpropstest/job1.job b/unit/executions/execpropstest/job1.job
new file mode 100644
index 0000000..d910f1a
--- /dev/null
+++ b/unit/executions/execpropstest/job1.job
@@ -0,0 +1,4 @@
+type=test
+props1=job1
+props2=job2
+props8=job8
\ No newline at end of file
diff --git a/unit/executions/execpropstest/job3.job b/unit/executions/execpropstest/job3.job
new file mode 100644
index 0000000..d9be481
--- /dev/null
+++ b/unit/executions/execpropstest/job3.job
@@ -0,0 +1,3 @@
+type=test
+dependencies=innerflow
+props3=job3
\ No newline at end of file
diff --git a/unit/executions/execpropstest/moo.properties b/unit/executions/execpropstest/moo.properties
new file mode 100644
index 0000000..7e4f399
--- /dev/null
+++ b/unit/executions/execpropstest/moo.properties
@@ -0,0 +1,3 @@
+props3=moo3
+props4=moo4
+props5=moo5
\ No newline at end of file
diff --git a/unit/executions/execpropstest/shared.properties b/unit/executions/execpropstest/shared.properties
new file mode 100644
index 0000000..29b2b7b
--- /dev/null
+++ b/unit/executions/execpropstest/shared.properties
@@ -0,0 +1,3 @@
+props1=shared1
+props2=shared2
+props6=shared6
\ No newline at end of file
diff --git a/unit/executions/execpropstest/subdir/job2.job b/unit/executions/execpropstest/subdir/job2.job
new file mode 100644
index 0000000..bfd48e3
--- /dev/null
+++ b/unit/executions/execpropstest/subdir/job2.job
@@ -0,0 +1,3 @@
+type=test
+props2=job2
+props7=job7
\ No newline at end of file
diff --git a/unit/executions/execpropstest/subdir/job4.job b/unit/executions/execpropstest/subdir/job4.job
new file mode 100644
index 0000000..e26aaa6
--- /dev/null
+++ b/unit/executions/execpropstest/subdir/job4.job
@@ -0,0 +1,4 @@
+type=test
+dependencies=job1
+props8=job8
+props9=job9
\ No newline at end of file
diff --git a/unit/executions/execpropstest/subdir/shared.properties b/unit/executions/execpropstest/subdir/shared.properties
new file mode 100644
index 0000000..2ba1880
--- /dev/null
+++ b/unit/executions/execpropstest/subdir/shared.properties
@@ -0,0 +1,2 @@
+props4=shared4
+props8=shared8
\ No newline at end of file
diff --git a/unit/java/azkaban/test/execapp/FlowRunnerPropertyResolutionTest.java b/unit/java/azkaban/test/execapp/FlowRunnerPropertyResolutionTest.java
new file mode 100644
index 0000000..afddd0c
--- /dev/null
+++ b/unit/java/azkaban/test/execapp/FlowRunnerPropertyResolutionTest.java
@@ -0,0 +1,236 @@
+package azkaban.test.execapp;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.commons.io.FileUtils;
+import org.apache.log4j.Logger;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import azkaban.execapp.FlowRunner;
+import azkaban.executor.ExecutableFlow;
+import azkaban.executor.ExecutableFlowBase;
+import azkaban.executor.ExecutableNode;
+import azkaban.executor.ExecutorLoader;
+import azkaban.flow.Flow;
+import azkaban.jobtype.JobTypeManager;
+import azkaban.project.Project;
+import azkaban.project.ProjectLoader;
+import azkaban.project.ProjectManagerException;
+import azkaban.test.executor.InteractiveTestJob;
+import azkaban.test.executor.JavaJob;
+import azkaban.utils.DirectoryFlowLoader;
+import azkaban.utils.Props;
+
+/**
+ * Test the property resolution of jobs in a flow.
+ * 
+ * The tests are contained in execpropstest, and should be resolved in the following fashion,
+ * where the later props take precedence over the previous ones.
+ * 
+ * 1. Global props (set in the FlowRunner)
+ * 2. Shared job props (depends on job directory)
+ * 3. Flow Override properties
+ * 4. Previous job outputs to the embedded flow (Only if contained in embedded flow)
+ * 5. Embedded flow properties (Only if contained in embedded flow)
+ * 6. Previous job outputs (if exists)
+ * 7. Job Props
+ * 
+ * The test contains the following structure:
+ * job2 -> innerFlow (job1 -> job4 ) -> job3
+ * 
+ * job2 and 4 are in nested directories so should have different shared properties than other jobs.
+ */
+public class FlowRunnerPropertyResolutionTest {
+	private File workingDir;
+	private JobTypeManager jobtypeManager;
+	private ProjectLoader fakeProjectLoader;
+	private ExecutorLoader fakeExecutorLoader;
+	private Logger logger = Logger.getLogger(FlowRunnerTest2.class);
+	private Project project;
+	private Map<String, Flow> flowMap;
+	private static int id=101;
+	
+	@Before
+	public void setUp() throws Exception {
+		System.out.println("Create temp dir");
+		workingDir = new File("_AzkabanTestDir_" + System.currentTimeMillis());
+		if (workingDir.exists()) {
+			FileUtils.deleteDirectory(workingDir);
+		}
+		workingDir.mkdirs();
+		jobtypeManager = new JobTypeManager(null, this.getClass().getClassLoader());
+		jobtypeManager.getJobTypePluginSet().addPluginClass("java", JavaJob.class);
+		jobtypeManager.getJobTypePluginSet().addPluginClass("test", InteractiveTestJob.class);
+		fakeProjectLoader = new MockProjectLoader(workingDir);
+		fakeExecutorLoader = new MockExecutorLoader();
+		project = new Project(1, "testProject");
+		
+		File dir = new File("unit/executions/execpropstest");
+		prepareProject(dir);
+		
+		InteractiveTestJob.clearTestJobs();
+	}
+	
+	@After
+	public void tearDown() throws IOException {
+		System.out.println("Teardown temp dir");
+		if (workingDir != null) {
+			FileUtils.deleteDirectory(workingDir);
+			workingDir = null;
+		}
+	}
+	
+	/**
+	 * Tests the basic flow resolution. Flow is defined in execpropstest
+	 * @throws Exception
+	 */
+	@Test
+	public void testPropertyResolution() throws Exception {
+		HashMap<String, String> flowProps = new HashMap<String,String>();
+		flowProps.put("props7", "flow7");
+		flowProps.put("props6", "flow6");
+		flowProps.put("props5", "flow5");
+		FlowRunner runner = createFlowRunner("job3", flowProps);
+		Map<String, ExecutableNode> nodeMap = new HashMap<String, ExecutableNode>();
+		createNodeMap(runner.getExecutableFlow(), nodeMap);
+		
+		// 1. Start flow. Job 2 should start
+		Thread thread = runFlowRunnerInThread(runner);
+		pause(250);
+		
+		// Job 2 is a normal job.
+		// Only the flow overrides and the shared properties matter
+		ExecutableNode node = nodeMap.get("job2");
+		Props job2Props = node.getInputProps();
+		Assert.assertEquals("shared1", job2Props.get("props1"));
+		Assert.assertEquals("job2", job2Props.get("props2"));
+		Assert.assertEquals("moo3", job2Props.get("props3"));
+		Assert.assertEquals("job7", job2Props.get("props7"));
+		Assert.assertEquals("flow5", job2Props.get("props5"));
+		Assert.assertEquals("flow6", job2Props.get("props6"));
+		Assert.assertEquals("shared4", job2Props.get("props4"));
+		Assert.assertEquals("shared8", job2Props.get("props8"));
+		
+		// Job 1 is inside another flow, and is nested in a different directory
+		// The priority order should be: job1->innerflow->job2.output->flow.overrides->job1 shared props
+		Props job2Generated = new Props();
+		job2Generated.put("props6","gjob6");
+		job2Generated.put("props9","gjob9");
+		job2Generated.put("props10","gjob10");
+		InteractiveTestJob.getTestJob("job2").succeedJob(job2Generated);
+		pause(250);
+		node = nodeMap.get("innerflow:job1");
+		Props job1Props = node.getInputProps();
+		Assert.assertEquals("job1", job1Props.get("props1"));
+		Assert.assertEquals("job2", job1Props.get("props2"));
+		Assert.assertEquals("job8", job1Props.get("props8"));
+		Assert.assertEquals("gjob9", job1Props.get("props9"));
+		Assert.assertEquals("gjob10", job1Props.get("props10"));
+		Assert.assertEquals("innerflow6", job1Props.get("props6"));
+		Assert.assertEquals("innerflow5", job1Props.get("props5"));
+		Assert.assertEquals("flow7", job1Props.get("props7"));
+		Assert.assertEquals("moo3", job1Props.get("props3"));
+		Assert.assertEquals("moo4", job1Props.get("props4"));
+		
+		// Job 4 is inside another flow and takes output from job 1
+		// The priority order should be: job4->job1.output->innerflow->job2.output->flow.overrides->job4 shared props
+		Props job1GeneratedProps = new Props();
+		job1GeneratedProps.put("props9", "g2job9");
+		job1GeneratedProps.put("props7", "g2job7");
+		InteractiveTestJob.getTestJob("innerflow:job1").succeedJob(job1GeneratedProps);
+		pause(250);
+		node = nodeMap.get("innerflow:job4");
+		Props job4Props = node.getInputProps();
+		Assert.assertEquals("job8", job4Props.get("props8"));
+		Assert.assertEquals("job9", job4Props.get("props9"));
+		Assert.assertEquals("g2job7", job4Props.get("props7"));
+		Assert.assertEquals("innerflow5", job4Props.get("props5"));
+		Assert.assertEquals("innerflow6", job4Props.get("props6"));
+		Assert.assertEquals("gjob10", job4Props.get("props10"));
+		Assert.assertEquals("shared4", job4Props.get("props4"));
+		Assert.assertEquals("shared1", job4Props.get("props1"));
+		Assert.assertEquals("shared2", job4Props.get("props2"));
+		Assert.assertEquals("moo3", job4Props.get("props3"));
+		
+		// Job 3 is a normal job taking props from an embedded flow
+		// The priority order should be: job3->innerflow.output->flow.overrides->job3.sharedprops
+		Props job4GeneratedProps = new Props();
+		job4GeneratedProps.put("props9", "g4job9");
+		job4GeneratedProps.put("props6", "g4job6");
+		InteractiveTestJob.getTestJob("innerflow:job4").succeedJob(job4GeneratedProps);
+		pause(250);
+		node = nodeMap.get("job3");
+		Props job3Props = node.getInputProps();
+		Assert.assertEquals("job3", job3Props.get("props3"));
+		Assert.assertEquals("g4job6", job3Props.get("props6"));
+		Assert.assertEquals("g4job9", job3Props.get("props9"));
+		Assert.assertEquals("flow7", job3Props.get("props7"));
+		Assert.assertEquals("flow5", job3Props.get("props5"));
+		Assert.assertEquals("shared1", job3Props.get("props1"));
+		Assert.assertEquals("shared2", job3Props.get("props2"));
+		Assert.assertEquals("moo4", job3Props.get("props4"));
+	}
+	
+	private void prepareProject(File directory) throws ProjectManagerException, IOException {
+		DirectoryFlowLoader loader = new DirectoryFlowLoader(logger);
+		loader.loadProjectFlow(directory);
+		if (!loader.getErrors().isEmpty()) {
+			for (String error: loader.getErrors()) {
+				System.out.println(error);
+			}
+			
+			throw new RuntimeException("Errors found in setup");
+		}
+		
+		flowMap = loader.getFlowMap();
+		project.setFlows(flowMap);
+		FileUtils.copyDirectory(directory, workingDir);
+	}
+	
+	private FlowRunner createFlowRunner(String flowName, HashMap<String, String> flowParams) throws Exception {
+		Flow flow = flowMap.get(flowName);
+
+		int exId = id++;
+		ExecutableFlow exFlow = new ExecutableFlow(project, flow);
+		exFlow.setExecutionPath(workingDir.getPath());
+		exFlow.setExecutionId(exId);
+
+		exFlow.getExecutionOptions().addAllFlowParameters(flowParams);
+		fakeExecutorLoader.uploadExecutableFlow(exFlow);
+	
+		FlowRunner runner = new FlowRunner(fakeExecutorLoader.fetchExecutableFlow(exId), fakeExecutorLoader, fakeProjectLoader, jobtypeManager);
+		return runner;
+	}
+
+	private void createNodeMap(ExecutableFlowBase flow, Map<String, ExecutableNode> nodeMap) {
+		for (ExecutableNode node: flow.getExecutableNodes()) {
+			nodeMap.put(node.getNestedId(), node);
+			
+			if (node instanceof ExecutableFlowBase) {
+				createNodeMap((ExecutableFlowBase)node, nodeMap);
+			}
+		}
+	}
+
+	private Thread runFlowRunnerInThread(FlowRunner runner) {
+		Thread thread = new Thread(runner);
+		thread.start();
+		return thread;
+	}
+	
+	private void pause(long millisec) {
+		synchronized(this) {
+			try {
+				wait(millisec);
+			}
+			catch (InterruptedException e) {
+			}
+		}
+	}
+}
\ No newline at end of file