/*
 * Decompiled with CFR 0.152.
 */
package azkaban.executor;

import azkaban.executor.ExecutableFlow;
import azkaban.executor.ExecutableJobInfo;
import azkaban.executor.ExecutableNode;
import azkaban.executor.ExecutionOptions;
import azkaban.executor.ExecutionReference;
import azkaban.executor.ExecutorLoader;
import azkaban.executor.ExecutorMailer;
import azkaban.executor.ExecutorManagerException;
import azkaban.executor.Status;
import azkaban.project.Project;
import azkaban.utils.FileIOUtils;
import azkaban.utils.JSONUtils;
import azkaban.utils.Pair;
import azkaban.utils.Props;
import java.io.IOException;
import java.net.URI;
import java.net.URISyntaxException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import org.apache.commons.lang.StringUtils;
import org.apache.http.client.ResponseHandler;
import org.apache.http.client.methods.HttpGet;
import org.apache.http.client.methods.HttpUriRequest;
import org.apache.http.client.utils.URIBuilder;
import org.apache.http.impl.client.BasicResponseHandler;
import org.apache.http.impl.client.DefaultHttpClient;
import org.apache.log4j.Logger;
import org.joda.time.DateTime;

public class ExecutorManager {
    private static Logger logger = Logger.getLogger(ExecutorManager.class);
    private ExecutorLoader executorLoader;
    private String executorHost;
    private int executorPort;
    private CleanerThread cleanerThread;
    private ConcurrentHashMap<Integer, Pair<ExecutionReference, ExecutableFlow>> runningFlows = new ConcurrentHashMap();
    private ConcurrentHashMap<Integer, ExecutableFlow> recentlyFinished = new ConcurrentHashMap();
    private ExecutorMailer mailer;
    private ExecutingManagerUpdaterThread executingManager;
    private static final long DEFAULT_EXECUTION_LOGS_RETENTION_MS = 7257600000L;
    private long lastCleanerThreadCheckTime = -1L;
    private long lastThreadCheckTime = -1L;

    public ExecutorManager(Props props, ExecutorLoader loader) throws ExecutorManagerException {
        this.executorLoader = loader;
        this.loadRunningFlows();
        this.executorHost = props.getString("executor.host", "localhost");
        this.executorPort = props.getInt("executor.port");
        this.mailer = new ExecutorMailer(props);
        this.executingManager = new ExecutingManagerUpdaterThread();
        this.executingManager.start();
        long executionLogsRetentionMs = props.getLong("azkaban.execution.logs.retention.ms", 7257600000L);
        this.cleanerThread = new CleanerThread(executionLogsRetentionMs);
        this.cleanerThread.start();
    }

    public String getExecutorHost() {
        return this.executorHost;
    }

    public int getExecutorPort() {
        return this.executorPort;
    }

    public Thread.State getExecutorThreadState() {
        return this.executingManager.getState();
    }

    public boolean isThreadActive() {
        return this.executingManager.isAlive();
    }

    public long getLastThreadCheckTime() {
        return this.lastThreadCheckTime;
    }

    public long getLastCleanerThreadCheckTime() {
        return this.lastCleanerThreadCheckTime;
    }

    public Set<String> getPrimaryServerHosts() {
        HashSet<String> ports = new HashSet<String>();
        ports.add(this.executorHost + ":" + this.executorPort);
        return ports;
    }

    public Set<String> getAllActiveExecutorServerHosts() {
        HashSet<String> ports = new HashSet<String>();
        ports.add(this.executorHost + ":" + this.executorPort);
        for (Pair<ExecutionReference, ExecutableFlow> running : this.runningFlows.values()) {
            ExecutionReference ref = running.getFirst();
            ports.add(ref.getHost() + ":" + ref.getPort());
        }
        return ports;
    }

    private void loadRunningFlows() throws ExecutorManagerException {
        this.runningFlows.putAll(this.executorLoader.fetchActiveFlows());
    }

    public List<Integer> getRunningFlows(int projectId, String flowId) {
        ArrayList<Integer> executionIds = new ArrayList<Integer>();
        for (Pair<ExecutionReference, ExecutableFlow> ref : this.runningFlows.values()) {
            if (!ref.getSecond().getFlowId().equals(flowId)) continue;
            executionIds.add(ref.getFirst().getExecId());
        }
        return executionIds;
    }

