JdbcSLALoader.java

284 lines | 8.544 kB Blame History Raw Download
package azkaban.sla;

import java.io.IOException;
import java.sql.Connection;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;

import org.apache.commons.dbutils.DbUtils;
import org.apache.commons.dbutils.QueryRunner;
import org.apache.commons.dbutils.ResultSetHandler;
import org.apache.log4j.Logger;
import org.joda.time.DateTime;

import azkaban.sla.SLA.SlaRule;
import azkaban.database.AbstractJdbcLoader;
import azkaban.utils.GZIPUtils;
import azkaban.utils.JSONUtils;
import azkaban.utils.Props;

public class JdbcSLALoader extends AbstractJdbcLoader implements SLALoader {
	private static final Logger logger = Logger.getLogger(JdbcSLALoader.class);
	private EncodingType defaultEncodingType = EncodingType.GZIP;
	
	private static String slaTblName = "active_sla";
	
	final private static String LOAD_ALL_SLA =
			"SELECT exec_id, job_name, check_time, rule, enc_type, options FROM " + slaTblName;
	
	final private static String INSERT_SLA = 
			"INSERT INTO " + slaTblName + " ( exec_id, job_name, check_time, rule, enc_type, options) values (?,?,?,?,?,?)";

//	final private static String UPDATE_SLA = 
//			"UPDATE " + slaTblName + " SET exec_id, job_name, check_time, rule, enc_type, options) values (?,?,?,?,?,?)";
//	
	// use "" for flow
	final private static String REMOVE_SLA = 
			"DELETE FROM " + slaTblName + " WHERE exec_id=? AND job_name=? AND check_time=? AND rule=?";
	
	public JdbcSLALoader(Props props) {
		super(props);
	}

	private Connection getConnection() throws SLAManagerException {
		Connection connection = null;
		try {
			connection = super.getDBConnection(false);
		} catch (Exception e) {
			DbUtils.closeQuietly(connection);
			throw new SLAManagerException("Error getting DB connection.", e);
		}
		
		return connection;
	}
	
	public EncodingType getDefaultEncodingType() {
		return defaultEncodingType;
	}

	public void setDefaultEncodingType(EncodingType defaultEncodingType) {
		this.defaultEncodingType = defaultEncodingType;
	}

	@Override
	public List<SLA> loadSLAs() throws SLAManagerException {
		logger.info("Loading all SLAs from db.");
		
		Connection connection = getConnection();
		List<SLA> SLAs;
		try{
			SLAs= loadSLAs(connection, defaultEncodingType);
		}
		catch(SLAManagerException e) {
			throw new SLAManagerException("Failed to load SLAs" + e.getCause());
		}
		finally{
			DbUtils.closeQuietly(connection);
		}
				
		logger.info("Loaded " + SLAs.size() + " SLAs.");
		
		return SLAs;

	}

	public List<SLA> loadSLAs(Connection connection, EncodingType encType) throws SLAManagerException {
		logger.info("Loading all SLAs from db.");
		
		QueryRunner runner = new QueryRunner();
		ResultSetHandler<List<SLA>> handler = new SLAResultHandler();
		List<SLA> SLAs;
		
		try {
			SLAs = runner.query(connection, LOAD_ALL_SLA, handler);
		} catch (SQLException e) {
			logger.error(LOAD_ALL_SLA + " failed.");
			throw new SLAManagerException("Load SLAs from db failed. "+ e.getCause());
		}
		
		return SLAs;

	}
	
	@Override
	public void removeSLA(SLA s) throws SLAManagerException {
		Connection connection = getConnection();
		try{
			removeSLA(connection, s);
		}
		catch(SLAManagerException e) {
			logger.error(LOAD_ALL_SLA + " failed.");
			throw new SLAManagerException("Load SLAs from db failed. "+ e.getCause());
		}
		finally {
			DbUtils.closeQuietly(connection);
		}
		
	}
	
	private void removeSLA(Connection connection, SLA s) throws SLAManagerException {

		logger.info("Removing SLA " + s.toString() + " from db.");

		QueryRunner runner = createQueryRunner();
	
		try {
			int removes =  runner.update(REMOVE_SLA, s.getExecId(), s.getJobName(), s.getCheckTime().getMillis(), s.getRule().getNumVal());
			if (removes == 0) {
				throw new SLAManagerException("No schedule has been removed.");
			}
		} catch (SQLException e) {
			logger.error("Remove SLA failed. " + s.toString());
			throw new SLAManagerException("Remove SLA " + s.toString() + " from db failed. "+ e.getCause());
		}
		
	}

	@Override
	public	void insertSLA(SLA s) throws SLAManagerException {
		Connection connection = getConnection();
		try{
			insertSLA(connection, s, defaultEncodingType);
		}
		catch(SLAManagerException e){
			logger.error("Insert SLA failed. " + s.toString());
			throw new SLAManagerException("Insert SLA " + s.toString() + " into db failed. "+ e.getCause() + e.getMessage() + e.getStackTrace());
		}
		finally {
			DbUtils.closeQuietly(connection);
		}
	}
	
