JdbcExecutorLoader.java

1301 lines | 36.131 kB Blame History Raw Download
/*
 * Copyright 2012 LinkedIn Corp.
 * 
 * Licensed under the Apache License, Version 2.0 (the "License"); you may not
 * use this file except in compliance with the License. You may obtain a copy of
 * the License at
 * 
 * http://www.apache.org/licenses/LICENSE-2.0
 * 
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
 * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
 * License for the specific language governing permissions and limitations under
 * the License.
 */

package azkaban.executor;

import java.io.BufferedInputStream;
import java.io.ByteArrayOutputStream;
import java.io.File;
import java.io.FileInputStream;
import java.io.IOException;
import java.sql.Connection;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

import org.apache.commons.dbutils.DbUtils;
import org.apache.commons.dbutils.QueryRunner;
import org.apache.commons.dbutils.ResultSetHandler;
import org.apache.commons.io.IOUtils;
import org.apache.commons.io.FileUtils;
import org.apache.log4j.Logger;
import org.joda.time.DateTime;

import azkaban.database.AbstractJdbcLoader;
import azkaban.utils.FileIOUtils;
import azkaban.utils.FileIOUtils.LogData;
import azkaban.utils.GZIPUtils;
import azkaban.utils.JSONUtils;
import azkaban.utils.Pair;
import azkaban.utils.Props;
import azkaban.utils.PropsUtils;

