FlowRunnerPropertyResolutionTest.java

236 lines | 8.743 kB Blame History Raw Download
package azkaban.test.execapp;

import java.io.File;
import java.io.IOException;
import java.util.HashMap;
import java.util.Map;

import org.apache.commons.io.FileUtils;
import org.apache.log4j.Logger;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;

import azkaban.execapp.FlowRunner;
import azkaban.executor.ExecutableFlow;
import azkaban.executor.ExecutableFlowBase;
import azkaban.executor.ExecutableNode;
import azkaban.executor.ExecutorLoader;
import azkaban.flow.Flow;
import azkaban.jobtype.JobTypeManager;
import azkaban.project.Project;
import azkaban.project.ProjectLoader;
import azkaban.project.ProjectManagerException;
import azkaban.test.executor.InteractiveTestJob;
import azkaban.test.executor.JavaJob;
import azkaban.utils.DirectoryFlowLoader;
import azkaban.utils.Props;

/**
 * Test the property resolution of jobs in a flow.
 * 
 * The tests are contained in execpropstest, and should be resolved in the following fashion,
 * where the later props take precedence over the previous ones.
 * 
 * 1. Global props (set in the FlowRunner)
 * 2. Shared job props (depends on job directory)
 * 3. Flow Override properties
 * 4. Previous job outputs to the embedded flow (Only if contained in embedded flow)
 * 5. Embedded flow properties (Only if contained in embedded flow)
 * 6. Previous job outputs (if exists)
 * 7. Job Props
 * 
 * The test contains the following structure:
 * job2 -> innerFlow (job1 -> job4 ) -> job3
 * 
 * job2 and 4 are in nested directories so should have different shared properties than other jobs.
 */
public class FlowRunnerPropertyResolutionTest {
	private File workingDir;
	private JobTypeManager jobtypeManager;
	private ProjectLoader fakeProjectLoader;
	private ExecutorLoader fakeExecutorLoader;
	private Logger logger = Logger.getLogger(FlowRunnerTest2.class);
	private Project project;
	private Map<String, Flow> flowMap;
	private static int id=101;
	
	@Before
	public void setUp() throws Exception {
		System.out.println("Create temp dir");
		workingDir = new File("_AzkabanTestDir_" + System.currentTimeMillis());
		if (workingDir.exists()) {
			FileUtils.deleteDirectory(workingDir);
		}
		workingDir.mkdirs();
		jobtypeManager = new JobTypeManager(null, this.getClass().getClassLoader());
		jobtypeManager.getJobTypePluginSet().addPluginClass("java", JavaJob.class);
		jobtypeManager.getJobTypePluginSet().addPluginClass("test", InteractiveTestJob.class);
		fakeProjectLoader = new MockProjectLoader(workingDir);
		fakeExecutorLoader = new MockExecutorLoader();
		project = new Project(1, "testProject");
		
		File dir = new File("unit/executions/execpropstest");
		prepareProject(dir);
		
		InteractiveTestJob.clearTestJobs();
	}
	
	@After
	public void tearDown() throws IOException {
		System.out.println("Teardown temp dir");
		if (workingDir != null) {
			FileUtils.deleteDirectory(workingDir);
			workingDir = null;
		}
	}
	
