/*
 * Decompiled with CFR 0.152.
 */
package azkaban.executor;

import azkaban.database.AbstractJdbcLoader;
import azkaban.executor.ExecutableFlow;
import azkaban.executor.ExecutableJobInfo;
import azkaban.executor.ExecutableNode;
import azkaban.executor.ExecutionReference;
import azkaban.executor.ExecutorLoader;
import azkaban.executor.ExecutorManagerException;
import azkaban.executor.JdbcExecutorLoader;
import azkaban.executor.Status;
import azkaban.utils.FileIOUtils;
import azkaban.utils.GZIPUtils;
import azkaban.utils.JSONUtils;
import azkaban.utils.Pair;
import azkaban.utils.Props;
import azkaban.utils.PropsUtils;
import java.io.BufferedInputStream;
import java.io.File;
import java.io.FileInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.sql.Connection;
import java.sql.SQLException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
import org.apache.commons.dbutils.DbUtils;
import org.apache.commons.dbutils.QueryRunner;
import org.apache.commons.dbutils.ResultSetHandler;
import org.apache.commons.io.FileUtils;
import org.apache.commons.io.IOUtils;
import org.apache.log4j.Logger;
import org.joda.time.DateTime;

/*
 * Exception performing whole class analysis ignored.
 */
public class JdbcExecutorLoader
extends AbstractJdbcLoader
implements ExecutorLoader {
    private static final Logger logger = Logger.getLogger(JdbcExecutorLoader.class);
    private AbstractJdbcLoader.EncodingType defaultEncodingType = AbstractJdbcLoader.EncodingType.GZIP;

    public JdbcExecutorLoader(Props props) {
        super(props);
    }

    public AbstractJdbcLoader.EncodingType getDefaultEncodingType() {
        return this.defaultEncodingType;
    }

    public void setDefaultEncodingType(AbstractJdbcLoader.EncodingType defaultEncodingType) {
        this.defaultEncodingType = defaultEncodingType;
    }

    public synchronized void uploadExecutableFlow(ExecutableFlow flow) throws ExecutorManagerException {
        Connection connection = this.getConnection();
        try {
            try {
                this.uploadExecutableFlow(connection, flow, this.defaultEncodingType);
            }
            catch (IOException e) {
                throw new ExecutorManagerException("Error uploading flow", (Throwable)e);
            }
        }
        finally {
            DbUtils.closeQuietly((Connection)connection);
        }
    }

    private synchronized void uploadExecutableFlow(Connection connection, ExecutableFlow flow, AbstractJdbcLoader.EncodingType encType) throws ExecutorManagerException, IOException {
        String INSERT_EXECUTABLE_FLOW = "INSERT INTO execution_flows (project_id, flow_id, version, status, submit_time, submit_user, update_time) values (?,?,?,?,?,?,?)";
        QueryRunner runner = new QueryRunner();
        long submitTime = System.currentTimeMillis();
        try {
            flow.setStatus(Status.PREPARING);
            runner.update(connection, "INSERT INTO execution_flows (project_id, flow_id, version, status, submit_time, submit_user, update_time) values (?,?,?,?,?,?,?)", new Object[]{flow.getProjectId(), flow.getFlowId(), flow.getVersion(), Status.PREPARING.getNumVal(), submitTime, flow.getSubmitUser(), submitTime});
            connection.commit();
            long id = (Long)runner.query(connection, LastInsertID.access$1(), (ResultSetHandler)new LastInsertID(null));
            if (id == -1L) {
                throw new ExecutorManagerException("Execution id is not properly created.");
            }
            logger.info((Object)("Flow given " + flow.getFlowId() + " given id " + id));
            flow.setExecutionId((int)id);
            this.updateExecutableFlow(connection, flow, encType);
        }
        catch (SQLException e) {
            throw new ExecutorManagerException("Error creating execution.", (Throwable)e);
        }
    }

    public void updateExecutableFlow(ExecutableFlow flow) throws ExecutorManagerException {
        Connection connection = this.getConnection();
        try {
            this.updateExecutableFlow(connection, flow, this.defaultEncodingType);
        }
        finally {
            DbUtils.closeQuietly((Connection)connection);
        }
    }

    private void updateExecutableFlow(Connection connection, ExecutableFlow flow, AbstractJdbcLoader.EncodingType encType) throws ExecutorManagerException {
        String UPDATE_EXECUTABLE_FLOW_DATA = "UPDATE execution_flows SET status=?,update_time=?,start_time=?,end_time=?,enc_type=?,flow_data=? WHERE exec_id=?";
        QueryRunner runner = new QueryRunner();
        String json = JSONUtils.toJSON((Object)flow.toObject());
        byte[] data = null;
        try {
            byte[] stringData;
            data = stringData = json.getBytes("UTF-8");
            if (encType == AbstractJdbcLoader.EncodingType.GZIP) {
                data = GZIPUtils.gzipBytes((byte[])stringData);
            }
        }
        catch (IOException e) {
            throw new ExecutorManagerException("Error encoding the execution flow.");
        }
        try {
            runner.update(connection, "UPDATE execution_flows SET status=?,update_time=?,start_time=?,end_time=?,enc_type=?,flow_data=? WHERE exec_id=?", new Object[]{flow.getStatus().getNumVal(), flow.getUpdateTime(), flow.getStartTime(), flow.getEndTime(), encType.getNumVal(), data, flow.getExecutionId()});
            connection.commit();
        }
        catch (SQLException e) {
            throw new ExecutorManagerException("Error updating flow.", (Throwable)e);
        }
    }

    public ExecutableFlow fetchExecutableFlow(int id) throws ExecutorManagerException {
        QueryRunner runner = this.createQueryRunner();
        FetchExecutableFlows flowHandler = new FetchExecutableFlows(null);
        try {
            List properties = (List)runner.query(FetchExecutableFlows.access$2(), (ResultSetHandler)flowHandler, new Object[]{id});
            return (ExecutableFlow)properties.get(0);
        }
        catch (SQLException e) {
            throw new ExecutorManagerException("Error fetching flow id " + id, (Throwable)e);
        }
    }

    public Map<Integer, Pair<ExecutionReference, ExecutableFlow>> fetchActiveFlows() throws ExecutorManagerException {
        QueryRunner runner = this.createQueryRunner();
        FetchActiveExecutableFlows flowHandler = new FetchActiveExecutableFlows(null);
        try {
            Map properties = (Map)runner.query(FetchActiveExecutableFlows.access$2(), (ResultSetHandler)flowHandler);
            return properties;
        }
        catch (SQLException e) {
            throw new ExecutorManagerException("Error fetching active flows", (Throwable)e);
        }
    }

    public int fetchNumExecutableFlows() throws ExecutorManagerException {
        QueryRunner runner = this.createQueryRunner();
        IntHandler intHandler = new IntHandler(null);
        try {
            int count = (Integer)runner.query(IntHandler.access$2(), (ResultSetHandler)intHandler);
            return count;
        }
        catch (SQLException e) {
            throw new ExecutorManagerException("Error fetching num executions", (Throwable)e);
        }
    }

    public int fetchNumExecutableFlows(int projectId, String flowId) throws ExecutorManagerException {
        QueryRunner runner = this.createQueryRunner();
        IntHandler intHandler = new IntHandler(null);
        try {
            int count = (Integer)runner.query(IntHandler.access$3(), (ResultSetHandler)intHandler, new Object[]{projectId, flowId});
            return count;
        }
        catch (SQLException e) {
            throw new ExecutorManagerException("Error fetching num executions", (Throwable)e);
        }
    }

    public int fetchNumExecutableNodes(int projectId, String jobId) throws ExecutorManagerException {
        QueryRunner runner = this.createQueryRunner();
        IntHandler intHandler = new IntHandler(null);
        try {
            int count = (Integer)runner.query(IntHandler.access$4(), (ResultSetHandler)intHandler, new Object[]{projectId, jobId});
            return count;
        }
        catch (SQLException e) {
            throw new ExecutorManagerException("Error fetching num executions", (Throwable)e);
        }
    }

    public List<ExecutableFlow> fetchFlowHistory(int projectId, String flowId, int skip, int num) throws ExecutorManagerException {
        QueryRunner runner = this.createQueryRunner();
        FetchExecutableFlows flowHandler = new FetchExecutableFlows(null);
        try {
            List properties = (List)runner.query(FetchExecutableFlows.access$3(), (ResultSetHandler)flowHandler, new Object[]{projectId, flowId, skip, num});
            return properties;
        }
        catch (SQLException e) {
            throw new ExecutorManagerException("Error fetching active flows", (Throwable)e);
        }
    }

    public List<ExecutableFlow> fetchFlowHistory(int projectId, String flowId, int skip, int num, Status status) throws ExecutorManagerException {
        QueryRunner runner = this.createQueryRunner();
        FetchExecutableFlows flowHandler = new FetchExecutableFlows(null);
        try {
            List properties = (List)runner.query(FetchExecutableFlows.access$4(), (ResultSetHandler)flowHandler, new Object[]{projectId, flowId, status.getNumVal(), skip, num});
            return properties;
        }
        catch (SQLException e) {
            throw new ExecutorManagerException("Error fetching active flows", (Throwable)e);
        }
    }

    public List<ExecutableFlow> fetchFlowHistory(int skip, int num) throws ExecutorManagerException {
        QueryRunner runner = this.createQueryRunner();
        FetchExecutableFlows flowHandler = new FetchExecutableFlows(null);
        try {
            List properties = (List)runner.query(FetchExecutableFlows.access$5(), (ResultSetHandler)flowHandler, new Object[]{skip, num});
            return properties;
        }
        catch (SQLException e) {
            throw new ExecutorManagerException("Error fetching active flows", (Throwable)e);
        }
    }

    public List<ExecutableFlow> fetchFlowHistory(String projContain, String flowContains, String userNameContains, int status, long startTime, long endTime, int skip, int num) throws ExecutorManagerException {
        String query = FetchExecutableFlows.access$6();
        ArrayList<Object> params = new ArrayList<Object>();
        boolean first = true;
        if (projContain != null && !projContain.isEmpty()) {
            query = String.valueOf(query) + " ef JOIN projects p ON ef.project_id = p.id WHERE name LIKE ?";
            params.add(String.valueOf('%') + projContain + '%');
            first = false;
        }
        if (flowContains != null && !flowContains.isEmpty()) {
            if (first) {
                query = String.valueOf(query) + " WHERE ";
                first = false;
            } else {
                query = String.valueOf(query) + " AND ";
            }
            query = String.valueOf(query) + " flow_id LIKE ?";
            params.add(String.valueOf('%') + flowContains + '%');
        }
        if (userNameContains != null && !userNameContains.isEmpty()) {
            if (first) {
                query = String.valueOf(query) + " WHERE ";
                first = false;
            } else {
                query = String.valueOf(query) + " AND ";
            }
            query = String.valueOf(query) + " submit_user LIKE ?";
            params.add(String.valueOf('%') + userNameContains + '%');
        }
        if (status != 0) {
            if (first) {
                query = String.valueOf(query) + " WHERE ";
                first = false;
            } else {
                query = String.valueOf(query) + " AND ";
            }
            query = String.valueOf(query) + " status = ?";
            params.add(status);
        }
        if (startTime > 0L) {
            if (first) {
                query = String.valueOf(query) + " WHERE ";
                first = false;
            } else {
                query = String.valueOf(query) + " AND ";
            }
            query = String.valueOf(query) + " start_time > ?";
            params.add(startTime);
        }
        if (endTime > 0L) {
            if (first) {
                query = String.valueOf(query) + " WHERE ";
                first = false;
            } else {
                query = String.valueOf(query) + " AND ";
            }
            query = String.valueOf(query) + " end_time < ?";
            params.add(endTime);
        }
        if (skip > -1 && num > 0) {
            query = String.valueOf(query) + "  ORDER BY exec_id DESC LIMIT ?, ?";
            params.add(skip);
            params.add(num);
        }
        QueryRunner runner = this.createQueryRunner();
        FetchExecutableFlows flowHandler = new FetchExecutableFlows(null);
        try {
            List properties = (List)runner.query(query, (ResultSetHandler)flowHandler, params.toArray());
            return properties;
        }
        catch (SQLException e) {
            throw new ExecutorManagerException("Error fetching active flows", (Throwable)e);
        }
    }

    public void addActiveExecutableReference(ExecutionReference reference) throws ExecutorManagerException {
        String INSERT = "INSERT INTO active_executing_flows (exec_id, host, port, update_time) values (?,?,?,?)";
        QueryRunner runner = this.createQueryRunner();
        try {
            runner.update("INSERT INTO active_executing_flows (exec_id, host, port, update_time) values (?,?,?,?)", new Object[]{reference.getExecId(), reference.getHost(), reference.getPort(), reference.getUpdateTime()});
        }
        catch (SQLException e) {
            throw new ExecutorManagerException("Error updating active flow reference " + reference.getExecId(), (Throwable)e);
        }
    }

    public void removeActiveExecutableReference(int execid) throws ExecutorManagerException {
        String DELETE = "DELETE FROM active_executing_flows WHERE exec_id=?";
        QueryRunner runner = this.createQueryRunner();
        try {
            runner.update("DELETE FROM active_executing_flows WHERE exec_id=?", (Object)execid);
        }
        catch (SQLException e) {
            throw new ExecutorManagerException("Error deleting active flow reference " + execid, (Throwable)e);
        }
    }

    public boolean updateExecutableReference(int execId, long updateTime) throws ExecutorManagerException {
        String DELETE = "UPDATE active_executing_flows set update_time=? WHERE exec_id=?";
        QueryRunner runner = this.createQueryRunner();
        int updateNum = 0;
        try {
            updateNum = runner.update("UPDATE active_executing_flows set update_time=? WHERE exec_id=?", new Object[]{updateTime, execId});
        }
        catch (SQLException e) {
            throw new ExecutorManagerException("Error deleting active flow reference " + execId, (Throwable)e);
        }
        return updateNum > 0;
    }

    public void uploadExecutableNode(ExecutableNode node, Props inputProps) throws ExecutorManagerException {
        String INSERT_EXECUTION_NODE = "INSERT INTO execution_jobs (exec_id, project_id, version, flow_id, job_id, start_time, end_time, status, input_params, attempt) VALUES (?,?,?,?,?,?,?,?,?,?)";
        byte[] inputParam = null;
        if (inputProps != null) {
            try {
                String jsonString = JSONUtils.toJSON((Object)PropsUtils.toHierarchicalMap((Props)inputProps));
                inputParam = GZIPUtils.gzipString((String)jsonString, (String)"UTF-8");
            }
            catch (IOException e) {
                throw new ExecutorManagerException("Error encoding input params");
            }
        }
        ExecutableFlow flow = node.getExecutableFlow();
        String flowId = node.getParentFlow().getFlowPath();
        System.out.println("Uploading flowId " + flowId);
        QueryRunner runner = this.createQueryRunner();
        try {
            runner.update("INSERT INTO execution_jobs (exec_id, project_id, version, flow_id, job_id, start_time, end_time, status, input_params, attempt) VALUES (?,?,?,?,?,?,?,?,?,?)", new Object[]{flow.getExecutionId(), flow.getProjectId(), flow.getVersion(), flowId, node.getId(), node.getStartTime(), node.getEndTime(), node.getStatus().getNumVal(), inputParam, node.getAttempt()});
        }
        catch (SQLException e) {
            throw new ExecutorManagerException("Error writing job " + node.getId(), (Throwable)e);
        }
    }

    public void updateExecutableNode(ExecutableNode node) throws ExecutorManagerException {
        String UPSERT_EXECUTION_NODE = "UPDATE execution_jobs SET start_time=?, end_time=?, status=?, output_params=? WHERE exec_id=? AND flow_id=? AND job_id=? AND attempt=?";
        byte[] outputParam = null;
        Props outputProps = node.getOutputProps();
        if (outputProps != null) {
            try {
                String jsonString = JSONUtils.toJSON((Object)PropsUtils.toHierarchicalMap((Props)outputProps));
                outputParam = GZIPUtils.gzipString((String)jsonString, (String)"UTF-8");
            }
            catch (IOException e) {
                throw new ExecutorManagerException("Error encoding input params");
            }
        }
        QueryRunner runner = this.createQueryRunner();
        try {
            runner.update("UPDATE execution_jobs SET start_time=?, end_time=?, status=?, output_params=? WHERE exec_id=? AND flow_id=? AND job_id=? AND attempt=?", new Object[]{node.getStartTime(), node.getEndTime(), node.getStatus().getNumVal(), outputParam, node.getExecutableFlow().getExecutionId(), node.getParentFlow().getFlowPath(), node.getId(), node.getAttempt()});
        }
        catch (SQLException e) {
            throw new ExecutorManagerException("Error updating job " + node.getId(), (Throwable)e);
        }
    }

    public List<ExecutableJobInfo> fetchJobInfoAttempts(int execId, String jobId) throws ExecutorManagerException {
        List info;
        block3: {
            QueryRunner runner = this.createQueryRunner();
            try {
                info = (List)runner.query(FetchExecutableJobHandler.access$1(), (ResultSetHandler)new FetchExecutableJobHandler(null), new Object[]{execId, jobId});
                if (info != null && !info.isEmpty()) break block3;
                return null;
            }
            catch (SQLException e) {
                throw new ExecutorManagerException("Error querying job info " + jobId, (Throwable)e);
            }
        }
        return info;
    }

    public ExecutableJobInfo fetchJobInfo(int execId, String jobId, int attempts) throws ExecutorManagerException {
        List info;
        block3: {
            QueryRunner runner = this.createQueryRunner();
            try {
                info = (List)runner.query(FetchExecutableJobHandler.access$3(), (ResultSetHandler)new FetchExecutableJobHandler(null), new Object[]{execId, jobId, attempts});
                if (info != null && !info.isEmpty()) break block3;
                return null;
            }
            catch (SQLException e) {
                throw new ExecutorManagerException("Error querying job info " + jobId, (Throwable)e);
            }
        }
        return (ExecutableJobInfo)info.get(0);
    }

    public Props fetchExecutionJobInputProps(int execId, String jobId) throws ExecutorManagerException {
        QueryRunner runner = this.createQueryRunner();
        try {
            Pair props = (Pair)runner.query(FetchExecutableJobPropsHandler.access$1(), (ResultSetHandler)new FetchExecutableJobPropsHandler(null), new Object[]{execId, jobId});
            return (Props)props.getFirst();
        }
        catch (SQLException e) {
            throw new ExecutorManagerException("Error querying job params " + execId + " " + jobId, (Throwable)e);
        }
    }

    public Props fetchExecutionJobOutputProps(int execId, String jobId) throws ExecutorManagerException {
        QueryRunner runner = this.createQueryRunner();
        try {
            Pair props = (Pair)runner.query(FetchExecutableJobPropsHandler.access$3(), (ResultSetHandler)new FetchExecutableJobPropsHandler(null), new Object[]{execId, jobId});
            return (Props)props.getFirst();
        }
        catch (SQLException e) {
            throw new ExecutorManagerException("Error querying job params " + execId + " " + jobId, (Throwable)e);
        }
    }

    public Pair<Props, Props> fetchExecutionJobProps(int execId, String jobId) throws ExecutorManagerException {
        QueryRunner runner = this.createQueryRunner();
        try {
            Pair props = (Pair)runner.query(FetchExecutableJobPropsHandler.access$4(), (ResultSetHandler)new FetchExecutableJobPropsHandler(null), new Object[]{execId, jobId});
            return props;
        }
        catch (SQLException e) {
            throw new ExecutorManagerException("Error querying job params " + execId + " " + jobId, (Throwable)e);
        }
    }

    public List<ExecutableJobInfo> fetchJobHistory(int projectId, String jobId, int skip, int size) throws ExecutorManagerException {
        List info;
        block3: {
            QueryRunner runner = this.createQueryRunner();
            try {
                info = (List)runner.query(FetchExecutableJobHandler.access$4(), (ResultSetHandler)new FetchExecutableJobHandler(null), new Object[]{projectId, jobId, skip, size});
                if (info != null && !info.isEmpty()) break block3;
                return null;
            }
            catch (SQLException e) {
                throw new ExecutorManagerException("Error querying job info " + jobId, (Throwable)e);
            }
        }
        return info;
    }

    public FileIOUtils.LogData fetchLogs(int execId, String name, int attempt, int startByte, int length) throws ExecutorManagerException {
        QueryRunner runner = this.createQueryRunner();
        FetchLogsHandler handler = new FetchLogsHandler(startByte, length + startByte);
        try {
            FileIOUtils.LogData result = (FileIOUtils.LogData)runner.query(FetchLogsHandler.access$1(), (ResultSetHandler)handler, new Object[]{execId, name, attempt, startByte, startByte + length});
            return result;
        }
        catch (SQLException e) {
            throw new ExecutorManagerException("Error fetching logs " + execId + " : " + name, (Throwable)e);
        }
    }

    public List<Object> fetchAttachments(int execId, String jobId, int attempt) throws ExecutorManagerException {
        String attachments;
        block4: {
            QueryRunner runner = this.createQueryRunner();
            attachments = (String)runner.query(FetchExecutableJobAttachmentsHandler.access$1(), (ResultSetHandler)new FetchExecutableJobAttachmentsHandler(null), new Object[]{execId, jobId});
            if (attachments != null) break block4;
            return null;
        }
        try {
            List attachmentList = (List)JSONUtils.parseJSONFromString((String)attachments);
            return attachmentList;
        }
        catch (IOException e) {
            throw new ExecutorManagerException("Error converting job attachments to JSON " + jobId, (Throwable)e);
        }
        catch (SQLException e) {
            throw new ExecutorManagerException("Error query job attachments " + jobId, (Throwable)e);
        }
    }

    public void uploadLogFile(int execId, String name, int attempt, File ... files) throws ExecutorManagerException {
        Connection connection = this.getConnection();
        try {
            try {
                this.uploadLogFile(connection, execId, name, attempt, files, this.defaultEncodingType);
                connection.commit();
            }
            catch (SQLException e) {
                throw new ExecutorManagerException("Error committing log", (Throwable)e);
            }
            catch (IOException e) {
                throw new ExecutorManagerException("Error committing log", (Throwable)e);
            }
        }
        finally {
            DbUtils.closeQuietly((Connection)connection);
        }
    }

    private void uploadLogFile(Connection connection, int execId, String name, int attempt, File[] files, AbstractJdbcLoader.EncodingType encType) throws ExecutorManagerException, IOException {
        byte[] buffer = new byte[51200];
        int pos = 0;
        int length = buffer.length;
        int startByte = 0;
        try {
            int i = 0;
            while (i < files.length) {
                File file = files[i];
                BufferedInputStream bufferedStream = new BufferedInputStream(new FileInputStream(file));
                try {
                    int size = bufferedStream.read(buffer, pos, length);
                    while (size >= 0) {
                        if (pos + size == buffer.length) {
                            this.uploadLogPart(connection, execId, name, attempt, startByte, startByte + buffer.length, encType, buffer, buffer.length);
                            pos = 0;
                            length = buffer.length;
                            startByte += buffer.length;
                        } else {
                            length = buffer.length - (pos += size);
                        }
                        size = bufferedStream.read(buffer, pos, length);
                    }
                }
                finally {
                    IOUtils.closeQuietly((InputStream)bufferedStream);
                }
                ++i;
            }
            if (pos > 0) {
                this.uploadLogPart(connection, execId, name, attempt, startByte, startByte + pos, encType, buffer, pos);
            }
        }
        catch (SQLException e) {
            throw new ExecutorManagerException("Error writing log part.", (Throwable)e);
        }
        catch (IOException e) {
            throw new ExecutorManagerException("Error chunking", (Throwable)e);
        }
    }

    private void uploadLogPart(Connection connection, int execId, String name, int attempt, int startByte, int endByte, AbstractJdbcLoader.EncodingType encType, byte[] buffer, int length) throws SQLException, IOException {
        String INSERT_EXECUTION_LOGS = "INSERT INTO execution_logs (exec_id, name, attempt, enc_type, start_byte, end_byte, log, upload_time) VALUES (?,?,?,?,?,?,?,?)";
        QueryRunner runner = new QueryRunner();
        byte[] buf = buffer;
        if (encType == AbstractJdbcLoader.EncodingType.GZIP) {
            buf = GZIPUtils.gzipBytes((byte[])buf, (int)0, (int)length);
        } else if (length < buf.length) {
            buf = Arrays.copyOf(buffer, length);
        }
        runner.update(connection, "INSERT INTO execution_logs (exec_id, name, attempt, enc_type, start_byte, end_byte, log, upload_time) VALUES (?,?,?,?,?,?,?,?)", new Object[]{execId, name, attempt, encType.getNumVal(), startByte, startByte + length, buf, DateTime.now().getMillis()});
    }

    public void uploadAttachmentFile(ExecutableNode node, File file) throws ExecutorManagerException {
        Connection connection = this.getConnection();
        try {
            try {
                this.uploadAttachmentFile(connection, node, file, this.defaultEncodingType);
                connection.commit();
            }
            catch (SQLException e) {
                throw new ExecutorManagerException("Error committing attachments ", (Throwable)e);
            }
            catch (IOException e) {
                throw new ExecutorManagerException("Error uploading attachments ", (Throwable)e);
            }
        }
        finally {
            DbUtils.closeQuietly((Connection)connection);
        }
    }

    private void uploadAttachmentFile(Connection connection, ExecutableNode node, File file, AbstractJdbcLoader.EncodingType encType) throws SQLException, IOException {
        String jsonString = FileUtils.readFileToString((File)file);
        byte[] attachments = GZIPUtils.gzipString((String)jsonString, (String)"UTF-8");
        String UPDATE_EXECUTION_NODE_ATTACHMENTS = "UPDATE execution_jobs SET attachments=? WHERE exec_id=? AND flow_id=? AND job_id=? AND attempt=?";
        QueryRunner runner = new QueryRunner();
        runner.update(connection, "UPDATE execution_jobs SET attachments=? WHERE exec_id=? AND flow_id=? AND job_id=? AND attempt=?", new Object[]{attachments, node.getExecutableFlow().getExecutionId(), node.getParentFlow().getNestedId(), node.getId(), node.getAttempt()});
    }

    private Connection getConnection() throws ExecutorManagerException {
        Connection connection = null;
        try {
            connection = super.getDBConnection(false);
        }
        catch (Exception e) {
            DbUtils.closeQuietly((Connection)connection);
            throw new ExecutorManagerException("Error getting DB connection.", (Throwable)e);
        }
        return connection;
    }

    public int removeExecutionLogsByTime(long millis) throws ExecutorManagerException {
        String DELETE_BY_TIME = "DELETE FROM execution_logs WHERE upload_time < ?";
        QueryRunner runner = this.createQueryRunner();
        int updateNum = 0;
        try {
            updateNum = runner.update("DELETE FROM execution_logs WHERE upload_time < ?", (Object)millis);
        }
        catch (SQLException e) {
            e.printStackTrace();
            throw new ExecutorManagerException("Error deleting old execution_logs before " + millis, (Throwable)e);
        }
        return updateNum;
    }
}

