/*
 * Decompiled with CFR 0.152.
 */
package azkaban.scheduler;

import azkaban.executor.ExecutableFlow;
import azkaban.executor.ExecutionOptions;
import azkaban.executor.ExecutorManager;
import azkaban.executor.ExecutorManagerException;
import azkaban.flow.Flow;
import azkaban.project.Project;
import azkaban.project.ProjectManager;
import azkaban.scheduler.Schedule;
import azkaban.scheduler.ScheduleLoader;
import azkaban.scheduler.ScheduleManagerException;
import azkaban.sla.SLA;
import azkaban.sla.SLAManager;
import azkaban.sla.SlaOptions;
import azkaban.utils.Pair;
import java.util.ArrayList;
import java.util.Comparator;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.PriorityBlockingQueue;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.log4j.Logger;
import org.joda.time.DateTime;
import org.joda.time.DateTimeZone;
import org.joda.time.ReadablePeriod;
import org.joda.time.format.DateTimeFormat;
import org.joda.time.format.DateTimeFormatter;

public class ScheduleManager {
    private static Logger logger = Logger.getLogger(ScheduleManager.class);
    private final DateTimeFormatter _dateFormat = DateTimeFormat.forPattern((String)"MM-dd-yyyy HH:mm:ss:SSS");
    private ScheduleLoader loader;
    private Map<Pair<Integer, String>, Schedule> scheduleIDMap = new LinkedHashMap<Pair<Integer, String>, Schedule>();
    private final ScheduleRunner runner;
    private final ExecutorManager executorManager;
    private final ProjectManager projectManager;
    private final SLAManager slaManager;
    private long lastCheckTime = -1L;
    private long nextWakupTime = -1L;

    public ScheduleManager(ExecutorManager executorManager, ProjectManager projectManager, SLAManager slaManager, ScheduleLoader loader) {
        this.executorManager = executorManager;
        this.projectManager = projectManager;
        this.slaManager = slaManager;
        this.loader = loader;
        this.runner = new ScheduleRunner();
        List<Schedule> scheduleList = null;
        try {
            scheduleList = loader.loadSchedules();
        }
        catch (ScheduleManagerException e) {
            logger.error((Object)("Failed to load schedules" + e.getCause() + e.getMessage()));
            e.printStackTrace();
        }
        for (Schedule sched : scheduleList) {
            this.internalSchedule(sched);
        }
        this.runner.start();
    }

    public void shutdown() {
        this.runner.shutdown();
    }

    public synchronized List<Schedule> getSchedules() {
        return this.runner.getRunnerSchedules();
    }

    public Schedule getSchedule(int projectId, String flowId) {
        return this.scheduleIDMap.get(new Pair<Integer, String>(projectId, flowId));
    }

    public synchronized void removeSchedule(int projectId, String flowId) {
        Pair<Integer, String> scheduleId = new Pair<Integer, String>(projectId, flowId);
        Schedule sched = this.scheduleIDMap.get(scheduleId);
        this.scheduleIDMap.remove(scheduleId);
        this.runner.removeRunnerSchedule(sched);
        try {
            this.loader.removeSchedule(sched);
        }
        catch (ScheduleManagerException e) {
            e.printStackTrace();
        }
    }

    public Schedule scheduleFlow(int projectId, String projectName, String flowName, String status, long firstSchedTime, DateTimeZone timezone, ReadablePeriod period, long lastModifyTime, long nextExecTime, long submitTime, String submitUser) {
        return this.scheduleFlow(projectId, projectName, flowName, status, firstSchedTime, timezone, period, lastModifyTime, nextExecTime, submitTime, submitUser, null, null);
    }

    public Schedule scheduleFlow(int projectId, String projectName, String flowName, String status, long firstSchedTime, DateTimeZone timezone, ReadablePeriod period, long lastModifyTime, long nextExecTime, long submitTime, String submitUser, ExecutionOptions execOptions, SlaOptions slaOptions) {
        Schedule sched = new Schedule(projectId, projectName, flowName, status, firstSchedTime, timezone, period, lastModifyTime, nextExecTime, submitTime, submitUser, execOptions, slaOptions);
        logger.info((Object)("Scheduling flow '" + sched.getScheduleName() + "' for " + this._dateFormat.print(firstSchedTime) + " with a period of " + period == null ? "(non-recurring)" : period));
        this.insertSchedule(sched);
        return sched;
    }

