/*
 * Decompiled with CFR 0.152.
 */
package azkaban.execapp;

import azkaban.execapp.FlowRunner;
import azkaban.execapp.FlowRunnerManager;
import azkaban.execapp.ProjectVersion;
import azkaban.execapp.event.Event;
import azkaban.execapp.event.EventListener;
import azkaban.execapp.event.FlowWatcher;
import azkaban.execapp.event.LocalFlowWatcher;
import azkaban.execapp.event.RemoteFlowWatcher;
import azkaban.executor.ExecutableFlow;
import azkaban.executor.ExecutionOptions;
import azkaban.executor.ExecutorLoader;
import azkaban.executor.ExecutorManagerException;
import azkaban.jobtype.JobTypeManager;
import azkaban.project.ProjectLoader;
import azkaban.utils.FileIOUtils;
import azkaban.utils.JSONUtils;
import azkaban.utils.Pair;
import azkaban.utils.Props;
import java.io.File;
import java.io.FilenameFilter;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
import org.apache.commons.io.FileUtils;
import org.apache.log4j.Logger;

public class FlowRunnerManager
implements EventListener {
    private static Logger logger = Logger.getLogger(FlowRunnerManager.class);
    private File executionDirectory;
    private File projectDirectory;
    private static final long RECENTLY_FINISHED_TIME_TO_LIVE = 60000L;
    private static final int DEFAULT_NUM_EXECUTING_FLOWS = 30;
    private Map<Pair<Integer, Integer>, ProjectVersion> installedProjects = new ConcurrentHashMap();
    private Map<Integer, FlowRunner> runningFlows = new ConcurrentHashMap();
    private Map<Integer, ExecutableFlow> recentlyFinishedFlows = new ConcurrentHashMap();
    private LinkedBlockingQueue<FlowRunner> flowQueue = new LinkedBlockingQueue();
    private int numThreads = 30;
    private ExecutorService executorService;
    private SubmitterThread submitterThread;
    private CleanerThread cleanerThread;
    private int numJobThreadPerFlow = 10;
    private ExecutorLoader executorLoader;
    private ProjectLoader projectLoader;
    private JobTypeManager jobtypeManager;
    private Props globalProps;
    private final Props azkabanProps;
    private long lastSubmitterThreadCheckTime = -1L;
    private long lastCleanerThreadCheckTime = -1L;
    private long executionDirRetention = 86400000L;
    private String jobLogChunkSize = "5MB";
    private int jobLogNumFiles = 4;
    private boolean validateProxyUser = false;
    private Object executionDirDeletionSync = new Object();

    public FlowRunnerManager(Props props, ExecutorLoader executorLoader, ProjectLoader projectLoader, ClassLoader parentClassLoader) throws IOException {
        this.executionDirectory = new File(props.getString("azkaban.execution.dir", "executions"));
        this.projectDirectory = new File(props.getString("azkaban.project.dir", "projects"));
        this.azkabanProps = props;
        this.executionDirRetention = props.getLong("execution.dir.retention", this.executionDirRetention);
        logger.info((Object)("Execution dir retention set to " + this.executionDirRetention + " ms"));
        if (!this.executionDirectory.exists()) {
            this.executionDirectory.mkdirs();
        }
        if (!this.projectDirectory.exists()) {
            this.projectDirectory.mkdirs();
        }
        this.installedProjects = this.loadExistingProjects();
        this.numThreads = props.getInt("executor.flow.threads", 30);
        this.numJobThreadPerFlow = props.getInt("flow.num.job.threads", this.numJobThreadPerFlow);
        this.executorService = Executors.newFixedThreadPool(this.numThreads);
        this.executorLoader = executorLoader;
        this.projectLoader = projectLoader;
        this.jobLogChunkSize = this.azkabanProps.getString("job.log.chunk.size", "5MB");
        this.jobLogNumFiles = this.azkabanProps.getInt("job.log.backup.index", 4);
        this.validateProxyUser = this.azkabanProps.getBoolean("proxy.user.lock.down", false);
        this.submitterThread = new SubmitterThread(this, (BlockingQueue)this.flowQueue);
        this.submitterThread.start();
        this.cleanerThread = new CleanerThread(this);
        this.cleanerThread.start();
        this.jobtypeManager = new JobTypeManager(props.getString("azkaban.jobtype.plugin.dir", "plugins/jobtypes"), parentClassLoader);
    }

    private Map<Pair<Integer, Integer>, ProjectVersion> loadExistingProjects() {
        HashMap<Pair<Integer, Integer>, ProjectVersion> allProjects = new HashMap<Pair<Integer, Integer>, ProjectVersion>();
        File[] fileArray = this.projectDirectory.listFiles((FilenameFilter)new /* Unavailable Anonymous Inner Class!! */);
        int n = fileArray.length;
        int n2 = 0;
        while (n2 < n) {
            File project = fileArray[n2];
            if (project.isDirectory()) {
                try {
                    String fileName = new File(project.getAbsolutePath()).getName();
                    int projectId = Integer.parseInt(fileName.split("\\.")[0]);
                    int versionNum = Integer.parseInt(fileName.split("\\.")[1]);
                    ProjectVersion version = new ProjectVersion(projectId, versionNum, project);
                    allProjects.put((Pair<Integer, Integer>)new Pair((Object)projectId, (Object)versionNum), version);
                }
                catch (Exception e) {
                    e.printStackTrace();
                }
            }
            ++n2;
        }
        return allProjects;
    }

    public Props getGlobalProps() {
        return this.globalProps;
    }

    public void setGlobalProps(Props globalProps) {
        this.globalProps = globalProps;
    }

    public void submitFlow(int execId) throws ExecutorManagerException {
        FlowRunner runner;
        if (this.runningFlows.containsKey(execId)) {
            throw new ExecutorManagerException("Execution " + execId + " is already running.");
        }
        ExecutableFlow flow = null;
        flow = this.executorLoader.fetchExecutableFlow(execId);
        if (flow == null) {
            throw new ExecutorManagerException("Error loading flow with exec " + execId);
        }
        this.setupFlow(flow);
        Object watcher = null;
        ExecutionOptions options = flow.getExecutionOptions();
        if (options.getPipelineExecutionId() != null) {
            Integer pipelineExecId = options.getPipelineExecutionId();
            runner = (FlowRunner)this.runningFlows.get(pipelineExecId);
            watcher = runner != null ? new LocalFlowWatcher(runner) : new RemoteFlowWatcher(pipelineExecId.intValue(), this.executorLoader);
        }
        int numJobThreads = this.numJobThreadPerFlow;
        if (options.getFlowParameters().containsKey("flow.num.job.threads")) {
            try {
                int numJobs = Integer.valueOf((String)options.getFlowParameters().get("flow.num.job.threads"));
                if (numJobs > 0 && numJobs <= numJobThreads) {
                    numJobThreads = numJobs;
                }
            }
            catch (Exception e) {
                throw new ExecutorManagerException("Failed to set the number of job threads " + (String)options.getFlowParameters().get("flow.num.job.threads") + " for flow " + execId, (Throwable)e);
            }
        }
        runner = new FlowRunner(flow, this.executorLoader, this.projectLoader, this.jobtypeManager);
        runner.setFlowWatcher((FlowWatcher)watcher).setJobLogSettings(this.jobLogChunkSize, this.jobLogNumFiles).setValidateProxyUser(this.validateProxyUser).setGlobalProps(this.globalProps).setNumJobThreads(numJobThreads).addListener((EventListener)this);
        if (this.runningFlows.containsKey(execId)) {
            throw new ExecutorManagerException("Execution " + execId + " is already running.");
        }
        this.runningFlows.put(execId, runner);
        this.flowQueue.add(runner);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void setupFlow(ExecutableFlow flow) throws ExecutorManagerException {
        int execId = flow.getExecutionId();
        File execPath = new File(this.executionDirectory, String.valueOf(execId));
        flow.setExecutionPath(execPath.getPath());
        logger.info((Object)("Flow " + execId + " submitted with path " + execPath.getPath()));
        execPath.mkdirs();
        Pair projectVersionKey = new Pair((Object)flow.getProjectId(), (Object)flow.getVersion());
        ProjectVersion projectVersion = null;
        Map map = this.installedProjects;
        synchronized (map) {
            projectVersion = (ProjectVersion)this.installedProjects.get(projectVersionKey);
            if (projectVersion == null) {
                projectVersion = new ProjectVersion(flow.getProjectId(), flow.getVersion());
                this.installedProjects.put(projectVersionKey, projectVersion);
            }
        }
        try {
            projectVersion.setupProjectFiles(this.projectLoader, this.projectDirectory, logger);
            projectVersion.copyCreateSymlinkDirectory(execPath);
        }
        catch (Exception e) {
            e.printStackTrace();
            if (execPath.exists()) {
                try {
                    FileUtils.deleteDirectory((File)execPath);
                }
                catch (IOException e1) {
                    e1.printStackTrace();
                }
            }
            throw new ExecutorManagerException(e);
        }
    }

    public void cancelFlow(int execId, String user) throws ExecutorManagerException {
        FlowRunner runner = (FlowRunner)this.runningFlows.get(execId);
        if (runner == null) {
            throw new ExecutorManagerException("Execution " + execId + " is not running.");
        }
        runner.kill(user);
    }

    public void pauseFlow(int execId, String user) throws ExecutorManagerException {
        FlowRunner runner = (FlowRunner)this.runningFlows.get(execId);
        if (runner == null) {
            throw new ExecutorManagerException("Execution " + execId + " is not running.");
        }
        runner.pause(user);
    }

    public void resumeFlow(int execId, String user) throws ExecutorManagerException {
        FlowRunner runner = (FlowRunner)this.runningFlows.get(execId);
        if (runner == null) {
            throw new ExecutorManagerException("Execution " + execId + " is not running.");
        }
        runner.resume(user);
    }

    public void retryFailures(int execId, String user) throws ExecutorManagerException {
        FlowRunner runner = (FlowRunner)this.runningFlows.get(execId);
        if (runner == null) {
            throw new ExecutorManagerException("Execution " + execId + " is not running.");
        }
        runner.retryFailures(user);
    }

    public ExecutableFlow getExecutableFlow(int execId) {
        FlowRunner runner = (FlowRunner)this.runningFlows.get(execId);
        if (runner == null) {
            return (ExecutableFlow)this.recentlyFinishedFlows.get(execId);
        }
        return runner.getExecutableFlow();
    }

    public void handleEvent(Event event) {
        if (event.getType() == Event.Type.FLOW_FINISHED) {
            FlowRunner flowRunner = (FlowRunner)event.getRunner();
            ExecutableFlow flow = flowRunner.getExecutableFlow();
            this.recentlyFinishedFlows.put(flow.getExecutionId(), flow);
            logger.info((Object)("Flow " + flow.getExecutionId() + " is finished. Adding it to recently finished flows list."));
            this.runningFlows.remove(flow.getExecutionId());
        }
    }

    public FileIOUtils.LogData readFlowLogs(int execId, int startByte, int length) throws ExecutorManagerException {
        FlowRunner runner = (FlowRunner)this.runningFlows.get(execId);
        if (runner == null) {
            throw new ExecutorManagerException("Running flow " + execId + " not found.");
        }
        File dir = runner.getExecutionDir();
        if (dir != null && dir.exists()) {
            try {
                Object object = this.executionDirDeletionSync;
                synchronized (object) {
                    if (!dir.exists()) {
                        throw new ExecutorManagerException("Execution dir file doesn't exist. Probably has beend deleted");
                    }
                    File logFile = runner.getFlowLogFile();
                    if (logFile != null && logFile.exists()) {
                        return FileIOUtils.readUtf8File((File)logFile, (int)startByte, (int)length);
                    }
                    throw new ExecutorManagerException("Flow log file doesn't exist.");
                }
            }
            catch (IOException e) {
                throw new ExecutorManagerException((Exception)e);
            }
        }
        throw new ExecutorManagerException("Error reading file. Log directory doesn't exist.");
    }

    public FileIOUtils.LogData readJobLogs(int execId, String jobId, int attempt, int startByte, int length) throws ExecutorManagerException {
        FlowRunner runner = (FlowRunner)this.runningFlows.get(execId);
        if (runner == null) {
            throw new ExecutorManagerException("Running flow " + execId + " not found.");
        }
        File dir = runner.getExecutionDir();
        if (dir != null && dir.exists()) {
            try {
                Object object = this.executionDirDeletionSync;
                synchronized (object) {
                    if (!dir.exists()) {
                        throw new ExecutorManagerException("Execution dir file doesn't exist. Probably has beend deleted");
                    }
                    File logFile = runner.getJobLogFile(jobId, attempt);
                    if (logFile != null && logFile.exists()) {
                        return FileIOUtils.readUtf8File((File)logFile, (int)startByte, (int)length);
                    }
                    throw new ExecutorManagerException("Job log file doesn't exist.");
                }
            }
            catch (IOException e) {
                throw new ExecutorManagerException((Exception)e);
            }
        }
        throw new ExecutorManagerException("Error reading file. Log directory doesn't exist.");
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     * Enabled aggressive block sorting
     * Enabled unnecessary exception pruning
     * Enabled aggressive exception aggregation
     */
    public List<Object> readJobAttachments(int execId, String jobId, int attempt) throws ExecutorManagerException {
        FlowRunner runner = (FlowRunner)this.runningFlows.get(execId);
        if (runner == null) {
            throw new ExecutorManagerException("Running flow " + execId + " not found.");
        }
        File dir = runner.getExecutionDir();
        if (dir == null) throw new ExecutorManagerException("Error reading file. Log directory doesn't exist.");
        if (!dir.exists()) {
            throw new ExecutorManagerException("Error reading file. Log directory doesn't exist.");
        }
        try {
            Object object = this.executionDirDeletionSync;
            synchronized (object) {
                if (!dir.exists()) {
                    throw new ExecutorManagerException("Execution dir file doesn't exist. Probably has beend deleted");
                }
                File attachmentFile = runner.getJobAttachmentFile(jobId, attempt);
                if (attachmentFile == null) return null;
                if (!attachmentFile.exists()) return null;
                return (ArrayList)JSONUtils.parseJSONFromFile((File)attachmentFile);
            }
        }
        catch (IOException e) {
            throw new ExecutorManagerException((Exception)e);
        }
    }

    public FileIOUtils.JobMetaData readJobMetaData(int execId, String jobId, int attempt, int startByte, int length) throws ExecutorManagerException {
        FlowRunner runner = (FlowRunner)this.runningFlows.get(execId);
        if (runner == null) {
            throw new ExecutorManagerException("Running flow " + execId + " not found.");
        }
        File dir = runner.getExecutionDir();
        if (dir != null && dir.exists()) {
            try {
                Object object = this.executionDirDeletionSync;
                synchronized (object) {
                    if (!dir.exists()) {
                        throw new ExecutorManagerException("Execution dir file doesn't exist. Probably has beend deleted");
                    }
                    File metaDataFile = runner.getJobMetaDataFile(jobId, attempt);
                    if (metaDataFile != null && metaDataFile.exists()) {
                        return FileIOUtils.readUtf8MetaDataFile((File)metaDataFile, (int)startByte, (int)length);
                    }
                    throw new ExecutorManagerException("Job log file doesn't exist.");
                }
            }
            catch (IOException e) {
                throw new ExecutorManagerException((Exception)e);
            }
        }
        throw new ExecutorManagerException("Error reading file. Log directory doesn't exist.");
    }

    public long getLastCleanerThreadCheckTime() {
        return this.lastCleanerThreadCheckTime;
    }

    public long getLastSubmitterThreadCheckTime() {
        return this.lastSubmitterThreadCheckTime;
    }

    public boolean isSubmitterThreadActive() {
        return this.submitterThread.isAlive();
    }

    public boolean isCleanerThreadActive() {
        return this.cleanerThread.isAlive();
    }

    public Thread.State getSubmitterThreadState() {
        return this.submitterThread.getState();
    }

    public Thread.State getCleanerThreadState() {
        return this.cleanerThread.getState();
    }

    public boolean isExecutorThreadPoolShutdown() {
        return this.executorService.isShutdown();
    }

    public int getNumExecutingFlows() {
        return this.runningFlows.size();
    }

    public String getRunningFlowIds() {
        ArrayList ids = new ArrayList(this.runningFlows.keySet());
        Collections.sort(ids);
        return ids.toString();
    }

    public int getNumExecutingJobs() {
        int jobCount = 0;
        for (FlowRunner runner : this.runningFlows.values()) {
            jobCount += runner.getNumRunningJobs();
        }
        return jobCount;
    }

    static /* synthetic */ void access$0(FlowRunnerManager flowRunnerManager, long l) {
        flowRunnerManager.lastSubmitterThreadCheckTime = l;
    }

    static /* synthetic */ ExecutorService access$1(FlowRunnerManager flowRunnerManager) {
        return flowRunnerManager.executorService;
    }

    static /* synthetic */ Logger access$2() {
        return logger;
    }

    static /* synthetic */ void access$3(FlowRunnerManager flowRunnerManager, long l) {
        flowRunnerManager.lastCleanerThreadCheckTime = l;
    }

    static /* synthetic */ File access$4(FlowRunnerManager flowRunnerManager) {
        return flowRunnerManager.executionDirectory;
    }

    static /* synthetic */ long access$5(FlowRunnerManager flowRunnerManager) {
        return flowRunnerManager.executionDirRetention;
    }

    static /* synthetic */ Map access$6(FlowRunnerManager flowRunnerManager) {
        return flowRunnerManager.runningFlows;
    }

    static /* synthetic */ Map access$7(FlowRunnerManager flowRunnerManager) {
        return flowRunnerManager.recentlyFinishedFlows;
    }

    static /* synthetic */ Object access$8(FlowRunnerManager flowRunnerManager) {
        return flowRunnerManager.executionDirDeletionSync;
    }

    static /* synthetic */ Map access$9(FlowRunnerManager flowRunnerManager) {
        return flowRunnerManager.installedProjects;
    }
}

