JdbcScheduleLoader.java

364 lines | 11.676 kB Blame History Raw Download
/*
 * Copyright 2012 LinkedIn, Inc
 * 
 * Licensed under the Apache License, Version 2.0 (the "License"); you may not
 * use this file except in compliance with the License. You may obtain a copy of
 * the License at
 * 
 * http://www.apache.org/licenses/LICENSE-2.0
 * 
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
 * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
 * License for the specific language governing permissions and limitations under
 * the License.
 */

package azkaban.migration.scheduler;


import azkaban.database.DataSourceUtils;
import azkaban.utils.GZIPUtils;
import azkaban.utils.JSONUtils;
import azkaban.utils.Props;
import java.io.IOException;
import java.sql.Connection;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import javax.sql.DataSource;
import org.apache.commons.dbutils.DbUtils;
import org.apache.commons.dbutils.QueryRunner;
import org.apache.commons.dbutils.ResultSetHandler;
import org.apache.log4j.Logger;
import org.joda.time.DateTimeZone;
import org.joda.time.ReadablePeriod;

@Deprecated
public class JdbcScheduleLoader implements ScheduleLoader {

	private static Logger logger = Logger.getLogger(JdbcScheduleLoader.class);
	
	public static enum EncodingType {
		PLAIN(1), GZIP(2);

		private int numVal;

		EncodingType(int numVal) {
			this.numVal = numVal;
		}

		public int getNumVal() {
			return numVal;
		}

		public static EncodingType fromInteger(int x) {
			switch (x) {
			case 1:
				return PLAIN;
			case 2:
				return GZIP;
			default:
				return PLAIN;
			}
		}
	}
	
	private DataSource dataSource;
	private EncodingType defaultEncodingType = EncodingType.GZIP;
	
	private static final String scheduleTableName = "schedules";

	private static String SELECT_ALL_SCHEDULES =
			"SELECT project_id, project_name, flow_name, status, first_sched_time, timezone, period, last_modify_time, next_exec_time, submit_time, submit_user, enc_type, schedule_options FROM " + scheduleTableName;
	
	private static String INSERT_SCHEDULE = 
			"INSERT INTO " + scheduleTableName + " ( project_id, project_name, flow_name, status, first_sched_time, timezone, period, last_modify_time, next_exec_time, submit_time, submit_user, enc_type, schedule_options) values (?,?,?,?,?,?,?,?,?,?,?,?,?)";
	
	private static String REMOVE_SCHEDULE_BY_KEY = 
			"DELETE FROM " + scheduleTableName + " WHERE project_id=? AND flow_name=?";
	
	private static String UPDATE_SCHEDULE_BY_KEY = 
			"UPDATE " + scheduleTableName + " SET status=?, first_sched_time=?, timezone=?, period=?, last_modify_time=?, next_exec_time=?, submit_time=?, submit_user=?, enc_type=?, schedule_options=? WHERE project_id=? AND flow_name=?";
	
	private static String UPDATE_NEXT_EXEC_TIME = 
			"UPDATE " + scheduleTableName + " SET next_exec_time=? WHERE project_id=? AND flow_name=?";

	private Connection getConnection() throws ScheduleManagerException {
		Connection connection = null;
		try {
			connection = dataSource.getConnection();
		} catch (Exception e) {
			DbUtils.closeQuietly(connection);
			throw new ScheduleManagerException("Error getting DB connection.", e);
		}
		
		return connection;
	}
	
	public EncodingType getDefaultEncodingType() {
		return defaultEncodingType;
	}

	public void setDefaultEncodingType(EncodingType defaultEncodingType) {
		this.defaultEncodingType = defaultEncodingType;
	}
	
	public JdbcScheduleLoader(Props props) {
		String databaseType = props.getString("database.type");
		
		if (databaseType.equals("mysql")) {
			int port = props.getInt("mysql.port");
			String host = props.getString("mysql.host");
			String database = props.getString("mysql.database");
			String user = props.getString("mysql.user");
			String password = props.getString("mysql.password");
			int numConnections = props.getInt("mysql.numconnections");
			
			dataSource = DataSourceUtils.getMySQLDataSource(host, port, database, user, password, numConnections);
		}
	}

