JdbcScheduleLoaderTest.java

289 lines | 8.563 kB Blame History Raw Download
package azkaban.scheduler;

import java.sql.Connection;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;

import javax.sql.DataSource;

import junit.framework.Assert;

import org.apache.commons.dbutils.DbUtils;
import org.apache.commons.dbutils.QueryRunner;
import org.apache.commons.dbutils.ResultSetHandler;
import org.joda.time.DateTimeZone;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.Test;


import azkaban.utils.DataSourceUtils;
import azkaban.utils.Props;

public class JdbcScheduleLoaderTest {
	private static boolean testDBExists;
	private static final String host = "localhost";
	private static final int port = 3306;
	private static final String database = "azkaban2";
	private static final String user = "azkaban";
	private static final String password = "azkaban";
	private static final int numConnections = 10;
	
	@BeforeClass
	public static void setupDB() {
		DataSource dataSource = DataSourceUtils.getMySQLDataSource(host, port, database, user, password, numConnections);
		testDBExists = true;
		
		Connection connection = null;
		try {
			connection = dataSource.getConnection();
		} catch (SQLException e) {
			e.printStackTrace();
			testDBExists = false;
			DbUtils.closeQuietly(connection);
			return;
		}

		CountHandler countHandler = new CountHandler();
		QueryRunner runner = new QueryRunner();
		try {
			runner.query(connection, "SELECT COUNT(1) FROM schedules", countHandler);
		} catch (SQLException e) {
			e.printStackTrace();
			testDBExists = false;
			DbUtils.closeQuietly(connection);
			return;
		}
		finally {
			DbUtils.closeQuietly(connection);
		}
	}
	
	@AfterClass
	public static void clearDB() {
		if (!testDBExists) {
			return;
		}

		DataSource dataSource = DataSourceUtils.getMySQLDataSource(host, port, database, user, password, numConnections);
		Connection connection = null;
		try {
			connection = dataSource.getConnection();
		} catch (SQLException e) {
			e.printStackTrace();
			testDBExists = false;
			DbUtils.closeQuietly(connection);
			return;
		}

//		CountHandler countHandler = new CountHandler();
		QueryRunner runner = new QueryRunner();
		try {
			runner.update(connection, "DELETE FROM schedules");
		} catch (SQLException e) {
			e.printStackTrace();
			testDBExists = false;
			DbUtils.closeQuietly(connection);
			return;
		}
		finally {
			DbUtils.closeQuietly(connection);
		}
	}
	
	@Test
	public void testInsertAndLoadSchedule() throws ScheduleManagerException {
		if (!isTestSetup()) {
			return;
		}
		clearDB();
		
		JdbcScheduleLoader loader = createLoader();
		
		Map<String, Object> scheduleOptions = new HashMap<String, Object>();
		List<String> disabled = new ArrayList<String>();
		disabled.add("job1");
		disabled.add("job2");
		disabled.add("job3");
		List<String> failEmails = new ArrayList<String>();
		failEmails.add("email1");
		failEmails.add("email2");
		failEmails.add("email3");
		boolean hasSla = true;
		scheduleOptions.put("disabled", disabled);
		scheduleOptions.put("failEmails", failEmails);
		scheduleOptions.put("hasSla", hasSla);
		
		Schedule s1 = new Schedule(1, "proj1", "flow1", "ready", 11111, "America/Los_Angeles", "1d", 22222, 33333, 44444, "cyu", scheduleOptions);
		Schedule s2 = new Schedule(1, "proj1", "flow2", "ready", 11111, "America/Los_Angeles", "1d", 22222, 33333, 44444, "ccc", scheduleOptions);
		Schedule s3 = new Schedule(2, "proj1", "flow1", "ready", 11111, "America/Los_Angeles", "1d", 22222, 33333, 44444, "cyu", scheduleOptions);
		Schedule s4 = new Schedule(3, "proj2", "flow1", "ready", 11111, "America/Los_Angeles", "1d", 22222, 33333, 44444, "cyu", scheduleOptions);
		Schedule s5 = new Schedule(3, "proj2", "flow2", "ready", 11111, "America/Los_Angeles", "1d", 22222, 33333, 44444, "cyu", scheduleOptions);
		Schedule s6 = new Schedule(3, "proj2", "flow3", "error", 11111, "America/Los_Angeles", "1d", 22222, 33333, 44444, "cyu", scheduleOptions);
		
		loader.insertSchedule(s1);
		loader.insertSchedule(s2);
		loader.insertSchedule(s3);
		loader.insertSchedule(s4);
		loader.insertSchedule(s5);
		loader.insertSchedule(s6);
		
		List<Schedule> schedules = loader.loadSchedules();
		
		Assert.assertEquals(6, schedules.size());
		Assert.assertEquals("America/Los_Angeles", schedules.get(0).getTimezone().getID());
		Assert.assertEquals(44444, schedules.get(0).getSubmitTime());
		Assert.assertEquals("1d", Schedule.createPeriodString(schedules.get(0).getPeriod()));
		System.out.println("the options are " + schedules.get(0).getSchedOptions());
		Assert.assertEquals(true, schedules.get(0).getSchedOptions().get("hasSla"));
	}
	
