azkaban-developers
Changes
src/java/azkaban/scheduler/ScheduleManager.java 326(+4 -322)
src/java/azkaban/trigger/TriggerManager.java 145(+69 -76)
Details
src/java/azkaban/scheduler/ScheduleManager.java 326(+4 -322)
diff --git a/src/java/azkaban/scheduler/ScheduleManager.java b/src/java/azkaban/scheduler/ScheduleManager.java
index 39e42e9..a243389 100644
--- a/src/java/azkaban/scheduler/ScheduleManager.java
+++ b/src/java/azkaban/scheduler/ScheduleManager.java
@@ -50,8 +50,6 @@ public class ScheduleManager implements TriggerAgent {
private Map<Integer, Schedule> scheduleIDMap = new LinkedHashMap<Integer, Schedule>();
private Map<Pair<Integer, String>, Schedule> scheduleIdentityPairMap = new LinkedHashMap<Pair<Integer, String>, Schedule>();
- // Used for mbeans to query Scheduler status
-
/**
* Give the schedule manager a loader class that will properly load the
* schedule.
@@ -115,19 +113,7 @@ public class ScheduleManager implements TriggerAgent {
* @throws ScheduleManagerException
*/
public synchronized List<Schedule> getSchedules() throws ScheduleManagerException {
-// if(useExternalRunner) {
-// for(Schedule s : scheduleIDMap.values()) {
-// try {
-// loader.updateNextExecTime(s);
-// } catch (ScheduleManagerException e) {
-// // TODO Auto-generated catch block
-// e.printStackTrace();
-// logger.error("Failed to update schedule from external runner for schedule " + s.getScheduleId());
-// }
-// }
-// }
-
- //return runner.getRunnerSchedules();
+
updateLocal();
return new ArrayList<Schedule>(scheduleIDMap.values());
}
@@ -139,10 +125,7 @@ public class ScheduleManager implements TriggerAgent {
* @return
* @throws ScheduleManagerException
*/
-// public Set<Schedule> getSchedules(int projectId, String flowId) throws ScheduleManagerException {
-// updateLocal();
-// return scheduleIdentityPairMap.get(new Pair<Integer,String>(projectId, flowId));
-// }
+
public Schedule getSchedule(int projectId, String flowId) throws ScheduleManagerException {
updateLocal();
return scheduleIdentityPairMap.get(new Pair<Integer,String>(projectId, flowId));
@@ -166,14 +149,7 @@ public class ScheduleManager implements TriggerAgent {
* @param id
* @throws ScheduleManagerException
*/
-// public synchronized void removeSchedules(int projectId, String flowId) throws ScheduleManagerException {
-// Set<Schedule> schedules = getSchedules(projectId, flowId);
-// if(schedules != null) {
-// for(Schedule sched : schedules) {
-// removeSchedule(sched);
-// }
-// }
-// }
+
public synchronized void removeSchedule(int projectId, String flowId) throws ScheduleManagerException {
Schedule sched = getSchedule(projectId, flowId);
if(sched != null) {
@@ -188,13 +164,7 @@ public class ScheduleManager implements TriggerAgent {
*/
public synchronized void removeSchedule(Schedule sched) {
Pair<Integer,String> identityPairMap = sched.getScheduleIdentityPair();
-// Set<Schedule> schedules = scheduleIdentityPairMap.get(identityPairMap);
-// if(schedules != null) {
-// schedules.remove(sched);
-// if(schedules.size() == 0) {
-// scheduleIdentityPairMap.remove(identityPairMap);
-// }
-// }
+
Schedule schedule = scheduleIdentityPairMap.get(identityPairMap);
if(schedule != null) {
scheduleIdentityPairMap.remove(identityPairMap);
@@ -210,28 +180,6 @@ public class ScheduleManager implements TriggerAgent {
}
}
- // public synchronized void pauseScheduledFlow(String scheduleId){
- // try{
- // ScheduledFlow flow = scheduleIDMap.get(scheduleId);
- // flow.setSchedStatus(SchedStatus.LASTPAUSED);
- // loader.saveSchedule(getSchedule());
- // }
- // catch (Exception e) {
- // throw new RuntimeException("Error pausing a schedule " + scheduleId);
- // }
- // }
- //
- // public synchronized void resumeScheduledFlow(String scheduleId){
- // try {
- // ScheduledFlow flow = scheduleIDMap.get(scheduleId);
- // flow.setSchedStatus(SchedStatus.LASTSUCCESS);
- // loader.saveSchedule(getSchedule());
- // }
- // catch (Exception e) {
- // throw new RuntimeException("Error resuming a schedule " + scheduleId);
- // }
- // }
-
public Schedule scheduleFlow(
final int scheduleId,
final int projectId,
@@ -280,19 +228,7 @@ public class ScheduleManager implements TriggerAgent {
* @param flow
*/
private synchronized void internalSchedule(Schedule s) {
- //Schedule existing = scheduleIDMap.get(s.getScheduleId());
-// Schedule existing = null;
-// if(scheduleIdentityPairMap.get(s.getScheduleIdentityPair()) != null) {
-// existing = scheduleIdentityPairMap.get(s.getScheduleIdentityPair());
-// }
-
scheduleIDMap.put(s.getScheduleId(), s);
-// Set<Schedule> schedules = scheduleIdentityPairMap.get(s.getScheduleIdentityPair());
-// if(schedules == null) {
-// schedules = new HashSet<Schedule>();
-// scheduleIdentityPairMap.put(s.getScheduleIdentityPair(), schedules);
-// }
-// schedules.add(s);
scheduleIdentityPairMap.put(s.getScheduleIdentityPair(), s);
}
@@ -330,260 +266,6 @@ public class ScheduleManager implements TriggerAgent {
throw new ScheduleManagerException("create " + getTriggerSource() + " from json not supported yet" );
}
- /**
- * Thread that simply invokes the running of flows when the schedule is
- * ready.
- */
-// public class ScheduleRunner extends Thread {
-// private final PriorityBlockingQueue<Schedule> schedules;
-// private AtomicBoolean stillAlive = new AtomicBoolean(true);
-//
-// // Five minute minimum intervals
-// private static final int TIMEOUT_MS = 300000;
-//
-// public ScheduleRunner() {
-// schedules = new PriorityBlockingQueue<Schedule>(1,new ScheduleComparator());
-// }
-//
-// public void shutdown() {
-// logger.error("Shutting down scheduler thread");
-// stillAlive.set(false);
-// this.interrupt();
-// }
-//
-// /**
-// * Return a list of scheduled flow
-// *
-// * @return
-// */
-// public synchronized List<Schedule> getRunnerSchedules() {
-// return new ArrayList<Schedule>(schedules);
-// }
-//
-// /**
-// * Adds the flow to the schedule and then interrupts so it will update
-// * its wait time.
-// *
-// * @param flow
-// */
-// public synchronized void addRunnerSchedule(Schedule s) {
-// logger.info("Adding " + s + " to schedule runner.");
-// schedules.add(s);
-// // MonitorImpl.getInternalMonitorInterface().workflowEvent(null,
-// // System.currentTimeMillis(),
-// // WorkflowAction.SCHEDULE_WORKFLOW,
-// // WorkflowState.NOP,
-// // flow.getId());
-//
-// this.interrupt();
-// }
-//
-// /**
-// * Remove scheduled flows. Does not interrupt.
-// *
-// * @param flow
-// */
-// public synchronized void removeRunnerSchedule(Schedule s) {
-// logger.info("Removing " + s + " from the schedule runner.");
-// schedules.remove(s);
-// // MonitorImpl.getInternalMonitorInterface().workflowEvent(null,
-// // System.currentTimeMillis(),
-// // WorkflowAction.UNSCHEDULE_WORKFLOW,
-// // WorkflowState.NOP,
-// // flow.getId());
-// // Don't need to interrupt, because if this is originally on the top
-// // of the queue,
-// // it'll just skip it.
-// }
-//
-// public void run() {
-// while (stillAlive.get()) {
-// synchronized (this) {
-// try {
-// lastCheckTime = System.currentTimeMillis();
-//
-// runnerStage = "Starting schedule scan.";
-// // TODO clear up the exception handling
-// Schedule s = schedules.peek();
-//
-// if (s == null) {
-// // If null, wake up every minute or so to see if
-// // there's something to do. Most likely there will not be.
-// try {
-// logger.info("Nothing scheduled to run. Checking again soon.");
-// runnerStage = "Waiting for next round scan.";
-// nextWakupTime = System.currentTimeMillis() + TIMEOUT_MS;
-// this.wait(TIMEOUT_MS);
-// } catch (InterruptedException e) {
-// // interruption should occur when items are added or removed from the queue.
-// }
-// } else {
-// // We've passed the flow execution time, so we will run.
-// if (!(new DateTime(s.getNextExecTime())).isAfterNow()) {
-// // Run flow. The invocation of flows should be quick.
-// Schedule runningSched = schedules.poll();
-//
-// runnerStage = "Ready to run schedule " + runningSched.toString();
-//
-// logger.info("Scheduler ready to run " + runningSched.toString());
-// // Execute the flow here
-// try {
-// Project project = projectManager.getProject(runningSched.getProjectId());
-// if (project == null) {
-// logger.error("Scheduled Project " + runningSched.getProjectId() + " does not exist!");
-// throw new RuntimeException("Error finding the scheduled project. "+ runningSched.getProjectId());
-// }
-// //TODO It is possible that the project is there, but the flow doesn't exist because upload a version that changes flow structure
-//
-// Flow flow = project.getFlow(runningSched.getFlowName());
-// if (flow == null) {
-// logger.error("Flow " + runningSched.getScheduleName() + " cannot be found in project " + project.getName());
-// throw new RuntimeException("Error finding the scheduled flow. " + runningSched.getScheduleName());
-// }
-//
-// // Create ExecutableFlow
-// ExecutableFlow exflow = new ExecutableFlow(flow);
-// System.out.println("ScheduleManager: creating schedule: " +runningSched.getScheduleId());
-// exflow.setScheduleId(runningSched.getScheduleId());
-// exflow.setSubmitUser(runningSched.getSubmitUser());
-// exflow.addAllProxyUsers(project.getProxyUsers());
-//
-// ExecutionOptions flowOptions = runningSched.getExecutionOptions();
-// if(flowOptions == null) {
-// flowOptions = new ExecutionOptions();
-// flowOptions.setConcurrentOption(ExecutionOptions.CONCURRENT_OPTION_SKIP);
-// }
-// exflow.setExecutionOptions(flowOptions);
-//
-// if (!flowOptions.isFailureEmailsOverridden()) {
-// flowOptions.setFailureEmails(flow.getFailureEmails());
-// }
-// if (!flowOptions.isSuccessEmailsOverridden()) {
-// flowOptions.setSuccessEmails(flow.getSuccessEmails());
-// }
-//
-// runnerStage = "Submitting flow " + exflow.getFlowId();
-// flowOptions.setMailCreator(flow.getMailCreator());
-//
-// try {
-// executorManager.submitExecutableFlow(exflow);
-// logger.info("Scheduler has invoked " + exflow.getExecutionId());
-// }
-// catch (ExecutorManagerException e) {
-// throw e;
-// }
-// catch (Exception e) {
-// e.printStackTrace();
-// throw new ScheduleManagerException("Scheduler invoked flow " + exflow.getExecutionId() + " has failed.", e);
-// }
-//
-// SlaOptions slaOptions = runningSched.getSlaOptions();
-// if(slaOptions != null) {
-// logger.info("Submitting SLA checkings for " + runningSched.getFlowName());
-// runnerStage = "Submitting SLA checkings for " + runningSched.getFlowName();
-// // submit flow slas
-// List<SlaSetting> jobsettings = new ArrayList<SlaSetting>();
-// for(SlaSetting set : slaOptions.getSettings()) {
-// if(set.getId().equals("")) {
-// DateTime checkTime = new DateTime(runningSched.getNextExecTime()).plus(set.getDuration());
-// slaManager.submitSla(exflow.getExecutionId(), "", checkTime, slaOptions.getSlaEmails(), set.getActions(), null, set.getRule());
-// }
-// else {
-// jobsettings.add(set);
-// }
-// }
-// if(jobsettings.size() > 0) {
-// slaManager.submitSla(exflow.getExecutionId(), "", DateTime.now(), slaOptions.getSlaEmails(), new ArrayList<SlaAction>(), jobsettings, SlaRule.WAITANDCHECKJOB);
-// }
-// }
-//
-// }
-// catch (ExecutorManagerException e) {
-// if (e.getReason() != null && e.getReason() == ExecutorManagerException.Reason.SkippedExecution) {
-// logger.info(e.getMessage());
-// }
-// else {
-// e.printStackTrace();
-// }
-// }
-// catch (Exception e) {
-// logger.info("Scheduler failed to run job. " + e.getMessage() + e.getCause());
-// }
-//
-// runnerStage = "Done running schedule for " + runningSched.toString();
-// removeRunnerSchedule(runningSched);
-//
-// // Immediately reschedule if it's possible. Let
-// // the execution manager
-// // handle any duplicate runs.
-// if (runningSched.updateTime()) {
-// addRunnerSchedule(runningSched);
-// loader.updateSchedule(runningSched);
-// }
-// else {
-// removeSchedule(runningSched);
-// }
-// } else {
-// runnerStage = "Waiting for next round scan.";
-// // wait until flow run
-// long millisWait = Math.max(0, s.getNextExecTime() - (new DateTime()).getMillis());
-// try {
-// nextWakupTime = System.currentTimeMillis() + millisWait;
-// this.wait(Math.min(millisWait, TIMEOUT_MS));
-// } catch (InterruptedException e) {
-// // interruption should occur when items are
-// // added or removed from the queue.
-// }
-// }
-// }
-// } catch (Exception e) {
-// logger.error("Unexpected exception has been thrown in scheduler", e);
-// } catch (Throwable e) {
-// logger.error("Unexpected throwable has been thrown in scheduler", e);
-// }
-// }
-// }
-// }
-//
-// /**
-// * Class to sort the schedule based on time.
-// *
-// * @author Richard Park
-// */
-// private class ScheduleComparator implements Comparator<Schedule> {
-// @Override
-// public int compare(Schedule arg0, Schedule arg1) {
-// long first = arg1.getNextExecTime();
-// long second = arg0.getNextExecTime();
-//
-// if (first == second) {
-// return 0;
-// } else if (first < second) {
-// return 1;
-// }
-//
-// return -1;
-// }
-// }
-// }
-
-// public long getLastCheckTime() {
-// return lastCheckTime;
-// }
-//
-// public long getNextUpdateTime() {
-// return nextWakupTime;
-// }
-//
-// public State getThreadState() {
-// return runner.getState();
-// }
-//
-// public boolean isThreadActive() {
-// return runner.isAlive();
-//>>>>>>> df6eb48ad044ae68afffae2254991289792f33a0
-// }
-
@Override
public String getTriggerSource() {
return triggerSource;
diff --git a/src/java/azkaban/trigger/ConditionChecker.java b/src/java/azkaban/trigger/ConditionChecker.java
index 16869b4..94e3841 100644
--- a/src/java/azkaban/trigger/ConditionChecker.java
+++ b/src/java/azkaban/trigger/ConditionChecker.java
@@ -40,7 +40,4 @@ public interface ConditionChecker {
long getNextCheckTime();
-// void setCondition(Condition c);
-//
-// String getDescription();
}
src/java/azkaban/trigger/TriggerManager.java 145(+69 -76)
diff --git a/src/java/azkaban/trigger/TriggerManager.java b/src/java/azkaban/trigger/TriggerManager.java
index 6f403e7..594f759 100644
--- a/src/java/azkaban/trigger/TriggerManager.java
+++ b/src/java/azkaban/trigger/TriggerManager.java
@@ -53,6 +53,8 @@ public class TriggerManager extends EventHandler implements TriggerManagerAdapte
private ExecutorManagerEventListener listener = new ExecutorManagerEventListener();
+ private Object syncObj = new Object();
+
private String scannerStage = "";
public TriggerManager(Props props, TriggerLoader triggerLoader, ExecutorManager executorManager) throws TriggerManagerException {
@@ -106,51 +108,61 @@ public class TriggerManager extends EventHandler implements TriggerManagerAdapte
return actionTypeLoader;
}
- public synchronized void insertTrigger(Trigger t) throws TriggerManagerException {
- try {
- triggerLoader.addTrigger(t);
- } catch (TriggerLoaderException e) {
- throw new TriggerManagerException(e);
+ public void insertTrigger(Trigger t) throws TriggerManagerException {
+ synchronized (syncObj) {
+ try {
+ triggerLoader.addTrigger(t);
+ } catch (TriggerLoaderException e) {
+ throw new TriggerManagerException(e);
+ }
+ runnerThread.addTrigger(t);
+ triggerIdMap.put(t.getTriggerId(), t);
}
- runnerThread.addTrigger(t);
- triggerIdMap.put(t.getTriggerId(), t);
}
- public synchronized void removeTrigger(int id) throws TriggerManagerException {
- Trigger t = triggerIdMap.get(id);
- if(t != null) {
- removeTrigger(triggerIdMap.get(id));
+ public void removeTrigger(int id) throws TriggerManagerException {
+ synchronized (syncObj) {
+ Trigger t = triggerIdMap.get(id);
+ if(t != null) {
+ removeTrigger(triggerIdMap.get(id));
+ }
}
}
- public synchronized void updateTrigger(int id) throws TriggerManagerException {
- if(! triggerIdMap.containsKey(id)) {
- throw new TriggerManagerException("The trigger to update " + id + " doesn't exist!");
- }
-
- Trigger t;
- try {
- t = triggerLoader.loadTrigger(id);
- } catch (TriggerLoaderException e) {
- throw new TriggerManagerException(e);
+ public void updateTrigger(int id) throws TriggerManagerException {
+ synchronized (syncObj) {
+ if(! triggerIdMap.containsKey(id)) {
+ throw new TriggerManagerException("The trigger to update " + id + " doesn't exist!");
+ }
+
+ Trigger t;
+ try {
+ t = triggerLoader.loadTrigger(id);
+ } catch (TriggerLoaderException e) {
+ throw new TriggerManagerException(e);
+ }
+ updateTrigger(t);
}
- updateTrigger(t);
}
- public synchronized void updateTrigger(Trigger t) throws TriggerManagerException {
- runnerThread.deleteTrigger(triggerIdMap.get(t.getTriggerId()));
- runnerThread.addTrigger(t);
- triggerIdMap.put(t.getTriggerId(), t);
+ public void updateTrigger(Trigger t) throws TriggerManagerException {
+ synchronized (syncObj) {
+ runnerThread.deleteTrigger(triggerIdMap.get(t.getTriggerId()));
+ runnerThread.addTrigger(t);
+ triggerIdMap.put(t.getTriggerId(), t);
+ }
}
- public synchronized void removeTrigger(Trigger t) throws TriggerManagerException {
- runnerThread.deleteTrigger(t);
- triggerIdMap.remove(t.getTriggerId());
- try {
- t.stopCheckers();
- triggerLoader.removeTrigger(t);
- } catch (TriggerLoaderException e) {
- throw new TriggerManagerException(e);
+ public void removeTrigger(Trigger t) throws TriggerManagerException {
+ synchronized (syncObj) {
+ runnerThread.deleteTrigger(t);
+ triggerIdMap.remove(t.getTriggerId());
+ try {
+ t.stopCheckers();
+ triggerLoader.removeTrigger(t);
+ } catch (TriggerLoaderException e) {
+ throw new TriggerManagerException(e);
+ }
}
}
@@ -183,23 +195,27 @@ public class TriggerManager extends EventHandler implements TriggerManagerAdapte
this.interrupt();
}
- public synchronized void addJustFinishedFlow(ExecutableFlow flow) {
- justFinishedFlows.put(flow.getExecutionId(), flow);
+ public void addJustFinishedFlow(ExecutableFlow flow) {
+ synchronized (syncObj) {
+ justFinishedFlows.put(flow.getExecutionId(), flow);
+ }
}
- public synchronized void addTrigger(Trigger t) {
- t.updateNextCheckTime();
- triggers.add(t);
+ public void addTrigger(Trigger t) {
+ synchronized (syncObj) {
+ t.updateNextCheckTime();
+ triggers.add(t);
+ }
}
- public synchronized void deleteTrigger(Trigger t) {
+ public void deleteTrigger(Trigger t) {
triggers.remove(t);
}
public void run() {
//while(stillAlive.get()) {
while(!shutdown) {
- synchronized (this) {
+ synchronized (syncObj) {
try{
lastRunnerThreadCheckTime = System.currentTimeMillis();
@@ -343,8 +359,10 @@ public class TriggerManager extends EventHandler implements TriggerManagerAdapte
}
}
- public synchronized Trigger getTrigger(int triggerId) {
- return triggerIdMap.get(triggerId);
+ public Trigger getTrigger(int triggerId) {
+ synchronized (syncObj) {
+ return triggerIdMap.get(triggerId);
+ }
}
public void expireTrigger(int triggerId) {
@@ -386,20 +404,6 @@ public class TriggerManager extends EventHandler implements TriggerManagerAdapte
return triggers;
}
-// public void loadTrigger(int triggerId) throws TriggerManagerException {
-// Trigger t;
-// try {
-// t = triggerLoader.loadTrigger(triggerId);
-// } catch (TriggerLoaderException e) {
-// throw new TriggerManagerException(e);
-// }
-// if(t.getStatus().equals(TriggerStatus.PREPARING)) {
-// triggerIdMap.put(t.getTriggerId(), t);
-// runnerThread.addTrigger(t);
-// t.setStatus(TriggerStatus.READY);
-// }
-// }
-
@Override
public void insertTrigger(Trigger t, String user) throws TriggerManagerException {
insertTrigger(t);
@@ -415,19 +419,6 @@ public class TriggerManager extends EventHandler implements TriggerManagerAdapte
updateTrigger(t);
}
-// @Override
-// public void insertTrigger(int triggerId, String user) throws TriggerManagerException {
-// Trigger t;
-// try {
-// t = triggerLoader.loadTrigger(triggerId);
-// } catch (TriggerLoaderException e) {
-// throw new TriggerManagerException(e);
-// }
-// if(t != null) {
-// insertTrigger(t);
-// }
-// }
-
@Override
public void shutdown() {
runnerThread.shutdown();
@@ -510,12 +501,14 @@ public class TriggerManager extends EventHandler implements TriggerManagerAdapte
}
@Override
- public synchronized void handleEvent(Event event) {
-
- ExecutableFlow flow = (ExecutableFlow) event.getRunner();
- if (event.getType() == Type.FLOW_FINISHED) {
- logger.info("Flow finish event received. " + flow.getExecutionId() );
- runnerThread.addJustFinishedFlow(flow);
+ public void handleEvent(Event event) {
+ // this needs to be fixed for perf
+ synchronized (syncObj) {
+ ExecutableFlow flow = (ExecutableFlow) event.getRunner();
+ if (event.getType() == Type.FLOW_FINISHED) {
+ logger.info("Flow finish event received. " + flow.getExecutionId() );
+ runnerThread.addJustFinishedFlow(flow);
+ }
}
}
}
diff --git a/src/package/execserver/conf/azkaban.properties b/src/package/execserver/conf/azkaban.properties
index b54b0f5..1f3b462 100644
--- a/src/package/execserver/conf/azkaban.properties
+++ b/src/package/execserver/conf/azkaban.properties
@@ -20,3 +20,4 @@ mysql.numconnections=100
executor.maxThreads=50
executor.port=12321
executor.flow.threads=30
+
diff --git a/src/package/soloserver/bin/azkaban-solo-start.sh b/src/package/soloserver/bin/azkaban-solo-start.sh
index 54ac8d9..7d2fb8b 100755
--- a/src/package/soloserver/bin/azkaban-solo-start.sh
+++ b/src/package/soloserver/bin/azkaban-solo-start.sh
@@ -45,7 +45,7 @@ serverpath=`pwd`
if [ -z $AZKABAN_OPTS ]; then
AZKABAN_OPTS=-Xmx3G
fi
-AZKABAN_OPTS=$AZKABAN_OPTS -server -Dcom.sun.management.jmxremote -Djava.io.tmpdir=$tmpdir -Dexecutorport=$executorport -Dserverpath=$serverpath
+AZKABAN_OPTS="$AZKABAN_OPTS -server -Dcom.sun.management.jmxremote -Djava.io.tmpdir=$tmpdir -Dexecutorport=$executorport -Dserverpath=$serverpath"
java $AZKABAN_OPTS -cp $CLASSPATH azkaban.webapp.AzkabanSingleServer -conf $azkaban_dir/conf $@ &
diff --git a/src/sql/update.project_properties.2.1.sql b/src/sql/update.project_properties.2.1.sql
new file mode 100644
index 0000000..32d0821
--- /dev/null
+++ b/src/sql/update.project_properties.2.1.sql
@@ -0,0 +1,3 @@
+ALTER TABLE project_properties MODIFY name VARCHAR(255);
+
+