	@Override
	public List<Schedule> loadSchedules() throws ScheduleManagerException {
		logger.info("Loading all schedules from db.");
		Connection connection = getConnection();

		QueryRunner runner = new QueryRunner();
		ResultSetHandler<List<Schedule>> handler = new ScheduleResultHandler();
	
		List<Schedule> schedules;
		
		try {
			schedules = runner.query(connection, SELECT_ALL_SCHEDULES, handler);
		} catch (SQLException e) {
			logger.error(SELECT_ALL_SCHEDULES + " failed.");

			DbUtils.closeQuietly(connection);
			throw new ScheduleManagerException("Loading schedules from db failed. ", e);
		} finally {
			DbUtils.closeQuietly(connection);
		}
		
		logger.info("Now trying to update the schedules");
		
		// filter the schedules
        Iterator<Schedule> scheduleIterator = schedules.iterator();
        while (scheduleIterator.hasNext()) {
            Schedule sched = scheduleIterator.next();
			if(!sched.updateTime()) {
				logger.info("Schedule " + sched.getScheduleName() + " was scheduled before azkaban start, skipping it.");
				scheduleIterator.remove();
				removeSchedule(sched);
			}
			else {
				logger.info("Recurring schedule, need to update next exec time");
				try {
					updateNextExecTime(sched);
				} catch (Exception e) {
					e.printStackTrace();
					throw new ScheduleManagerException("Update next execution time failed.", e);
				} 
				logger.info("Schedule " + sched.getScheduleName() + " loaded and updated.");
			}
		}
		
		
				
		logger.info("Loaded " + schedules.size() + " schedules.");
		
		return schedules;
	}

	@Override
	public void removeSchedule(Schedule s) throws ScheduleManagerException {		
		logger.info("Removing schedule " + s.getScheduleName() + " from db.");

		QueryRunner runner = new QueryRunner(dataSource);
	
		try {
			int removes =  runner.update(REMOVE_SCHEDULE_BY_KEY, s.getProjectId(), s.getFlowName());
			if (removes == 0) {
				throw new ScheduleManagerException("No schedule has been removed.");
			}
		} catch (SQLException e) {
			logger.error(REMOVE_SCHEDULE_BY_KEY + " failed.");
			throw new ScheduleManagerException("Remove schedule " + s.getScheduleName() + " from db failed. ", e);
		}
	}
	
	
	public void insertSchedule(Schedule s) throws ScheduleManagerException {
		logger.info("Inserting schedule " + s.getScheduleName() + " into db.");
		insertSchedule(s, defaultEncodingType);
	}

	public void insertSchedule(Schedule s, EncodingType encType) throws ScheduleManagerException {
		
		String json = JSONUtils.toJSON(s.optionsToObject());
		byte[] data = null;
		try {
			byte[] stringData = json.getBytes("UTF-8");
			data = stringData;
	
			if (encType == EncodingType.GZIP) {
				data = GZIPUtils.gzipBytes(stringData);
			}
			logger.debug("NumChars: " + json.length() + " UTF-8:" + stringData.length + " Gzip:"+ data.length);
		}
		catch (IOException e) {
			throw new ScheduleManagerException("Error encoding the schedule options. " + s.getScheduleName());
		}
		
		QueryRunner runner = new QueryRunner(dataSource);
		try {
			int inserts =  runner.update( 
					INSERT_SCHEDULE, 
					s.getProjectId(),
					s.getProjectName(),
					s.getFlowName(), 
					s.getStatus(), 
					s.getFirstSchedTime(), 
					s.getTimezone().getID(), 
					Schedule.createPeriodString(s.getPeriod()), 
					s.getLastModifyTime(), 
					s.getNextExecTime(), 
					s.getSubmitTime(), 
					s.getSubmitUser(),
					encType.getNumVal(),
					data);
			if (inserts == 0) {
				throw new ScheduleManagerException("No schedule has been inserted.");
			}
		} catch (SQLException e) {
			logger.error(INSERT_SCHEDULE + " failed.");
			throw new ScheduleManagerException("Insert schedule " + s.getScheduleName() + " into db failed. ", e);
		}
	}
	