	@Test
	public void testInsertAndUpdateSchedule() throws ScheduleManagerException {
		if (!isTestSetup()) {
			return;
		}
		clearDB();
		
		JdbcScheduleLoader loader = createLoader();
		
		Map<String, Object> scheduleOptions = new HashMap<String, Object>();
		List<String> disabled = new ArrayList<String>();
		disabled.add("job1");
		disabled.add("job2");
		disabled.add("job3");
		List<String> failEmails = new ArrayList<String>();
		failEmails.add("email1");
		failEmails.add("email2");
		failEmails.add("email3");
		boolean hasSla = true;
		scheduleOptions.put("disabled", disabled);
		scheduleOptions.put("failEmails", failEmails);
		scheduleOptions.put("hasSla", hasSla);
		
		System.out.println("the options are " + scheduleOptions);
		Schedule s1 = new Schedule(1, "proj1", "flow1", "ready", 11111, "America/Los_Angeles", "1d", 22222, 33333, 44444, "cyu", scheduleOptions);

		loader.insertSchedule(s1);
		
		hasSla = false;
		scheduleOptions.put("hasSla", hasSla);
		
		Schedule s2 = new Schedule(1, "proj1", "flow1", "ready", 11112, "America/Los_Angeles", "2M", 22223, 33334, 44445, "cyu", scheduleOptions);

		loader.updateSchedule(s2);
		
		List<Schedule> schedules = loader.loadSchedules();
		
		Assert.assertEquals(1, schedules.size());
		Assert.assertEquals("America/Los_Angeles", schedules.get(0).getTimezone().getID());
		Assert.assertEquals(44445, schedules.get(0).getSubmitTime());
		Assert.assertEquals("2M", Schedule.createPeriodString(schedules.get(0).getPeriod()));
		System.out.println("the options are " + schedules.get(0).getSchedOptions());
		Assert.assertEquals(false, schedules.get(0).getSchedOptions().get("hasSla"));
	}
	
	@Test
	public void testInsertAndRemoveSchedule() {
		if (!testDBExists) {
			return;
		}
		
		clearDB();
		
		JdbcScheduleLoader loader = createLoader();
		
		List<Schedule> schedules = new ArrayList<Schedule>();
		
		int stress = 10;
		
		for(int i=0; i<stress; i++)
		{
			Map<String, Object> scheduleOptions = new HashMap<String, Object>();
			List<String> disabled = new ArrayList<String>();
			disabled.add("job1");
			disabled.add("job2");
			disabled.add("job3");
			List<String> failEmails = new ArrayList<String>();
			failEmails.add("email1");
			failEmails.add("email2");
			failEmails.add("email3");
			boolean hasSla = true;
			scheduleOptions.put("disabled", disabled);
			scheduleOptions.put("failEmails", failEmails);
			scheduleOptions.put("hasSla", hasSla);
			
			Schedule s = new Schedule(i+1, "proj"+(i+1), "flow1", "ready", 11111, "America/Los_Angeles", "1d", 22222, 33333, 44444, "cyu", scheduleOptions);
			schedules.add(s);
			try {
				loader.insertSchedule(s);
			} catch (ScheduleManagerException e) {
				// TODO Auto-generated catch block
				e.printStackTrace();
			}
		}
		
		for(Schedule s : schedules)
		{
			try {
				loader.removeSchedule(s);
			} catch (ScheduleManagerException e) {
				// TODO Auto-generated catch block
				e.printStackTrace();
			}
		}
		
		// should have cleaned up
		List<Schedule> otherSchedules = null;
		try {
			otherSchedules = loader.loadSchedules();
		} catch (ScheduleManagerException e) {
			// TODO Auto-generated catch block
			e.printStackTrace();
		}
		Assert.assertEquals(0, otherSchedules.size());
		
	}
	
	
	private JdbcScheduleLoader createLoader() {
		Props props = new Props();
		props.put("database.type", "mysql");
		
		props.put("mysql.host", host);		
		props.put("mysql.port", port);
		props.put("mysql.user", user);
		props.put("mysql.database", database);
		props.put("mysql.password", password);
		props.put("mysql.numconnections", numConnections);
		
		return new JdbcScheduleLoader(props);
	}
	
	private boolean isTestSetup() {
		if (!testDBExists) {
			System.err.println("Skipping DB test because Db not setup.");
			return false;
		}
		
		System.out.println("Running DB test because Db setup.");
		return true;
	}
	
	public static class CountHandler implements ResultSetHandler<Integer> {
		@Override
		public Integer handle(ResultSet rs) throws SQLException {
			int val = 0;
			while (rs.next()) {
				val++;
			}
			
			return val;
		}
		
	}
}