JdbcExecutorLoaderTest.java

473 lines | 16.575 kB Blame History Raw Download
package azkaban.test.executor;

import java.io.File;
import java.io.IOException;
import java.sql.Connection;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.util.HashMap;
import java.util.HashSet;	
import java.util.Map;

import javax.sql.DataSource;

import org.apache.commons.dbutils.DbUtils;
import org.apache.commons.dbutils.QueryRunner;
import org.apache.commons.dbutils.ResultSetHandler;
import org.joda.time.DateTime;
import org.junit.Assert;
import org.junit.BeforeClass;
import org.junit.Test;


import azkaban.executor.ExecutableFlow;
import azkaban.executor.ExecutableJobInfo;
import azkaban.executor.ExecutableNode;
import azkaban.executor.ExecutionReference;
import azkaban.executor.ExecutorLoader;
import azkaban.executor.ExecutorManagerException;
import azkaban.executor.JdbcExecutorLoader;
import azkaban.executor.Status;
import azkaban.flow.Flow;

import azkaban.database.DataSourceUtils;
import azkaban.project.Project;
import azkaban.utils.FileIOUtils.LogData;
import azkaban.utils.JSONUtils;
import azkaban.utils.Pair;
import azkaban.utils.Props;

public class JdbcExecutorLoaderTest {
	private static boolean testDBExists;
	//@TODO remove this and turn into local host.
	private static final String host = "cyu-ld.linkedin.biz";
	private static final int port = 3306;
	private static final String database = "azkaban2";
	private static final String user = "azkaban";
	private static final String password = "azkaban";
	private static final int numConnections = 10;
	
	private File flowDir = new File("unit/executions/exectest1");
	
	@BeforeClass
	public static void setupDB() {
		DataSource dataSource = DataSourceUtils.getMySQLDataSource(host, port, database, user, password, numConnections);
		testDBExists = true;
		
		Connection connection = null;
		try {
			connection = dataSource.getConnection();
		} catch (SQLException e) {
			e.printStackTrace();
			testDBExists = false;
			DbUtils.closeQuietly(connection);
			return;
		}

		CountHandler countHandler = new CountHandler();
		QueryRunner runner = new QueryRunner();
		try {
			runner.query(connection, "SELECT COUNT(1) FROM active_executing_flows", countHandler);
		} catch (SQLException e) {
			e.printStackTrace();
			testDBExists = false;
			DbUtils.closeQuietly(connection);
			return;
		}

		try {
			runner.query(connection, "SELECT COUNT(1) FROM execution_flows", countHandler);
		} catch (SQLException e) {
			e.printStackTrace();
			testDBExists = false;
			DbUtils.closeQuietly(connection);
			return;
		}

		try {
			runner.query(connection, "SELECT COUNT(1) FROM execution_jobs", countHandler);
		} catch (SQLException e) {
			e.printStackTrace();
			testDBExists = false;
			DbUtils.closeQuietly(connection);
			return;
		}
		
		try {
			runner.query(connection, "SELECT COUNT(1) FROM execution_logs", countHandler);
		} catch (SQLException e) {
			e.printStackTrace();
			testDBExists = false;
			DbUtils.closeQuietly(connection);
			return;
		}
		
		DbUtils.closeQuietly(connection);
		
		clearDB();
	}
	
	private static void clearDB() {
		if (!testDBExists) {
			return;
		}

		DataSource dataSource = DataSourceUtils.getMySQLDataSource(host, port, database, user, password, numConnections);
		Connection connection = null;
		try {
			connection = dataSource.getConnection();
		} catch (SQLException e) {
			e.printStackTrace();
			testDBExists = false;
			DbUtils.closeQuietly(connection);
			return;
		}

		QueryRunner runner = new QueryRunner();
		try {
			runner.update(connection, "DELETE FROM active_executing_flows");
			
		} catch (SQLException e) {
			e.printStackTrace();
			testDBExists = false;
			DbUtils.closeQuietly(connection);
			return;
		}

		try {
			runner.update(connection, "DELETE FROM execution_flows");
		} catch (SQLException e) {
			e.printStackTrace();
			testDBExists = false;
			DbUtils.closeQuietly(connection);
			return;
		}
		
		try {
			runner.update(connection, "DELETE FROM execution_jobs");
		} catch (SQLException e) {
			e.printStackTrace();
			testDBExists = false;
			DbUtils.closeQuietly(connection);
			return;
		}

		try {
			runner.update(connection, "DELETE FROM execution_logs");
		} catch (SQLException e) {
			e.printStackTrace();
			testDBExists = false;
			DbUtils.closeQuietly(connection);
			return;
		}
		
		DbUtils.closeQuietly(connection);
	}
	