public class JdbcExecutorLoader extends AbstractJdbcLoader 
		implements ExecutorLoader {
	private static final Logger logger = 
			Logger.getLogger(JdbcExecutorLoader.class);

	private EncodingType defaultEncodingType = EncodingType.GZIP;
	
	public JdbcExecutorLoader(Props props) {
		super(props);
	}

	public EncodingType getDefaultEncodingType() {
		return defaultEncodingType;
	}

	public void setDefaultEncodingType(EncodingType defaultEncodingType) {
		this.defaultEncodingType = defaultEncodingType;
	}
	
	@Override
	public synchronized void uploadExecutableFlow(ExecutableFlow flow) 
			throws ExecutorManagerException {
		Connection connection = getConnection();
		try {
			uploadExecutableFlow(connection, flow, defaultEncodingType);
		}
		catch (IOException e) {
			throw new ExecutorManagerException("Error uploading flow", e);
		}
		finally {
			DbUtils.closeQuietly(connection);
		}
	}
	
	private synchronized void uploadExecutableFlow(Connection connection, 
			ExecutableFlow flow, EncodingType encType) 
			throws ExecutorManagerException, IOException {
		final String INSERT_EXECUTABLE_FLOW = 
				"INSERT INTO execution_flows " + 
						"(project_id, flow_id, version, status, submit_time, submit_user, update_time) " + 
						"values (?,?,?,?,?,?,?)";
		QueryRunner runner = new QueryRunner();
		long submitTime = System.currentTimeMillis();

		long id;
		try {
			flow.setStatus(Status.PREPARING);
			runner.update(
					connection, 
					INSERT_EXECUTABLE_FLOW, 
					flow.getProjectId(), 
					flow.getFlowId(), 
					flow.getVersion(), 
					Status.PREPARING.getNumVal(), 
					submitTime, 
					flow.getSubmitUser(), 
					submitTime);
			connection.commit();
			id = runner.query(
					connection, LastInsertID.LAST_INSERT_ID, new LastInsertID());

			if (id == -1l) {
				throw new ExecutorManagerException("Execution id is not properly created.");
			}
			logger.info("Flow given " + flow.getFlowId() + " given id " + id);
			flow.setExecutionId((int)id);
			
			updateExecutableFlow(connection, flow, encType);
		}
		catch (SQLException e) {
			throw new ExecutorManagerException("Error creating execution.", e);
		}
	}
	
	@Override
	public void updateExecutableFlow(ExecutableFlow flow) 
			throws ExecutorManagerException {
		Connection connection = this.getConnection();
		
		try {
			updateExecutableFlow(connection, flow, defaultEncodingType);
		}
		finally {
			DbUtils.closeQuietly(connection);
		}
	} 
	
	private void updateExecutableFlow(
			Connection connection, ExecutableFlow flow, EncodingType encType) 
			throws ExecutorManagerException {
		final String UPDATE_EXECUTABLE_FLOW_DATA = 
				"UPDATE execution_flows " + 
						"SET status=?,update_time=?,start_time=?,end_time=?,enc_type=?,flow_data=? " + 
						"WHERE exec_id=?";
		QueryRunner runner = new QueryRunner();
		
		String json = JSONUtils.toJSON(flow.toObject());
		byte[] data = null;
		try {
			byte[] stringData = json.getBytes("UTF-8");
			data = stringData;
	
			if (encType == EncodingType.GZIP) {
				data = GZIPUtils.gzipBytes(stringData);
			}
		}
		catch (IOException e) {
			throw new ExecutorManagerException("Error encoding the execution flow.");
		}
		
		try {
			runner.update(
					connection, 
					UPDATE_EXECUTABLE_FLOW_DATA, 
					flow.getStatus().getNumVal(), 
					flow.getUpdateTime(), 
					flow.getStartTime(), 
					flow.getEndTime(), 
					encType.getNumVal(), 
					data, 
					flow.getExecutionId());
			connection.commit();
		}
		catch (SQLException e) {
			throw new ExecutorManagerException("Error updating flow.", e);
		}
	}
	
	@Override
	public ExecutableFlow fetchExecutableFlow(int id) 
			throws ExecutorManagerException {
		QueryRunner runner = createQueryRunner();
		FetchExecutableFlows flowHandler = new FetchExecutableFlows();

		try {
			List<ExecutableFlow> properties = runner.query(
					FetchExecutableFlows.FETCH_EXECUTABLE_FLOW, flowHandler, id);
			return properties.get(0);
		}
		catch (SQLException e) {
			throw new ExecutorManagerException("Error fetching flow id " + id, e);
		}
	}
	
	@Override
	public Map<Integer, Pair<ExecutionReference, ExecutableFlow>> fetchActiveFlows() 
			throws ExecutorManagerException {
		QueryRunner runner = createQueryRunner();
		FetchActiveExecutableFlows flowHandler = new FetchActiveExecutableFlows();

		try {
			Map<Integer, Pair<ExecutionReference, ExecutableFlow>> properties = 
					runner.query(
							FetchActiveExecutableFlows.FETCH_ACTIVE_EXECUTABLE_FLOW, 
							flowHandler);
			return properties;
		}
		catch (SQLException e) {
			throw new ExecutorManagerException("Error fetching active flows", e);
		}
	}
	
	@Override
	public int fetchNumExecutableFlows() throws ExecutorManagerException {
		QueryRunner runner = createQueryRunner();
		
		IntHandler intHandler = new IntHandler();
		try {
			int count = runner.query(IntHandler.NUM_EXECUTIONS, intHandler);
			return count;
		}
		catch (SQLException e) {
			throw new ExecutorManagerException("Error fetching num executions", e);
		}
	}
	
	@Override
	public int fetchNumExecutableFlows(int projectId, String flowId) 
			throws ExecutorManagerException {
		QueryRunner runner = createQueryRunner();
		
		IntHandler intHandler = new IntHandler();
		try {
			int count = runner.query(
					IntHandler.NUM_FLOW_EXECUTIONS, intHandler, projectId, flowId);
			return count;
		}
		catch (SQLException e) {
			throw new ExecutorManagerException("Error fetching num executions", e);
		}
	}
	
	@Override
	public int fetchNumExecutableNodes(int projectId, String jobId) 
			throws ExecutorManagerException {
		QueryRunner runner = createQueryRunner();
		
		IntHandler intHandler = new IntHandler();
		try {
			int count = runner.query(
					IntHandler.NUM_JOB_EXECUTIONS, intHandler, projectId, jobId);
			return count;
		}
		catch (SQLException e) {
			throw new ExecutorManagerException("Error fetching num executions", e);
		}
	}
	
	@Override
	public List<ExecutableFlow> fetchFlowHistory(int projectId, String flowId, 
			int skip, int num) throws ExecutorManagerException {
		QueryRunner runner = createQueryRunner();
		FetchExecutableFlows flowHandler = new FetchExecutableFlows();

		try {
			List<ExecutableFlow> properties = runner.query(
					FetchExecutableFlows.FETCH_EXECUTABLE_FLOW_HISTORY, 
					flowHandler, 
					projectId, 
					flowId, 
					skip, 
					num);
			return properties;
		}
		catch (SQLException e) {
			throw new ExecutorManagerException("Error fetching active flows", e);
		}
	}
	
	@Override
	public List<ExecutableFlow> fetchFlowHistory(
			int projectId, String flowId, int skip, int num, Status status) 
			throws ExecutorManagerException {
		QueryRunner runner = createQueryRunner();
		FetchExecutableFlows flowHandler = new FetchExecutableFlows();

		try {
			List<ExecutableFlow> properties = runner.query(
					FetchExecutableFlows.FETCH_EXECUTABLE_FLOW_BY_STATUS, 
					flowHandler, 
					projectId, 
					flowId, 
					status.getNumVal(), 
					skip, 
					num);
			return properties;
		}
		catch (SQLException e) {
			throw new ExecutorManagerException("Error fetching active flows", e);
		}
	}
	
	@Override
	public List<ExecutableFlow> fetchFlowHistory(int skip, int num) 
			throws ExecutorManagerException {
		QueryRunner runner = createQueryRunner();

		FetchExecutableFlows flowHandler = new FetchExecutableFlows();
		
		try {
			List<ExecutableFlow> properties = runner.query(
					FetchExecutableFlows.FETCH_ALL_EXECUTABLE_FLOW_HISTORY, 
					flowHandler, 
					skip, 
					num);
			return properties;
		}
		catch (SQLException e) {
			throw new ExecutorManagerException("Error fetching active flows", e);
		}
	}
	

	@Override
	public List<ExecutableFlow> fetchFlowHistory(
			String projContain, 
			String flowContains, 
			String userNameContains, 
			int status, 
			long startTime, 
			long endTime, 
			int skip, 
			int num) throws ExecutorManagerException {
		String query = FetchExecutableFlows.FETCH_BASE_EXECUTABLE_FLOW_QUERY;
		ArrayList<Object> params = new ArrayList<Object>();
		
		boolean first = true;
		if (projContain != null && !projContain.isEmpty()) {
			query += " ef JOIN projects p ON ef.project_id = p.id WHERE name LIKE ?";
			params.add('%'+projContain+'%');
			first = false;
		}
		
		if (flowContains != null && !flowContains.isEmpty()) {
			if (first) {
				query += " WHERE ";
				first = false;
			}
			else {
				query += " AND ";
			}

			query += " flow_id LIKE ?";
			params.add('%'+flowContains+'%');
		}
		
		if (userNameContains != null && !userNameContains.isEmpty()) {
			if (first) {
				query += " WHERE ";
				first = false;
			}
			else {
				query += " AND ";
			}
			query += " submit_user LIKE ?";
			params.add('%'+userNameContains+'%');
		}
		
		if (status != 0) {
			if (first) {
				query += " WHERE ";
				first = false;
			}
			else {
				query += " AND ";
			}
			query += " status = ?";
			params.add(status);
		}
		
		if (startTime > 0) {
			if (first) {
				query += " WHERE ";
				first = false;
			}
			else {
				query += " AND ";
			}
			query += " start_time > ?";
			params.add(startTime);
		}
		
		if (endTime > 0) {
			if (first) {
				query += " WHERE ";
				first = false;
			}
			else {
				query += " AND "; 
			}
			query += " end_time < ?";
			params.add(endTime);
		}
		
		if (skip > -1 && num > 0) {
			query += "  ORDER BY exec_id DESC LIMIT ?, ?";
			params.add(skip);
			params.add(num);
		}
		
		QueryRunner runner = createQueryRunner();
		FetchExecutableFlows flowHandler = new FetchExecutableFlows();

		try {
			List<ExecutableFlow> properties = runner.query(
					query, flowHandler, params.toArray());
			return properties;
		} catch (SQLException e) {
			throw new ExecutorManagerException("Error fetching active flows", e);
		}
	}
	
	@Override
	public void addActiveExecutableReference(ExecutionReference reference)
			throws ExecutorManagerException {
		final String INSERT = 
				"INSERT INTO active_executing_flows " + 
					"(exec_id, host, port, update_time) values (?,?,?,?)";
		QueryRunner runner = createQueryRunner();
		
		try {
			runner.update(
					INSERT, 
					reference.getExecId(), 
					reference.getHost(), 
					reference.getPort(), 
					reference.getUpdateTime());
		}
		catch (SQLException e) {
			throw new ExecutorManagerException(
					"Error updating active flow reference " + reference.getExecId(), e);
		}
	}
	
	@Override
	public void removeActiveExecutableReference(int execid) 
			throws ExecutorManagerException {
		final String DELETE = "DELETE FROM active_executing_flows WHERE exec_id=?";
		
		QueryRunner runner = createQueryRunner();
		try {
			runner.update(DELETE, execid);
		}
		catch (SQLException e) {
			throw new ExecutorManagerException(
					"Error deleting active flow reference " + execid, e);
		}
	}
	
	@Override
	public boolean updateExecutableReference(int execId, long updateTime) 
			throws ExecutorManagerException {
		final String DELETE = 
				"UPDATE active_executing_flows set update_time=? WHERE exec_id=?";
		
		QueryRunner runner = createQueryRunner();
		int updateNum = 0;
		try {
			updateNum = runner.update(DELETE, updateTime, execId);
		}
		catch (SQLException e) {
			throw new ExecutorManagerException(
					"Error deleting active flow reference " + execId, e);
		}
		
		// Should be 1.
		return updateNum > 0;
	}

	@Override
	public void uploadExecutableNode(ExecutableNode node, Props inputProps) 
			throws ExecutorManagerException {
		final String INSERT_EXECUTION_NODE = 
			"INSERT INTO execution_jobs " + 
					"(exec_id, project_id, version, flow_id, job_id, start_time, " + 
					"end_time, status, input_params, attempt) VALUES (?,?,?,?,?,?,?,?,?,?)";
		
		byte[] inputParam = null;
		if (inputProps != null) {
			try {
				String jsonString =
						JSONUtils.toJSON(PropsUtils.toHierarchicalMap(inputProps));
				inputParam = GZIPUtils.gzipString(jsonString, "UTF-8");
			}
			catch (IOException e) {
				throw new ExecutorManagerException("Error encoding input params");
			}
		}
		
		ExecutableFlow flow = node.getExecutableFlow();
		String flowId = node.getParentFlow().getFlowPath();
		System.out.println("Uploading flowId " + flowId);
		QueryRunner runner = createQueryRunner();
		try {
			runner.update(
					INSERT_EXECUTION_NODE, 
					flow.getExecutionId(), 
					flow.getProjectId(), 
					flow.getVersion(), 
					flowId, 
					node.getId(),
					node.getStartTime(),
					node.getEndTime(), 
					node.getStatus().getNumVal(),
					inputParam,
					node.getAttempt());
		} catch (SQLException e) {
			throw new ExecutorManagerException(
					"Error writing job " + node.getId(), e);
		}
	}
	
	@Override
	public void updateExecutableNode(ExecutableNode node)
			throws ExecutorManagerException {
		final String UPSERT_EXECUTION_NODE = 
				"UPDATE execution_jobs " +
						"SET start_time=?, end_time=?, status=?, output_params=? " + 
						"WHERE exec_id=? AND flow_id=? AND job_id=? AND attempt=?";
		
		byte[] outputParam = null;
		Props outputProps = node.getOutputProps();
		if (outputProps != null) {
			try {
				String jsonString =
						JSONUtils.toJSON(PropsUtils.toHierarchicalMap(outputProps));
				outputParam = GZIPUtils.gzipString(jsonString, "UTF-8");
			}
			catch (IOException e) {
				throw new ExecutorManagerException("Error encoding input params");
			}
		}
		
		QueryRunner runner = createQueryRunner();
		try {
			runner.update(
					UPSERT_EXECUTION_NODE, 
					node.getStartTime(), 
					node.getEndTime(), 
					node.getStatus().getNumVal(), 
					outputParam,
					node.getExecutableFlow().getExecutionId(),
					node.getParentFlow().getFlowPath(),
					node.getId(),
					node.getAttempt());
		} catch (SQLException e) {
			throw new ExecutorManagerException(
					"Error updating job " + node.getId(), e);
		}
	}
	
	@Override
	public List<ExecutableJobInfo> fetchJobInfoAttempts(int execId, String jobId)
			throws ExecutorManagerException {
		QueryRunner runner = createQueryRunner();
		
		try {
			List<ExecutableJobInfo> info = runner.query(
					FetchExecutableJobHandler.FETCH_EXECUTABLE_NODE_ATTEMPTS, 
					new FetchExecutableJobHandler(), 
					execId,
					jobId);
			if (info == null || info.isEmpty()) {
				return null;
			}
			return info;
		}
		catch (SQLException e) {
			throw new ExecutorManagerException("Error querying job info " + jobId, e);
		}
	}
	
	@Override
	public ExecutableJobInfo fetchJobInfo(int execId, String jobId, int attempts)
			throws ExecutorManagerException {
		QueryRunner runner = createQueryRunner();
		
		try {
			List<ExecutableJobInfo> info = runner.query(
					FetchExecutableJobHandler.FETCH_EXECUTABLE_NODE, 
					new FetchExecutableJobHandler(), 
					execId, 
					jobId,
					attempts);
			if (info == null || info.isEmpty()) {
				return null;
			}
			return info.get(0);
		}
		catch (SQLException e) {
			throw new ExecutorManagerException("Error querying job info " + jobId, e);
		}
	}
	
	@Override
	public Props fetchExecutionJobInputProps(int execId, String jobId)
			throws ExecutorManagerException {
		QueryRunner runner = createQueryRunner();
		try {
			Pair<Props, Props> props = runner.query(
					FetchExecutableJobPropsHandler.FETCH_INPUT_PARAM_EXECUTABLE_NODE, 
					new FetchExecutableJobPropsHandler(), 
					execId, 
					jobId);
			return props.getFirst();
		}
		catch (SQLException e) {
			throw new ExecutorManagerException(
					"Error querying job params " + execId + " " + jobId, e);
		}
	}
	
	@Override
	public Props fetchExecutionJobOutputProps(int execId, String jobId) 
			throws ExecutorManagerException {
		QueryRunner runner = createQueryRunner();
		try {
			Pair<Props, Props> props = runner.query(
					FetchExecutableJobPropsHandler.FETCH_OUTPUT_PARAM_EXECUTABLE_NODE,
					new FetchExecutableJobPropsHandler(),
					execId,
					jobId);
			return props.getFirst();
		}
		catch (SQLException e) {
			throw new ExecutorManagerException(
					"Error querying job params " + execId + " " + jobId, e);
		}
	}
	
	@Override
	public Pair<Props, Props> fetchExecutionJobProps(int execId, String jobId)
			throws ExecutorManagerException {
		QueryRunner runner = createQueryRunner();
		try {
			Pair<Props, Props> props = runner.query(
					FetchExecutableJobPropsHandler.FETCH_INPUT_OUTPUT_PARAM_EXECUTABLE_NODE, 
					new FetchExecutableJobPropsHandler(), 
					execId, 
					jobId);
			return props;
		}
		catch (SQLException e) {
			throw new ExecutorManagerException(
					"Error querying job params " + execId + " " + jobId, e);
		}
	}
	
	@Override
	public List<ExecutableJobInfo> fetchJobHistory(
			int projectId, String jobId, int skip, int size) 
			throws ExecutorManagerException {
		QueryRunner runner = createQueryRunner();
		
		try {
			List<ExecutableJobInfo> info = runner.query(
					FetchExecutableJobHandler.FETCH_PROJECT_EXECUTABLE_NODE,
					new FetchExecutableJobHandler(), 
					projectId, 
					jobId, 
					skip, 
					size);
			if (info == null || info.isEmpty()) {
				return null;
			}
			return info;
		}
		catch (SQLException e) {
			throw new ExecutorManagerException("Error querying job info " + jobId, e);
		}
	}
	
	@Override
	public LogData fetchLogs(
			int execId, String name, int attempt, int startByte, int length)
			throws ExecutorManagerException {
		QueryRunner runner = createQueryRunner();

		FetchLogsHandler handler = new FetchLogsHandler(startByte, length + startByte);
		try {
			LogData result = runner.query(
					FetchLogsHandler.FETCH_LOGS, 
					handler, 
					execId, 
					name, 
					attempt, 
					startByte, 
					startByte + length);
			return result;
		}
		catch (SQLException e) {
			throw new ExecutorManagerException(
					"Error fetching logs " + execId + " : " + name, e);
		}
	}

	@Override
	public List<Object> fetchAttachments(int execId, String jobId, int attempt)
			throws ExecutorManagerException {
		QueryRunner runner = createQueryRunner();

		try {
			String attachments = runner.query(
					FetchExecutableJobAttachmentsHandler.FETCH_ATTACHMENTS_EXECUTABLE_NODE,
					new FetchExecutableJobAttachmentsHandler(),
					execId,
					jobId);
			if (attachments == null) {
				return null;
			}
			
			@SuppressWarnings("unchecked")
      List<Object> attachmentList = (List<Object>) JSONUtils.parseJSONFromString(attachments);
			
			return attachmentList;
		}
		catch (IOException e) {
			throw new ExecutorManagerException(
					"Error converting job attachments to JSON " + jobId, e);
		}
		catch (SQLException e) {
			throw new ExecutorManagerException(
					"Error query job attachments " + jobId, e);
		}
	}
	
	@Override
	public void uploadLogFile(
			int execId, String name, int attempt, File ... files)
			throws ExecutorManagerException {
		Connection connection = getConnection();
		try {
			uploadLogFile(
					connection, execId, name, attempt, files, defaultEncodingType);
			connection.commit();
		}
		catch (SQLException e) {
			throw new ExecutorManagerException("Error committing log", e);
		}
		catch (IOException e) {
			throw new ExecutorManagerException("Error committing log", e);
		}
		finally {
			DbUtils.closeQuietly(connection);
		}
	}
	
	private void uploadLogFile(
			Connection connection, 
			int execId, 
			String name, 
			int attempt, 
			File[] files, 
			EncodingType encType) throws ExecutorManagerException, IOException {
		// 50K buffer... if logs are greater than this, we chunk.
		// However, we better prevent large log files from being uploaded somehow
		byte[] buffer = new byte[50*1024];
		int pos = 0;
		int length = buffer.length;
		int startByte = 0;
		BufferedInputStream bufferedStream = null;
		try {
			for (int i = 0; i < files.length; ++i) {
				File file = files[i];
				
				bufferedStream = new BufferedInputStream(new FileInputStream(file));
				int size = bufferedStream.read(buffer, pos, length);
				while (size >= 0) {
					if (pos + size == buffer.length) {
						// Flush here.
						uploadLogPart(
								connection, 
								execId, 
								name, 
								attempt, 
								startByte, 
								startByte + buffer.length, 
								encType, 
								buffer, 
								buffer.length);
						
						pos = 0;
						length = buffer.length;
						startByte += buffer.length;
					}
					else {
						// Usually end of file.
						pos += size;
						length = buffer.length - pos;
					}
					size = bufferedStream.read(buffer, pos, length);
				}
			}
			
			// Final commit of buffer.
			if (pos > 0) {
				uploadLogPart(
						connection, 
						execId, 
						name, 
						attempt, 
						startByte, 
						startByte + pos, 
						encType, 
						buffer, 
						pos);
			}
		}
		catch (SQLException e) {
			throw new ExecutorManagerException("Error writing log part.", e);
		}
		catch (IOException e) {
			throw new ExecutorManagerException("Error chunking", e);
		}
		finally {
			IOUtils.closeQuietly(bufferedStream);
		}
	}
	
	private void uploadLogPart(
			Connection connection, 
			int execId, 
			String name, 
			int attempt, 
			int startByte, 
			int endByte, 
			EncodingType encType, 
			byte[] buffer, 
			int length) throws SQLException, IOException {
		final String INSERT_EXECUTION_LOGS = 
				"INSERT INTO execution_logs " + 
						"(exec_id, name, attempt, enc_type, start_byte, end_byte, " + 
						"log, upload_time) VALUES (?,?,?,?,?,?,?,?)";
		
		QueryRunner runner = new QueryRunner();
		byte[] buf = buffer;
		if (encType == EncodingType.GZIP) {
			buf = GZIPUtils.gzipBytes(buf, 0, length);
		}
		else if (length < buf.length) {
			buf = Arrays.copyOf(buffer, length);
		}
		
		runner.update(
				connection, 
				INSERT_EXECUTION_LOGS, 
				execId, 
				name, 
				attempt, 
				encType.getNumVal(), 
				startByte, 
				startByte + length, 
				buf, 
				DateTime.now().getMillis());
	}
	
	@Override
	public void uploadAttachmentFile(ExecutableNode node, File file)
			throws ExecutorManagerException {
		Connection connection = getConnection();
		try {
			uploadAttachmentFile(connection, node, file, defaultEncodingType);
			connection.commit();
		}
		catch (SQLException e) {
			throw new ExecutorManagerException("Error committing attachments ", e);
		}
		catch (IOException e) {
			throw new ExecutorManagerException("Error uploading attachments ", e);
		}
		finally {
			DbUtils.closeQuietly(connection);
		}
	}

	private void uploadAttachmentFile(
			Connection connection,
			ExecutableNode node,
			File file,
			EncodingType encType) throws SQLException, IOException {

		String jsonString = FileUtils.readFileToString(file);
		byte[] attachments = GZIPUtils.gzipString(jsonString, "UTF-8");

		final String UPDATE_EXECUTION_NODE_ATTACHMENTS = 
				"UPDATE execution_jobs " +
						"SET attachments=? " + 
						"WHERE exec_id=? AND flow_id=? AND job_id=? AND attempt=?";

		QueryRunner runner = new QueryRunner();
		runner.update(
				connection,
				UPDATE_EXECUTION_NODE_ATTACHMENTS,
				attachments,
				node.getExecutableFlow().getExecutionId(),
				node.getParentFlow().getNestedId(),
				node.getId(),
				node.getAttempt());
	}
	
	private Connection getConnection() throws ExecutorManagerException {
		Connection connection = null;
		try {
			connection = super.getDBConnection(false);
		}
		catch (Exception e) {
			DbUtils.closeQuietly(connection);
			throw new ExecutorManagerException("Error getting DB connection.", e);
		}
		return connection;
	}
	
	private static class LastInsertID implements ResultSetHandler<Long> {
		private static String LAST_INSERT_ID = "SELECT LAST_INSERT_ID()";
		@Override
		public Long handle(ResultSet rs) throws SQLException {
			if (!rs.next()) {
				return -1l;
			}
			long id = rs.getLong(1);
			return id;
		}
	}
	
	private static class FetchLogsHandler implements ResultSetHandler<LogData> {
		private static String FETCH_LOGS =
			"SELECT exec_id, name, attempt, enc_type, start_byte, end_byte, log " + 
					"FROM execution_logs " + 
					"WHERE exec_id=? AND name=? AND attempt=? AND end_byte > ? " + 
					"AND start_byte <= ? ORDER BY start_byte";

		private int startByte;
		private int endByte;
		
		public FetchLogsHandler(int startByte, int endByte) {
			this.startByte = startByte;
			this.endByte = endByte;
		}
		
		@Override
		public LogData handle(ResultSet rs) throws SQLException {
			if (!rs.next()) {
				return null;
			}
			
			ByteArrayOutputStream byteStream = new ByteArrayOutputStream();

			do {
				//int execId = rs.getInt(1);
				//String name = rs.getString(2);
				@SuppressWarnings("unused")
				int attempt = rs.getInt(3);
				EncodingType encType = EncodingType.fromInteger(rs.getInt(4));
				int startByte = rs.getInt(5);
				int endByte = rs.getInt(6);

				byte[] data = rs.getBytes(7);

				int offset = this.startByte > startByte 
						? this.startByte - startByte 
						: 0;
				int length = this.endByte < endByte 
						? this.endByte - startByte - offset
						: endByte - startByte - offset;
				try {
					byte[] buffer = data;
					if (encType == EncodingType.GZIP) {
						buffer = GZIPUtils.unGzipBytes(data);
					}

					byteStream.write(buffer, offset, length);
				}
				catch (IOException e) {
					throw new SQLException(e);
				}
			} while (rs.next());

			byte[] buffer = byteStream.toByteArray();
			Pair<Integer,Integer> result = FileIOUtils.getUtf8Range(
					buffer, 0, buffer.length);
		
			return new LogData(
					startByte + result.getFirst(), 
					result.getSecond(), 
					new String(buffer, result.getFirst(), result.getSecond()));
		}
	}
	
	private static class FetchExecutableJobHandler 
			implements ResultSetHandler<List<ExecutableJobInfo>> {
		private static String FETCH_EXECUTABLE_NODE = 
				"SELECT exec_id, project_id, version, flow_id, job_id, " + 
						"start_time, end_time, status, attempt " + 
						"FROM execution_jobs WHERE exec_id=? " + 
						"AND job_id=? AND attempt_id=?";
		private static String FETCH_EXECUTABLE_NODE_ATTEMPTS = 
				"SELECT exec_id, project_id, version, flow_id, job_id, " + 
						"start_time, end_time, status, attempt FROM execution_jobs " + 
						"WHERE exec_id=? AND job_id=?";
		private static String FETCH_PROJECT_EXECUTABLE_NODE =
				"SELECT exec_id, project_id, version, flow_id, job_id, " + 
						"start_time, end_time, status, attempt FROM execution_jobs " +
						"WHERE project_id=? AND job_id=? " + 
						"ORDER BY exec_id DESC LIMIT ?, ? ";

		@Override
		public List<ExecutableJobInfo> handle(ResultSet rs) throws SQLException {
			if (!rs.next()) {
				return Collections.<ExecutableJobInfo>emptyList();
			}
			
			List<ExecutableJobInfo> execNodes = new ArrayList<ExecutableJobInfo>();
			do {
				int execId = rs.getInt(1);
				int projectId = rs.getInt(2);
				int version = rs.getInt(3);
				String flowId = rs.getString(4);
				String jobId = rs.getString(5);
				long startTime = rs.getLong(6);
				long endTime = rs.getLong(7);
				Status status = Status.fromInteger(rs.getInt(8));
				int attempt = rs.getInt(9);
				
				ExecutableJobInfo info = new ExecutableJobInfo(
						execId, 
						projectId, 
						version, 
						flowId, 
						jobId, 
						startTime, 
						endTime, 
						status, 
						attempt);
				execNodes.add(info);
			} while (rs.next());

			return execNodes;
		}
	}

	private static class FetchExecutableJobAttachmentsHandler
			implements ResultSetHandler<String> {
		private static String FETCH_ATTACHMENTS_EXECUTABLE_NODE = 
				"SELECT attachments FROM execution_jobs WHERE exec_id=? AND job_id=?";

		@Override
		public String handle(ResultSet rs) throws SQLException {
			String attachmentsJson = null;
			if (rs.next()) {
				try {
					byte[] attachments = rs.getBytes(1);
					if (attachments != null) {
						attachmentsJson = GZIPUtils.unGzipString(attachments, "UTF-8");
					}
				}
				catch (IOException e) {
					throw new SQLException("Error decoding job attachments", e);
				}
			}
			return attachmentsJson;
		}
	}
	
	private static class FetchExecutableJobPropsHandler 
			implements ResultSetHandler<Pair<Props, Props>> {
		private static String FETCH_OUTPUT_PARAM_EXECUTABLE_NODE = 
				"SELECT output_params FROM execution_jobs WHERE exec_id=? AND job_id=?";
		private static String FETCH_INPUT_PARAM_EXECUTABLE_NODE = 
				"SELECT input_params FROM execution_jobs WHERE exec_id=? AND job_id=?";
		private static String FETCH_INPUT_OUTPUT_PARAM_EXECUTABLE_NODE = 
				"SELECT input_params, output_params " +
						"FROM execution_jobs WHERE exec_id=? AND job_id=?";
		
		@SuppressWarnings("unchecked")
		@Override
		public Pair<Props, Props> handle(ResultSet rs) throws SQLException {
			if (!rs.next()) {
				return new Pair<Props, Props>(null, null);
			}
			
			if (rs.getMetaData().getColumnCount() > 1) {
				byte[] input = rs.getBytes(1);
				byte[] output = rs.getBytes(2);
				
				Props inputProps = null;
				Props outputProps = null;
				try {
					if (input != null) {
						String jsonInputString = GZIPUtils.unGzipString(input, "UTF-8");
						inputProps = PropsUtils.fromHierarchicalMap(
								(Map<String, Object>)JSONUtils.parseJSONFromString(jsonInputString));
						
					}
					if (output != null) {
						String jsonOutputString = GZIPUtils.unGzipString(output, "UTF-8");
						outputProps = PropsUtils.fromHierarchicalMap(
								(Map<String, Object>) JSONUtils.parseJSONFromString(jsonOutputString));
					}
				}
				catch (IOException e) {
					throw new SQLException("Error decoding param data", e);
				}
				
				return new Pair<Props, Props>(inputProps, outputProps);
			}
			else {
				byte[] params = rs.getBytes(1);
				Props props = null;
				try {
					if (params != null) {
						String jsonProps = GZIPUtils.unGzipString(params, "UTF-8");

						props = PropsUtils.fromHierarchicalMap(
								(Map<String, Object>)JSONUtils.parseJSONFromString(jsonProps));
					}
				}
				catch (IOException e) {
					throw new SQLException("Error decoding param data", e);
				}
				
				return new Pair<Props,Props>(props, null);
			}
		}
	}

	private static class FetchActiveExecutableFlows 
			implements ResultSetHandler<Map<Integer, Pair<ExecutionReference,ExecutableFlow>>> {
		private static String FETCH_ACTIVE_EXECUTABLE_FLOW = 
				"SELECT ex.exec_id exec_id, ex.enc_type enc_type, ex.flow_data " + 
						"flow_data, ax.host host, ax.port port, ax.update_time " + 
						"axUpdateTime " + 
						"FROM execution_flows ex " + 
						"INNER JOIN active_executing_flows ax ON ex.exec_id = ax.exec_id";
		
		@Override
		public Map<Integer, Pair<ExecutionReference,ExecutableFlow>> handle(ResultSet rs) 
				throws SQLException {
			if (!rs.next()) {
				return Collections.<Integer, Pair<ExecutionReference,ExecutableFlow>>emptyMap();
			}

			Map<Integer, Pair<ExecutionReference,ExecutableFlow>> execFlows = 
					new HashMap<Integer, Pair<ExecutionReference,ExecutableFlow>>();
			do {
				int id = rs.getInt(1);
				int encodingType = rs.getInt(2);
				byte[] data = rs.getBytes(3);
				String host = rs.getString(4);
				int port = rs.getInt(5);
				long updateTime = rs.getLong(6);
				
				if (data == null) {
					execFlows.put(id, null);
				}
				else {
					EncodingType encType = EncodingType.fromInteger(encodingType);
					Object flowObj;
					try {
						// Convoluted way to inflate strings. Should find common package or
						// helper function.
						if (encType == EncodingType.GZIP) {
							// Decompress the sucker.
							String jsonString = GZIPUtils.unGzipString(data, "UTF-8");
							flowObj = JSONUtils.parseJSONFromString(jsonString);
						}
						else {
							String jsonString = new String(data, "UTF-8");
							flowObj = JSONUtils.parseJSONFromString(jsonString);
						}
						
						ExecutableFlow exFlow = 
								ExecutableFlow.createExecutableFlowFromObject(flowObj);
						ExecutionReference ref = new ExecutionReference(id, host, port);
						ref.setUpdateTime(updateTime);
						
						execFlows.put(
								id, new Pair<ExecutionReference, ExecutableFlow>(ref, exFlow));
					}
					catch (IOException e) {
						throw new SQLException("Error retrieving flow data " + id, e);
					}
				}
			} while (rs.next());

			return execFlows;
		}
	}
	
	private static class FetchExecutableFlows 
			implements ResultSetHandler<List<ExecutableFlow>> {
		private static String FETCH_BASE_EXECUTABLE_FLOW_QUERY = 
				"SELECT exec_id, enc_type, flow_data FROM execution_flows ";
		private static String FETCH_EXECUTABLE_FLOW = 
				"SELECT exec_id, enc_type, flow_data FROM execution_flows " +
						"WHERE exec_id=?";
		//private static String FETCH_ACTIVE_EXECUTABLE_FLOW = 
		//	"SELECT ex.exec_id exec_id, ex.enc_type enc_type, ex.flow_data flow_data " +
		//			"FROM execution_flows ex " +
		//			"INNER JOIN active_executing_flows ax ON ex.exec_id = ax.exec_id";
		private static String FETCH_ALL_EXECUTABLE_FLOW_HISTORY = 
				"SELECT exec_id, enc_type, flow_data FROM execution_flows " +
						"ORDER BY exec_id DESC LIMIT ?, ?";
		private static String FETCH_EXECUTABLE_FLOW_HISTORY = 
				"SELECT exec_id, enc_type, flow_data FROM execution_flows " +
						"WHERE project_id=? AND flow_id=? " +
						"ORDER BY exec_id DESC LIMIT ?, ?";
		private static String FETCH_EXECUTABLE_FLOW_BY_STATUS = 
				"SELECT exec_id, enc_type, flow_data FROM execution_flows " +
						"WHERE project_id=? AND flow_id=? AND status=? " +
						"ORDER BY exec_id DESC LIMIT ?, ?";
		
		@Override
		public List<ExecutableFlow> handle(ResultSet rs) throws SQLException {
			if (!rs.next()) {
				return Collections.<ExecutableFlow>emptyList();
			}
			
			List<ExecutableFlow> execFlows = new ArrayList<ExecutableFlow>();
			do {
				int id = rs.getInt(1);
				int encodingType = rs.getInt(2);
				byte[] data = rs.getBytes(3);
				
				if (data != null) {
					EncodingType encType = EncodingType.fromInteger(encodingType);
					Object flowObj;
					try {
						// Convoluted way to inflate strings. Should find common package 
						// or helper function.
						if (encType == EncodingType.GZIP) {
							// Decompress the sucker.
							String jsonString = GZIPUtils.unGzipString(data, "UTF-8");
							flowObj = JSONUtils.parseJSONFromString(jsonString);
						}
						else {
							String jsonString = new String(data, "UTF-8");
							flowObj = JSONUtils.parseJSONFromString(jsonString);
						}
						
						ExecutableFlow exFlow = 
								ExecutableFlow.createExecutableFlowFromObject(flowObj);
						execFlows.add(exFlow);
					}
					catch (IOException e) {
						throw new SQLException("Error retrieving flow data " + id, e);
					}
				}
			} while (rs.next());

			return execFlows;
		}
	}
	
	private static class IntHandler implements ResultSetHandler<Integer> {
		private static String NUM_EXECUTIONS = 
				"SELECT COUNT(1) FROM execution_flows";
		private static String NUM_FLOW_EXECUTIONS = 
				"SELECT COUNT(1) FROM execution_flows WHERE project_id=? AND flow_id=?";
		private static String NUM_JOB_EXECUTIONS = 
				"SELECT COUNT(1) FROM execution_jobs WHERE project_id=? AND job_id=?";
		
		@Override
		public Integer handle(ResultSet rs) throws SQLException {
			if (!rs.next()) {
				return 0;
			}
			return rs.getInt(1);
		}
	}

	@Override
	public int removeExecutionLogsByTime(long millis) 
			throws ExecutorManagerException {
		final String DELETE_BY_TIME = 
				"DELETE FROM execution_logs WHERE upload_time < ?";
		
		QueryRunner runner = createQueryRunner();
		int updateNum = 0;
		try {
			updateNum = runner.update(DELETE_BY_TIME, millis);
		}
		catch (SQLException e) {
			e.printStackTrace();
			throw new ExecutorManagerException(
					"Error deleting old execution_logs before " + millis, e);			
		}
		
		return updateNum;
	}
}