/*
 * Decompiled with CFR 0.152.
 */
package azkaban.execapp;

import azkaban.execapp.event.BlockingStatus;
import azkaban.execapp.event.Event;
import azkaban.execapp.event.EventHandler;
import azkaban.execapp.event.FlowWatcher;
import azkaban.executor.ExecutableNode;
import azkaban.executor.ExecutorLoader;
import azkaban.executor.ExecutorManagerException;
import azkaban.executor.Status;
import azkaban.jobExecutor.Job;
import azkaban.jobtype.JobTypeManager;
import azkaban.jobtype.JobTypeManagerException;
import azkaban.utils.Props;
import java.io.File;
import java.io.FilenameFilter;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashSet;
import java.util.Iterator;
import java.util.Set;
import org.apache.log4j.Appender;
import org.apache.log4j.Layout;
import org.apache.log4j.Logger;
import org.apache.log4j.PatternLayout;
import org.apache.log4j.RollingFileAppender;

public class JobRunner
extends EventHandler
implements Runnable {
    private static final Layout DEFAULT_LAYOUT = new PatternLayout("%d{dd-MM-yyyy HH:mm:ss z} %c{1} %p - %m\n");
    private ExecutorLoader loader;
    private Props props;
    private Props outputProps;
    private ExecutableNode node;
    private File workingDir;
    private Logger logger = null;
    private Layout loggerLayout = DEFAULT_LAYOUT;
    private Logger flowLogger = null;
    private Appender jobAppender;
    private File logFile;
    private Job job;
    private int executionId = -1;
    private static final Object logCreatorLock = new Object();
    private Object syncObject = new Object();
    private final JobTypeManager jobtypeManager;
    private Integer pipelineLevel = null;
    private FlowWatcher watcher = null;
    private Set<String> pipelineJobs = new HashSet<String>();
    private Set<String> proxyUsers = null;
    private String jobLogChunkSize;
    private int jobLogBackupIndex;
    private long delayStartMs = 0L;
    private boolean cancelled = false;

    public JobRunner(ExecutableNode node, Props props, File workingDir, ExecutorLoader loader, JobTypeManager jobtypeManager) {
        this.props = props;
        this.node = node;
        this.workingDir = workingDir;
        this.executionId = node.getExecutionId();
        this.loader = loader;
        this.jobtypeManager = jobtypeManager;
    }

    public void setValidatedProxyUsers(Set<String> proxyUsers) {
        this.proxyUsers = proxyUsers;
    }

    public void setLogSettings(Logger flowLogger, String logFileChuckSize, int numLogBackup) {
        this.flowLogger = flowLogger;
        this.jobLogChunkSize = logFileChuckSize;
        this.jobLogBackupIndex = numLogBackup;
    }

    public Props getProps() {
        return this.props;
    }

    public void setPipeline(FlowWatcher watcher, int pipelineLevel) {
        this.watcher = watcher;
        this.pipelineLevel = pipelineLevel;
        if (this.pipelineLevel == 1) {
            this.pipelineJobs.add(this.node.getJobId());
        } else if (this.pipelineLevel == 2) {
            this.pipelineJobs.add(this.node.getJobId());
            this.pipelineJobs.addAll(this.node.getOutNodes());
        }
    }

    public void setDelayStart(long delayMS) {
        this.delayStartMs = delayMS;
    }

    public long getDelayStart() {
        return this.delayStartMs;
    }

    public ExecutableNode getNode() {
        return this.node;
    }

    public String getLogFilePath() {
        return this.logFile == null ? null : this.logFile.getPath();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void createLogger() {
        Object object = logCreatorLock;
        synchronized (object) {
            String loggerName = System.currentTimeMillis() + "." + this.executionId + "." + this.node.getJobId();
            this.logger = Logger.getLogger((String)loggerName);
            String logName = JobRunner.createLogFileName(this.node.getExecutionId(), this.node.getJobId(), this.node.getAttempt());
            this.logFile = new File(this.workingDir, logName);
            String absolutePath = this.logFile.getAbsolutePath();
            this.jobAppender = null;
            try {
                RollingFileAppender fileAppender = new RollingFileAppender(this.loggerLayout, absolutePath, true);
                fileAppender.setMaxBackupIndex(this.jobLogBackupIndex);
                fileAppender.setMaxFileSize(this.jobLogChunkSize);
                this.jobAppender = fileAppender;
                this.logger.addAppender(this.jobAppender);
            }
            catch (IOException e) {
                this.flowLogger.error((Object)("Could not open log file in " + this.workingDir + " for job " + this.node.getJobId()), (Throwable)e);
            }
        }
    }

    private void closeLogger() {
        if (this.jobAppender != null) {
            this.logger.removeAppender(this.jobAppender);
            this.jobAppender.close();
        }
    }

    private void writeStatus() {
        try {
            this.node.setUpdateTime(System.currentTimeMillis());
            this.loader.updateExecutableNode(this.node);
        }
        catch (ExecutorManagerException e) {
            this.flowLogger.error((Object)("Could not update job properties in db for " + this.node.getJobId()), (Throwable)e);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void run() {
        Thread.currentThread().setName("JobRunner-" + this.node.getJobId() + "-" + this.executionId);
        if (this.node.getStatus() == Status.DISABLED) {
            this.node.setStartTime(System.currentTimeMillis());
            this.fireEvent(Event.create(this, Event.Type.JOB_STARTED, null, false));
            this.node.setStatus(Status.SKIPPED);
            this.node.setEndTime(System.currentTimeMillis());
            this.fireEvent(Event.create(this, Event.Type.JOB_FINISHED));
            return;
        }
        if (this.cancelled) {
            this.node.setStartTime(System.currentTimeMillis());
            this.fireEvent(Event.create(this, Event.Type.JOB_STARTED, null, false));
            this.node.setStatus(Status.FAILED);
            this.node.setEndTime(System.currentTimeMillis());
            this.fireEvent(Event.create(this, Event.Type.JOB_FINISHED));
        } else {
            if (this.node.getStatus() == Status.FAILED || this.node.getStatus() == Status.KILLED) {
                this.node.setStartTime(System.currentTimeMillis());
                this.fireEvent(Event.create(this, Event.Type.JOB_STARTED, null, false));
                this.node.setEndTime(System.currentTimeMillis());
                this.fireEvent(Event.create(this, Event.Type.JOB_FINISHED));
                return;
            }
            this.createLogger();
            this.node.setUpdateTime(System.currentTimeMillis());
            if (!this.pipelineJobs.isEmpty()) {
                String blockedList = "";
                ArrayList<BlockingStatus> blockingStatus = new ArrayList<BlockingStatus>();
                for (String waitingJobId : this.pipelineJobs) {
                    Status status = this.watcher.peekStatus(waitingJobId);
                    if (status == null || Status.isStatusFinished(status)) continue;
                    BlockingStatus block = this.watcher.getBlockingStatus(waitingJobId);
                    blockingStatus.add(block);
                    blockedList = blockedList + waitingJobId + ",";
                }
                if (!blockingStatus.isEmpty()) {
                    this.logger.info((Object)("Pipeline job " + this.node.getJobId() + " waiting on " + blockedList + " in execution " + this.watcher.getExecId()));
                    for (BlockingStatus bStatus : blockingStatus) {
                        this.logger.info((Object)("Waiting on pipelined job " + bStatus.getJobId()));
                        bStatus.blockOnFinishedStatus();
                        this.logger.info((Object)("Pipelined job " + bStatus.getJobId() + " finished."));
                    }
                }
                if (this.watcher.isWatchCancelled()) {
                    this.logger.info((Object)"Job was cancelled while waiting on pipeline. Quiting.");
                    this.node.setStartTime(System.currentTimeMillis());
                    this.node.setEndTime(System.currentTimeMillis());
                    this.fireEvent(Event.create(this, Event.Type.JOB_FINISHED));
                    return;
                }
            }
            long currentTime = System.currentTimeMillis();
            if (this.delayStartMs > 0L) {
                this.logger.info((Object)("Delaying start of execution for " + this.delayStartMs + " milliseconds."));
                Iterator i$ = this;
                synchronized (i$) {
                    try {
                        this.wait(this.delayStartMs);
                        this.logger.info((Object)("Execution has been delayed for " + this.delayStartMs + " ms. Continuing with execution."));
                    }
                    catch (InterruptedException e) {
                        this.logger.error((Object)("Job " + this.node.getJobId() + " was to be delayed for " + this.delayStartMs + ". Interrupted after " + (System.currentTimeMillis() - currentTime)));
                    }
                }
                if (this.cancelled) {
                    this.logger.info((Object)"Job was cancelled while in delay. Quiting.");
                    this.node.setStartTime(System.currentTimeMillis());
                    this.node.setEndTime(System.currentTimeMillis());
                    this.fireEvent(Event.create(this, Event.Type.JOB_FINISHED));
                    return;
                }
            }
            this.node.setStartTime(System.currentTimeMillis());
            this.fireEvent(Event.create(this, Event.Type.JOB_STARTED, null, false));
            try {
                this.loader.uploadExecutableNode(this.node, this.props);
            }
            catch (ExecutorManagerException e1) {
                this.logger.error((Object)"Error writing initial node properties");
            }
            if (this.prepareJob()) {
                this.writeStatus();
                this.fireEvent(Event.create(this, Event.Type.JOB_STATUS_CHANGED), false);
                this.runJob();
            } else {
                this.node.setStatus(Status.FAILED);
                this.logError("Job run failed!");
            }
            this.node.setEndTime(System.currentTimeMillis());
            this.logInfo("Finishing job " + this.node.getJobId() + " at " + this.node.getEndTime());
            this.closeLogger();
            this.writeStatus();
            if (this.logFile != null) {
                try {
                    File[] files = this.logFile.getParentFile().listFiles(new FilenameFilter(){

                        @Override
                        public boolean accept(File dir, String name) {
                            return name.startsWith(JobRunner.this.logFile.getName());
                        }
                    });
                    Arrays.sort(files, Collections.reverseOrder());
                    this.loader.uploadLogFile(this.executionId, this.node.getJobId(), this.node.getAttempt(), files);
                }
                catch (ExecutorManagerException e) {
                    this.flowLogger.error((Object)("Error writing out logs for job " + this.node.getJobId()), (Throwable)e);
                }
            } else {
                this.flowLogger.info((Object)("Log file for job " + this.node.getJobId() + " is null"));
            }
        }
        this.fireEvent(Event.create(this, Event.Type.JOB_FINISHED));
    }

    private void fireEvent(Event event) {
        this.fireEvent(event, true);
    }

    private void fireEvent(Event event, boolean updateTime) {
        if (updateTime) {
            this.node.setUpdateTime(System.currentTimeMillis());
        }
        this.fireEventListeners(event);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private boolean prepareJob() throws RuntimeException {
        if (this.props == null || this.cancelled) {
            this.logError("Failing job. The job properties don't exist");
            return false;
        }
        Object object = this.syncObject;
        synchronized (object) {
            if (this.node.getStatus() == Status.FAILED || this.cancelled) {
                return false;
            }
            if (this.node.getAttempt() > 0) {
                this.logInfo("Starting job " + this.node.getJobId() + " attempt " + this.node.getAttempt() + " at " + this.node.getStartTime());
            } else {
                this.logInfo("Starting job " + this.node.getJobId() + " at " + this.node.getStartTime());
            }
            this.props.put("azkaban.job.attempt", this.node.getAttempt());
            this.node.setStatus(Status.RUNNING);
            if (!this.props.containsKey("working.dir")) {
                this.props.put("working.dir", this.workingDir.getAbsolutePath());
            }
            if (this.props.containsKey("user.to.proxy")) {
                String jobProxyUser = this.props.getString("user.to.proxy");
                if (this.proxyUsers != null && !this.proxyUsers.contains(jobProxyUser)) {
                    this.logger.error((Object)("User " + jobProxyUser + " has no permission to execute this job " + this.node.getJobId() + "!"));
                    return false;
                }
            }
            try {
                this.job = this.jobtypeManager.buildJobExecutor(this.node.getJobId(), this.props, this.logger);
            }
            catch (JobTypeManagerException e) {
                this.logger.error((Object)"Failed to build job type, skipping this job");
                return false;
            }
        }
        return true;
    }

    private void runJob() {
        try {
            this.job.run();
        }
        catch (Exception e) {
            e.printStackTrace();
            this.node.setStatus(Status.FAILED);
            this.logError("Job run failed!");
            this.logError(e.getMessage() + e.getCause());
            return;
        }
        this.node.setStatus(Status.SUCCEEDED);
        if (this.job != null) {
            this.outputProps = this.job.getJobGeneratedProperties();
            this.node.setOutputProps(this.outputProps);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void cancel() {
        Object object = this.syncObject;
        synchronized (object) {
            this.logError("Cancel has been called.");
            this.cancelled = true;
            if (this.job == null) {
                this.logError("Job hasn't started yet.");
                JobRunner jobRunner = this;
                synchronized (jobRunner) {
                    this.notify();
                }
                return;
            }
            try {
                this.job.cancel();
            }
            catch (Exception e) {
                this.logError(e.getMessage());
                this.logError("Failed trying to cancel job. Maybe it hasn't started running yet or just finished.");
            }
        }
    }

    public boolean isCancelled() {
        return this.cancelled;
    }

    public Status getStatus() {
        return this.node.getStatus();
    }

    public Props getOutputProps() {
        return this.outputProps;
    }

    private void logError(String message) {
        if (this.logger != null) {
            this.logger.error((Object)message);
        }
    }

    private void logInfo(String message) {
        if (this.logger != null) {
            this.logger.info((Object)message);
        }
    }

    public File getLogFile() {
        return this.logFile;
    }

    public int getRetries() {
        return this.props.getInt("retries", 0);
    }

    public long getRetryBackoff() {
        return this.props.getLong("retry.backoff", 0L);
    }

    public static String createLogFileName(int executionId, String jobId, int attempt) {
        return attempt > 0 ? "_job." + executionId + "." + attempt + "." + jobId + ".log" : "_job." + executionId + "." + jobId + ".log";
    }
}