	@Test
	public void testUploadExecutionFlows() throws Exception {
		if (!isTestSetup()) {
			return;
		}
		
		ExecutorLoader loader = createLoader();
		ExecutableFlow flow = createExecutableFlow("exec1");

		loader.uploadExecutableFlow(flow);
		
		ExecutableFlow fetchFlow = loader.fetchExecutableFlow(flow.getExecutionId());

		// Shouldn't be the same object.
		Assert.assertTrue(flow != fetchFlow);
		Assert.assertEquals(flow.getExecutionId(), fetchFlow.getExecutionId());
		Assert.assertEquals(flow.getEndTime(), fetchFlow.getEndTime());
		Assert.assertEquals(flow.getStartTime(), fetchFlow.getStartTime());
		Assert.assertEquals(flow.getSubmitTime(), fetchFlow.getSubmitTime());
		Assert.assertEquals(flow.getFlowId(), fetchFlow.getFlowId());
		Assert.assertEquals(flow.getProjectId(), fetchFlow.getProjectId());
		Assert.assertEquals(flow.getVersion(), fetchFlow.getVersion());
		Assert.assertEquals(flow.getExecutionOptions().getFailureAction(), fetchFlow.getExecutionOptions().getFailureAction());
		Assert.assertEquals(new HashSet<String>(flow.getEndNodes()), new HashSet<String>(fetchFlow.getEndNodes()));
	}
	
