/*
 * Decompiled with CFR 0.152.
 */
package azkaban.scheduler;

import azkaban.executor.ExecutableFlow;
import azkaban.executor.ExecutionOptions;
import azkaban.executor.ExecutorManagerAdapter;
import azkaban.executor.ExecutorManagerException;
import azkaban.flow.Flow;
import azkaban.project.Project;
import azkaban.project.ProjectManager;
import azkaban.scheduler.Schedule;
import azkaban.scheduler.ScheduleLoader;
import azkaban.scheduler.ScheduleManagerException;
import azkaban.sla.SlaOption;
import azkaban.trigger.TriggerAgent;
import azkaban.trigger.TriggerStatus;
import azkaban.utils.Pair;
import azkaban.utils.Props;
import java.util.ArrayList;
import java.util.Comparator;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.PriorityBlockingQueue;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.log4j.Logger;
import org.joda.time.DateTime;
import org.joda.time.DateTimeZone;
import org.joda.time.ReadablePeriod;
import org.joda.time.format.DateTimeFormat;
import org.joda.time.format.DateTimeFormatter;

public class ScheduleManager
implements TriggerAgent {
    private static Logger logger = Logger.getLogger(ScheduleManager.class);
    public static final String triggerSource = "SimpleTimeTrigger";
    private final DateTimeFormatter _dateFormat = DateTimeFormat.forPattern((String)"MM-dd-yyyy HH:mm:ss:SSS");
    private ScheduleLoader loader;
    private Map<Integer, Schedule> scheduleIDMap = new LinkedHashMap<Integer, Schedule>();
    private Map<Pair<Integer, String>, Schedule> scheduleIdentityPairMap = new LinkedHashMap<Pair<Integer, String>, Schedule>();
    private final ExecutorManagerAdapter executorManager;
    private ProjectManager projectManager = null;
    private final boolean useExternalRunner;
    private final ScheduleRunner runner;

    public ScheduleManager(ExecutorManagerAdapter executorManager, ScheduleLoader loader, boolean useExternalRunner) {
        this.executorManager = executorManager;
        this.loader = loader;
        this.useExternalRunner = useExternalRunner;
        this.runner = !useExternalRunner ? new ScheduleRunner() : null;
    }

    public void setProjectManager(ProjectManager projectManager) {
        this.projectManager = projectManager;
    }

    @Override
    public void start() throws ScheduleManagerException {
        List<Schedule> scheduleList = null;
        try {
            scheduleList = this.loader.loadSchedules();
        }
        catch (ScheduleManagerException e) {
            logger.error((Object)("Failed to load schedules" + e.getCause() + e.getMessage()));
            e.printStackTrace();
        }
        for (Schedule sched : scheduleList) {
            if (sched.getStatus().equals(TriggerStatus.EXPIRED.toString())) {
                this.onScheduleExpire(sched);
                continue;
            }
            this.internalSchedule(sched);
        }
        if (!this.useExternalRunner) {
            if (this.projectManager == null) {
                throw new ScheduleManagerException("Project Manager must be initialized when using internal schedule runner!");
            }
            this.runner.start();
        }
    }

    public synchronized void updateLocal() throws ScheduleManagerException {
        if (!this.useExternalRunner) {
            return;
        }
        List<Schedule> updates = this.loader.loadUpdatedSchedules();
        for (Schedule s : updates) {
            if (s.getStatus().equals(TriggerStatus.EXPIRED.toString())) {
                this.onScheduleExpire(s);
                continue;
            }
            this.internalSchedule(s);
        }
    }

    private void onScheduleExpire(Schedule s) {
        this.removeSchedule(s);
    }

    @Override
    public void shutdown() {
        if (!this.useExternalRunner) {
            this.runner.shutdown();
        }
    }

    public synchronized List<Schedule> getSchedules() throws ScheduleManagerException {
        this.updateLocal();
        return new ArrayList<Schedule>(this.scheduleIDMap.values());
    }

    public Schedule getSchedule(int projectId, String flowId) throws ScheduleManagerException {
        this.updateLocal();
        return this.scheduleIdentityPairMap.get(new Pair<Integer, String>(projectId, flowId));
    }

    public Schedule getSchedule(int scheduleId) throws ScheduleManagerException {
        this.updateLocal();
        return this.scheduleIDMap.get(scheduleId);
    }

    public synchronized void removeSchedule(int projectId, String flowId) throws ScheduleManagerException {
        Schedule sched = this.getSchedule(projectId, flowId);
        if (sched != null) {
            this.removeSchedule(sched);
        }
    }

    public synchronized void removeSchedule(Schedule sched) {
        Pair<Integer, String> identityPairMap = sched.getScheduleIdentityPair();
        Schedule schedule = this.scheduleIdentityPairMap.get(identityPairMap);
        if (schedule != null) {
            this.scheduleIdentityPairMap.remove(identityPairMap);
        }
        this.scheduleIDMap.remove(sched.getScheduleId());
        try {
            this.loader.removeSchedule(sched);
        }
        catch (ScheduleManagerException e) {
            e.printStackTrace();
        }
        if (!this.useExternalRunner) {
            this.runner.removeRunnerSchedule(sched);
        }
    }

    public Schedule scheduleFlow(int scheduleId, int projectId, String projectName, String flowName, String status, long firstSchedTime, DateTimeZone timezone, ReadablePeriod period, long lastModifyTime, long nextExecTime, long submitTime, String submitUser) {
        return this.scheduleFlow(scheduleId, projectId, projectName, flowName, status, firstSchedTime, timezone, period, lastModifyTime, nextExecTime, submitTime, submitUser, null, null);
    }

    public Schedule scheduleFlow(int scheduleId, int projectId, String projectName, String flowName, String status, long firstSchedTime, DateTimeZone timezone, ReadablePeriod period, long lastModifyTime, long nextExecTime, long submitTime, String submitUser, ExecutionOptions execOptions, List<SlaOption> slaOptions) {
        Schedule sched = new Schedule(scheduleId, projectId, projectName, flowName, status, firstSchedTime, timezone, period, lastModifyTime, nextExecTime, submitTime, submitUser, execOptions, slaOptions);
        logger.info((Object)("Scheduling flow '" + sched.getScheduleName() + "' for " + this._dateFormat.print(firstSchedTime) + " with a period of " + period == null ? "(non-recurring)" : period));
        this.insertSchedule(sched);
        return sched;
    }

    private synchronized void internalSchedule(Schedule s) {
        Schedule existing = null;
        if (this.scheduleIdentityPairMap.get(s.getScheduleIdentityPair()) != null) {
            existing = this.scheduleIdentityPairMap.get(s.getScheduleIdentityPair());
        }
        if (!this.useExternalRunner) {
            if (existing != null) {
                this.runner.removeRunnerSchedule(existing);
            }
            s.updateTime();
            this.runner.addRunnerSchedule(s);
        }
        this.scheduleIDMap.put(s.getScheduleId(), s);
        this.scheduleIdentityPairMap.put(s.getScheduleIdentityPair(), s);
    }

    public synchronized void insertSchedule(Schedule s) {
        block5: {
            Schedule exist = this.scheduleIdentityPairMap.get(s.getScheduleIdentityPair());
            if (s.updateTime()) {
                try {
                    if (exist == null) {
                        this.loader.insertSchedule(s);
                        this.internalSchedule(s);
                        break block5;
                    }
                    s.setScheduleId(exist.getScheduleId());
                    this.loader.updateSchedule(s);
                    this.internalSchedule(s);
                }
                catch (ScheduleManagerException e) {
                    e.printStackTrace();
                }
            } else {
                logger.error((Object)("The provided schedule is non-recurring and the scheduled time already passed. " + s.getScheduleName()));
            }
        }
    }

    @Override
    public void loadTriggerFromProps(Props props) throws ScheduleManagerException {
        throw new ScheduleManagerException("create " + this.getTriggerSource() + " from json not supported yet");
    }

    @Override
    public String getTriggerSource() {
        return triggerSource;
    }

    public long getLastCheckTime() {
        if (this.useExternalRunner) {
            return -1L;
        }
        return this.runner.getLastCheckTime();
    }

    public long getNextUpdateTime() {
        if (this.useExternalRunner) {
            return -1L;
        }
        return this.runner.getNextWakeupTime();
    }

    public Thread.State getThreadState() {
        if (this.useExternalRunner) {
            return null;
        }
        return this.runner.getState();
    }

    public boolean isThreadActive() {
        if (this.useExternalRunner) {
            return false;
        }
        return this.runner.isAlive();
    }

    public class ScheduleRunner
    extends Thread {
        private long lastCheckTime = -1L;
        private long nextWakupTime = -1L;
        private final PriorityBlockingQueue<Schedule> schedules;
        private AtomicBoolean stillAlive = new AtomicBoolean(true);
        private static final int TIMEOUT_MS = 300000;

        public ScheduleRunner() {
            this.schedules = new PriorityBlockingQueue<Schedule>(1, new ScheduleComparator());
        }

        public void shutdown() {
            logger.error((Object)"Shutting down scheduler thread");
            this.stillAlive.set(false);
            this.interrupt();
        }

        public long getLastCheckTime() {
            return this.lastCheckTime;
        }

        public long getNextWakeupTime() {
            return this.nextWakupTime;
        }

        public synchronized List<Schedule> getRunnerSchedules() {
            return new ArrayList<Schedule>(this.schedules);
        }

        public synchronized void addRunnerSchedule(Schedule s) {
            logger.info((Object)("Adding " + s + " to schedule runner."));
            this.schedules.add(s);
            this.interrupt();
        }

        public synchronized void removeRunnerSchedule(Schedule s) {
            logger.info((Object)("Removing " + s + " from the schedule runner."));
            this.schedules.remove(s);
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        @Override
        public void run() {
            while (this.stillAlive.get()) {
                ScheduleRunner scheduleRunner = this;
                synchronized (scheduleRunner) {
                    block28: {
                        try {
                            this.lastCheckTime = System.currentTimeMillis();
                            Schedule s = this.schedules.peek();
                            if (s == null) {
                                try {
                                    logger.info((Object)"Nothing scheduled to run. Checking again soon.");
                                    this.nextWakupTime = System.currentTimeMillis() + 300000L;
                                    this.wait(300000L);
                                }
                                catch (InterruptedException e) {}
                                break block28;
                            }
                            if (!new DateTime(s.getNextExecTime()).isAfterNow()) {
                                Schedule runningSched = this.schedules.poll();
                                logger.info((Object)("Scheduler ready to run " + runningSched.toString()));
                                try {
                                    Project project = ScheduleManager.this.projectManager.getProject(runningSched.getProjectId());
                                    if (project == null) {
                                        logger.error((Object)("Scheduled Project " + runningSched.getProjectId() + " does not exist!"));
                                        throw new RuntimeException("Error finding the scheduled project. " + runningSched.getProjectId());
                                    }
                                    Flow flow = project.getFlow(runningSched.getFlowName());
                                    if (flow == null) {
                                        logger.error((Object)("Flow " + runningSched.getScheduleName() + " cannot be found in project " + project.getName()));
                                        throw new RuntimeException("Error finding the scheduled flow. " + runningSched.getScheduleName());
                                    }
                                    ExecutableFlow exflow = new ExecutableFlow(flow);
                                    System.out.println("ScheduleManager: creating schedule: " + runningSched.getScheduleId());
                                    exflow.setScheduleId(runningSched.getScheduleId());
                                    exflow.setSubmitUser(runningSched.getSubmitUser());
                                    exflow.addAllProxyUsers(project.getProxyUsers());
                                    ExecutionOptions flowOptions = runningSched.getExecutionOptions();
                                    if (flowOptions == null) {
                                        flowOptions = new ExecutionOptions();
                                        flowOptions.setConcurrentOption("skip");
                                    }
                                    exflow.setExecutionOptions(flowOptions);
                                    if (!flowOptions.isFailureEmailsOverridden()) {
                                        flowOptions.setFailureEmails(flow.getFailureEmails());
                                    }
                                    if (!flowOptions.isSuccessEmailsOverridden()) {
                                        flowOptions.setSuccessEmails(flow.getSuccessEmails());
                                    }
                                    try {
                                        ScheduleManager.this.executorManager.submitExecutableFlow(exflow, s.getSubmitUser());
                                        logger.info((Object)("Scheduler has invoked " + exflow.getExecutionId()));
                                    }
                                    catch (ExecutorManagerException e) {
                                        throw e;
                                    }
                                    catch (Exception e) {
                                        e.printStackTrace();
                                        throw new ScheduleManagerException("Scheduler invoked flow " + exflow.getExecutionId() + " has failed.", e);
                                    }
                                }
                                catch (ExecutorManagerException e) {
                                    if (e.getReason() != null && e.getReason() == ExecutorManagerException.Reason.SkippedExecution) {
                                        logger.info((Object)e.getMessage());
                                    } else {
                                        e.printStackTrace();
                                    }
                                }
                                catch (Exception e) {
                                    logger.info((Object)("Scheduler failed to run job. " + e.getMessage() + e.getCause()));
                                }
                                this.removeRunnerSchedule(runningSched);
                                if (runningSched.updateTime()) {
                                    this.addRunnerSchedule(runningSched);
                                    ScheduleManager.this.loader.updateSchedule(runningSched);
                                } else {
                                    ScheduleManager.this.removeSchedule(runningSched);
                                }
                                break block28;
                            }
                            long millisWait = Math.max(0L, s.getNextExecTime() - new DateTime().getMillis());
                            try {
                                this.nextWakupTime = System.currentTimeMillis() + millisWait;
                                this.wait(Math.min(millisWait, 300000L));
                            }
                            catch (InterruptedException e) {}
                        }
                        catch (Exception e) {
                            logger.error((Object)"Unexpected exception has been thrown in scheduler", (Throwable)e);
                        }
                        catch (Throwable e) {
                            logger.error((Object)"Unexpected throwable has been thrown in scheduler", e);
                        }
                    }
                }
            }
        }

        private class ScheduleComparator
        implements Comparator<Schedule> {
            private ScheduleComparator() {
            }

            @Override
            public int compare(Schedule arg0, Schedule arg1) {
                long second;
                long first = arg1.getNextExecTime();
                if (first == (second = arg0.getNextExecTime())) {
                    return 0;
                }
                if (first < second) {
                    return 1;
                }
                return -1;
            }
        }
    }
}