	/**
	 * Tests the basic flow resolution. Flow is defined in execpropstest
	 * @throws Exception
	 */
	@Test
	public void testPropertyResolution() throws Exception {
		HashMap<String, String> flowProps = new HashMap<String,String>();
		flowProps.put("props7", "flow7");
		flowProps.put("props6", "flow6");
		flowProps.put("props5", "flow5");
		FlowRunner runner = createFlowRunner("job3", flowProps);
		Map<String, ExecutableNode> nodeMap = new HashMap<String, ExecutableNode>();
		createNodeMap(runner.getExecutableFlow(), nodeMap);
		
		// 1. Start flow. Job 2 should start
		Thread thread = runFlowRunnerInThread(runner);
		pause(250);
		
		// Job 2 is a normal job.
		// Only the flow overrides and the shared properties matter
		ExecutableNode node = nodeMap.get("job2");
		Props job2Props = node.getInputProps();
		Assert.assertEquals("shared1", job2Props.get("props1"));
		Assert.assertEquals("job2", job2Props.get("props2"));
		Assert.assertEquals("moo3", job2Props.get("props3"));
		Assert.assertEquals("job7", job2Props.get("props7"));
		Assert.assertEquals("flow5", job2Props.get("props5"));
		Assert.assertEquals("flow6", job2Props.get("props6"));
		Assert.assertEquals("shared4", job2Props.get("props4"));
		Assert.assertEquals("shared8", job2Props.get("props8"));
		
		// Job 1 is inside another flow, and is nested in a different directory
		// The priority order should be: job1->innerflow->job2.output->flow.overrides->job1 shared props
		Props job2Generated = new Props();
		job2Generated.put("props6","gjob6");
		job2Generated.put("props9","gjob9");
		job2Generated.put("props10","gjob10");
		InteractiveTestJob.getTestJob("job2").succeedJob(job2Generated);
		pause(250);
		node = nodeMap.get("innerflow:job1");
		Props job1Props = node.getInputProps();
		Assert.assertEquals("job1", job1Props.get("props1"));
		Assert.assertEquals("job2", job1Props.get("props2"));
		Assert.assertEquals("job8", job1Props.get("props8"));
		Assert.assertEquals("gjob9", job1Props.get("props9"));
		Assert.assertEquals("gjob10", job1Props.get("props10"));
		Assert.assertEquals("innerflow6", job1Props.get("props6"));
		Assert.assertEquals("innerflow5", job1Props.get("props5"));
		Assert.assertEquals("flow7", job1Props.get("props7"));
		Assert.assertEquals("moo3", job1Props.get("props3"));
		Assert.assertEquals("moo4", job1Props.get("props4"));
		
		// Job 4 is inside another flow and takes output from job 1
		// The priority order should be: job4->job1.output->innerflow->job2.output->flow.overrides->job4 shared props
		Props job1GeneratedProps = new Props();
		job1GeneratedProps.put("props9", "g2job9");
		job1GeneratedProps.put("props7", "g2job7");
		InteractiveTestJob.getTestJob("innerflow:job1").succeedJob(job1GeneratedProps);
		pause(250);
		node = nodeMap.get("innerflow:job4");
		Props job4Props = node.getInputProps();
		Assert.assertEquals("job8", job4Props.get("props8"));
		Assert.assertEquals("job9", job4Props.get("props9"));
		Assert.assertEquals("g2job7", job4Props.get("props7"));
		Assert.assertEquals("innerflow5", job4Props.get("props5"));
		Assert.assertEquals("innerflow6", job4Props.get("props6"));
		Assert.assertEquals("gjob10", job4Props.get("props10"));
		Assert.assertEquals("shared4", job4Props.get("props4"));
		Assert.assertEquals("shared1", job4Props.get("props1"));
		Assert.assertEquals("shared2", job4Props.get("props2"));
		Assert.assertEquals("moo3", job4Props.get("props3"));
		
		// Job 3 is a normal job taking props from an embedded flow
		// The priority order should be: job3->innerflow.output->flow.overrides->job3.sharedprops
		Props job4GeneratedProps = new Props();
		job4GeneratedProps.put("props9", "g4job9");
		job4GeneratedProps.put("props6", "g4job6");
		InteractiveTestJob.getTestJob("innerflow:job4").succeedJob(job4GeneratedProps);
		pause(250);
		node = nodeMap.get("job3");
		Props job3Props = node.getInputProps();
		Assert.assertEquals("job3", job3Props.get("props3"));
		Assert.assertEquals("g4job6", job3Props.get("props6"));
		Assert.assertEquals("g4job9", job3Props.get("props9"));
		Assert.assertEquals("flow7", job3Props.get("props7"));
		Assert.assertEquals("flow5", job3Props.get("props5"));
		Assert.assertEquals("shared1", job3Props.get("props1"));
		Assert.assertEquals("shared2", job3Props.get("props2"));
		Assert.assertEquals("moo4", job3Props.get("props4"));
	}
	
	private void prepareProject(File directory) throws ProjectManagerException, IOException {
		DirectoryFlowLoader loader = new DirectoryFlowLoader(logger);
		loader.loadProjectFlow(directory);
		if (!loader.getErrors().isEmpty()) {
			for (String error: loader.getErrors()) {
				System.out.println(error);
			}
			
			throw new RuntimeException("Errors found in setup");
		}
		
		flowMap = loader.getFlowMap();
		project.setFlows(flowMap);
		FileUtils.copyDirectory(directory, workingDir);
	}
	
	private FlowRunner createFlowRunner(String flowName, HashMap<String, String> flowParams) throws Exception {
		Flow flow = flowMap.get(flowName);

		int exId = id++;
		ExecutableFlow exFlow = new ExecutableFlow(project, flow);
		exFlow.setExecutionPath(workingDir.getPath());
		exFlow.setExecutionId(exId);

		exFlow.getExecutionOptions().addAllFlowParameters(flowParams);
		fakeExecutorLoader.uploadExecutableFlow(exFlow);
	
		FlowRunner runner = new FlowRunner(fakeExecutorLoader.fetchExecutableFlow(exId), fakeExecutorLoader, fakeProjectLoader, jobtypeManager);
		return runner;
	}

	private void createNodeMap(ExecutableFlowBase flow, Map<String, ExecutableNode> nodeMap) {
		for (ExecutableNode node: flow.getExecutableNodes()) {
			nodeMap.put(node.getNestedId(), node);
			
			if (node instanceof ExecutableFlowBase) {
				createNodeMap((ExecutableFlowBase)node, nodeMap);
			}
		}
	}

	private Thread runFlowRunnerInThread(FlowRunner runner) {
		Thread thread = new Thread(runner);
		thread.start();
		return thread;
	}
	
	private void pause(long millisec) {
		synchronized(this) {
			try {
				wait(millisec);
			}
			catch (InterruptedException e) {
			}
		}
	}
}