    public boolean isFlowRunning(int projectId, String flowId) {
        for (Pair<ExecutionReference, ExecutableFlow> ref : this.runningFlows.values()) {
            if (ref.getSecond().getProjectId() != projectId || !ref.getSecond().getFlowId().equals(flowId)) continue;
            return true;
        }
        return false;
    }

    public ExecutableFlow getExecutableFlow(int execId) throws ExecutorManagerException {
        Pair<ExecutionReference, ExecutableFlow> active = this.runningFlows.get(execId);
        if (active == null) {
            return this.executorLoader.fetchExecutableFlow(execId);
        }
        return active.getSecond();
    }

    public List<ExecutableFlow> getRunningFlows() {
        ArrayList<ExecutableFlow> flows = new ArrayList<ExecutableFlow>();
        for (Pair<ExecutionReference, ExecutableFlow> ref : this.runningFlows.values()) {
            flows.add(ref.getSecond());
        }
        return flows;
    }

    public List<ExecutableFlow> getRecentlyFinishedFlows() {
        return new ArrayList<ExecutableFlow>(this.recentlyFinished.values());
    }

    public List<ExecutableFlow> getExecutableFlows(Project project, String flowId, int skip, int size) throws ExecutorManagerException {
        List<ExecutableFlow> flows = this.executorLoader.fetchFlowHistory(project.getId(), flowId, skip, size);
        return flows;
    }

    public List<ExecutableFlow> getExecutableFlows(int skip, int size) throws ExecutorManagerException {
        List<ExecutableFlow> flows = this.executorLoader.fetchFlowHistory(skip, size);
        return flows;
    }

    public List<ExecutableFlow> getExecutableFlows(String flowIdContains, int skip, int size) throws ExecutorManagerException {
        List<ExecutableFlow> flows = this.executorLoader.fetchFlowHistory(null, '%' + flowIdContains + '%', null, 0, -1L, -1L, skip, size);
        return flows;
    }

    public List<ExecutableFlow> getExecutableFlows(String projContain, String flowContain, String userContain, int status, long begin, long end, int skip, int size) throws ExecutorManagerException {
        List<ExecutableFlow> flows = this.executorLoader.fetchFlowHistory(projContain, flowContain, userContain, status, begin, end, skip, size);
        return flows;
    }

    public List<ExecutableJobInfo> getExecutableJobs(Project project, String jobId, int skip, int size) throws ExecutorManagerException {
        List<ExecutableJobInfo> nodes = this.executorLoader.fetchJobHistory(project.getId(), jobId, skip, size);
        return nodes;
    }

    public int getNumberOfJobExecutions(Project project, String jobId) throws ExecutorManagerException {
        return this.executorLoader.fetchNumExecutableNodes(project.getId(), jobId);
    }

    public int getNumberOfExecutions(Project project, String flowId) throws ExecutorManagerException {
        return this.executorLoader.fetchNumExecutableFlows(project.getId(), flowId);
    }

    public FileIOUtils.LogData getExecutableFlowLog(ExecutableFlow exFlow, int offset, int length) throws ExecutorManagerException {
        Pair<ExecutionReference, ExecutableFlow> pair = this.runningFlows.get(exFlow.getExecutionId());
        if (pair != null) {
            Pair<String, String> typeParam = new Pair<String, String>("type", "flow");
            Pair<String, String> offsetParam = new Pair<String, String>("offset", String.valueOf(offset));
            Pair<String, String> lengthParam = new Pair<String, String>("length", String.valueOf(length));
            Map<String, Object> result = this.callExecutorServer(pair.getFirst(), "log", typeParam, offsetParam, lengthParam);
            return FileIOUtils.LogData.createLogDataFromObject(result);
        }
        FileIOUtils.LogData value = this.executorLoader.fetchLogs(exFlow.getExecutionId(), "", 0, offset, length);
        return value;
    }