	private void insertSLA(Connection connection, SLA s, EncodingType encType) throws SLAManagerException {

		logger.debug("Inserting new SLA into DB. " + s.toString());
		
		QueryRunner runner = new QueryRunner();

		String json = JSONUtils.toJSON(s.optionToObject());
		byte[] data = null;
		try {
			byte[] stringData = json.getBytes("UTF-8");
			data = stringData;
	
			if (encType == EncodingType.GZIP) {
				data = GZIPUtils.gzipBytes(stringData);
			}
			logger.debug("NumChars: " + json.length() + " UTF-8:" + stringData.length + " Gzip:"+ data.length);
		}
		catch (IOException e) {
			throw new SLAManagerException("Error encoding the sla options.");
		}
		
		try {
			int inserts = runner.update(connection, INSERT_SLA, s.getExecId(), s.getJobName(), s.getCheckTime().getMillis(), s.getRule().getNumVal(), encType.getNumVal(), data);

			if (inserts == 0) {
				throw new SLAManagerException("No SLA has been inserted. Insertion failed.");
			}
		} catch (SQLException e) {
			logger.error("Insert SLA failed.");
			throw new SLAManagerException("Insert sla " + s.toString() + " into db failed. " + e.getCause());
		}
		
	}
	
//	@Override
//	public	void updateSLA(SLA s) throws SLAManagerException {
//		Connection connection = getConnection();
//		try{
//			insertSLA(connection, s, defaultEncodingType);
//		}
//		catch(SLAManagerException e){
//			logger.error("Insert SLA failed. " + s.toString());
//			throw new SLAManagerException("Insert SLA " + s.toString() + " into db failed. "+ e.getCause() + e.getMessage() + e.getStackTrace());
//		}
//		finally {
//			DbUtils.closeQuietly(connection);
//		}
//	}
//	
//	private void updateSLA(Connection connection, SLA s, EncodingType encType) throws SLAManagerException {
//
//		logger.debug("Inserting new SLA into DB. " + s.toString());
//		
//		QueryRunner runner = new QueryRunner();
//
//		String json = JSONUtils.toJSON(s.optionToObject());
//		byte[] data = null;
//		try {
//			byte[] stringData = json.getBytes("UTF-8");
//			data = stringData;
//	
//			if (encType == EncodingType.GZIP) {
//				data = GZIPUtils.gzipBytes(stringData);
//			}
//			logger.debug("NumChars: " + json.length() + " UTF-8:" + stringData.length + " Gzip:"+ data.length);
//		}
//		catch (IOException e) {
//			throw new SLAManagerException("Error encoding the sla options.");
//		}
//		
//		try {
//			int inserts = runner.update(connection, INSERT_SLA, s.getExecId(), s.getJobName(), s.getCheckTime().getMillis(), s.getRule().getNumVal(), encType.getNumVal(), data);
//
//			if (inserts == 0) {
//				throw new SLAManagerException("No SLA has been inserted. Insertion failed.");
//			}
//		} catch (SQLException e) {
//			logger.error("Insert SLA failed.");
//			throw new SLAManagerException("Insert sla " + s.toString() + " into db failed. " + e.getCause());
//		}
//		
//	}
	
	public class SLAResultHandler implements ResultSetHandler<List<SLA>> {
		@Override
		public List<SLA> handle(ResultSet rs) throws SQLException {
			if (!rs.next()) {
				return Collections.<SLA>emptyList();
			}
			
			ArrayList<SLA> SLAs = new ArrayList<SLA>();
			
			do {
				int execId = rs.getInt(1);
				String jobName = rs.getString(2);
				DateTime checkTime = new DateTime(rs.getLong(3));
				SlaRule rule = SlaRule.fromInteger(rs.getInt(4));
				int encodingType = rs.getInt(5);
				byte[] data = rs.getBytes(6);
				
				SLA s = null;

				EncodingType encType = EncodingType.fromInteger(encodingType);
				Object optsObj;
				try {
					// Convoluted way to inflate strings. Should find common package or helper function.
					if (encType == EncodingType.GZIP) {
						// Decompress the sucker.
						String jsonString = GZIPUtils.unGzipString(data, "UTF-8");
						optsObj = JSONUtils.parseJSONFromString(jsonString);
					}
					else {
						String jsonString = new String(data, "UTF-8");
						optsObj = JSONUtils.parseJSONFromString(jsonString);
					}						
					s = SLA.createSlaFromObject(execId, jobName, checkTime, rule, optsObj);
				} catch (IOException e) {
					throw new SQLException("Error reconstructing SLA options. " + execId + " " + jobName + " " + checkTime.toDateTimeISO() + e.getCause());
				}
				SLAs.add(s);

			} while (rs.next());
			
			return SLAs;
		}
		
	}

}