    private synchronized void internalSchedule(Schedule s) {
        Schedule existing = this.scheduleIDMap.get(s.getScheduleId());
        if (existing != null) {
            this.runner.removeRunnerSchedule(existing);
        }
        s.updateTime();
        this.runner.addRunnerSchedule(s);
        this.scheduleIDMap.put(s.getScheduleId(), s);
    }

    public synchronized void insertSchedule(Schedule s) {
        block5: {
            boolean exist = this.scheduleIDMap.containsKey(s.getScheduleId());
            if (s.updateTime()) {
                this.internalSchedule(s);
                try {
                    if (!exist) {
                        this.loader.insertSchedule(s);
                        break block5;
                    }
                    this.loader.updateSchedule(s);
                }
                catch (ScheduleManagerException e) {
                    e.printStackTrace();
                }
            } else {
                logger.error((Object)("The provided schedule is non-recurring and the scheduled time already passed. " + s.getScheduleName()));
            }
        }
    }

    public long getLastCheckTime() {
        return this.lastCheckTime;
    }

    public long getNextUpdateTime() {
        return this.nextWakupTime;
    }

    public Thread.State getThreadState() {
        return this.runner.getState();
    }

    public boolean isThreadActive() {
        return this.runner.isAlive();
    }

    public class ScheduleRunner
    extends Thread {
        private final PriorityBlockingQueue<Schedule> schedules;
        private AtomicBoolean stillAlive = new AtomicBoolean(true);
        private static final int TIMEOUT_MS = 300000;

        public ScheduleRunner() {
            this.schedules = new PriorityBlockingQueue<Schedule>(1, new ScheduleComparator());
        }

        public void shutdown() {
            logger.error((Object)"Shutting down scheduler thread");
            this.stillAlive.set(false);
            this.interrupt();
        }

        public synchronized List<Schedule> getRunnerSchedules() {
            return new ArrayList<Schedule>(this.schedules);
        }

        public synchronized void addRunnerSchedule(Schedule s) {
            logger.info((Object)("Adding " + s + " to schedule runner."));
            this.schedules.add(s);
            this.interrupt();
        }

        public synchronized void removeRunnerSchedule(Schedule s) {
            logger.info((Object)("Removing " + s + " from the schedule runner."));
            this.schedules.remove(s);
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        @Override
        public void run() {
            while (this.stillAlive.get()) {
                ScheduleRunner scheduleRunner = this;
                synchronized (scheduleRunner) {
                    block32: {
                        try {
                            ScheduleManager.this.lastCheckTime = System.currentTimeMillis();
                            Schedule s = this.schedules.peek();
                            if (s == null) {
                                try {
                                    logger.info((Object)"Nothing scheduled to run. Checking again soon.");
                                    ScheduleManager.this.nextWakupTime = System.currentTimeMillis() + 300000L;
                                    this.wait(300000L);
                                }
                                catch (InterruptedException e) {}
                                break block32;
                            }
                            if (!new DateTime(s.getNextExecTime()).isAfterNow()) {
                                Schedule runningSched = this.schedules.poll();
                                logger.info((Object)("Scheduler ready to run " + runningSched.toString()));
                                try {
                                    Project project = ScheduleManager.this.projectManager.getProject(runningSched.getProjectId());
                                    if (project == null) {
                                        logger.error((Object)("Scheduled Project " + runningSched.getProjectId() + " does not exist!"));
                                        throw new RuntimeException("Error finding the scheduled project. " + runningSched.getProjectId());
                                    }
                                    Flow flow = project.getFlow(runningSched.getFlowName());
                                    if (flow == null) {
                                        logger.error((Object)("Flow " + runningSched.getScheduleName() + " cannot be found in project " + project.getName()));
                                        throw new RuntimeException("Error finding the scheduled flow. " + runningSched.getScheduleName());
                                    }
                                    ExecutableFlow exflow = new ExecutableFlow(flow);
                                    exflow.setSubmitUser(runningSched.getSubmitUser());
                                    exflow.setProxyUsers(project.getProxyUsers());
                                    ExecutionOptions flowOptions = runningSched.getExecutionOptions();
                                    if (flowOptions == null) {
                                        flowOptions = new ExecutionOptions();
                                        flowOptions.setConcurrentOption("skip");
                                    }
                                    exflow.setExecutionOptions(flowOptions);
                                    if (!flowOptions.isFailureEmailsOverridden()) {
                                        flowOptions.setFailureEmails(flow.getFailureEmails());
                                    }
                                    if (!flowOptions.isSuccessEmailsOverridden()) {
                                        flowOptions.setSuccessEmails(flow.getSuccessEmails());
                                    }
                                    try {
                                        ScheduleManager.this.executorManager.submitExecutableFlow(exflow);
                                        logger.info((Object)("Scheduler has invoked " + exflow.getExecutionId()));
                                    }
                                    catch (ExecutorManagerException e) {
                                        throw e;
                                    }
                                    catch (Exception e) {
                                        e.printStackTrace();
                                        throw new ScheduleManagerException("Scheduler invoked flow " + exflow.getExecutionId() + " has failed.", e);
                                    }
                                    SlaOptions slaOptions = runningSched.getSlaOptions();
                                    if (slaOptions != null) {
                                        logger.info((Object)("Submitting SLA checkings for " + runningSched.getFlowName()));
                                        ArrayList<SLA.SlaSetting> jobsettings = new ArrayList<SLA.SlaSetting>();
                                        for (SLA.SlaSetting set : slaOptions.getSettings()) {
                                            if (set.getId().equals("")) {
                                                DateTime checkTime = new DateTime(runningSched.getNextExecTime()).plus(set.getDuration());
                                                ScheduleManager.this.slaManager.submitSla(exflow.getExecutionId(), "", checkTime, slaOptions.getSlaEmails(), set.getActions(), null, set.getRule());
                                                continue;
                                            }
                                            jobsettings.add(set);
                                        }
                                        if (jobsettings.size() > 0) {
                                            ScheduleManager.this.slaManager.submitSla(exflow.getExecutionId(), "", DateTime.now(), slaOptions.getSlaEmails(), new ArrayList<SLA.SlaAction>(), jobsettings, SLA.SlaRule.WAITANDCHECKJOB);
                                        }
                                    }
                                }
                                catch (ExecutorManagerException e) {
                                    if (e.getReason() != null && e.getReason() == ExecutorManagerException.Reason.SkippedExecution) {
                                        logger.info((Object)e.getMessage());
                                    } else {
                                        e.printStackTrace();
                                    }
                                }
                                catch (Exception e) {
                                    logger.info((Object)("Scheduler failed to run job. " + e.getMessage() + e.getCause()));
                                }
                                this.removeRunnerSchedule(runningSched);
                                if (runningSched.updateTime()) {
                                    this.addRunnerSchedule(runningSched);
                                    ScheduleManager.this.loader.updateSchedule(runningSched);
                                } else {
                                    ScheduleManager.this.removeSchedule(runningSched.getProjectId(), runningSched.getFlowName());
                                }
                                break block32;
                            }
                            long millisWait = Math.max(0L, s.getNextExecTime() - new DateTime().getMillis());
                            try {
                                ScheduleManager.this.nextWakupTime = System.currentTimeMillis() + millisWait;
                                this.wait(Math.min(millisWait, 300000L));
                            }
                            catch (InterruptedException e) {}
                        }
                        catch (Exception e) {
                            logger.error((Object)"Unexpected exception has been thrown in scheduler", (Throwable)e);
                        }
                        catch (Throwable e) {
                            logger.error((Object)"Unexpected throwable has been thrown in scheduler", e);
                        }
                    }
                }
            }
        }

        private class ScheduleComparator
        implements Comparator<Schedule> {
            private ScheduleComparator() {
            }

            @Override
            public int compare(Schedule arg0, Schedule arg1) {
                long second;
                long first = arg1.getNextExecTime();
                if (first == (second = arg0.getNextExecTime())) {
                    return 0;
                }
                if (first < second) {
                    return 1;
                }
                return -1;
            }
        }
    }
}