    public FileIOUtils.LogData getExecutionJobLog(ExecutableFlow exFlow, String jobId, int offset, int length, int attempt) throws ExecutorManagerException {
        Pair<ExecutionReference, ExecutableFlow> pair = this.runningFlows.get(exFlow.getExecutionId());
        if (pair != null) {
            Pair<String, String> typeParam = new Pair<String, String>("type", "job");
            Pair<String, String> jobIdParam = new Pair<String, String>("jobId", jobId);
            Pair<String, String> offsetParam = new Pair<String, String>("offset", String.valueOf(offset));
            Pair<String, String> lengthParam = new Pair<String, String>("length", String.valueOf(length));
            Pair<String, String> attemptParam = new Pair<String, String>("attempt", String.valueOf(attempt));
            Map<String, Object> result = this.callExecutorServer(pair.getFirst(), "log", typeParam, jobIdParam, offsetParam, lengthParam, attemptParam);
            return FileIOUtils.LogData.createLogDataFromObject(result);
        }
        FileIOUtils.LogData value = this.executorLoader.fetchLogs(exFlow.getExecutionId(), jobId, attempt, offset, length);
        return value;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void cancelFlow(ExecutableFlow exFlow, String userId) throws ExecutorManagerException {
        ExecutableFlow executableFlow = exFlow;
        synchronized (executableFlow) {
            Pair<ExecutionReference, ExecutableFlow> pair = this.runningFlows.get(exFlow.getExecutionId());
            if (pair == null) {
                throw new ExecutorManagerException("Execution " + exFlow.getExecutionId() + " of flow " + exFlow.getFlowId() + " isn't running.");
            }
            this.callExecutorServer(pair.getFirst(), "cancel", userId);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void resumeFlow(ExecutableFlow exFlow, String userId) throws ExecutorManagerException {
        ExecutableFlow executableFlow = exFlow;
        synchronized (executableFlow) {
            Pair<ExecutionReference, ExecutableFlow> pair = this.runningFlows.get(exFlow.getExecutionId());
            if (pair == null) {
                throw new ExecutorManagerException("Execution " + exFlow.getExecutionId() + " of flow " + exFlow.getFlowId() + " isn't running.");
            }
            this.callExecutorServer(pair.getFirst(), "resume", userId);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void pauseFlow(ExecutableFlow exFlow, String userId) throws ExecutorManagerException {
        ExecutableFlow executableFlow = exFlow;
        synchronized (executableFlow) {
            Pair<ExecutionReference, ExecutableFlow> pair = this.runningFlows.get(exFlow.getExecutionId());
            if (pair == null) {
                throw new ExecutorManagerException("Execution " + exFlow.getExecutionId() + " of flow " + exFlow.getFlowId() + " isn't running.");
            }
            this.callExecutorServer(pair.getFirst(), "pause", userId);
        }
    }

    public void pauseExecutingJobs(ExecutableFlow exFlow, String userId, String ... jobIds) throws ExecutorManagerException {
        this.modifyExecutingJobs(exFlow, "pauseJobs", userId, jobIds);
    }

    public void resumeExecutingJobs(ExecutableFlow exFlow, String userId, String ... jobIds) throws ExecutorManagerException {
        this.modifyExecutingJobs(exFlow, "resumeJobs", userId, jobIds);
    }

    public void retryFailures(ExecutableFlow exFlow, String userId) throws ExecutorManagerException {
        this.modifyExecutingJobs(exFlow, "retryFailures", userId, new String[0]);
    }

    public void retryExecutingJobs(ExecutableFlow exFlow, String userId, String ... jobIds) throws ExecutorManagerException {
        this.modifyExecutingJobs(exFlow, "retryJobs", userId, jobIds);
    }

    public void disableExecutingJobs(ExecutableFlow exFlow, String userId, String ... jobIds) throws ExecutorManagerException {
        this.modifyExecutingJobs(exFlow, "skipJobs", userId, jobIds);
    }

    public void enableExecutingJobs(ExecutableFlow exFlow, String userId, String ... jobIds) throws ExecutorManagerException {
        this.modifyExecutingJobs(exFlow, "enableJobs", userId, jobIds);
    }

    public void cancelExecutingJobs(ExecutableFlow exFlow, String userId, String ... jobIds) throws ExecutorManagerException {
        this.modifyExecutingJobs(exFlow, "cancelJobs", userId, jobIds);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private Map<String, Object> modifyExecutingJobs(ExecutableFlow exFlow, String command, String userId, String ... jobIds) throws ExecutorManagerException {
        ExecutableFlow executableFlow = exFlow;
        synchronized (executableFlow) {
            Pair<ExecutionReference, ExecutableFlow> pair = this.runningFlows.get(exFlow.getExecutionId());
            if (pair == null) {
                throw new ExecutorManagerException("Execution " + exFlow.getExecutionId() + " of flow " + exFlow.getFlowId() + " isn't running.");
            }
            Map<String, Object> response = null;
            if (jobIds != null && jobIds.length > 0) {
                for (String jobId : jobIds) {
                    ExecutableNode node;
                    if (jobId.isEmpty() || (node = exFlow.getExecutableNode(jobId)) != null) continue;
                    throw new ExecutorManagerException("Job " + jobId + " doesn't exist in execution " + exFlow.getExecutionId() + ".");
                }
                String ids = StringUtils.join((Object[])jobIds, (char)',');
                response = this.callExecutorServer(pair.getFirst(), "modifyExecution", userId, new Pair<String, String>("modifyType", command), new Pair<String, String>("jobIds", ids));
            } else {
                response = this.callExecutorServer(pair.getFirst(), "modifyExecution", userId, new Pair<String, String>("modifyType", command));
            }
            return response;
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public String submitExecutableFlow(ExecutableFlow exflow) throws ExecutorManagerException {
        ExecutableFlow executableFlow = exflow;
        synchronized (executableFlow) {
            logger.info((Object)("Submitting execution flow " + exflow.getFlowId()));
            int projectId = exflow.getProjectId();
            String flowId = exflow.getFlowId();
            List<Integer> running = this.getRunningFlows(projectId, flowId);
            ExecutionOptions options = exflow.getExecutionOptions();
            if (options == null) {
                options = new ExecutionOptions();
            }
            if (options.getDisabledJobs() != null) {
                for (String disabledId : options.getDisabledJobs()) {
                    ExecutableNode node = exflow.getExecutableNode(disabledId);
                    node.setStatus(Status.DISABLED);
                }
            }
            String message = "";
            if (!running.isEmpty()) {
                if (options.getConcurrentOption().equals("pipeline")) {
                    Collections.sort(running);
                    Integer runningExecId = running.get(running.size() - 1);
                    options.setPipelineExecutionId(runningExecId);
                    message = "Flow " + flowId + " is already running with exec id " + runningExecId + ". Pipelining level " + options.getPipelineLevel() + ". ";
                } else {
                    if (options.getConcurrentOption().equals("skip")) {
                        throw new ExecutorManagerException("Flow " + flowId + " is already running. Skipping execution.", ExecutorManagerException.Reason.SkippedExecution);
                    }
                    message = "Flow " + flowId + " is already running with exec id " + StringUtils.join(running, (String)",") + ". Will execute concurrently. ";
                }
            }
            this.executorLoader.uploadExecutableFlow(exflow);
            ExecutionReference reference = new ExecutionReference(exflow.getExecutionId(), this.executorHost, this.executorPort);
            this.executorLoader.addActiveExecutableReference(reference);
            try {
                this.callExecutorServer(reference, "execute");
                this.runningFlows.put(exflow.getExecutionId(), new Pair<ExecutionReference, ExecutableFlow>(reference, exflow));
                message = message + "Execution submitted successfully with exec id " + exflow.getExecutionId();
            }
            catch (ExecutorManagerException e) {
                this.executorLoader.removeActiveExecutableReference(reference.getExecId());
                throw e;
            }
            return message;
        }
    }

    public void cleanOldExecutionLogs(long millis) {
        try {
            int count = this.executorLoader.removeExecutionLogsByTime(millis);
            logger.info((Object)("Cleaned up " + count + " log entries."));
        }
        catch (ExecutorManagerException e) {
            e.printStackTrace();
        }
    }

    private Map<String, Object> callExecutorServer(ExecutionReference ref, String action) throws ExecutorManagerException {
        try {
            return this.callExecutorServer(ref.getHost(), ref.getPort(), action, ref.getExecId(), null, (Pair[])null);
        }
        catch (IOException e) {
            throw new ExecutorManagerException(e);
        }
    }

    private Map<String, Object> callExecutorServer(ExecutionReference ref, String action, String user) throws ExecutorManagerException {
        try {
            return this.callExecutorServer(ref.getHost(), ref.getPort(), action, ref.getExecId(), user, (Pair[])null);
        }
        catch (IOException e) {
            throw new ExecutorManagerException(e);
        }
    }

    private Map<String, Object> callExecutorServer(ExecutionReference ref, String action, Pair<String, String> ... params) throws ExecutorManagerException {
        try {
            return this.callExecutorServer(ref.getHost(), ref.getPort(), action, ref.getExecId(), null, params);
        }
        catch (IOException e) {
            throw new ExecutorManagerException(e);
        }
    }

    private Map<String, Object> callExecutorServer(ExecutionReference ref, String action, String user, Pair<String, String> ... params) throws ExecutorManagerException {
        try {
            return this.callExecutorServer(ref.getHost(), ref.getPort(), action, ref.getExecId(), user, params);
        }
        catch (IOException e) {
            throw new ExecutorManagerException(e);
        }
    }

    private Map<String, Object> callExecutorServer(String host, int port, String action, Integer executionId, String user, Pair<String, String> ... params) throws IOException {
        URIBuilder builder = new URIBuilder();
        builder.setScheme("http").setHost(host).setPort(port).setPath("/executor");
        builder.setParameter("action", action);
        if (executionId != null) {
            builder.setParameter("execid", String.valueOf(executionId));
        }
        if (user != null) {
            builder.setParameter("user", user);
        }
        if (params != null) {
            for (Pair<String, String> pair : params) {
                builder.setParameter(pair.getFirst(), pair.getSecond());
            }
        }
        URI uri = null;
        try {
            uri = builder.build();
        }
        catch (URISyntaxException e) {
            throw new IOException(e);
        }
        BasicResponseHandler responseHandler = new BasicResponseHandler();
        DefaultHttpClient httpclient = new DefaultHttpClient();
        HttpGet httpget = new HttpGet(uri);
        String response = null;
        try {
            response = (String)httpclient.execute((HttpUriRequest)httpget, (ResponseHandler)responseHandler);
        }
        catch (IOException e) {
            throw e;
        }
        finally {
            httpclient.getConnectionManager().shutdown();
        }
        Map jsonResponse = (Map)JSONUtils.parseJSONFromString(response);
        String error = (String)jsonResponse.get("error");
        if (error != null) {
            throw new IOException(error);
        }
        return jsonResponse;
    }

    public Map<String, Object> callExecutorJMX(String hostPort, String action, String mBean) throws IOException {
        URIBuilder builder = new URIBuilder();
        String[] hostPortSplit = hostPort.split(":");
        builder.setScheme("http").setHost(hostPortSplit[0]).setPort(Integer.parseInt(hostPortSplit[1])).setPath("/jmx");
        builder.setParameter(action, "");
        if (mBean != null) {
            builder.setParameter("mBean", mBean);
        }
        URI uri = null;
        try {
            uri = builder.build();
        }
        catch (URISyntaxException e) {
            throw new IOException(e);
        }
        BasicResponseHandler responseHandler = new BasicResponseHandler();
        DefaultHttpClient httpclient = new DefaultHttpClient();
        HttpGet httpget = new HttpGet(uri);
        String response = null;
        try {
            response = (String)httpclient.execute((HttpUriRequest)httpget, (ResponseHandler)responseHandler);
        }
        catch (IOException e) {
            throw e;
        }
        finally {
            httpclient.getConnectionManager().shutdown();
        }
        Map jsonResponse = (Map)JSONUtils.parseJSONFromString(response);
        String error = (String)jsonResponse.get("error");
        if (error != null) {
            throw new IOException(error);
        }
        return jsonResponse;
    }

    public void shutdown() {
        this.executingManager.shutdown();
    }

    private void finalizeFlows(ExecutableFlow flow) {
        int execId = flow.getExecutionId();
        try {
            ExecutableFlow dsFlow;
            if (this.isFinished(flow)) {
                dsFlow = flow;
            } else {
                dsFlow = this.executorLoader.fetchExecutableFlow(execId);
                if (!this.isFinished(dsFlow)) {
                    this.failEverything(dsFlow);
                    this.executorLoader.updateExecutableFlow(dsFlow);
                }
            }
            if (flow.getEndTime() == -1L) {
                flow.setEndTime(System.currentTimeMillis());
                this.executorLoader.updateExecutableFlow(dsFlow);
            }
            this.executorLoader.removeActiveExecutableReference(execId);
            this.runningFlows.remove(execId);
            this.recentlyFinished.put(execId, dsFlow);
        }
        catch (ExecutorManagerException e) {
            logger.error((Object)e);
        }
        ExecutionOptions options = flow.getExecutionOptions();
        if (flow.getStatus() == Status.FAILED || flow.getStatus() == Status.KILLED) {
            if (options.getFailureEmails() != null && !options.getFailureEmails().isEmpty()) {
                try {
                    this.mailer.sendErrorEmail(flow, "Executor no longer seems to be running this execution. Most likely due to executor bounce.");
                }
                catch (Exception e) {
                    logger.error((Object)e);
                }
            }
        } else if (options.getSuccessEmails() != null && !options.getSuccessEmails().isEmpty()) {
            try {
                this.mailer.sendSuccessEmail(flow);
            }
            catch (Exception e) {
                logger.error((Object)e);
            }
        }
    }

    private void failEverything(ExecutableFlow exFlow) {
        long time = System.currentTimeMillis();
        block4: for (ExecutableNode node : exFlow.getExecutableNodes()) {
            switch (node.getStatus()) {
                case SUCCEEDED: 
                case FAILED: 
                case KILLED: 
                case SKIPPED: 
                case DISABLED: {
                    continue block4;
                }
                case READY: {
                    node.setStatus(Status.KILLED);
                    break;
                }
                default: {
                    node.setStatus(Status.FAILED);
                }
            }
            if (node.getStartTime() == -1L) {
                node.setStartTime(time);
            }
            if (node.getEndTime() != -1L) continue;
            node.setEndTime(time);
        }
        if (exFlow.getEndTime() == -1L) {
            exFlow.setEndTime(time);
        }
        exFlow.setStatus(Status.FAILED);
    }

    private void evictOldRecentlyFinished(long ageMs) {
        ArrayList recentlyFinishedKeys = new ArrayList(this.recentlyFinished.keySet());
        long oldAgeThreshold = System.currentTimeMillis() - ageMs;
        for (Integer key : recentlyFinishedKeys) {
            ExecutableFlow flow = this.recentlyFinished.get(key);
            if (flow.getEndTime() >= oldAgeThreshold) continue;
            this.recentlyFinished.remove(key);
        }
    }

    private ExecutableFlow updateExecution(Map<String, Object> updateData) throws ExecutorManagerException {
        Integer execId = (Integer)updateData.get("execId");
        if (execId == null) {
            throw new ExecutorManagerException("Response is malformed. Need exec id to update.");
        }
        Pair<ExecutionReference, ExecutableFlow> refPair = this.runningFlows.get(execId);
        if (refPair == null) {
            throw new ExecutorManagerException("No running flow found with the execution id.");
        }
        ExecutionReference ref = refPair.getFirst();
        ExecutableFlow flow = refPair.getSecond();
        if (updateData.containsKey("error")) {
            throw new ExecutorManagerException((String)updateData.get("error"), flow);
        }
        ref.setNextCheckTime(0L);
        ref.setNumErrors(0);
        Status oldStatus = flow.getStatus();
        flow.applyUpdateObject(updateData);
        Status newStatus = flow.getStatus();
        ExecutionOptions options = flow.getExecutionOptions();
        if (oldStatus != newStatus && newStatus.equals((Object)Status.FAILED_FINISHING) && options.getNotifyOnFirstFailure()) {
            this.mailer.sendFirstErrorMessage(flow);
        }
        return flow;
    }

    public boolean isFinished(ExecutableFlow flow) {
        switch (flow.getStatus()) {
            case SUCCEEDED: 
            case FAILED: 
            case KILLED: {
                return true;
            }
        }
        return false;
    }

    private void fillUpdateTimeAndExecId(List<ExecutableFlow> flows, List<Integer> executionIds, List<Long> updateTimes) {
        for (ExecutableFlow flow : flows) {
            executionIds.add(flow.getExecutionId());
            updateTimes.add(flow.getUpdateTime());
        }
    }

    private Map<ConnectionInfo, List<ExecutableFlow>> getFlowToExecutorMap() {
        HashMap<ConnectionInfo, List<ExecutableFlow>> exFlowMap = new HashMap<ConnectionInfo, List<ExecutableFlow>>();
        ConnectionInfo lastPort = new ConnectionInfo(this.executorHost, this.executorPort);
        for (Pair<ExecutionReference, ExecutableFlow> runningFlow : this.runningFlows.values()) {
            List<ExecutableFlow> flows;
            ExecutionReference ref = runningFlow.getFirst();
            ExecutableFlow flow = runningFlow.getSecond();
            if (ref.getNextCheckTime() >= System.currentTimeMillis()) continue;
            if (!lastPort.isEqual(ref.getHost(), ref.getPort())) {
                lastPort = new ConnectionInfo(ref.getHost(), ref.getPort());
            }
            if ((flows = exFlowMap.get(lastPort)) == null) {
                flows = new ArrayList<ExecutableFlow>();
                exFlowMap.put(lastPort, flows);
            }
            flows.add(flow);
        }
        return exFlowMap;
    }

    public int getExecutableFlows(int projectId, String flowId, int from, int length, List<ExecutableFlow> outputList) throws ExecutorManagerException {
        List<ExecutableFlow> flows = this.executorLoader.fetchFlowHistory(projectId, flowId, from, length);
        outputList.addAll(flows);
        return this.executorLoader.fetchNumExecutableFlows(projectId, flowId);
    }

    private class CleanerThread
    extends Thread {
        private static final long CLEANER_THREAD_WAIT_INTERVAL_MS = 86400000L;
        private final long executionLogsRetentionMs;
        private boolean shutdown = false;
        private long lastLogCleanTime = -1L;

        public CleanerThread(long executionLogsRetentionMs) {
            this.executionLogsRetentionMs = executionLogsRetentionMs;
            this.setName("AzkabanWebServer-Cleaner-Thread");
        }

        public void shutdown() {
            this.shutdown = true;
            this.interrupt();
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        @Override
        public void run() {
            while (!this.shutdown) {
                CleanerThread cleanerThread = this;
                synchronized (cleanerThread) {
                    try {
                        ExecutorManager.this.lastCleanerThreadCheckTime = System.currentTimeMillis();
                        long currentTime = System.currentTimeMillis();
                        if (currentTime - 86400000L > this.lastLogCleanTime) {
                            this.cleanExecutionLogs();
                            this.lastLogCleanTime = currentTime;
                        }
                        this.wait(86400000L);
                    }
                    catch (InterruptedException e) {
                        logger.info((Object)"Interrupted. Probably to shut down.");
                    }
                }
            }
        }

        private void cleanExecutionLogs() {
            logger.info((Object)"Cleaning old logs from execution_logs");
            long cutoff = DateTime.now().getMillis() - this.executionLogsRetentionMs;
            logger.info((Object)("Cleaning old log files before " + new DateTime(cutoff).toString()));
            ExecutorManager.this.cleanOldExecutionLogs(DateTime.now().getMillis() - this.executionLogsRetentionMs);
        }
    }

    private static class ConnectionInfo {
        private String host;
        private int port;

        public ConnectionInfo(String host, int port) {
            this.host = host;
            this.port = port;
        }

        private ConnectionInfo getOuterType() {
            return this;
        }

        public boolean isEqual(String host, int port) {
            return this.port == port && this.host.equals(host);
        }

        public String getHost() {
            return this.host;
        }

        public int getPort() {
            return this.port;
        }

        public int hashCode() {
            int prime = 31;
            int result = 1;
            result = 31 * result + (this.host == null ? 0 : this.host.hashCode());
            result = 31 * result + this.port;
            return result;
        }

        public boolean equals(Object obj) {
            if (this == obj) {
                return true;
            }
            if (obj == null) {
                return false;
            }
            if (this.getClass() != obj.getClass()) {
                return false;
            }
            ConnectionInfo other = (ConnectionInfo)obj;
            if (this.host == null ? other.host != null : !this.host.equals(other.host)) {
                return false;
            }
            return this.port == other.port;
        }
    }

    private class ExecutingManagerUpdaterThread
    extends Thread {
        private boolean shutdown = false;
        private long recentlyFinishedLifetimeMs = 600000L;
        private int waitTimeIdleMs = 2000;
        private int waitTimeMs = 500;
        private int numErrors = 6;
        private long errorThreshold = 10000L;

        public ExecutingManagerUpdaterThread() {
            this.setName("ExecutorManagerUpdaterThread");
        }

        private void shutdown() {
            this.shutdown = true;
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        @Override
        public void run() {
            while (!this.shutdown) {
                try {
                    ExecutorManager.this.lastThreadCheckTime = System.currentTimeMillis();
                    Map exFlowMap = ExecutorManager.this.getFlowToExecutorMap();
                    ArrayList<ExecutableFlow> finishedFlows = new ArrayList<ExecutableFlow>();
                    ArrayList<Object> finalizeFlows = new ArrayList<Object>();
                    if (exFlowMap.size() > 0) {
                        for (Map.Entry entry : exFlowMap.entrySet()) {
                            ArrayList updateTimesList = new ArrayList();
                            ArrayList executionIdsList = new ArrayList();
                            ExecutorManager.this.fillUpdateTimeAndExecId((List)entry.getValue(), executionIdsList, updateTimesList);
                            Pair<String, String> updateTimes = new Pair<String, String>("updatetime", JSONUtils.toJSON(updateTimesList));
                            Pair<String, String> executionIds = new Pair<String, String>("execid", JSONUtils.toJSON(executionIdsList));
                            ConnectionInfo connection = (ConnectionInfo)entry.getKey();
                            Map results = null;
                            try {
                                results = ExecutorManager.this.callExecutorServer(connection.getHost(), connection.getPort(), "update", null, null, new Pair[]{executionIds, updateTimes});
                            }
                            catch (IOException e) {
                                logger.error((Object)e);
                                for (ExecutableFlow flow : (List)entry.getValue()) {
                                    Pair pair = (Pair)ExecutorManager.this.runningFlows.get(flow.getExecutionId());
                                    if (pair == null) continue;
                                    ExecutionReference ref = (ExecutionReference)pair.getFirst();
                                    int numErrors = ref.getNumErrors();
                                    if (ref.getNumErrors() < this.numErrors) {
                                        ref.setNextCheckTime(System.currentTimeMillis() + this.errorThreshold);
                                        ref.setNumErrors(++numErrors);
                                        continue;
                                    }
                                    logger.error((Object)("Evicting flow " + flow.getExecutionId() + ". The executor is unresponsive."));
                                    finalizeFlows.add(pair.getSecond());
                                }
                            }
                            if (results == null) continue;
                            List executionUpdates = (List)results.get("updated");
                            for (Map updateMap : executionUpdates) {
                                try {
                                    ExecutableFlow flow = ExecutorManager.this.updateExecution(updateMap);
                                    if (!ExecutorManager.this.isFinished(flow)) continue;
                                    finishedFlows.add(flow);
                                    finalizeFlows.add(flow);
                                }
                                catch (ExecutorManagerException e) {
                                    ExecutableFlow flow = e.getExecutableFlow();
                                    logger.error((Object)e);
                                    if (flow == null) continue;
                                    logger.error((Object)("Finalizing flow " + flow.getExecutionId()));
                                    finalizeFlows.add(flow);
                                }
                            }
                        }
                        ExecutorManager.this.evictOldRecentlyFinished(this.recentlyFinishedLifetimeMs);
                        for (ExecutableFlow executableFlow : finishedFlows) {
                            ExecutorManager.this.recentlyFinished.put(executableFlow.getExecutionId(), executableFlow);
                        }
                        for (ExecutableFlow executableFlow : finalizeFlows) {
                            ExecutorManager.this.finalizeFlows(executableFlow);
                        }
                    }
                    ExecutingManagerUpdaterThread executingManagerUpdaterThread = this;
                    synchronized (executingManagerUpdaterThread) {
                        try {
                            if (ExecutorManager.this.runningFlows.size() > 0) {
                                this.wait(this.waitTimeMs);
                            } else {
                                this.wait(this.waitTimeIdleMs);
                            }
                        }
                        catch (InterruptedException interruptedException) {
                            // empty catch block
                        }
                    }
                }
                catch (Exception e) {
                    logger.error((Object)e);
                }
            }
        }
    }
}

