/*
 * Decompiled with CFR 0.152.
 */
package azkaban.execapp;

import azkaban.execapp.JobRunner;
import azkaban.execapp.event.Event;
import azkaban.execapp.event.EventHandler;
import azkaban.execapp.event.EventListener;
import azkaban.execapp.event.FlowWatcher;
import azkaban.executor.ExecutableFlow;
import azkaban.executor.ExecutableNode;
import azkaban.executor.ExecutionOptions;
import azkaban.executor.ExecutorLoader;
import azkaban.executor.ExecutorManagerException;
import azkaban.executor.Status;
import azkaban.flow.FlowProps;
import azkaban.jobtype.JobTypeManager;
import azkaban.project.ProjectLoader;
import azkaban.project.ProjectManagerException;
import azkaban.utils.Props;
import azkaban.utils.PropsUtils;
import java.io.File;
import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.RejectedExecutionException;
import org.apache.log4j.Appender;
import org.apache.log4j.FileAppender;
import org.apache.log4j.Layout;
import org.apache.log4j.Logger;
import org.apache.log4j.PatternLayout;

public class FlowRunner
extends EventHandler
implements Runnable {
    private static final Layout DEFAULT_LAYOUT = new PatternLayout("%d{dd-MM-yyyy HH:mm:ss z} %c{1} %p - %m\n");
    private static final long CHECK_WAIT_MS = 18000000L;
    private Logger logger;
    private Layout loggerLayout = DEFAULT_LAYOUT;
    private Appender flowAppender;
    private File logFile;
    private ExecutorService executorService;
    private ExecutorLoader executorLoader;
    private ProjectLoader projectLoader;
    private int execId;
    private File execDir;
    private ExecutableFlow flow;
    private Thread flowRunnerThread;
    private int numJobThreads = 10;
    private Object mainSyncObj = new Object();
    private Map<String, Props> sharedProps = new HashMap<String, Props>();
    private Map<String, Props> jobOutputProps = new HashMap<String, Props>();
    private Props globalProps;
    private final JobTypeManager jobtypeManager;
    private JobRunnerEventListener listener = new JobRunnerEventListener();
    private Map<String, JobRunner> jobRunners = new ConcurrentHashMap<String, JobRunner>();
    private Map<String, JobRunner> activeJobRunners = new ConcurrentHashMap<String, JobRunner>();
    private Integer pipelineLevel = null;
    private Integer pipelineExecId = null;
    private FlowWatcher watcher = null;
    private HashSet<String> proxyUsers = null;
    private boolean validateUserProxy;
    private String jobLogFileSize = "5MB";
    private int jobLogNumFiles = 4;
    private boolean flowPaused = false;
    private boolean flowFailed = false;
    private boolean flowFinished = false;
    private boolean flowCancelled = false;

    public FlowRunner(ExecutableFlow flow, ExecutorLoader executorLoader, ProjectLoader projectLoader, JobTypeManager jobtypeManager) throws ExecutorManagerException {
        this.execId = flow.getExecutionId();
        this.flow = flow;
        this.executorLoader = executorLoader;
        this.projectLoader = projectLoader;
        this.executorService = Executors.newFixedThreadPool(this.numJobThreads);
        this.execDir = new File(flow.getExecutionPath());
        this.jobtypeManager = jobtypeManager;
        ExecutionOptions options = flow.getExecutionOptions();
        this.pipelineLevel = options.getPipelineLevel();
        this.pipelineExecId = options.getPipelineExecutionId();
        this.proxyUsers = flow.getProxyUsers();
    }

    public FlowRunner setFlowWatcher(FlowWatcher watcher) {
        this.watcher = watcher;
        return this;
    }

    public FlowRunner setGlobalProps(Props globalProps) {
        this.globalProps = globalProps;
        return this;
    }

    public FlowRunner setJobLogSettings(String jobLogFileSize, int jobLogNumFiles) {
        this.jobLogFileSize = jobLogFileSize;
        this.jobLogNumFiles = jobLogNumFiles;
        return this;
    }

    public FlowRunner setValidateProxyUser(boolean validateUserProxy) {
        this.validateUserProxy = validateUserProxy;
        return this;
    }

    public File getExecutionDir() {
        return this.execDir;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void run() {
        try {
            this.setupFlowExecution();
            this.flow.setStartTime(System.currentTimeMillis());
            this.updateFlowReference();
            this.logger.info((Object)"Updating initial flow directory.");
            this.updateFlow();
            this.logger.info((Object)"Fetching job and shared properties.");
            this.loadAllProperties();
            this.fireEventListeners(Event.create(this, Event.Type.FLOW_STARTED));
            this.runFlow();
        }
        catch (Throwable t) {
            if (this.logger != null) {
                this.logger.error((Object)"An error has occurred during the running of the flow. Quiting.", t);
            }
            this.flow.setStatus(Status.FAILED);
        }
        finally {
            if (this.watcher != null) {
                this.watcher.stopWatcher();
            }
            this.flow.setEndTime(System.currentTimeMillis());
            this.logger.info((Object)("Setting end time for flow " + this.execId + " to " + System.currentTimeMillis()));
            this.closeLogger();
            this.updateFlow();
            this.fireEventListeners(Event.create(this, Event.Type.FLOW_FINISHED));
        }
    }

    private void setupFlowExecution() {
        int projectId = this.flow.getProjectId();
        int version = this.flow.getVersion();
        String flowId = this.flow.getFlowId();
        PropsUtils.addCommonFlowProperties(this.flow);
        this.createLogger(flowId);
        this.logger.info((Object)("Running execid:" + this.execId + " flow:" + flowId + " project:" + projectId + " version:" + version));
        if (this.pipelineExecId != null) {
            this.logger.info((Object)("Running simulateously with " + this.pipelineExecId + ". Pipelining level " + this.pipelineLevel));
        }
        this.flowRunnerThread = Thread.currentThread();
        this.flowRunnerThread.setName("FlowRunner-exec-" + this.flow.getExecutionId());
    }

    private void updateFlowReference() throws ExecutorManagerException {
        this.logger.info((Object)"Update active reference");
        if (!this.executorLoader.updateExecutableReference(this.execId, System.currentTimeMillis())) {
            throw new ExecutorManagerException("The executor reference doesn't exist. May have been killed prematurely.");
        }
    }

    private void updateFlow() {
        this.updateFlow(System.currentTimeMillis());
    }

    private synchronized void updateFlow(long time) {
        try {
            this.flow.setUpdateTime(time);
            this.executorLoader.updateExecutableFlow(this.flow);
        }
        catch (ExecutorManagerException e) {
            this.logger.error((Object)"Error updating flow.", (Throwable)e);
        }
    }

    private void createLogger(String flowId) {
        String loggerName = this.execId + "." + flowId;
        this.logger = Logger.getLogger((String)loggerName);
        String logName = "_flow." + loggerName + ".log";
        this.logFile = new File(this.execDir, logName);
        String absolutePath = this.logFile.getAbsolutePath();
        this.flowAppender = null;
        try {
            this.flowAppender = new FileAppender(this.loggerLayout, absolutePath, false);
            this.logger.addAppender(this.flowAppender);
        }
        catch (IOException e) {
            this.logger.error((Object)("Could not open log file in " + this.execDir), (Throwable)e);
        }
    }

    private void closeLogger() {
        if (this.logger != null) {
            this.logger.removeAppender(this.flowAppender);
            this.flowAppender.close();
            try {
                this.executorLoader.uploadLogFile(this.execId, "", 0, this.logFile);
            }
            catch (ExecutorManagerException e) {
                e.printStackTrace();
            }
        }
    }

    private void loadAllProperties() throws IOException {
        Props props;
        String source;
        for (FlowProps fprops : this.flow.getFlowProps()) {
            source = fprops.getSource();
            File propsPath = new File(this.execDir, source);
            props = new Props(null, propsPath);
            this.sharedProps.put(source, props);
        }
        for (FlowProps fprops : this.flow.getFlowProps()) {
            if (fprops.getInheritedSource() != null) {
                source = fprops.getSource();
                String inherit = fprops.getInheritedSource();
                props = this.sharedProps.get(source);
                Props inherits = this.sharedProps.get(inherit);
                props.setParent(inherits);
                continue;
            }
            source = fprops.getSource();
            Props props2 = this.sharedProps.get(source);
            props2.setParent(this.globalProps);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void runFlow() throws Exception {
        Object object;
        this.logger.info((Object)"Starting flows");
        this.flow.setStatus(Status.RUNNING);
        this.updateFlow();
        while (!this.flowFinished) {
            object = this.mainSyncObj;
            synchronized (object) {
                if (this.flowPaused) {
                    try {
                        this.mainSyncObj.wait(18000000L);
                    }
                    catch (InterruptedException e) {
                        // empty catch block
                    }
                    continue;
                }
                List<ExecutableNode> jobsReadyToRun = this.findReadyJobsToRun();
                if (!jobsReadyToRun.isEmpty()) {
                    for (ExecutableNode node : jobsReadyToRun) {
                        long currentTime = System.currentTimeMillis();
                        if (node.getStatus() == Status.READY) {
                            Props outputProps = this.collectOutputProps(node);
                            node.setStatus(Status.QUEUED);
                            JobRunner runner = this.createJobRunner(node, outputProps);
                            this.logger.info((Object)("Submitting job " + node.getJobId() + " to run."));
                            try {
                                this.executorService.submit(runner);
                                this.jobRunners.put(node.getJobId(), runner);
                                this.activeJobRunners.put(node.getJobId(), runner);
                            }
                            catch (RejectedExecutionException e) {
                                this.logger.error((Object)e);
                            }
                            continue;
                        }
                        if (node.getStatus() == Status.KILLED) {
                            this.logger.info((Object)("Killing " + node.getJobId() + " due to prior errors."));
                            node.setStartTime(currentTime);
                            node.setEndTime(currentTime);
                            continue;
                        }
                        if (node.getStatus() != Status.DISABLED) continue;
                        this.logger.info((Object)("Skipping disabled job " + node.getJobId() + "."));
                        node.setStartTime(currentTime);
                        node.setEndTime(currentTime);
                        node.setStatus(Status.SKIPPED);
                    }
                    this.updateFlow();
                } else {
                    if (this.isFlowFinished()) {
                        this.flowFinished = true;
                        break;
                    }
                    try {
                        this.mainSyncObj.wait(18000000L);
                    }
                    catch (InterruptedException interruptedException) {
                        // empty catch block
                    }
                }
            }
        }
        this.logger.info((Object)"Finishing up flow. Awaiting Termination");
        this.executorService.shutdown();
        object = this.mainSyncObj;
        synchronized (object) {
            switch (this.flow.getStatus()) {
                case FAILED_FINISHING: {
                    this.logger.info((Object)"Setting flow status to Failed.");
                    this.flow.setStatus(Status.FAILED);
                }
                case FAILED: 
                case KILLED: {
                    this.logger.info((Object)("Flow is set to " + this.flow.getStatus().toString()));
                    break;
                }
                default: {
                    this.flow.setStatus(Status.SUCCEEDED);
                    this.logger.info((Object)("Flow is set to " + this.flow.getStatus().toString()));
                }
            }
        }
    }

    private List<ExecutableNode> findReadyJobsToRun() {
        ArrayList<ExecutableNode> jobsToRun = new ArrayList<ExecutableNode>();
        for (ExecutableNode node : this.flow.getExecutableNodes()) {
            if (Status.isStatusFinished(node.getStatus())) continue;
            Status impliedStatus = this.getImpliedStatus(node);
            if (this.getImpliedStatus(node) == null) continue;
            node.setStatus(impliedStatus);
            jobsToRun.add(node);
        }
        return jobsToRun;
    }

    private boolean isFlowFinished() {
        if (!this.activeJobRunners.isEmpty()) {
            return false;
        }
        for (String end : this.flow.getEndNodes()) {
            ExecutableNode node = this.flow.getExecutableNode(end);
            if (Status.isStatusFinished(node.getStatus())) continue;
            return false;
        }
        return true;
    }

    private Props collectOutputProps(ExecutableNode node) {
        Props previousOutput = null;
        for (String dependency : node.getInNodes()) {
            Props output = this.jobOutputProps.get(dependency);
            if (output == null) continue;
            output = Props.clone(output);
            output.setParent(previousOutput);
            previousOutput = output;
        }
        return previousOutput;
    }

    private JobRunner createJobRunner(ExecutableNode node, Props previousOutput) {
        String source = node.getJobPropsSource();
        String propsSource = node.getPropsSource();
        Props parentProps = propsSource == null ? this.globalProps : this.sharedProps.get(propsSource);
        ExecutionOptions options = this.flow.getExecutionOptions();
        Props flowProps = new Props(null, options.getFlowParameters());
        if (flowProps.size() > 0) {
            flowProps.setParent(parentProps);
            parentProps = flowProps;
        }
        if (previousOutput != null) {
            Props earliestParent = previousOutput.getEarliestAncestor();
            earliestParent.setParent(parentProps);
            parentProps = previousOutput;
        }
        File path = new File(this.execDir, source);
        Props prop = null;
        try {
            prop = this.projectLoader.fetchProjectProperty(this.flow.getProjectId(), this.flow.getVersion(), node.getJobId() + ".jor");
        }
        catch (ProjectManagerException e) {
            e.printStackTrace();
            this.logger.error((Object)("Error loading job override property for job " + node.getJobId()));
        }
        if (prop == null) {
            try {
                prop = new Props(null, path);
            }
            catch (IOException e) {
                e.printStackTrace();
                this.logger.error((Object)("Error loading job file " + source + " for job " + node.getJobId()));
            }
        }
        prop.setSource(path.getPath());
        prop.setParent(parentProps);
        JobRunner jobRunner = new JobRunner(node, prop, path.getParentFile(), this.executorLoader, this.jobtypeManager);
        if (this.watcher != null) {
            jobRunner.setPipeline(this.watcher, this.pipelineLevel);
        }
        if (this.validateUserProxy) {
            jobRunner.setValidatedProxyUsers(this.proxyUsers);
        }
        jobRunner.setDelayStart(node.getDelayedExecution());
        jobRunner.setLogSettings(this.logger, this.jobLogFileSize, this.jobLogNumFiles);
        jobRunner.addListener(this.listener);
        return jobRunner;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void pause(String user) {
        Object object = this.mainSyncObj;
        synchronized (object) {
            if (!this.flowFinished) {
                this.logger.info((Object)("Flow paused by " + user));
                this.flowPaused = true;
                this.flow.setStatus(Status.PAUSED);
                this.updateFlow();
            } else {
                this.logger.info((Object)("Cannot pause finished flow. Called by user " + user));
            }
        }
        this.interrupt();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void resume(String user) {
        Object object = this.mainSyncObj;
        synchronized (object) {
            if (!this.flowPaused) {
                this.logger.info((Object)"Cannot resume flow that isn't paused");
            } else {
                this.logger.info((Object)("Flow resumed by " + user));
                this.flowPaused = false;
                if (this.flowFailed) {
                    this.flow.setStatus(Status.FAILED_FINISHING);
                } else if (this.flowCancelled) {
                    this.flow.setStatus(Status.KILLED);
                } else {
                    this.flow.setStatus(Status.RUNNING);
                }
                this.updateFlow();
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void cancel(String user) {
        Object object = this.mainSyncObj;
        synchronized (object) {
            this.logger.info((Object)("Flow cancelled by " + user));
            this.cancel();
            this.updateFlow();
        }
        this.interrupt();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void cancel() {
        Object object = this.mainSyncObj;
        synchronized (object) {
            this.logger.info((Object)("Cancel has been called on flow " + this.execId));
            this.flowPaused = false;
            this.flowCancelled = true;
            if (this.watcher != null) {
                this.logger.info((Object)"Watcher is attached. Stopping watcher.");
                this.watcher.stopWatcher();
            }
            this.logger.info((Object)("Cancelling " + this.activeJobRunners.size() + " jobs."));
            for (JobRunner runner : this.activeJobRunners.values()) {
                runner.cancel();
            }
            if (this.flow.getStatus() != Status.FAILED && this.flow.getStatus() != Status.FAILED_FINISHING) {
                this.logger.info((Object)("Setting flow status to " + Status.KILLED.toString()));
                this.flow.setStatus(Status.KILLED);
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void retryFailures(String user) {
        Object object = this.mainSyncObj;
        synchronized (object) {
            this.logger.info((Object)("Retrying failures invoked by " + user));
            ArrayList<String> failures = new ArrayList<String>();
            for (ExecutableNode node : this.flow.getExecutableNodes()) {
                if (node.getStatus() == Status.FAILED) {
                    failures.add(node.getJobId());
                    continue;
                }
                if (node.getStatus() != Status.KILLED) continue;
                node.setStartTime(-1L);
                node.setEndTime(-1L);
                node.setStatus(Status.READY);
            }
            this.retryJobs(failures, user);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void retryJobs(List<String> jobIds, String user) {
        Object object = this.mainSyncObj;
        synchronized (object) {
            for (String jobId : jobIds) {
                ExecutableNode node = this.flow.getExecutableNode(jobId);
                if (node == null) {
                    this.logger.error((Object)("Job " + jobId + " doesn't exist in execution " + this.flow.getExecutionId() + ". Cannot retry."));
                    continue;
                }
                if (Status.isStatusFinished(node.getStatus())) {
                    node.resetForRetry();
                    this.reEnableDependents(node);
                    this.logger.info((Object)("Re-enabling job " + node.getJobId() + " attempt " + node.getAttempt()));
                    continue;
                }
                this.logger.error((Object)("Cannot retry job " + jobId + " since it hasn't run yet. User " + user));
            }
            boolean isFailureFound = false;
            for (ExecutableNode node : this.flow.getExecutableNodes()) {
                Status nodeStatus = node.getStatus();
                if (nodeStatus != Status.FAILED && nodeStatus != Status.KILLED) continue;
                isFailureFound = true;
                break;
            }
            if (!isFailureFound) {
                this.flow.setStatus(Status.RUNNING);
                this.flow.setUpdateTime(System.currentTimeMillis());
                this.flowFailed = false;
            }
            this.updateFlow();
            this.interrupt();
        }
    }

    private void reEnableDependents(ExecutableNode node) {
        for (String dependent : node.getOutNodes()) {
            ExecutableNode dependentNode = this.flow.getExecutableNode(dependent);
            if (dependentNode.getStatus() == Status.KILLED) {
                dependentNode.setStatus(Status.READY);
                dependentNode.setUpdateTime(System.currentTimeMillis());
                this.reEnableDependents(dependentNode);
                continue;
            }
            if (dependentNode.getStatus() != Status.SKIPPED) continue;
            dependentNode.setStatus(Status.DISABLED);
            dependentNode.setUpdateTime(System.currentTimeMillis());
            this.reEnableDependents(dependentNode);
        }
    }

    private void interrupt() {
        this.flowRunnerThread.interrupt();
    }

    private Status getImpliedStatus(ExecutableNode node) {
        switch (node.getStatus()) {
            case FAILED: 
            case KILLED: 
            case SKIPPED: 
            case SUCCEEDED: 
            case QUEUED: 
            case RUNNING: {
                return null;
            }
        }
        boolean shouldKill = false;
        block8: for (String dependency : node.getInNodes()) {
            ExecutableNode dependencyNode = this.flow.getExecutableNode(dependency);
            Status depStatus = dependencyNode.getStatus();
            switch (depStatus) {
                case FAILED: 
                case KILLED: {
                    shouldKill = true;
                }
                case SKIPPED: 
                case SUCCEEDED: {
                    continue block8;
                }
                case QUEUED: 
                case RUNNING: 
                case DISABLED: {
                    return null;
                }
            }
            return null;
        }
        ExecutionOptions options = this.flow.getExecutionOptions();
        if (shouldKill || this.flowCancelled || this.flowFailed && options.getFailureAction() != ExecutionOptions.FailureAction.FINISH_ALL_POSSIBLE) {
            return Status.KILLED;
        }
        if (node.getStatus() == Status.DISABLED) {
            return Status.DISABLED;
        }
        return Status.READY;
    }

    public boolean isCancelled() {
        return this.flowCancelled;
    }

    public ExecutableFlow getExecutableFlow() {
        return this.flow;
    }

    public File getFlowLogFile() {
        return this.logFile;
    }

    public File getJobLogFile(String jobId, int attempt) {
        ExecutableNode node = this.flow.getExecutableNode(jobId);
        File path = new File(this.execDir, node.getJobPropsSource());
        String logFileName = JobRunner.createLogFileName(this.execId, jobId, attempt);
        File logFile = new File(path.getParentFile(), logFileName);
        if (!logFile.exists()) {
            return null;
        }
        return logFile;
    }

    public boolean isRunnerThreadAlive() {
        if (this.flowRunnerThread != null) {
            return this.flowRunnerThread.isAlive();
        }
        return false;
    }

    public boolean isThreadPoolShutdown() {
        return this.executorService.isShutdown();
    }

    public int getNumRunningJobs() {
        return this.activeJobRunners.size();
    }

    private class JobRunnerEventListener
    implements EventListener {
        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        @Override
        public synchronized void handleEvent(Event event) {
            JobRunner runner = (JobRunner)event.getRunner();
            if (event.getType() == Event.Type.JOB_FINISHED) {
                Object object = FlowRunner.this.mainSyncObj;
                synchronized (object) {
                    ExecutableNode node = runner.getNode();
                    FlowRunner.this.activeJobRunners.remove(node.getJobId());
                    FlowRunner.this.logger.info((Object)("Job Finished " + node.getJobId() + " with status " + (Object)((Object)node.getStatus())));
                    if (runner.getOutputProps() != null) {
                        FlowRunner.this.logger.info((Object)("Job " + node.getJobId() + " had output props."));
                        FlowRunner.this.jobOutputProps.put(node.getJobId(), runner.getOutputProps());
                    }
                    FlowRunner.this.updateFlow();
                    if (node.getStatus() == Status.FAILED) {
                        if (!runner.isCancelled() && runner.getRetries() > node.getAttempt()) {
                            FlowRunner.this.logger.info((Object)("Job " + node.getJobId() + " will be retried. Attempt " + node.getAttempt() + " of " + runner.getRetries()));
                            node.setDelayedExecution(runner.getRetryBackoff());
                            node.resetForRetry();
                        } else {
                            if (!runner.isCancelled() && runner.getRetries() > 0) {
                                FlowRunner.this.logger.info((Object)("Job " + node.getJobId() + " has run out of retry attempts"));
                                node.setDelayedExecution(0L);
                            }
                            FlowRunner.this.flowFailed = true;
                            ExecutionOptions options = FlowRunner.this.flow.getExecutionOptions();
                            if (FlowRunner.this.flow.getStatus() != Status.KILLED && FlowRunner.this.flow.getStatus() != Status.FAILED) {
                                FlowRunner.this.flow.setStatus(Status.FAILED_FINISHING);
                                if (options.getFailureAction() == ExecutionOptions.FailureAction.CANCEL_ALL && !FlowRunner.this.flowCancelled) {
                                    FlowRunner.this.logger.info((Object)"Flow failed. Failure option is Cancel All. Stopping execution.");
                                    FlowRunner.this.cancel();
                                }
                            }
                        }
                    }
                    FlowRunner.this.interrupt();
                    FlowRunner.this.fireEventListeners(event);
                }
            }
        }
    }
}