	@Test
	public void testUpdateExecutionFlows() throws Exception {
		if (!isTestSetup()) {
			return;
		}
		
		ExecutorLoader loader = createLoader();
		ExecutableFlow flow = createExecutableFlow("exec1");

		loader.uploadExecutableFlow(flow);
		
		ExecutableFlow fetchFlow2 = loader.fetchExecutableFlow(flow.getExecutionId());
		
		fetchFlow2.setEndTime(System.currentTimeMillis());
		fetchFlow2.setStatus(Status.SUCCEEDED);
		loader.updateExecutableFlow(fetchFlow2);
		ExecutableFlow fetchFlow = loader.fetchExecutableFlow(flow.getExecutionId());
		
		// Shouldn't be the same object.
		Assert.assertTrue(flow != fetchFlow);
		Assert.assertEquals(flow.getExecutionId(), fetchFlow.getExecutionId());
		Assert.assertEquals(fetchFlow2.getEndTime(), fetchFlow.getEndTime());
		Assert.assertEquals(fetchFlow2.getStatus(), fetchFlow.getStatus());
		Assert.assertEquals(flow.getStartTime(), fetchFlow.getStartTime());
		Assert.assertEquals(flow.getSubmitTime(), fetchFlow.getSubmitTime());
		Assert.assertEquals(flow.getFlowId(), fetchFlow.getFlowId());
		Assert.assertEquals(flow.getProjectId(), fetchFlow.getProjectId());
		Assert.assertEquals(flow.getVersion(), fetchFlow.getVersion());
		Assert.assertEquals(flow.getExecutionOptions().getFailureAction(), fetchFlow.getExecutionOptions().getFailureAction());
		Assert.assertEquals(new HashSet<String>(flow.getEndNodes()), new HashSet<String>(fetchFlow.getEndNodes()));
	}
	
	
	@Test
	public void testUploadExecutableNode() throws Exception {
		if (!isTestSetup()) {
			return;
		}
		
		ExecutorLoader loader = createLoader();
		ExecutableFlow flow = createExecutableFlow(10, "exec1");
		flow.setExecutionId(10);
		
		File jobFile = new File(flowDir, "job10.job");
		Props props = new Props(null, jobFile);
		props.put("test","test2");
		ExecutableNode oldNode = flow.getExecutableNode("job10");
		oldNode.setStartTime(System.currentTimeMillis());
		loader.uploadExecutableNode(oldNode, props);
		
		ExecutableJobInfo info = loader.fetchJobInfo(10, "job10", 0);
		Assert.assertEquals(flow.getExecutionId(), info.getExecId());
		Assert.assertEquals(flow.getProjectId(), info.getProjectId());
		Assert.assertEquals(flow.getVersion(), info.getVersion());
		Assert.assertEquals(flow.getFlowId(), info.getFlowId());
		Assert.assertEquals(oldNode.getId(), info.getJobId());
		Assert.assertEquals(oldNode.getStatus(), info.getStatus());
		Assert.assertEquals(oldNode.getStartTime(), info.getStartTime());
		Assert.assertEquals("endTime = " + oldNode.getEndTime() + " info endTime = " + info.getEndTime(), oldNode.getEndTime(), info.getEndTime());
	
		// Fetch props
		Props outputProps = new Props();
		outputProps.put("hello", "output");
		oldNode.setOutputProps(outputProps);
		oldNode.setEndTime(System.currentTimeMillis());
		loader.updateExecutableNode(oldNode);

		Props fInputProps = loader.fetchExecutionJobInputProps(10, "job10");
		Props fOutputProps = loader.fetchExecutionJobOutputProps(10, "job10");
		Pair<Props,Props> inOutProps = loader.fetchExecutionJobProps(10, "job10");
		
		Assert.assertEquals(fInputProps.get("test"), "test2");
		Assert.assertEquals(fOutputProps.get("hello"), "output");
		Assert.assertEquals(inOutProps.getFirst().get("test"), "test2");
		Assert.assertEquals(inOutProps.getSecond().get("hello"), "output");
		
	}
	
	@Test
	public void testActiveReference() throws Exception {
		if (!isTestSetup()) {
			return;
		}
		
		ExecutorLoader loader = createLoader();
		ExecutableFlow flow1 = createExecutableFlow("exec1");
		loader.uploadExecutableFlow(flow1);
		ExecutionReference ref1 = new ExecutionReference(flow1.getExecutionId(), "test", 1);
		loader.addActiveExecutableReference(ref1);
		
		ExecutableFlow flow2 = createExecutableFlow("exec1");
		loader.uploadExecutableFlow(flow2);
		ExecutionReference ref2 = new ExecutionReference(flow2.getExecutionId(), "test", 1);
		loader.addActiveExecutableReference(ref2);
		
		ExecutableFlow flow3 = createExecutableFlow("exec1");
		loader.uploadExecutableFlow(flow3);
		
		Map<Integer, Pair<ExecutionReference, ExecutableFlow>> activeFlows1 = loader.fetchActiveFlows();
		ExecutableFlow flow1Result = activeFlows1.get(flow1.getExecutionId()).getSecond();
		Assert.assertNotNull(flow1Result);
		Assert.assertTrue(flow1 != flow1Result);
		Assert.assertEquals(flow1.getExecutionId(), flow1Result.getExecutionId());
		Assert.assertEquals(flow1.getEndTime(), flow1Result.getEndTime());
		Assert.assertEquals(flow1.getStartTime(), flow1Result.getStartTime());
		Assert.assertEquals(flow1.getSubmitTime(), flow1Result.getSubmitTime());
		Assert.assertEquals(flow1.getFlowId(), flow1Result.getFlowId());
		Assert.assertEquals(flow1.getProjectId(), flow1Result.getProjectId());
		Assert.assertEquals(flow1.getVersion(), flow1Result.getVersion());
		Assert.assertEquals(flow1.getExecutionOptions().getFailureAction(), flow1Result.getExecutionOptions().getFailureAction());
		
		ExecutableFlow flow1Result2 = activeFlows1.get(flow2.getExecutionId()).getSecond();
		Assert.assertNotNull(flow1Result2);
		Assert.assertTrue(flow2 != flow1Result2);
		Assert.assertEquals(flow2.getExecutionId(), flow1Result2.getExecutionId());
		Assert.assertEquals(flow2.getEndTime(), flow1Result2.getEndTime());
		Assert.assertEquals(flow2.getStartTime(), flow1Result2.getStartTime());
		Assert.assertEquals(flow2.getSubmitTime(), flow1Result2.getSubmitTime());
		Assert.assertEquals(flow2.getFlowId(), flow1Result2.getFlowId());
		Assert.assertEquals(flow2.getProjectId(), flow1Result2.getProjectId());
		Assert.assertEquals(flow2.getVersion(), flow1Result2.getVersion());
		Assert.assertEquals(flow2.getExecutionOptions().getFailureAction(), flow1Result2.getExecutionOptions().getFailureAction());
		
		loader.removeActiveExecutableReference(flow2.getExecutionId());
		Map<Integer, Pair<ExecutionReference,ExecutableFlow>> activeFlows2 = loader.fetchActiveFlows();

		Assert.assertTrue(activeFlows2.containsKey(flow1.getExecutionId()));
		Assert.assertFalse(activeFlows2.containsKey(flow3.getExecutionId()));
		Assert.assertFalse(activeFlows2.containsKey(flow2.getExecutionId()));
	}
	
