azkaban-uncached
Changes
src/java/azkaban/scheduler/ScheduleManager.java 331(+5 -326)
src/java/azkaban/trigger/Condition.java 16(+13 -3)
src/java/azkaban/trigger/TriggerManager.java 16(+15 -1)
Details
diff --git a/src/java/azkaban/execapp/JobRunner.java b/src/java/azkaban/execapp/JobRunner.java
index 9096b2f..fdb7688 100644
--- a/src/java/azkaban/execapp/JobRunner.java
+++ b/src/java/azkaban/execapp/JobRunner.java
@@ -157,6 +157,7 @@ public class JobRunner extends EventHandler implements Runnable {
fileAppender.setMaxFileSize(jobLogChunkSize);
jobAppender = fileAppender;
logger.addAppender(jobAppender);
+ logger.setAdditivity(false);
} catch (IOException e) {
flowLogger.error("Could not open log file in " + workingDir + " for job " + node.getJobId(), e);
}
diff --git a/src/java/azkaban/jmx/JmxExecutorManager.java b/src/java/azkaban/jmx/JmxExecutorManager.java
index 37f52f8..e3acefb 100644
--- a/src/java/azkaban/jmx/JmxExecutorManager.java
+++ b/src/java/azkaban/jmx/JmxExecutorManager.java
@@ -19,7 +19,7 @@ public class JmxExecutorManager implements JmxExecutorManagerMBean {
@Override
public String getExecutorThreadState() {
- return manager.getExecutorThreadState().toString();
+ return manager.getExecutorManagerThreadState().toString();
}
@Override
@@ -29,12 +29,12 @@ public class JmxExecutorManager implements JmxExecutorManagerMBean {
@Override
public boolean isThreadActive() {
- return manager.isThreadActive();
+ return manager.isExecutorManagerThreadActive();
}
@Override
public Long getLastThreadCheckTime() {
- return manager.getLastThreadCheckTime();
+ return manager.getLastExecutorManagerThreadCheckTime();
}
@Override
diff --git a/src/java/azkaban/jmx/JmxTriggerManager.java b/src/java/azkaban/jmx/JmxTriggerManager.java
index dbee790..e183558 100644
--- a/src/java/azkaban/jmx/JmxTriggerManager.java
+++ b/src/java/azkaban/jmx/JmxTriggerManager.java
@@ -55,6 +55,12 @@ public class JmxTriggerManager implements JmxTriggerManagerMBean {
public long getScannerIdleTime() {
return jmxStats.getScannerIdleTime();
}
+
+ @Override
+ public String getScannerThreadStage() {
+ // TODO Auto-generated method stub
+ return jmxStats.getScannerThreadStage();
+ }
diff --git a/src/java/azkaban/jmx/JmxTriggerManagerMBean.java b/src/java/azkaban/jmx/JmxTriggerManagerMBean.java
index 6302885..ecf9c21 100644
--- a/src/java/azkaban/jmx/JmxTriggerManagerMBean.java
+++ b/src/java/azkaban/jmx/JmxTriggerManagerMBean.java
@@ -27,4 +27,7 @@ public interface JmxTriggerManagerMBean {
@DisplayName("OPERATION: getScannerIdleTime")
public long getScannerIdleTime();
+
+ @DisplayName("OPERATION: getScannerThreadStage")
+ public String getScannerThreadStage();
}
src/java/azkaban/scheduler/ScheduleManager.java 331(+5 -326)
diff --git a/src/java/azkaban/scheduler/ScheduleManager.java b/src/java/azkaban/scheduler/ScheduleManager.java
index 29dbdeb..878eb28 100644
--- a/src/java/azkaban/scheduler/ScheduleManager.java
+++ b/src/java/azkaban/scheduler/ScheduleManager.java
@@ -69,9 +69,6 @@ public class ScheduleManager implements TriggerAgent {
private ProjectManager projectManager = null;
- private final boolean useExternalRunner;
- private final ScheduleRunner runner;
-
// Used for mbeans to query Scheduler status
//<<<<<<< HEAD
//
@@ -88,18 +85,10 @@ public class ScheduleManager implements TriggerAgent {
* @param loader
*/
public ScheduleManager (ExecutorManagerAdapter executorManager,
- ScheduleLoader loader,
- boolean useExternalRunner)
+ ScheduleLoader loader)
{
this.executorManager = executorManager;
this.loader = loader;
- this.useExternalRunner = useExternalRunner;
-
- if(!useExternalRunner) {
- this.runner = new ScheduleRunner();
- } else {
- this.runner = null;
- }
}
@@ -126,20 +115,11 @@ public class ScheduleManager implements TriggerAgent {
}
}
- if(!useExternalRunner) {
- if(projectManager == null) {
- throw new ScheduleManagerException("Project Manager must be initialized when using internal schedule runner!");
- }
- this.runner.start();
- }
}
// only do this when using external runner
public synchronized void updateLocal() throws ScheduleManagerException {
- if(!useExternalRunner) {
- //throw new ScheduleManagerException("Should not change schedule states when using local runner!");
- return;
- }
+
List<Schedule> updates = loader.loadUpdatedSchedules();
for(Schedule s : updates) {
if(s.getStatus().equals(TriggerStatus.EXPIRED.toString())) {
@@ -159,9 +139,7 @@ public class ScheduleManager implements TriggerAgent {
* it again.
*/
public void shutdown() {
- if(!useExternalRunner) {
- this.runner.shutdown();
- }
+
}
/**
@@ -265,10 +243,7 @@ public class ScheduleManager implements TriggerAgent {
// TODO Auto-generated catch block
e.printStackTrace();
}
-
- if(!useExternalRunner) {
- runner.removeRunnerSchedule(sched);
- }
+
}
@@ -347,13 +322,7 @@ public class ScheduleManager implements TriggerAgent {
if(scheduleIdentityPairMap.get(s.getScheduleIdentityPair()) != null) {
existing = scheduleIdentityPairMap.get(s.getScheduleIdentityPair());
}
- if(!useExternalRunner) {
- if (existing != null) {
- this.runner.removeRunnerSchedule(existing);
- }
- s.updateTime();
- this.runner.addRunnerSchedule(s);
- }
+
scheduleIDMap.put(s.getScheduleId(), s);
// Set<Schedule> schedules = scheduleIdentityPairMap.get(s.getScheduleIdentityPair());
// if(schedules == null) {
@@ -404,295 +373,5 @@ public class ScheduleManager implements TriggerAgent {
return triggerSource;
}
- /**
- * Thread that simply invokes the running of flows when the schedule is
- * ready.
- *
- * @author Richard Park
- *
- */
- public class ScheduleRunner extends Thread {
-
- private long lastCheckTime = -1;
- private long nextWakupTime = -1;
- private final PriorityBlockingQueue<Schedule> schedules;
- private AtomicBoolean stillAlive = new AtomicBoolean(true);
-
- // Five minute minimum intervals
- private static final int TIMEOUT_MS = 300000;
-
- public ScheduleRunner() {
- schedules = new PriorityBlockingQueue<Schedule>(1,new ScheduleComparator());
- }
-
- public void shutdown() {
- logger.error("Shutting down scheduler thread");
- stillAlive.set(false);
- this.interrupt();
- }
-
- public long getLastCheckTime() {
- return lastCheckTime;
- }
-
- public long getNextWakeupTime() {
- return nextWakupTime;
- }
-
- /**
- * Return a list of scheduled flow
- *
- * @return
- */
- public synchronized List<Schedule> getRunnerSchedules() {
- return new ArrayList<Schedule>(schedules);
- }
-
- /**
- * Adds the flow to the schedule and then interrupts so it will update
- * its wait time.
- *
- * @param flow
- */
- public synchronized void addRunnerSchedule(Schedule s) {
- logger.info("Adding " + s + " to schedule runner.");
- schedules.add(s);
- // MonitorImpl.getInternalMonitorInterface().workflowEvent(null,
- // System.currentTimeMillis(),
- // WorkflowAction.SCHEDULE_WORKFLOW,
- // WorkflowState.NOP,
- // flow.getId());
-
- this.interrupt();
- }
-
- /**
- * Remove scheduled flows. Does not interrupt.
- *
- * @param flow
- */
- public synchronized void removeRunnerSchedule(Schedule s) {
- logger.info("Removing " + s + " from the schedule runner.");
- schedules.remove(s);
- // MonitorImpl.getInternalMonitorInterface().workflowEvent(null,
- // System.currentTimeMillis(),
- // WorkflowAction.UNSCHEDULE_WORKFLOW,
- // WorkflowState.NOP,
- // flow.getId());
- // Don't need to interrupt, because if this is originally on the top
- // of the queue,
- // it'll just skip it.
- }
-
- public void run() {
- while (stillAlive.get()) {
- synchronized (this) {
- try {
- lastCheckTime = System.currentTimeMillis();
-
-// runnerStage = "Starting schedule scan.";
- // TODO clear up the exception handling
- Schedule s = schedules.peek();
-
- if (s == null) {
- // If null, wake up every minute or so to see if
- // there's something to do. Most likely there will not be.
- try {
- logger.info("Nothing scheduled to run. Checking again soon.");
-// runnerStage = "Waiting for next round scan.";
- nextWakupTime = System.currentTimeMillis() + TIMEOUT_MS;
- this.wait(TIMEOUT_MS);
- } catch (InterruptedException e) {
- // interruption should occur when items are added or removed from the queue.
- }
- } else {
- // We've passed the flow execution time, so we will run.
- if (!(new DateTime(s.getNextExecTime())).isAfterNow()) {
- // Run flow. The invocation of flows should be quick.
- Schedule runningSched = schedules.poll();
-
-// runnerStage = "Ready to run schedule " + runningSched.toString();
-
- logger.info("Scheduler ready to run " + runningSched.toString());
- // Execute the flow here
- try {
- Project project = projectManager.getProject(runningSched.getProjectId());
- if (project == null) {
- logger.error("Scheduled Project " + runningSched.getProjectId() + " does not exist!");
- throw new RuntimeException("Error finding the scheduled project. "+ runningSched.getProjectId());
- }
- //TODO It is possible that the project is there, but the flow doesn't exist because upload a version that changes flow structure
-
- Flow flow = project.getFlow(runningSched.getFlowName());
- if (flow == null) {
- logger.error("Flow " + runningSched.getScheduleName() + " cannot be found in project " + project.getName());
- throw new RuntimeException("Error finding the scheduled flow. " + runningSched.getScheduleName());
- }
-
- // Create ExecutableFlow
- ExecutableFlow exflow = new ExecutableFlow(flow);
- System.out.println("ScheduleManager: creating schedule: " +runningSched.getScheduleId());
- exflow.setScheduleId(runningSched.getScheduleId());
- exflow.setSubmitUser(runningSched.getSubmitUser());
- exflow.addAllProxyUsers(project.getProxyUsers());
-
- ExecutionOptions flowOptions = runningSched.getExecutionOptions();
- if(flowOptions == null) {
- flowOptions = new ExecutionOptions();
- flowOptions.setConcurrentOption(ExecutionOptions.CONCURRENT_OPTION_SKIP);
- }
- exflow.setExecutionOptions(flowOptions);
-
- if (!flowOptions.isFailureEmailsOverridden()) {
- flowOptions.setFailureEmails(flow.getFailureEmails());
- }
- if (!flowOptions.isSuccessEmailsOverridden()) {
- flowOptions.setSuccessEmails(flow.getSuccessEmails());
- }
-
-// runnerStage = "Submitting flow " + exflow.getFlowId();
-
- try {
- executorManager.submitExecutableFlow(exflow, s.getSubmitUser());
- logger.info("Scheduler has invoked " + exflow.getExecutionId());
- }
- catch (ExecutorManagerException e) {
- throw e;
- }
- catch (Exception e) {
- e.printStackTrace();
- throw new ScheduleManagerException("Scheduler invoked flow " + exflow.getExecutionId() + " has failed.", e);
- }
-
-// SlaOptions slaOptions = runningSched.getSlaOptions();
-// if(slaOptions != null) {
-// logger.info("Submitting SLA checkings for " + runningSched.getFlowName());
-// // submit flow slas
-// List<SlaSetting> jobsettings = new ArrayList<SlaSetting>();
-// for(SlaSetting set : slaOptions.getSettings()) {
-// if(set.getId().equals("")) {
-// DateTime checkTime = new DateTime(runningSched.getNextExecTime()).plus(set.getDuration());
-// slaManager.submitSla(exflow.getExecutionId(), "", checkTime, slaOptions.getSlaEmails(), set.getActions(), null, set.getRule());
-// }
-// else {
-// jobsettings.add(set);
-// }
-// }
-// if(jobsettings.size() > 0) {
-// slaManager.submitSla(exflow.getExecutionId(), "", DateTime.now(), slaOptions.getSlaEmails(), new ArrayList<SlaAction>(), jobsettings, SlaRule.WAITANDCHECKJOB);
-// }
-// }
- }
- catch (ExecutorManagerException e) {
- if (e.getReason() != null && e.getReason() == ExecutorManagerException.Reason.SkippedExecution) {
- logger.info(e.getMessage());
- }
- else {
- e.printStackTrace();
- }
- }
- catch (Exception e) {
- logger.info("Scheduler failed to run job. " + e.getMessage() + e.getCause());
- }
-
-// runnerStage = "Done running schedule for " + runningSched.toString();
- removeRunnerSchedule(runningSched);
-
- // Immediately reschedule if it's possible. Let
- // the execution manager
- // handle any duplicate runs.
- if (runningSched.updateTime()) {
- addRunnerSchedule(runningSched);
- loader.updateSchedule(runningSched);
- }
- else {
- removeSchedule(runningSched);
- }
- } else {
-// runnerStage = "Waiting for next round scan.";
- // wait until flow run
- long millisWait = Math.max(0, s.getNextExecTime() - (new DateTime()).getMillis());
- try {
- nextWakupTime = System.currentTimeMillis() + millisWait;
- this.wait(Math.min(millisWait, TIMEOUT_MS));
- } catch (InterruptedException e) {
- // interruption should occur when items are
- // added or removed from the queue.
- }
- }
- }
- } catch (Exception e) {
- logger.error("Unexpected exception has been thrown in scheduler", e);
- } catch (Throwable e) {
- logger.error("Unexpected throwable has been thrown in scheduler", e);
- }
- }
- }
- }
-
- /**
- * Class to sort the schedule based on time.
- *
- * @author Richard Park
- */
- private class ScheduleComparator implements Comparator<Schedule> {
- @Override
- public int compare(Schedule arg0, Schedule arg1) {
- long first = arg1.getNextExecTime();
- long second = arg0.getNextExecTime();
-
- if (first == second) {
- return 0;
- } else if (first < second) {
- return 1;
- }
-
- return -1;
- }
- }
- }
-
- public long getLastCheckTime() {
- if(useExternalRunner) {
- return -1;
- } else {
- return runner.getLastCheckTime();
- }
- }
-
- public long getNextUpdateTime() {
- if(useExternalRunner) {
- return -1;
- } else {
- return runner.getNextWakeupTime();
- }
- }
-
- public State getThreadState() {
- if(useExternalRunner) {
- return null;
- } else {
- return runner.getState();
- }
- }
-
- public boolean isThreadActive() {
- if(useExternalRunner) {
- return false;
- } else {
- return runner.isAlive();
- }
- }
-//<<<<<<< HEAD
-//
-//
-//
-//
-//=======
-// public String getThreadStage() {
-// return runnerStage;
-// }
-
-//>>>>>>> 10830aeb8ac819473873cac3bb4e07b4aeda67e8
}
diff --git a/src/java/azkaban/trigger/builtin/SlaChecker.java b/src/java/azkaban/trigger/builtin/SlaChecker.java
index 05a00ac..25ca9b3 100644
--- a/src/java/azkaban/trigger/builtin/SlaChecker.java
+++ b/src/java/azkaban/trigger/builtin/SlaChecker.java
@@ -5,12 +5,10 @@ import java.util.Map;
import org.apache.log4j.Logger;
import org.joda.time.DateTime;
-import org.joda.time.DateTimeZone;
import org.joda.time.ReadablePeriod;
import azkaban.executor.ExecutableFlow;
import azkaban.executor.ExecutableNode;
-import azkaban.executor.ExecutorManager;
import azkaban.executor.ExecutorManagerAdapter;
import azkaban.executor.ExecutorManagerException;
import azkaban.executor.Status;
@@ -28,6 +26,7 @@ public class SlaChecker implements ConditionChecker{
private int execId;
private Map<String, Object> context;
private boolean passChecker = true;
+ private long checkTime = -1;
private static ExecutorManagerAdapter executorManager;
@@ -62,6 +61,7 @@ public class SlaChecker implements ConditionChecker{
ReadablePeriod duration = Utils.parsePeriodString((String) slaOption.getInfo().get(SlaOption.INFO_DURATION));
DateTime startTime = new DateTime(flow.getStartTime());
DateTime checkTime = startTime.plus(duration);
+ this.checkTime = checkTime.getMillis();
if(checkTime.isBeforeNow()) {
Status status = flow.getStatus();
if(status.equals(Status.FAILED) || status.equals(Status.KILLED) || status.equals(Status.SUCCEEDED)) {
@@ -74,6 +74,7 @@ public class SlaChecker implements ConditionChecker{
ReadablePeriod duration = Utils.parsePeriodString((String) slaOption.getInfo().get(SlaOption.INFO_DURATION));
DateTime startTime = new DateTime(flow.getStartTime());
DateTime checkTime = startTime.plus(duration);
+ this.checkTime = checkTime.getMillis();
if(checkTime.isBeforeNow()) {
Status status = flow.getStatus();
if(status.equals(Status.SUCCEEDED)) {
@@ -89,6 +90,7 @@ public class SlaChecker implements ConditionChecker{
ReadablePeriod duration = Utils.parsePeriodString((String) slaOption.getInfo().get(SlaOption.INFO_DURATION));
DateTime startTime = new DateTime(node.getStartTime());
DateTime checkTime = startTime.plus(duration);
+ this.checkTime = checkTime.getMillis();
if(checkTime.isBeforeNow()) {
Status status = node.getStatus();
if(status.equals(Status.FAILED) || status.equals(Status.KILLED) || status.equals(Status.SUCCEEDED)) {
@@ -105,6 +107,7 @@ public class SlaChecker implements ConditionChecker{
ReadablePeriod duration = Utils.parsePeriodString((String) slaOption.getInfo().get(SlaOption.INFO_DURATION));
DateTime startTime = new DateTime(node.getStartTime());
DateTime checkTime = startTime.plus(duration);
+ this.checkTime = checkTime.getMillis();
if(checkTime.isBeforeNow()) {
Status status = node.getStatus();
if(status.equals(Status.SUCCEEDED)) {
@@ -229,8 +232,7 @@ public class SlaChecker implements ConditionChecker{
@Override
public long getNextCheckTime() {
- // TODO Auto-generated method stub
- return 0;
+ return checkTime;
}
}
src/java/azkaban/trigger/Condition.java 16(+13 -3)
diff --git a/src/java/azkaban/trigger/Condition.java b/src/java/azkaban/trigger/Condition.java
index 0311b54..d032527 100644
--- a/src/java/azkaban/trigger/Condition.java
+++ b/src/java/azkaban/trigger/Condition.java
@@ -26,6 +26,7 @@ public class Condition {
public Condition(Map<String, ConditionChecker> checkers, String expr) {
setCheckers(checkers);
this.expression = jexl.createExpression(expr);
+ updateNextCheckTime();
}
public Condition(Map<String, ConditionChecker> checkers, String expr, long nextCheckTime) {
@@ -49,9 +50,11 @@ public class Condition {
public void registerChecker(ConditionChecker checker) {
checkers.put(checker.getId(), checker);
context.set(checker.getId(), checker);
+ updateNextCheckTime();
}
public long getNextCheckTime() {
+ updateNextCheckTime();
return nextCheckTime;
}
@@ -64,18 +67,25 @@ public class Condition {
for(ConditionChecker checker : checkers.values()) {
this.context.set(checker.getId(), checker);
}
+ updateNextCheckTime();
}
- public void resetCheckers() {
+ private void updateNextCheckTime() {
long time = Long.MAX_VALUE;
for(ConditionChecker checker : checkers.values()) {
- checker.reset();
time = Math.min(time, checker.getNextCheckTime());
}
- logger.error("Done resetting checkers. The next check time will be " + new DateTime(time));
this.nextCheckTime = time;
}
+ public void resetCheckers() {
+ for(ConditionChecker checker : checkers.values()) {
+ checker.reset();
+ }
+ updateNextCheckTime();
+ logger.error("Done resetting checkers. The next check time will be " + new DateTime(nextCheckTime));
+ }
+
public String getExpression() {
return this.expression.getExpression();
}
diff --git a/src/java/azkaban/trigger/Trigger.java b/src/java/azkaban/trigger/Trigger.java
index 013a548..60e58f8 100644
--- a/src/java/azkaban/trigger/Trigger.java
+++ b/src/java/azkaban/trigger/Trigger.java
@@ -310,10 +310,12 @@ public class Trigger {
public void resetTriggerConditions() {
triggerCondition.resetCheckers();
+ updateNextCheckTime();
}
public void resetExpireCondition() {
expireCondition.resetCheckers();
+ updateNextCheckTime();
}
public List<TriggerAction> getTriggerActions () {
src/java/azkaban/trigger/TriggerManager.java 16(+15 -1)
diff --git a/src/java/azkaban/trigger/TriggerManager.java b/src/java/azkaban/trigger/TriggerManager.java
index bdcd30f..e964ecf 100644
--- a/src/java/azkaban/trigger/TriggerManager.java
+++ b/src/java/azkaban/trigger/TriggerManager.java
@@ -10,12 +10,13 @@ import java.util.Set;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.PriorityBlockingQueue;
import org.apache.log4j.Logger;
+import org.joda.time.DateTime;
import azkaban.utils.Props;
public class TriggerManager implements TriggerManagerAdapter{
private static Logger logger = Logger.getLogger(TriggerManager.class);
- private static final long DEFAULT_SCANNER_INTERVAL_MS = 60000;
+ public static final long DEFAULT_SCANNER_INTERVAL_MS = 60000;
private static Map<Integer, Trigger> triggerIdMap = new HashMap<Integer, Trigger>();
@@ -28,6 +29,8 @@ public class TriggerManager implements TriggerManagerAdapter{
private long runnerThreadIdleTime = -1;
private LocalTriggerJMX jmxStats = new LocalTriggerJMX();
+ private String scannerStage = "";
+
public TriggerManager(Props props, TriggerLoader triggerLoader) throws TriggerManagerException {
this.triggerLoader = triggerLoader;
@@ -165,6 +168,8 @@ public class TriggerManager implements TriggerManagerAdapter{
try{
lastRunnerThreadCheckTime = System.currentTimeMillis();
+ scannerStage = "Ready to start a new scan cycle at " + lastRunnerThreadCheckTime;
+
try{
checkAllTriggers();
} catch(Exception e) {
@@ -175,7 +180,10 @@ public class TriggerManager implements TriggerManagerAdapter{
logger.error(t.getMessage());
}
+ scannerStage = "Done flipping all triggers.";
+
runnerThreadIdleTime = scannerInterval - (System.currentTimeMillis() - lastRunnerThreadCheckTime);
+
if(runnerThreadIdleTime < 0) {
logger.error("Trigger manager thread " + this.getName() + " is too busy!");
} else {
@@ -192,6 +200,7 @@ public class TriggerManager implements TriggerManagerAdapter{
private void checkAllTriggers() throws TriggerManagerException {
long now = System.currentTimeMillis();
for(Trigger t : triggers) {
+ scannerStage = "Checking for trigger " + t.getTriggerId();
if(t.getNextCheckTime() > now) {
logger.info("Skipping trigger" + t.getTriggerId() + " until " + t.getNextCheckTime());
continue;
@@ -433,6 +442,11 @@ public class TriggerManager implements TriggerManagerAdapter{
public Map<String, Object> getAllJMXMbeans() {
return new HashMap<String, Object>();
}
+
+ @Override
+ public String getScannerThreadStage() {
+ return scannerStage;
+ }
}
diff --git a/src/java/azkaban/trigger/TriggerManagerAdapter.java b/src/java/azkaban/trigger/TriggerManagerAdapter.java
index 2ce5df5..0934985 100644
--- a/src/java/azkaban/trigger/TriggerManagerAdapter.java
+++ b/src/java/azkaban/trigger/TriggerManagerAdapter.java
@@ -42,6 +42,7 @@ public interface TriggerManagerAdapter {
public String getTriggerIds();
public long getScannerIdleTime();
public Map<String, Object> getAllJMXMbeans();
+ public String getScannerThreadStage();
}
}
diff --git a/src/java/azkaban/webapp/AzkabanWebServer.java b/src/java/azkaban/webapp/AzkabanWebServer.java
index d0731ed..2af906b 100644
--- a/src/java/azkaban/webapp/AzkabanWebServer.java
+++ b/src/java/azkaban/webapp/AzkabanWebServer.java
@@ -42,7 +42,6 @@ import org.apache.velocity.app.VelocityEngine;
import org.apache.velocity.runtime.log.Log4JLogChute;
import org.apache.velocity.runtime.resource.loader.ClasspathResourceLoader;
import org.apache.velocity.runtime.resource.loader.JarResourceLoader;
-import org.joda.time.DateTime;
import org.joda.time.DateTimeZone;
import org.mortbay.jetty.Server;
import org.mortbay.jetty.bio.SocketConnector;
@@ -61,21 +60,16 @@ import azkaban.executor.JdbcExecutorLoader;
import azkaban.executor.ExecutorManager.Alerter;
import azkaban.jmx.JmxExecutorManagerAdapter;
import azkaban.jmx.JmxJettyServer;
-import azkaban.jmx.JmxScheduler;
import azkaban.jmx.JmxTriggerManager;
import azkaban.project.JdbcProjectLoader;
import azkaban.project.ProjectManager;
-import azkaban.scheduler.JdbcScheduleLoader;
import azkaban.scheduler.ScheduleLoader;
import azkaban.scheduler.ScheduleManager;
import azkaban.scheduler.TriggerBasedScheduleLoader;
-import azkaban.trigger.ActionTypeLoader;
-import azkaban.trigger.CheckerTypeLoader;
import azkaban.trigger.JdbcTriggerLoader;
import azkaban.trigger.TriggerLoader;
import azkaban.trigger.TriggerManager;
-import azkaban.trigger.TriggerAgent;
import azkaban.trigger.TriggerManagerException;
import azkaban.trigger.builtin.BasicTimeChecker;
import azkaban.trigger.builtin.CreateTriggerAction;
@@ -83,7 +77,6 @@ import azkaban.trigger.builtin.ExecuteFlowAction;
import azkaban.trigger.builtin.KillExecutionAction;
import azkaban.trigger.builtin.SlaAlertAction;
import azkaban.trigger.builtin.SlaChecker;
-import azkaban.triggerapp.AzkabanTriggerServer;
import azkaban.user.UserManager;
import azkaban.user.XmlUserManager;
import azkaban.utils.FileIOUtils;
@@ -292,7 +285,7 @@ public class AzkabanWebServer extends AzkabanServer {
private ScheduleManager loadScheduleManager(ExecutorManagerAdapter executorManager, TriggerManager tm, Props props ) throws Exception {
logger.info("Loading trigger based scheduler");
ScheduleLoader loader = new TriggerBasedScheduleLoader(tm, executorManager, null, ScheduleManager.triggerSource);
- return new ScheduleManager(executorManager, loader, true);
+ return new ScheduleManager(executorManager, loader);
}
// private TriggerBasedScheduler loadScheduler(ExecutorManager executorManager, ProjectManager projectManager, TriggerManager triggerManager) {
// TriggerBasedScheduleLoader loader = new TriggerBasedScheduleLoader(triggerManager, executorManager, projectManager);
diff --git a/src/java/azkaban/webapp/servlet/ExecutorServlet.java b/src/java/azkaban/webapp/servlet/ExecutorServlet.java
index 0bbd585..fa4471b 100644
--- a/src/java/azkaban/webapp/servlet/ExecutorServlet.java
+++ b/src/java/azkaban/webapp/servlet/ExecutorServlet.java
@@ -32,7 +32,6 @@ import azkaban.executor.ExecutableNode;
import azkaban.executor.ExecutionOptions;
import azkaban.executor.ExecutorManagerAdapter;
import azkaban.executor.ExecutionOptions.FailureAction;
-import azkaban.executor.ExecutorManager;
import azkaban.executor.ExecutorManagerException;
import azkaban.executor.Status;
import azkaban.flow.Flow;