	@Override
	public void updateNextExecTime(Schedule s) throws ScheduleManagerException 
	{
		logger.info("Update schedule " + s.getScheduleName() + " into db. ");
		Connection connection = getConnection();
		QueryRunner runner = new QueryRunner();
		try {
			
			runner.update(connection, UPDATE_NEXT_EXEC_TIME, s.getNextExecTime(), s.getProjectId(), s.getFlowName()); 
		} catch (SQLException e) {
			e.printStackTrace();
			logger.error(UPDATE_NEXT_EXEC_TIME + " failed.", e);
			throw new ScheduleManagerException("Update schedule " + s.getScheduleName() + " into db failed. ", e);
		} finally {
			DbUtils.closeQuietly(connection);
		}
	}
	
	@Override
	public void updateSchedule(Schedule s) throws ScheduleManagerException {
		logger.info("Updating schedule " + s.getScheduleName() + " into db.");
		updateSchedule(s, defaultEncodingType);
	}
		
	public void updateSchedule(Schedule s, EncodingType encType) throws ScheduleManagerException {

		String json = JSONUtils.toJSON(s.optionsToObject());
		byte[] data = null;
		try {
			byte[] stringData = json.getBytes("UTF-8");
			data = stringData;
	
			if (encType == EncodingType.GZIP) {
				data = GZIPUtils.gzipBytes(stringData);
			}
			logger.debug("NumChars: " + json.length() + " UTF-8:" + stringData.length + " Gzip:"+ data.length);
		}
		catch (IOException e) {
			throw new ScheduleManagerException("Error encoding the schedule options " + s.getScheduleName());
		}

		QueryRunner runner = new QueryRunner(dataSource);
	
		try {
			int updates =  runner.update( 
					UPDATE_SCHEDULE_BY_KEY, 
					s.getStatus(), 
					s.getFirstSchedTime(), 
					s.getTimezone().getID(), 
					Schedule.createPeriodString(s.getPeriod()), 
					s.getLastModifyTime(), 
					s.getNextExecTime(), 
					s.getSubmitTime(), 
					s.getSubmitUser(), 	
					encType.getNumVal(),
					data,
					s.getProjectId(), 
					s.getFlowName());
			if (updates == 0) {
				throw new ScheduleManagerException("No schedule has been updated.");
			}
		} catch (SQLException e) {
			logger.error(UPDATE_SCHEDULE_BY_KEY + " failed.");
			throw new ScheduleManagerException("Update schedule " + s.getScheduleName() + " into db failed. ", e);
		}
	}

	public class ScheduleResultHandler implements ResultSetHandler<List<Schedule>> {
		@Override
		public List<Schedule> handle(ResultSet rs) throws SQLException {
			if (!rs.next()) {
				return Collections.<Schedule>emptyList();
			}
			
			ArrayList<Schedule> schedules = new ArrayList<Schedule>();
			do {
				int projectId = rs.getInt(1);
				String projectName = rs.getString(2);
				String flowName = rs.getString(3);
				String status = rs.getString(4);
				long firstSchedTime = rs.getLong(5);
				DateTimeZone timezone = DateTimeZone.forID(rs.getString(6));
				ReadablePeriod period = Schedule.parsePeriodString(rs.getString(7));
				long lastModifyTime = rs.getLong(8);
				long nextExecTime = rs.getLong(9);
				long submitTime = rs.getLong(10);
				String submitUser = rs.getString(11);
				int encodingType = rs.getInt(12);
				byte[] data = rs.getBytes(13);
				
				Object optsObj = null;
				if (data != null) {
					EncodingType encType = EncodingType.fromInteger(encodingType);

					try {
						// Convoluted way to inflate strings. Should find common package or helper function.
						if (encType == EncodingType.GZIP) {
							// Decompress the sucker.
							String jsonString = GZIPUtils.unGzipString(data, "UTF-8");
							optsObj = JSONUtils.parseJSONFromString(jsonString);
						}
						else {
							String jsonString = new String(data, "UTF-8");
							optsObj = JSONUtils.parseJSONFromString(jsonString);
						}	
					} catch (IOException e) {
						throw new SQLException("Error reconstructing schedule options " + projectName + "." + flowName);
					}
				}
				
				Schedule s = new Schedule(projectId, projectName, flowName, status, firstSchedTime, timezone, period, lastModifyTime, nextExecTime, submitTime, submitUser);
				if (optsObj != null) {
					s.createAndSetScheduleOptions(optsObj);
				}
				
				schedules.add(s);
			} while (rs.next());
			
			return schedules;
		}
		
	}
}