	@Test
	public void testSmallUploadLog() throws ExecutorManagerException {
		File logDir = new File("unit/executions/logtest");
		File[] smalllog = {new File(logDir, "log1.log"), new File(logDir, "log2.log"), new File(logDir, "log3.log")};

		ExecutorLoader loader = createLoader();
		loader.uploadLogFile(1, "smallFiles", 0, smalllog);
		
		LogData data = loader.fetchLogs(1, "smallFiles", 0, 0, 50000);
		Assert.assertNotNull(data);
		Assert.assertEquals("Logs length is " + data.getLength(), data.getLength(), 53);
		
		System.out.println(data.toString());
	
		LogData data2 = loader.fetchLogs(1, "smallFiles", 0, 10, 20);
		System.out.println(data2.toString());
		Assert.assertNotNull(data2);
		Assert.assertEquals("Logs length is " + data2.getLength(), data2.getLength(), 20);

	}
	
	@Test
	public void testLargeUploadLog() throws ExecutorManagerException {
		File logDir = new File("unit/executions/logtest");
		
		// Multiple of 255 for Henry the Eigth
		File[] largelog = {new File(logDir, "largeLog1.log"), new File(logDir, "largeLog2.log"), new File(logDir, "largeLog3.log")};

		ExecutorLoader loader = createLoader();
		loader.uploadLogFile(1, "largeFiles",0, largelog);
		
		LogData logsResult = loader.fetchLogs(1, "largeFiles",0, 0, 64000);
		Assert.assertNotNull(logsResult);
		Assert.assertEquals("Logs length is " + logsResult.getLength(), logsResult.getLength(), 64000);
		
		LogData logsResult2 = loader.fetchLogs(1, "largeFiles",0, 1000, 64000);
		Assert.assertNotNull(logsResult2);
		Assert.assertEquals("Logs length is " + logsResult2.getLength(), logsResult2.getLength(), 64000);
		
		LogData logsResult3 = loader.fetchLogs(1, "largeFiles",0, 330000, 400000);
		Assert.assertNotNull(logsResult3);
		Assert.assertEquals("Logs length is " + logsResult3.getLength(), logsResult3.getLength(), 5493);
		
		LogData logsResult4 = loader.fetchLogs(1, "largeFiles",0, 340000, 400000);
		Assert.assertNull(logsResult4);
		
		LogData logsResult5 = loader.fetchLogs(1, "largeFiles",0, 153600, 204800);
		Assert.assertNotNull(logsResult5);
		Assert.assertEquals("Logs length is " + logsResult5.getLength(), logsResult5.getLength(), 181893);
		
		LogData logsResult6 = loader.fetchLogs(1, "largeFiles",0, 150000, 250000);
		Assert.assertNotNull(logsResult6);
		Assert.assertEquals("Logs length is " + logsResult6.getLength(), logsResult6.getLength(), 185493);
	}
	
	@SuppressWarnings("static-access")
	@Test
	public void testRemoveExecutionLogsByTime() throws ExecutorManagerException, IOException, InterruptedException {
		
		ExecutorLoader loader = createLoader();
		
		File logDir = new File("unit/executions/logtest");		
		
		// Multiple of 255 for Henry the Eigth
		File[] largelog = {new File(logDir, "largeLog1.log"), new File(logDir, "largeLog2.log"), new File(logDir, "largeLog3.log")};
		
		DateTime time1 = DateTime.now();
		loader.uploadLogFile(1, "oldlog", 0, largelog);
		// sleep for 5 seconds
		Thread.currentThread().sleep(5000);
		loader.uploadLogFile(2, "newlog", 0, largelog);
		
		DateTime time2 = time1.plusMillis(2500);
				
		int count = loader.removeExecutionLogsByTime(time2.getMillis());
		System.out.print("Removed " + count + " records");
		LogData logs = loader.fetchLogs(1, "oldlog", 0, 0, 22222);
		Assert.assertTrue(logs == null);
		logs = loader.fetchLogs(2, "newlog", 0, 0, 22222);
		Assert.assertFalse(logs == null);
	}
	
	private ExecutableFlow createExecutableFlow(int executionId, String flowName) throws IOException {
		File jsonFlowFile = new File(flowDir, flowName + ".flow");
		@SuppressWarnings("unchecked")
		HashMap<String, Object> flowObj = (HashMap<String, Object>) JSONUtils.parseJSONFromFile(jsonFlowFile);
		
		Flow flow = Flow.flowFromObject(flowObj);
		Project project = new Project(1, "flow");
		HashMap<String, Flow> flowMap = new HashMap<String, Flow>();
		flowMap.put(flow.getId(), flow);
		project.setFlows(flowMap);
		ExecutableFlow execFlow = new ExecutableFlow(project, flow);
		execFlow.setExecutionId(executionId);

		return execFlow;
	}
	
	private ExecutableFlow createExecutableFlow(String flowName) throws IOException {
		File jsonFlowFile = new File(flowDir, flowName + ".flow");
		@SuppressWarnings("unchecked")
		HashMap<String, Object> flowObj = (HashMap<String, Object>) JSONUtils.parseJSONFromFile(jsonFlowFile);
		
		Flow flow = Flow.flowFromObject(flowObj);
		Project project = new Project(1, "flow");
		HashMap<String, Flow> flowMap = new HashMap<String, Flow>();
		flowMap.put(flow.getId(), flow);
		project.setFlows(flowMap);
		ExecutableFlow execFlow = new ExecutableFlow(project, flow);

		return execFlow;
	}
	
	private ExecutorLoader createLoader() {
		Props props = new Props();
		props.put("database.type", "mysql");
		
		props.put("mysql.host", host);		
		props.put("mysql.port", port);
		props.put("mysql.user", user);
		props.put("mysql.database", database);
		props.put("mysql.password", password);
		props.put("mysql.numconnections", numConnections);
		
		return new JdbcExecutorLoader(props);
	}
	
	private boolean isTestSetup() {
		if (!testDBExists) {
			System.err.println("Skipping DB test because Db not setup.");
			return false;
		}
		
		System.out.println("Running DB test because Db setup.");
		return true;
	}
	
	public static class CountHandler implements ResultSetHandler<Integer> {
		@Override
		public Integer handle(ResultSet rs) throws SQLException {
			int val = 0;
			while (rs.next()) {
				val++;
			}
			
			return val;
		}
	}
	
}