azkaban-uncached
Changes
src/java/azkaban/scheduler/Schedule.java 30(+4 -26)
src/java/azkaban/scheduler/ScheduleManager.java 51(+13 -38)
src/java/azkaban/trigger/Trigger.java 111(+107 -4)
src/java/azkaban/trigger/TriggerManager.java 574(+374 -200)
src/java/azkaban/webapp/AzkabanWebServer.java 44(+15 -29)
src/java/azkaban/webapp/servlet/ScheduleServlet.java 346(+170 -176)
Details
diff --git a/src/java/azkaban/executor/ExecutorManager.java b/src/java/azkaban/executor/ExecutorManager.java
index 527956d..5d51a2b 100644
--- a/src/java/azkaban/executor/ExecutorManager.java
+++ b/src/java/azkaban/executor/ExecutorManager.java
@@ -70,19 +70,27 @@ public class ExecutorManager {
private long lastThreadCheckTime = -1;
- public ExecutorManager(Props props, ExecutorLoader loader) throws ExecutorManagerException {
+ private final boolean isPrimary;
+
+ public ExecutorManager(Props props, ExecutorLoader loader, boolean isPrimary) throws ExecutorManagerException {
this.executorLoader = loader;
this.loadRunningFlows();
executorHost = props.getString("executor.host", "localhost");
executorPort = props.getInt("executor.port");
+
mailer = new ExecutorMailer(props);
- executingManager = new ExecutingManagerUpdaterThread();
- executingManager.start();
+
+ this.isPrimary = isPrimary;
+
+ if(isPrimary) {
+ executingManager = new ExecutingManagerUpdaterThread();
+ executingManager.start();
- long executionLogsRetentionMs = props.getLong("execution.logs.retention.ms", DEFAULT_EXECUTION_LOGS_RETENTION_MS);
- cleanerThread = new CleanerThread(executionLogsRetentionMs);
- cleanerThread.start();
+ long executionLogsRetentionMs = props.getLong("execution.logs.retention.ms", DEFAULT_EXECUTION_LOGS_RETENTION_MS);
+ cleanerThread = new CleanerThread(executionLogsRetentionMs);
+ cleanerThread.start();
+ }
}
public String getExecutorHost() {
@@ -128,6 +136,11 @@ public class ExecutorManager {
return ports;
}
+ public ExecutableFlow fetchExecutableFlow(int execId) throws ExecutorManagerException {
+ ExecutableFlow exflow = executorLoader.fetchExecutableFlow(execId);
+ return exflow;
+ }
+
private void loadRunningFlows() throws ExecutorManagerException {
runningFlows.putAll(executorLoader.fetchActiveFlows());
}
@@ -593,6 +606,8 @@ public class ExecutorManager {
try {
lastThreadCheckTime = System.currentTimeMillis();
+ loadRunningFlows();
+
Map<ConnectionInfo, List<ExecutableFlow>> exFlowMap = getFlowToExecutorMap();
ArrayList<ExecutableFlow> finishedFlows = new ArrayList<ExecutableFlow>();
ArrayList<ExecutableFlow> finalizeFlows = new ArrayList<ExecutableFlow>();
diff --git a/src/java/azkaban/jmx/JmxTriggerRunnerManager.java b/src/java/azkaban/jmx/JmxTriggerRunnerManager.java
new file mode 100644
index 0000000..d0af406
--- /dev/null
+++ b/src/java/azkaban/jmx/JmxTriggerRunnerManager.java
@@ -0,0 +1,22 @@
+package azkaban.jmx;
+
+import azkaban.triggerapp.TriggerRunnerManager;
+
+public class JmxTriggerRunnerManager implements JmxTriggerRunnerManagerMBean {
+ private TriggerRunnerManager manager;
+
+ public JmxTriggerRunnerManager(TriggerRunnerManager manager) {
+ this.manager = manager;
+ }
+
+ @Override
+ public long getLastRunnerThreadCheckTime() {
+ return manager.getLastRunnerThreadCheckTime();
+ }
+
+ @Override
+ public boolean isRunnerThreadActive() {
+ return manager.isRunnerThreadActive();
+ }
+
+}
diff --git a/src/java/azkaban/jmx/JmxTriggerRunnerManagerMBean.java b/src/java/azkaban/jmx/JmxTriggerRunnerManagerMBean.java
new file mode 100644
index 0000000..77b72e7
--- /dev/null
+++ b/src/java/azkaban/jmx/JmxTriggerRunnerManagerMBean.java
@@ -0,0 +1,11 @@
+package azkaban.jmx;
+
+public interface JmxTriggerRunnerManagerMBean {
+
+ @DisplayName("OPERATION: getLastRunnerThreadCheckTime")
+ public long getLastRunnerThreadCheckTime();
+
+ @DisplayName("OPERATION: isRunnerThreadActive")
+ public boolean isRunnerThreadActive();
+
+}
diff --git a/src/java/azkaban/project/ProjectManager.java b/src/java/azkaban/project/ProjectManager.java
index 818fb2b..eea9c46 100644
--- a/src/java/azkaban/project/ProjectManager.java
+++ b/src/java/azkaban/project/ProjectManager.java
@@ -40,15 +40,13 @@ public class ProjectManager {
private TriggerManager triggerManager;
private boolean loadTriggerFromFile = false;
- public ProjectManager(ProjectLoader loader, Props props, TriggerManager triggerManager) {
+ public ProjectManager(ProjectLoader loader, Props props) {
this.projectLoader = loader;
this.props = props;
this.tempDir = new File(this.props.getString("project.temp.dir", "temp"));
this.projectVersionRetention = (props.getInt("project.version.retention", 3));
logger.info("Project version retention is set to " + projectVersionRetention);
- this.triggerManager = triggerManager;
-
this.creatorDefaultPermissions = props.getBoolean("creator.default.proxy", true);
this.loadTriggerFromFile = props.getBoolean("enable.load.trigger.from.file", false);
@@ -59,6 +57,10 @@ public class ProjectManager {
loadAllProjects();
}
+
+ public void setTriggerManager(TriggerManager triggerManager) {
+ this.triggerManager = triggerManager;
+ }
public void setLoadTriggerFromFile(boolean enable) {
this.loadTriggerFromFile = enable;
src/java/azkaban/scheduler/Schedule.java 30(+4 -26)
diff --git a/src/java/azkaban/scheduler/Schedule.java b/src/java/azkaban/scheduler/Schedule.java
index 931b4e7..ccbad60 100644
--- a/src/java/azkaban/scheduler/Schedule.java
+++ b/src/java/azkaban/scheduler/Schedule.java
@@ -31,7 +31,6 @@ import org.joda.time.Seconds;
import org.joda.time.Weeks;
import azkaban.executor.ExecutionOptions;
-import azkaban.sla.SlaOptions;
import azkaban.utils.Pair;
public class Schedule{
@@ -57,7 +56,6 @@ public class Schedule{
private boolean skipPastOccurrences = true;
private ExecutionOptions executionOptions;
- private SlaOptions slaOptions;
public Schedule(
int scheduleId,
@@ -86,7 +84,6 @@ public class Schedule{
nextExecTime,
submitTime,
submitUser,
- null,
null
);
}
@@ -104,8 +101,7 @@ public class Schedule{
long nextExecTime,
long submitTime,
String submitUser,
- ExecutionOptions executionOptions,
- SlaOptions slaOptions
+ ExecutionOptions executionOptions
) {
this(scheduleId, projectId,
projectName,
@@ -118,8 +114,7 @@ public class Schedule{
nextExecTime,
submitTime,
submitUser,
- executionOptions,
- slaOptions
+ executionOptions
);
}
@@ -136,8 +131,7 @@ public class Schedule{
long nextExecTime,
long submitTime,
String submitUser,
- ExecutionOptions executionOptions,
- SlaOptions slaOptions
+ ExecutionOptions executionOptions
) {
this.scheduleId = scheduleId;
this.projectId = projectId;
@@ -152,7 +146,6 @@ public class Schedule{
this.status = status;
this.submitTime = submitTime;
this.executionOptions = executionOptions;
- this.slaOptions = slaOptions;
}
public ExecutionOptions getExecutionOptions() {
@@ -163,14 +156,6 @@ public class Schedule{
this.executionOptions = executionOptions;
}
- public SlaOptions getSlaOptions() {
- return slaOptions;
- }
-
- public void setSlaOptions(SlaOptions slaOptions) {
- this.slaOptions = slaOptions;
- }
-
public String getScheduleName() {
return projectName + "." + flowName + " (" + projectId + ")";
}
@@ -345,15 +330,12 @@ public class Schedule{
public Map<String,Object> optionsToObject() {
- if(executionOptions != null || slaOptions != null) {
+ if(executionOptions != null ) {
HashMap<String, Object> schedObj = new HashMap<String, Object>();
if(executionOptions != null) {
schedObj.put("executionOptions", executionOptions.toObject());
}
- if(slaOptions != null) {
- schedObj.put("slaOptions", slaOptions.toObject());
- }
return schedObj;
}
@@ -377,10 +359,6 @@ public class Schedule{
this.executionOptions.setConcurrentOption(ExecutionOptions.CONCURRENT_OPTION_SKIP);
}
- if (schedObj.containsKey("slaOptions")) {
- SlaOptions slaOptions = SlaOptions.fromObject(schedObj.get("slaOptions"));
- this.slaOptions = slaOptions;
- }
}
public boolean isRecurring() {
src/java/azkaban/scheduler/ScheduleManager.java 51(+13 -38)
diff --git a/src/java/azkaban/scheduler/ScheduleManager.java b/src/java/azkaban/scheduler/ScheduleManager.java
index 49e48c3..4ce0e25 100644
--- a/src/java/azkaban/scheduler/ScheduleManager.java
+++ b/src/java/azkaban/scheduler/ScheduleManager.java
@@ -16,7 +16,6 @@
package azkaban.scheduler;
-import java.io.File;
import java.lang.Thread.State;
import java.util.ArrayList;
import java.util.Comparator;
@@ -42,12 +41,6 @@ import azkaban.executor.ExecutorManagerException;
import azkaban.flow.Flow;
import azkaban.project.Project;
import azkaban.project.ProjectManager;
-import azkaban.sla.SLA.SlaAction;
-import azkaban.sla.SLA.SlaRule;
-import azkaban.sla.SLA.SlaSetting;
-import azkaban.sla.SLAManager;
-import azkaban.sla.SlaOptions;
-import azkaban.trigger.Trigger;
import azkaban.trigger.TriggerAgent;
import azkaban.trigger.TriggerStatus;
import azkaban.utils.Pair;
@@ -70,8 +63,8 @@ public class ScheduleManager implements TriggerAgent {
private Map<Integer, Schedule> scheduleIDMap = new LinkedHashMap<Integer, Schedule>();
private final ExecutorManager executorManager;
- private final ProjectManager projectManager;
- private final SLAManager slaManager;
+
+ private ProjectManager projectManager = null;
private final boolean useExternalRunner;
private final ScheduleRunner runner;
@@ -86,28 +79,27 @@ public class ScheduleManager implements TriggerAgent {
* @param loader
*/
public ScheduleManager (ExecutorManager executorManager,
- ProjectManager projectManager,
- SLAManager slaManager,
ScheduleLoader loader,
boolean useExternalRunner)
{
this.executorManager = executorManager;
- this.projectManager = projectManager;
- this.slaManager = slaManager;
this.loader = loader;
this.useExternalRunner = useExternalRunner;
if(!useExternalRunner) {
this.runner = new ScheduleRunner();
- load();
} else {
this.runner = null;
}
}
+ public void setProjectManager(ProjectManager projectManager) {
+ this.projectManager = projectManager;
+ }
+
@Override
- public void load() {
+ public void start() throws ScheduleManagerException {
List<Schedule> scheduleList = null;
try {
scheduleList = loader.loadSchedules();
@@ -126,6 +118,9 @@ public class ScheduleManager implements TriggerAgent {
}
if(!useExternalRunner) {
+ if(projectManager == null) {
+ throw new ScheduleManagerException("Project Manager must be initialized when using internal schedule runner!");
+ }
this.runner.start();
}
}
@@ -289,7 +284,7 @@ public class ScheduleManager implements TriggerAgent {
final long submitTime,
final String submitUser
) {
- return scheduleFlow(scheduleId, projectId, projectName, flowName, status, firstSchedTime, timezone, period, lastModifyTime, nextExecTime, submitTime, submitUser, null, null);
+ return scheduleFlow(scheduleId, projectId, projectName, flowName, status, firstSchedTime, timezone, period, lastModifyTime, nextExecTime, submitTime, submitUser, null);
}
public Schedule scheduleFlow(
@@ -305,10 +300,9 @@ public class ScheduleManager implements TriggerAgent {
final long nextExecTime,
final long submitTime,
final String submitUser,
- ExecutionOptions execOptions,
- SlaOptions slaOptions
+ ExecutionOptions execOptions
) {
- Schedule sched = new Schedule(scheduleId, projectId, projectName, flowName, status, firstSchedTime, timezone, period, lastModifyTime, nextExecTime, submitTime, submitUser, execOptions, slaOptions);
+ Schedule sched = new Schedule(scheduleId, projectId, projectName, flowName, status, firstSchedTime, timezone, period, lastModifyTime, nextExecTime, submitTime, submitUser, execOptions);
logger.info("Scheduling flow '" + sched.getScheduleName() + "' for "
+ _dateFormat.print(firstSchedTime) + " with a period of "
+ period == null ? "(non-recurring)" : period);
@@ -531,25 +525,6 @@ public class ScheduleManager implements TriggerAgent {
throw new ScheduleManagerException("Scheduler invoked flow " + exflow.getExecutionId() + " has failed.", e);
}
- SlaOptions slaOptions = runningSched.getSlaOptions();
- if(slaOptions != null) {
- logger.info("Submitting SLA checkings for " + runningSched.getFlowName());
- // submit flow slas
- List<SlaSetting> jobsettings = new ArrayList<SlaSetting>();
- for(SlaSetting set : slaOptions.getSettings()) {
- if(set.getId().equals("")) {
- DateTime checkTime = new DateTime(runningSched.getNextExecTime()).plus(set.getDuration());
- slaManager.submitSla(exflow.getExecutionId(), "", checkTime, slaOptions.getSlaEmails(), set.getActions(), null, set.getRule());
- }
- else {
- jobsettings.add(set);
- }
- }
- if(jobsettings.size() > 0) {
- slaManager.submitSla(exflow.getExecutionId(), "", DateTime.now(), slaOptions.getSlaEmails(), new ArrayList<SlaAction>(), jobsettings, SlaRule.WAITANDCHECKJOB);
- }
- }
-
}
catch (ExecutorManagerException e) {
if (e.getReason() != null && e.getReason() == ExecutorManagerException.Reason.SkippedExecution) {
diff --git a/src/java/azkaban/scheduler/ScheduleManagerException.java b/src/java/azkaban/scheduler/ScheduleManagerException.java
index a977e2a..a15cabf 100644
--- a/src/java/azkaban/scheduler/ScheduleManagerException.java
+++ b/src/java/azkaban/scheduler/ScheduleManagerException.java
@@ -26,4 +26,8 @@ public class ScheduleManagerException extends Exception{
public ScheduleManagerException(String message, Throwable cause) {
super(message, cause);
}
+
+ public ScheduleManagerException(Exception e) {
+ super(e);
+ }
}
diff --git a/src/java/azkaban/scheduler/TriggerBasedScheduleLoader.java b/src/java/azkaban/scheduler/TriggerBasedScheduleLoader.java
index 311d673..b79fc3f 100644
--- a/src/java/azkaban/scheduler/TriggerBasedScheduleLoader.java
+++ b/src/java/azkaban/scheduler/TriggerBasedScheduleLoader.java
@@ -8,7 +8,6 @@ import java.util.Map;
import org.apache.log4j.Logger;
import org.joda.time.DateTime;
-import azkaban.actions.ExecuteFlowAction;
import azkaban.executor.ExecutorManager;
import azkaban.project.ProjectManager;
import azkaban.trigger.Condition;
@@ -18,6 +17,8 @@ import azkaban.trigger.TriggerAction;
import azkaban.trigger.TriggerManager;
import azkaban.trigger.TriggerManagerException;
import azkaban.trigger.TriggerStatus;
+import azkaban.trigger.builtin.BasicTimeChecker;
+import azkaban.trigger.builtin.ExecuteFlowAction;
public class TriggerBasedScheduleLoader implements ScheduleLoader {
@@ -80,7 +81,7 @@ public class TriggerBasedScheduleLoader implements ScheduleLoader {
public void insertSchedule(Schedule s) throws ScheduleManagerException {
Trigger t = scheduleToTrigger(s);
try {
- triggerManager.insertTrigger(t);
+ triggerManager.insertTrigger(t, t.getSubmitUser());
s.setScheduleId(t.getTriggerId());
// triggersLocalCopy.put(t.getTriggerId(), t);
} catch (TriggerManagerException e) {
@@ -93,7 +94,7 @@ public class TriggerBasedScheduleLoader implements ScheduleLoader {
public void updateSchedule(Schedule s) throws ScheduleManagerException {
Trigger t = scheduleToTrigger(s);
try {
- triggerManager.updateTrigger(t);
+ triggerManager.updateTrigger(t, t.getSubmitUser());
// triggersLocalCopy.put(t.getTriggerId(), t);
} catch (TriggerManagerException e) {
// TODO Auto-generated catch block
@@ -161,7 +162,7 @@ public class TriggerBasedScheduleLoader implements ScheduleLoader {
@Override
public void removeSchedule(Schedule s) throws ScheduleManagerException {
try {
- triggerManager.removeTrigger(s.getScheduleId());
+ triggerManager.removeTrigger(s.getScheduleId(), s.getSubmitUser());
// triggersLocalCopy.remove(s.getScheduleId());
} catch (TriggerManagerException e) {
// TODO Auto-generated catch block
@@ -180,7 +181,14 @@ public class TriggerBasedScheduleLoader implements ScheduleLoader {
@Override
public synchronized List<Schedule> loadUpdatedSchedules() throws ScheduleManagerException {
- List<Trigger> triggers = triggerManager.getUpdatedTriggers(triggerSource, lastUpdateTime);
+ List<Trigger> triggers;
+ try {
+ triggers = triggerManager.getUpdatedTriggers(triggerSource, lastUpdateTime);
+ } catch (TriggerManagerException e) {
+ // TODO Auto-generated catch block
+ e.printStackTrace();
+ throw new ScheduleManagerException(e);
+ }
List<Schedule> schedules = new ArrayList<Schedule>();
for(Trigger t : triggers) {
lastUpdateTime = Math.max(lastUpdateTime, t.getLastModifyTime().getMillis());
diff --git a/src/java/azkaban/scheduler/TriggerBasedScheduler.java b/src/java/azkaban/scheduler/TriggerBasedScheduler.java
index f3032d7..799c487 100644
--- a/src/java/azkaban/scheduler/TriggerBasedScheduler.java
+++ b/src/java/azkaban/scheduler/TriggerBasedScheduler.java
@@ -28,17 +28,15 @@ import org.joda.time.ReadablePeriod;
import org.joda.time.format.DateTimeFormat;
import org.joda.time.format.DateTimeFormatter;
-import azkaban.actions.ExecuteFlowAction;
import azkaban.executor.ExecutionOptions;
import azkaban.executor.ExecutorManager;
import azkaban.project.ProjectManager;
-import azkaban.sla.SLAManager;
-import azkaban.sla.SlaOptions;
import azkaban.trigger.Condition;
import azkaban.trigger.ConditionChecker;
import azkaban.trigger.Trigger;
import azkaban.trigger.TriggerAction;
import azkaban.trigger.TriggerManager;
+import azkaban.trigger.builtin.ExecuteFlowAction;
import azkaban.utils.Pair;
/**
@@ -149,7 +147,7 @@ public class TriggerBasedScheduler {
final long submitTime,
final String submitUser
) {
- return scheduleFlow(scheduleId, projectId, projectName, flowName, status, firstSchedTime, timezone, period, lastModifyTime, nextExecTime, submitTime, submitUser, null, null);
+ return scheduleFlow(scheduleId, projectId, projectName, flowName, status, firstSchedTime, timezone, period, lastModifyTime, nextExecTime, submitTime, submitUser, null);
}
public Schedule scheduleFlow(
@@ -165,10 +163,9 @@ public class TriggerBasedScheduler {
final long nextExecTime,
final long submitTime,
final String submitUser,
- ExecutionOptions execOptions,
- SlaOptions slaOptions
+ ExecutionOptions execOptions
) {
- Schedule sched = new Schedule(scheduleId, projectId, projectName, flowName, status, firstSchedTime, timezone, period, lastModifyTime, nextExecTime, submitTime, submitUser, execOptions, slaOptions);
+ Schedule sched = new Schedule(scheduleId, projectId, projectName, flowName, status, firstSchedTime, timezone, period, lastModifyTime, nextExecTime, submitTime, submitUser, execOptions);
logger.info("Scheduling flow '" + sched.getScheduleName() + "' for "
+ _dateFormat.print(firstSchedTime) + " with a period of "
+ period == null ? "(non-recurring)" : period);
diff --git a/src/java/azkaban/trigger/ActionTypeLoader.java b/src/java/azkaban/trigger/ActionTypeLoader.java
index 1f9a58e..1e4cc3c 100644
--- a/src/java/azkaban/trigger/ActionTypeLoader.java
+++ b/src/java/azkaban/trigger/ActionTypeLoader.java
@@ -14,7 +14,7 @@ import java.util.Set;
import org.apache.log4j.Logger;
-import azkaban.actions.ExecuteFlowAction;
+import azkaban.trigger.builtin.ExecuteFlowAction;
import azkaban.utils.Props;
import azkaban.utils.Utils;
diff --git a/src/java/azkaban/trigger/builtin/CreateTriggerAction.java b/src/java/azkaban/trigger/builtin/CreateTriggerAction.java
new file mode 100644
index 0000000..e231023
--- /dev/null
+++ b/src/java/azkaban/trigger/builtin/CreateTriggerAction.java
@@ -0,0 +1,64 @@
+package azkaban.trigger.builtin;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import azkaban.trigger.Trigger;
+import azkaban.trigger.TriggerAction;
+import azkaban.triggerapp.TriggerRunnerManager;
+
+public class CreateTriggerAction implements TriggerAction {
+
+ public static final String type = "CreateTriggerAction";
+ private static TriggerRunnerManager triggerRunnerManager;
+ private Trigger trigger;
+
+ public CreateTriggerAction(Trigger trigger) {
+ this.trigger = trigger;
+ }
+
+ @Override
+ public String getType() {
+ return type;
+ }
+
+ public static void setTriggerRunnerManager(TriggerRunnerManager trm) {
+ triggerRunnerManager = trm;
+ }
+
+ @SuppressWarnings("unchecked")
+ public static CreateTriggerAction createFromJson(Object obj) throws Exception {
+ Map<String, Object> jsonObj = (HashMap<String, Object>) obj;
+ if(!jsonObj.get("type").equals(type)) {
+ throw new Exception("Cannot create action of " + type + " from " + jsonObj.get("type"));
+ }
+ Trigger trigger = Trigger.fromJson(jsonObj.get("trigger"));
+ return new CreateTriggerAction(trigger);
+ }
+
+ @Override
+ public CreateTriggerAction fromJson(Object obj) throws Exception {
+ // TODO Auto-generated method stub
+ return createFromJson(obj);
+ }
+
+ @Override
+ public Object toJson() {
+ Map<String, Object> jsonObj = new HashMap<String, Object>();
+ jsonObj.put("type", type);
+ jsonObj.put("trigger", trigger.toJson());
+
+ return jsonObj;
+ }
+
+ @Override
+ public void doAction() throws Exception {
+ triggerRunnerManager.insertTrigger(trigger);
+ }
+
+ @Override
+ public String getDescription() {
+ return "create another: " + trigger.getDescription();
+ }
+
+}
diff --git a/src/java/azkaban/trigger/builtin/ExecutableFlowStatusChecker.java b/src/java/azkaban/trigger/builtin/ExecutableFlowStatusChecker.java
new file mode 100644
index 0000000..81553f9
--- /dev/null
+++ b/src/java/azkaban/trigger/builtin/ExecutableFlowStatusChecker.java
@@ -0,0 +1,104 @@
+package azkaban.trigger.builtin;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.log4j.Logger;
+
+import azkaban.executor.ExecutableFlow;
+import azkaban.executor.ExecutorManager;
+import azkaban.executor.ExecutorManagerException;
+import azkaban.executor.Status;
+import azkaban.trigger.ConditionChecker;
+
+public class ExecutableFlowStatusChecker implements ConditionChecker{
+ public static final String type = "ExecutableFlowStatusChecker";
+ private static Logger logger = Logger.getLogger(ExecutableFlowStatusChecker.class);
+ private int execId;
+ private Status status;
+ private String id;
+ private static ExecutorManager executorManager;
+
+ public ExecutableFlowStatusChecker(int execId, Status status, String id) {
+ this.execId = execId;
+ this.status = status;
+ this.id = id;
+ }
+
+ public static void setExecutorManager(ExecutorManager em) {
+ executorManager = em;
+ }
+
+ @Override
+ public Object eval() {
+ ExecutableFlow exflow;
+ try {
+ exflow = executorManager.fetchExecutableFlow(execId);
+ } catch (ExecutorManagerException e) {
+ // TODO Auto-generated catch block
+ e.printStackTrace();
+ logger.error("Failed to get executable flow status.");
+ return Boolean.FALSE;
+ }
+ Status flowStatus = exflow.getStatus();
+ return flowStatus.equals(status);
+ }
+
+ @Override
+ public Object getNum() {
+ // TODO Auto-generated method stub
+ return null;
+ }
+
+ @Override
+ public void reset() {
+ // TODO Auto-generated method stub
+
+ }
+
+ @Override
+ public String getId() {
+ return id;
+ }
+
+ @Override
+ public String getType() {
+ // TODO Auto-generated method stub
+ return type;
+ }
+
+ @Override
+ public ExecutableFlowStatusChecker fromJson(Object obj) throws Exception {
+ return createFromJson(obj);
+ }
+
+ @SuppressWarnings("unchecked")
+ public static ExecutableFlowStatusChecker createFromJson(Object obj) throws Exception {
+ Map<String, Object> jsonObj = (HashMap<String, Object>) obj;
+ if(!jsonObj.get("type").equals(type)) {
+ throw new Exception("Cannot create checker of " + type + " from " + jsonObj.get("type"));
+ }
+ String id = (String) jsonObj.get("id");
+ int execId = Integer.valueOf((String) jsonObj.get("execId"));
+ Status status = Status.valueOf((String) jsonObj.get("status"));
+ return new ExecutableFlowStatusChecker(execId, status, id);
+ }
+
+ @Override
+ public Object toJson() {
+ Map<String, Object> jsonObj = new HashMap<String, Object>();
+ jsonObj.put("type", type);
+ jsonObj.put("execId", String.valueOf(execId));
+ jsonObj.put("status", status.toString());
+ jsonObj.put("id", id);
+ return jsonObj;
+ }
+
+ @Override
+ public void stopChecker() {
+ // TODO Auto-generated method stub
+
+ }
+
+
+}
diff --git a/src/java/azkaban/trigger/CheckerTypeLoader.java b/src/java/azkaban/trigger/CheckerTypeLoader.java
index c64a66a..74f9ddf 100644
--- a/src/java/azkaban/trigger/CheckerTypeLoader.java
+++ b/src/java/azkaban/trigger/CheckerTypeLoader.java
@@ -14,7 +14,7 @@ import java.util.Set;
import org.apache.log4j.Logger;
-import azkaban.scheduler.BasicTimeChecker;
+import azkaban.trigger.builtin.BasicTimeChecker;
import azkaban.utils.Props;
import azkaban.utils.Utils;
diff --git a/src/java/azkaban/trigger/Condition.java b/src/java/azkaban/trigger/Condition.java
index 5d809d5..fcc1fa0 100644
--- a/src/java/azkaban/trigger/Condition.java
+++ b/src/java/azkaban/trigger/Condition.java
@@ -113,7 +113,7 @@ public class Condition {
} catch(Exception e) {
e.printStackTrace();
logger.error("Failed to recreate condition from json.", e);
- return null;
+ throw new Exception("Failed to recreate condition from json.", e);
}
return cond;
diff --git a/src/java/azkaban/trigger/ConditionChecker.java b/src/java/azkaban/trigger/ConditionChecker.java
index 8ce3c46..920eddf 100644
--- a/src/java/azkaban/trigger/ConditionChecker.java
+++ b/src/java/azkaban/trigger/ConditionChecker.java
@@ -18,5 +18,5 @@ public interface ConditionChecker {
Object toJson();
void stopChecker();
-
+
}
diff --git a/src/java/azkaban/trigger/JdbcTriggerLoader.java b/src/java/azkaban/trigger/JdbcTriggerLoader.java
index b5bde55..37f8131 100644
--- a/src/java/azkaban/trigger/JdbcTriggerLoader.java
+++ b/src/java/azkaban/trigger/JdbcTriggerLoader.java
@@ -43,11 +43,14 @@ public class JdbcTriggerLoader extends AbstractJdbcLoader implements TriggerLoad
private static final String triggerTblName = "triggers";
+ private static final String GET_UPDATED_TRIGGERS =
+ "SELECT trigger_id, trigger_source, modify_time, enc_type, data FROM " + triggerTblName + " WHERE modify_time>=?";
+
private static String GET_ALL_TRIGGERS =
"SELECT trigger_id, trigger_source, modify_time, enc_type, data FROM " + triggerTblName;
private static String GET_TRIGGER =
- "SELECT trigger_source, modify_time, enc_type, data FROM " + triggerTblName + " WHERE trigger_id=?";
+ "SELECT trigger_id, trigger_source, modify_time, enc_type, data FROM " + triggerTblName + " WHERE trigger_id=?";
private static String ADD_TRIGGER =
"INSERT INTO " + triggerTblName + " ( modify_time) values (?)";
@@ -71,6 +74,31 @@ public class JdbcTriggerLoader extends AbstractJdbcLoader implements TriggerLoad
}
@Override
+ public List<Trigger> getUpdatedTriggers(long lastUpdateTime) throws TriggerManagerException {
+ logger.info("Loading triggers changed since " + new DateTime(lastUpdateTime).toString());
+ Connection connection = getConnection();
+
+ QueryRunner runner = new QueryRunner();
+ ResultSetHandler<List<Trigger>> handler = new TriggerResultHandler();
+
+ List<Trigger> triggers;
+
+ try {
+ triggers = runner.query(connection, GET_UPDATED_TRIGGERS, handler, lastUpdateTime);
+ } catch (SQLException e) {
+ logger.error(GET_ALL_TRIGGERS + " failed.");
+
+ throw new TriggerManagerException("Loading triggers from db failed. ", e);
+ } finally {
+ DbUtils.closeQuietly(connection);
+ }
+
+ logger.info("Loaded " + triggers.size() + " triggers.");
+
+ return triggers;
+ }
+
+ @Override
public List<Trigger> loadTriggers() throws TriggerManagerException {
logger.info("Loading all triggers from db.");
Connection connection = getConnection();
@@ -155,6 +183,7 @@ public class JdbcTriggerLoader extends AbstractJdbcLoader implements TriggerLoad
logger.info("Updating trigger " + t.toString() + " into db.");
Connection connection = getConnection();
try{
+ t.setLastModifyTime(DateTime.now());
updateTrigger(connection, t, defaultEncodingType);
}
catch(Exception e) {
@@ -189,7 +218,7 @@ public class JdbcTriggerLoader extends AbstractJdbcLoader implements TriggerLoad
int updates = runner.update( connection,
UPDATE_TRIGGER,
t.getSource(),
- DateTime.now().getMillis(),
+ t.getLastModifyTime().getMillis(),
encType.getNumVal(),
data,
t.getTriggerId());
@@ -282,5 +311,33 @@ public class JdbcTriggerLoader extends AbstractJdbcLoader implements TriggerLoad
return connection;
}
+ @Override
+ public Trigger loadTrigger(int triggerId) throws TriggerManagerException {
+ logger.info("Loading trigger " + triggerId + " from db.");
+ Connection connection = getConnection();
+
+ QueryRunner runner = new QueryRunner();
+ ResultSetHandler<List<Trigger>> handler = new TriggerResultHandler();
+
+ List<Trigger> triggers;
+
+ try {
+ triggers = runner.query(connection, GET_TRIGGER, handler, triggerId);
+ } catch (SQLException e) {
+ logger.error(GET_TRIGGER + " failed.");
+ throw new TriggerManagerException("Loading trigger from db failed. ", e);
+ } finally {
+ DbUtils.closeQuietly(connection);
+ }
+
+ if(triggers.size() == 0) {
+ logger.error("Failed to load trigger " + triggerId);
+ throw new TriggerManagerException("Failed to load trigger " + triggerId);
+ }
+
+ return triggers.get(0);
+ }
+
+
}
src/java/azkaban/trigger/Trigger.java 111(+107 -4)
diff --git a/src/java/azkaban/trigger/Trigger.java b/src/java/azkaban/trigger/Trigger.java
index 673facc..962cf7a 100644
--- a/src/java/azkaban/trigger/Trigger.java
+++ b/src/java/azkaban/trigger/Trigger.java
@@ -22,6 +22,9 @@ public class Trigger {
private Condition triggerCondition;
private Condition expireCondition;
private List<TriggerAction> actions;
+ private List<TriggerAction> expireActions;
+
+ private Map<String, Object> info = new HashMap<String, Object>();
private static ActionTypeLoader actionTypeLoader;
@@ -61,6 +64,58 @@ public class Trigger {
return actions;
}
+ public List<TriggerAction> getExpireActions() {
+ return expireActions;
+ }
+
+ public Map<String, Object> getInfo() {
+ return info;
+ }
+
+ public void setInfo(Map<String, Object> info) {
+ this.info = info;
+ }
+
+ public Trigger(
+ DateTime lastModifyTime,
+ DateTime submitTime,
+ String submitUser,
+ String source,
+ Condition triggerCondition,
+ Condition expireCondition,
+ List<TriggerAction> actions,
+ List<TriggerAction> expireActions,
+ Map<String, Object> info) {
+ this.lastModifyTime = lastModifyTime;
+ this.submitTime = submitTime;
+ this.submitUser = submitUser;
+ this.source = source;
+ this.triggerCondition = triggerCondition;
+ this.expireCondition = expireCondition;
+ this.actions = actions;
+ this.expireActions = expireActions;
+ this.info = info;
+ }
+
+ public Trigger(
+ DateTime lastModifyTime,
+ DateTime submitTime,
+ String submitUser,
+ String source,
+ Condition triggerCondition,
+ Condition expireCondition,
+ List<TriggerAction> actions,
+ List<TriggerAction> expireActions) {
+ this.lastModifyTime = lastModifyTime;
+ this.submitTime = submitTime;
+ this.submitUser = submitUser;
+ this.source = source;
+ this.triggerCondition = triggerCondition;
+ this.expireCondition = expireCondition;
+ this.actions = actions;
+ this.expireActions = expireActions;
+ }
+
public Trigger(
DateTime lastModifyTime,
DateTime submitTime,
@@ -76,6 +131,7 @@ public class Trigger {
this.triggerCondition = triggerCondition;
this.expireCondition = expireCondition;
this.actions = actions;
+ this.expireActions = new ArrayList<TriggerAction>();
}
public Trigger(
@@ -86,7 +142,31 @@ public class Trigger {
String source,
Condition triggerCondition,
Condition expireCondition,
- List<TriggerAction> actions) {
+ List<TriggerAction> actions,
+ List<TriggerAction> expireActions,
+ Map<String, Object> info) {
+ this.triggerId = triggerId;
+ this.lastModifyTime = lastModifyTime;
+ this.submitTime = submitTime;
+ this.submitUser = submitUser;
+ this.source = source;
+ this.triggerCondition = triggerCondition;
+ this.expireCondition = expireCondition;
+ this.actions = actions;
+ this.expireActions = expireActions;
+ this.info = info;
+ }
+
+ public Trigger(
+ int triggerId,
+ DateTime lastModifyTime,
+ DateTime submitTime,
+ String submitUser,
+ String source,
+ Condition triggerCondition,
+ Condition expireCondition,
+ List<TriggerAction> actions,
+ List<TriggerAction> expireActions) {
this.triggerId = triggerId;
this.lastModifyTime = lastModifyTime;
this.submitTime = submitTime;
@@ -95,6 +175,7 @@ public class Trigger {
this.triggerCondition = triggerCondition;
this.expireCondition = expireCondition;
this.actions = actions;
+ this.expireActions = expireActions;
}
public static synchronized void setActionTypeLoader(ActionTypeLoader loader) {
@@ -124,6 +205,10 @@ public class Trigger {
public DateTime getLastModifyTime() {
return lastModifyTime;
}
+
+ public void setLastModifyTime(DateTime lastModifyTime) {
+ this.lastModifyTime = lastModifyTime;
+ }
public void setTriggerId(int id) {
this.triggerId = id;
@@ -165,6 +250,15 @@ public class Trigger {
actionsJson.add(oneActionJson);
}
jsonObj.put("actions", actionsJson);
+ List<Object> expireActionsJson = new ArrayList<Object>();
+ for(TriggerAction expireAction : expireActions) {
+ Map<String, Object> oneExpireActionJson = new HashMap<String, Object>();
+ oneExpireActionJson.put("type", expireAction.getType());
+ oneExpireActionJson.put("actionJson", expireAction.toJson());
+ expireActionsJson.add(oneExpireActionJson);
+ }
+ jsonObj.put("expireActions", expireActionsJson);
+
jsonObj.put("resetOnTrigger", String.valueOf(resetOnTrigger));
jsonObj.put("resetOnExpire", String.valueOf(resetOnExpire));
jsonObj.put("submitUser", submitUser);
@@ -173,7 +267,7 @@ public class Trigger {
jsonObj.put("lastModifyTime", String.valueOf(lastModifyTime.getMillis()));
jsonObj.put("triggerId", String.valueOf(triggerId));
jsonObj.put("status", status.toString());
-
+ jsonObj.put("info", info);
return jsonObj;
}
@@ -203,6 +297,14 @@ public class Trigger {
TriggerAction act = actionTypeLoader.createActionFromJson(type, oneActionJson.get("actionJson"));
actions.add(act);
}
+ List<TriggerAction> expireActions = new ArrayList<TriggerAction>();
+ List<Object> expireActionsJson = (List<Object>) jsonObj.get("expireActions");
+ for(Object expireActObj : expireActionsJson) {
+ Map<String, Object> oneExpireActionJson = (HashMap<String, Object>) expireActObj;
+ String type = (String) oneExpireActionJson.get("type");
+ TriggerAction expireAct = actionTypeLoader.createActionFromJson(type, oneExpireActionJson.get("actionJson"));
+ expireActions.add(expireAct);
+ }
boolean resetOnTrigger = Boolean.valueOf((String) jsonObj.get("resetOnTrigger"));
boolean resetOnExpire = Boolean.valueOf((String) jsonObj.get("resetOnExpire"));
String submitUser = (String) jsonObj.get("submitUser");
@@ -213,14 +315,15 @@ public class Trigger {
DateTime lastModifyTime = new DateTime(lastModifyTimeMillis);
int triggerId = Integer.valueOf((String) jsonObj.get("triggerId"));
TriggerStatus status = TriggerStatus.valueOf((String)jsonObj.get("status"));
- trigger = new Trigger(triggerId, lastModifyTime, submitTime, submitUser, source, triggerCond, expireCond, actions);
+ Map<String, Object> info = (Map<String, Object>) jsonObj.get("info");
+ trigger = new Trigger(triggerId, lastModifyTime, submitTime, submitUser, source, triggerCond, expireCond, actions, expireActions, info);
trigger.setResetOnExpire(resetOnExpire);
trigger.setResetOnTrigger(resetOnTrigger);
trigger.setStatus(status);
}catch(Exception e) {
e.printStackTrace();
logger.error("Failed to decode the trigger.", e);
- return null;
+ throw new Exception("Failed to decode the trigger.", e);
}
return trigger;
diff --git a/src/java/azkaban/trigger/TriggerAction.java b/src/java/azkaban/trigger/TriggerAction.java
index f6adf5d..85b6ad8 100644
--- a/src/java/azkaban/trigger/TriggerAction.java
+++ b/src/java/azkaban/trigger/TriggerAction.java
@@ -4,7 +4,7 @@ public interface TriggerAction {
String getType();
- TriggerAction fromJson(Object obj);
+ TriggerAction fromJson(Object obj) throws Exception;
Object toJson();
diff --git a/src/java/azkaban/trigger/TriggerAgent.java b/src/java/azkaban/trigger/TriggerAgent.java
index f86d289..453f49d 100644
--- a/src/java/azkaban/trigger/TriggerAgent.java
+++ b/src/java/azkaban/trigger/TriggerAgent.java
@@ -1,17 +1,12 @@
package azkaban.trigger;
-import java.io.File;
-
import azkaban.utils.Props;
public interface TriggerAgent {
- public void loadTriggerFromProps(Props props) throws Exception;
+ void loadTriggerFromProps(Props props) throws Exception;
- public String getTriggerSource();
+ String getTriggerSource();
- void load();
-
-// // update local copy
-// public void updateLocal();
+ void start() throws Exception;
}
diff --git a/src/java/azkaban/trigger/TriggerLoader.java b/src/java/azkaban/trigger/TriggerLoader.java
index cf37634..c3e604b 100644
--- a/src/java/azkaban/trigger/TriggerLoader.java
+++ b/src/java/azkaban/trigger/TriggerLoader.java
@@ -13,6 +13,10 @@ public interface TriggerLoader {
public void updateTrigger(Trigger t) throws TriggerManagerException;
- public List<Trigger> loadTriggers() throws TriggerManagerException;
+ public List<Trigger> loadTriggers() throws TriggerManagerException;
+
+ public Trigger loadTrigger(int triggerId) throws TriggerManagerException;
+
+ public List<Trigger> getUpdatedTriggers(long lastUpdateTime) throws TriggerManagerException;
}
src/java/azkaban/trigger/TriggerManager.java 574(+374 -200)
diff --git a/src/java/azkaban/trigger/TriggerManager.java b/src/java/azkaban/trigger/TriggerManager.java
index 11311dd..460a75f 100644
--- a/src/java/azkaban/trigger/TriggerManager.java
+++ b/src/java/azkaban/trigger/TriggerManager.java
@@ -1,66 +1,112 @@
+/*
+ * Copyright 2012 LinkedIn, Inc
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
package azkaban.trigger;
import java.io.File;
import java.io.FileFilter;
+import java.io.IOException;
+import java.lang.Thread.State;
+import java.net.URI;
+import java.net.URISyntaxException;
import java.util.ArrayList;
import java.util.HashMap;
+import java.util.HashSet;
import java.util.List;
import java.util.Map;
-import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.LinkedBlockingDeque;
-import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import org.apache.http.client.HttpClient;
+import org.apache.http.client.ResponseHandler;
+import org.apache.http.client.methods.HttpGet;
+import org.apache.http.client.utils.URIBuilder;
+import org.apache.http.impl.client.BasicResponseHandler;
+import org.apache.http.impl.client.DefaultHttpClient;
import org.apache.log4j.Logger;
+import org.joda.time.DateTime;
+import azkaban.triggerapp.TriggerConnectorParams;
import azkaban.utils.JSONUtils;
+import azkaban.utils.Pair;
import azkaban.utils.Props;
+/**
+ * Executor manager used to manage the client side job.
+ *
+ */
public class TriggerManager {
private static Logger logger = Logger.getLogger(TriggerManager.class);
-
+
private static final String TRIGGER_SUFFIX = ".trigger";
- private static Map<Integer, Trigger> triggerIdMap = new HashMap<Integer, Trigger>();
+ private TriggerLoader triggerLoader;
+ private CheckerTypeLoader checkerTypeLoader;
+ private ActionTypeLoader actionTypeLoader;
- private CheckerTypeLoader checkerLoader;
- private ActionTypeLoader actionLoader;
+ private String triggerServerHost;
+ private int triggerServerPort;
- private static TriggerLoader triggerLoader;
+ private Set<Pair<String, Integer>> triggerServers = new HashSet<Pair<String,Integer>>();
- private static TriggerScannerThread scannerThread;
+ private ConcurrentHashMap<Integer, Trigger> triggerIdMap = new ConcurrentHashMap<Integer, Trigger>();
private Map<String, TriggerAgent> triggerAgents = new HashMap<String, TriggerAgent>();
+
+ private TriggerManagerUpdaterThread triggerManagingThread;
- public TriggerManager(Props props, TriggerLoader triggerLoader) {
-
- TriggerManager.triggerLoader = triggerLoader;
- checkerLoader = new CheckerTypeLoader();
- actionLoader = new ActionTypeLoader();
+ private long lastThreadCheckTime = -1;
+
+ private long lastUpdateTime = -1;
+
+ public TriggerManager(Props props, TriggerLoader loader) throws TriggerManagerException {
+ this.triggerLoader = loader;
+ this.checkerTypeLoader = new CheckerTypeLoader();
+ this.actionTypeLoader = new ActionTypeLoader();
+
+ triggerServerHost = props.getString("trigger.server.host", "localhost");
+ triggerServerPort = props.getInt("trigger.server.port");
+
+ triggerManagingThread = new TriggerManagerUpdaterThread();
- // load plugins
try{
- checkerLoader.init(props);
- actionLoader.init(props);
+ checkerTypeLoader.init(props);
+ actionTypeLoader.init(props);
} catch(Exception e) {
e.printStackTrace();
logger.error(e.getMessage());
}
- Condition.setCheckerLoader(checkerLoader);
- Trigger.setActionTypeLoader(actionLoader);
-
- checkerLoader = new CheckerTypeLoader();
- actionLoader = new ActionTypeLoader();
-
- long scannerInterval = props.getLong("trigger.scan.interval", TriggerScannerThread.DEFAULT_SCAN_INTERVAL_MS);
- scannerThread = new TriggerScannerThread(scannerInterval);
- scannerThread.setName("TriggerScannerThread");
+ Condition.setCheckerLoader(checkerTypeLoader);
+ Trigger.setActionTypeLoader(actionTypeLoader);
+ triggerServers.add(new Pair<String, Integer>(triggerServerHost, triggerServerPort));
+
+ }
+
+ public void start() throws Exception {
+ loadTriggers();
+ for(TriggerAgent agent : triggerAgents.values()) {
+ agent.start();
+ }
+ triggerManagingThread.start();
}
private static class SuffixFilter implements FileFilter {
private String suffix;
-
public SuffixFilter(String suffix) {
this.suffix = suffix;
}
@@ -68,257 +114,385 @@ public class TriggerManager {
@Override
public boolean accept(File pathname) {
String name = pathname.getName();
-
return pathname.isFile() && !pathname.isHidden() && name.length() > suffix.length() && name.endsWith(suffix);
}
}
- @SuppressWarnings("unchecked")
- public void loadTriggerFromDir(File baseDir, Props props) throws Exception {
- File[] triggerFiles = baseDir.listFiles(new SuffixFilter(TRIGGER_SUFFIX));
-
- for(File triggerFile : triggerFiles) {
- Props triggerProps = new Props(props, triggerFile);
- String triggerType = triggerProps.getString("trigger.type");
- TriggerAgent agent = triggerAgents.get(triggerType);
- if(agent != null) {
- agent.loadTriggerFromProps(triggerProps);
- } else {
- throw new Exception("Trigger " + triggerType + " is not supported.");
- }
- }
+ public String getTriggerServerHost() {
+ return triggerServerHost;
}
- public void addTriggerAgent(String triggerSource, TriggerAgent agent) throws TriggerManagerException {
- if(triggerAgents.containsKey(triggerSource)) {
- throw new TriggerManagerException("Trigger agent " + triggerSource + " already exists!" );
- }
- this.triggerAgents.put(triggerSource, agent);
+ public int getTriggerServerPort() {
+ return triggerServerPort;
}
- public void start() {
-
- try{
- // expect loader to return valid triggers
- List<Trigger> triggers = triggerLoader.loadTriggers();
- for(Trigger t : triggers) {
- scannerThread.addTrigger(t);
- triggerIdMap.put(t.getTriggerId(), t);
- }
- }catch(Exception e) {
- e.printStackTrace();
- logger.error(e.getMessage());
- }
-
- for(TriggerAgent agent : triggerAgents.values()) {
- agent.load();
- }
-
- scannerThread.start();
+ public State getUpdaterThreadState() {
+ return triggerManagingThread.getState();
}
- public CheckerTypeLoader getCheckerLoader() {
- return checkerLoader;
+ public boolean isThreadActive() {
+ return triggerManagingThread.isAlive();
}
-
- public ActionTypeLoader getActionLoader() {
- return actionLoader;
+
+ public long getLastThreadCheckTime() {
+ return lastThreadCheckTime;
}
-
- public synchronized void insertTrigger(Trigger t) throws TriggerManagerException {
-
- triggerLoader.addTrigger(t);
- triggerIdMap.put(t.getTriggerId(), t);
- scannerThread.addTrigger(t);
+
+ public Set<String> getPrimaryServerHosts() {
+ // Only one for now. More probably later.
+ HashSet<String> ports = new HashSet<String>();
+ ports.add(triggerServerHost + ":" + triggerServerPort);
+ return ports;
}
- public synchronized void removeTrigger(int id) throws TriggerManagerException {
- Trigger t = triggerIdMap.get(id);
- if(t != null) {
- removeTrigger(triggerIdMap.get(id));
+ private void loadTriggers() throws TriggerManagerException {
+ List<Trigger> triggerList = triggerLoader.loadTriggers();
+ for(Trigger t : triggerList) {
+ triggerIdMap.put(t.getTriggerId(), t);
}
}
- //TODO: update corresponding agents
- public synchronized void updateTrigger(Trigger t) throws TriggerManagerException {
- if(!triggerIdMap.containsKey(t.getTriggerId())) {
- throw new TriggerManagerException("The trigger to update doesn't exist!");
+ public Trigger getTrigger(int triggerId) {
+ return triggerIdMap.get(triggerId);
+ }
+
+ public void removeTrigger(Trigger t, String userId) throws TriggerManagerException {
+ synchronized(t) {
+ callTriggerServer(t, TriggerConnectorParams.REMOVE_TRIGGER_ACTION, userId);
}
-
- scannerThread.deleteTrigger(t);
- scannerThread.addTrigger(t);
- triggerIdMap.put(t.getTriggerId(), t);
-
- triggerLoader.updateTrigger(t);
}
+
- //TODO: update corresponding agents
- public synchronized void removeTrigger(Trigger t) throws TriggerManagerException {
- t.stopCheckers();
- triggerLoader.removeTrigger(t);
- scannerThread.deleteTrigger(t);
- triggerIdMap.remove(t.getTriggerId());
+ public void updateTrigger(Trigger t, String userId) throws TriggerManagerException {
+ synchronized(t) {
+ try {
+ callTriggerServer(t, TriggerConnectorParams.UPDATE_TRIGGER_ACTION, userId);
+ } catch(TriggerManagerException e) {
+ throw new TriggerManagerException(e);
+ }
+ }
}
- public List<Trigger> getTriggers() {
- return new ArrayList<Trigger>(triggerIdMap.values());
+// public void getUpdatedTriggers() throws TriggerManagerException {
+// try {
+// callTriggerServer(triggerServerHost, triggerServerPort, TriggerConnectorParams.GET_UPDATE_ACTION, null, "azkaban", (Pair<String,String>[])null);
+// } catch(IOException e) {
+// throw new TriggerManagerException(e);
+// }
+// }
+
+ public String insertTrigger(Trigger t, String userId) throws TriggerManagerException {
+ synchronized(t) {
+ String message = null;
+ logger.info("Inserting trigger into system. " );
+ // The trigger id is set by the loader. So it's unavailable until after this call.
+ triggerLoader.addTrigger(t);
+ try {
+ callTriggerServer(t, TriggerConnectorParams.INSERT_TRIGGER_ACTION, userId);
+ triggerIdMap.put(t.getTriggerId(), t);
+
+ message += "Trigger inserted successfully with trigger id " + t.getTriggerId();
+ }
+ catch (TriggerManagerException e) {
+ throw e;
+ }
+ return message;
+ }
}
- public Map<String, Class<? extends ConditionChecker>> getSupportedCheckers() {
- return checkerLoader.getSupportedCheckers();
+ private Map<String, Object> callTriggerServer(Trigger t, String action, String user) throws TriggerManagerException {
+ try {
+ Map<String, Object> info = t.getInfo();
+ return callTriggerServer(triggerServerHost, triggerServerPort, action, t.getTriggerId(), null, (Pair<String,String>[])null);
+ } catch (IOException e) {
+ throw new TriggerManagerException(e);
+ }
}
-
-// private void updateAgent(Trigger t) {
-// TriggerAgent agent = triggerAgents.get(t.getSource());
-// if(agent != null) {
-// agent.updateLocal(t);
-// }
-//
-// }
- //trigger scanner thread
- public class TriggerScannerThread extends Thread {
-
- //public static final long DEFAULT_SCAN_INTERVAL_MS = 300000;
- public static final long DEFAULT_SCAN_INTERVAL_MS = 60000;
+ private Map<String, Object> callTriggerServer(String host, int port, String action, Integer triggerId, String user, Pair<String,String> ... params) throws IOException {
+ URIBuilder builder = new URIBuilder();
+ builder.setScheme("http")
+ .setHost(host)
+ .setPort(port)
+ .setPath("/trigger");
+
+ builder.setParameter(TriggerConnectorParams.ACTION_PARAM, action);
- private final BlockingQueue<Trigger> triggers;
- private AtomicBoolean stillAlive = new AtomicBoolean(true);
- private long lastCheckTime = -1;
- private final long scanInterval;
+ if (triggerId != null) {
+ builder.setParameter(TriggerConnectorParams.TRIGGER_ID_PARAM,String.valueOf(triggerId));
+ }
- // Five minute minimum intervals
+ if (user != null) {
+ builder.setParameter(TriggerConnectorParams.USER_PARAM, user);
+ }
- public TriggerScannerThread(){
- triggers = new LinkedBlockingDeque<Trigger>();
- this.scanInterval = DEFAULT_SCAN_INTERVAL_MS;
+ if (params != null) {
+ for (Pair<String, String> pair: params) {
+ builder.setParameter(pair.getFirst(), pair.getSecond());
+ }
}
- public TriggerScannerThread(long interval){
- triggers = new LinkedBlockingDeque<Trigger>();
- this.scanInterval = interval;
+ URI uri = null;
+ try {
+ uri = builder.build();
+ } catch (URISyntaxException e) {
+ throw new IOException(e);
}
- public void shutdown() {
- logger.error("Shutting down trigger manager thread " + this.getName());
- stillAlive.set(false);
- this.interrupt();
+ ResponseHandler<String> responseHandler = new BasicResponseHandler();
+
+ HttpClient httpclient = new DefaultHttpClient();
+ HttpGet httpget = new HttpGet(uri);
+ String response = null;
+ try {
+ response = httpclient.execute(httpget, responseHandler);
+ } catch (IOException e) {
+ throw e;
+ }
+ finally {
+ httpclient.getConnectionManager().shutdown();
}
- public synchronized List<Trigger> getTriggers() {
- return new ArrayList<Trigger>(triggers);
+ @SuppressWarnings("unchecked")
+ Map<String, Object> jsonResponse = (Map<String, Object>)JSONUtils.parseJSONFromString(response);
+ String error = (String)jsonResponse.get(TriggerConnectorParams.RESPONSE_ERROR);
+ if (error != null) {
+ throw new IOException(error);
}
- public synchronized void addTrigger(Trigger t) {
- triggers.add(t);
+ return jsonResponse;
+ }
+
+ public Map<String, Object> callTriggerServerJMX(String hostPort, String action, String mBean) throws IOException {
+ URIBuilder builder = new URIBuilder();
+
+ String[] hostPortSplit = hostPort.split(":");
+ builder.setScheme("http")
+ .setHost(hostPortSplit[0])
+ .setPort(Integer.parseInt(hostPortSplit[1]))
+ .setPath("/jmx");
+
+ builder.setParameter(action, "");
+ if (mBean != null) {
+ builder.setParameter(TriggerConnectorParams.JMX_MBEAN, mBean);
+ }
+
+ URI uri = null;
+ try {
+ uri = builder.build();
+ } catch (URISyntaxException e) {
+ throw new IOException(e);
+ }
+
+ ResponseHandler<String> responseHandler = new BasicResponseHandler();
+
+ HttpClient httpclient = new DefaultHttpClient();
+ HttpGet httpget = new HttpGet(uri);
+ String response = null;
+ try {
+ response = httpclient.execute(httpget, responseHandler);
+ } catch (IOException e) {
+ throw e;
+ }
+ finally {
+ httpclient.getConnectionManager().shutdown();
+ }
+
+ @SuppressWarnings("unchecked")
+ Map<String, Object> jsonResponse = (Map<String, Object>)JSONUtils.parseJSONFromString(response);
+ String error = (String)jsonResponse.get(TriggerConnectorParams.RESPONSE_ERROR);
+ if (error != null) {
+ throw new IOException(error);
+ }
+ return jsonResponse;
+ }
+
+ public void shutdown() {
+ triggerManagingThread.shutdown();
+ }
+
+ private class TriggerManagerUpdaterThread extends Thread {
+ private boolean shutdown = false;
+
+ public TriggerManagerUpdaterThread() {
+ this.setName("TriggerManagingThread");
}
+
+ private int waitTimeIdleMs = 2000;
+ private int waitTimeMs = 500;
- public synchronized void deleteTrigger(Trigger t) {
- triggers.remove(t);
+ private void shutdown() {
+ shutdown = true;
}
+ @SuppressWarnings("unchecked")
public void run() {
- while(stillAlive.get()) {
- synchronized (this) {
+ while(!shutdown) {
+ try {
+ lastThreadCheckTime = System.currentTimeMillis();
+
+ Pair<String, Integer> triggerServer = (Pair<String, Integer>) triggerServers.toArray()[0];
+
+ Pair<String, String> updateTimeParam = new Pair<String, String>("lastUpdateTime", String.valueOf(lastUpdateTime));
+ Map<String, Object> results = null;
try{
- lastCheckTime = System.currentTimeMillis();
-
- try{
- checkAllTriggers();
- } catch(Exception e) {
- e.printStackTrace();
- logger.error(e.getMessage());
- } catch(Throwable t) {
- t.printStackTrace();
- logger.error(t.getMessage());
+ results = callTriggerServer(triggerServer.getFirst(), triggerServer.getSecond(), TriggerConnectorParams.GET_UPDATE_ACTION, null, "azkaban", updateTimeParam);
+// lastUpdateTime = (Long) results.get(TriggerConnectorParams.RESPONSE_UPDATETIME);
+ List<Integer> updates = (List<Integer>) results.get("updates");
+ for(Integer update : updates) {
+ Trigger t = triggerLoader.loadTrigger(update);
+ lastUpdateTime = Math.max(lastUpdateTime, t.getLastModifyTime().getMillis());
+ triggerIdMap.put(update, t);
}
+ } catch (Exception e) {
+ logger.error(e);
- long timeRemaining = scanInterval - (System.currentTimeMillis() - lastCheckTime);
- if(timeRemaining < 0) {
- logger.error("Trigger manager thread " + this.getName() + " is too busy!");
- } else {
- wait(timeRemaining);
- }
- } catch(InterruptedException e) {
- logger.info("Interrupted. Probably to shut down.");
}
+ synchronized(this) {
+ try {
+ if (triggerIdMap.size() > 0) {
+ this.wait(waitTimeMs);
+ }
+ else {
+ this.wait(waitTimeIdleMs);
+ }
+ } catch (InterruptedException e) {
+ }
+ }
+ }
+ catch (Exception e) {
+ logger.error(e);
}
}
}
+ }
+
+ private static class ConnectionInfo {
+ private String host;
+ private int port;
+
+ public ConnectionInfo(String host, int port) {
+ this.host = host;
+ this.port = port;
+ }
+
+ @SuppressWarnings("unused")
+ private ConnectionInfo getOuterType() {
+ return ConnectionInfo.this;
+ }
- private void checkAllTriggers() throws TriggerManagerException {
- for(Trigger t : triggers) {
- if(t.getStatus().equals(TriggerStatus.READY)) {
- if(t.triggerConditionMet()) {
- onTriggerTrigger(t);
- } else if (t.expireConditionMet()) {
- onTriggerExpire(t);
- }
- }
- }
+ public boolean isEqual(String host, int port) {
+ return this.port == port && this.host.equals(host);
}
- private void onTriggerTrigger(Trigger t) throws TriggerManagerException {
- List<TriggerAction> actions = t.getTriggerActions();
- for(TriggerAction action : actions) {
- try {
- action.doAction();
- } catch (Exception e) {
- // TODO Auto-generated catch block
- throw new TriggerManagerException("action failed to execute", e);
- }
- }
- if(t.isResetOnTrigger()) {
- t.resetTriggerConditions();
- t.resetExpireCondition();
- updateTrigger(t);
- } else {
- t.setStatus(TriggerStatus.EXPIRED);
- }
-// updateAgent(t);
+ public String getHost() {
+ return host;
+ }
+
+ public int getPort() {
+ return port;
}
- private void onTriggerExpire(Trigger t) throws TriggerManagerException {
- if(t.isResetOnExpire()) {
- t.resetTriggerConditions();
- t.resetExpireCondition();
- updateTrigger(t);
+ @Override
+ public int hashCode() {
+ final int prime = 31;
+ int result = 1;
+ result = prime * result + ((host == null) ? 0 : host.hashCode());
+ result = prime * result + port;
+ return result;
+ }
+
+ @Override
+ public boolean equals(Object obj) {
+ if (this == obj)
+ return true;
+ if (obj == null)
+ return false;
+ if (getClass() != obj.getClass())
+ return false;
+ ConnectionInfo other = (ConnectionInfo) obj;
+ if (host == null) {
+ if (other.host != null)
+ return false;
+ } else if (!host.equals(other.host))
+ return false;
+ if (port != other.port)
+ return false;
+ return true;
+ }
+ }
+
+ public void loadTriggerFromDir(File baseDir, Props props) throws Exception {
+ File[] triggerFiles = baseDir.listFiles(new SuffixFilter(TRIGGER_SUFFIX));
+
+ for(File triggerFile : triggerFiles) {
+ Props triggerProps = new Props(props, triggerFile);
+ String triggerType = triggerProps.getString("trigger.type");
+ TriggerAgent agent = triggerAgents.get(triggerType);
+ if(agent != null) {
+ agent.loadTriggerFromProps(triggerProps);
} else {
- t.setStatus(TriggerStatus.EXPIRED);
+ throw new Exception("Trigger " + triggerType + " is not supported.");
}
-// updateAgent(t);
}
}
- public synchronized Trigger getTrigger(int triggerId) {
- return triggerIdMap.get(triggerId);
+ public List<Trigger> getTriggers() {
+ return new ArrayList<Trigger>(triggerIdMap.values());
}
public void expireTrigger(int triggerId) {
- Trigger t = getTrigger(triggerId);
- t.setStatus(TriggerStatus.EXPIRED);
-// updateAgent(t);
+ // TODO Auto-generated method stub
+
+ }
+
+ public CheckerTypeLoader getCheckerLoader() {
+ return checkerTypeLoader;
+ }
+
+ public ActionTypeLoader getActionLoader() {
+ return actionTypeLoader;
+ }
+
+ public void addTriggerAgent(String triggerSource,
+ TriggerAgent agent) {
+ triggerAgents.put(triggerSource, agent);
}
public List<Trigger> getTriggers(String triggerSource) {
- List<Trigger> triggers = new ArrayList<Trigger>();
+ List<Trigger> results = new ArrayList<Trigger>();
for(Trigger t : triggerIdMap.values()) {
if(t.getSource().equals(triggerSource)) {
- triggers.add(t);
+ results.add(t);
}
}
- return triggers;
+ return results;
}
- public List<Trigger> getUpdatedTriggers(String triggerSource, long lastUpdateTime) {
+ public List<Trigger> getUpdatedTriggers(String triggerSource, long lastUpdateTime) throws TriggerManagerException {
+ getUpdatedTriggers();
List<Trigger> triggers = new ArrayList<Trigger>();
for(Trigger t : triggerIdMap.values()) {
- if(t.getSource().equals(triggerSource) && t.getLastModifyTime().getMillis() > lastUpdateTime) {
+ if(t.getSource().equals(triggerSource) && t.getLastModifyTime().getMillis() >= lastUpdateTime) {
triggers.add(t);
}
}
return triggers;
}
+ private void getUpdatedTriggers() throws TriggerManagerException {
+ List<Trigger> triggers = triggerLoader.getUpdatedTriggers(this.lastUpdateTime);
+ for(Trigger t : triggers) {
+ this.lastUpdateTime = Math.max(this.lastUpdateTime, t.getLastModifyTime().getMillis());
+ triggerIdMap.put(t.getTriggerId(), t);
+ }
+ }
+
+ public void removeTrigger(int scheduleId, String submitUser) throws TriggerManagerException {
+ removeTrigger(triggerIdMap.get(scheduleId), submitUser);
+ }
+
+
}
+
diff --git a/src/java/azkaban/trigger/TriggerManagerException.java b/src/java/azkaban/trigger/TriggerManagerException.java
index 5d30b39..c12a0e1 100644
--- a/src/java/azkaban/trigger/TriggerManagerException.java
+++ b/src/java/azkaban/trigger/TriggerManagerException.java
@@ -27,5 +27,9 @@ public class TriggerManagerException extends Exception{
public TriggerManagerException(String message, Throwable cause) {
super(message, cause);
}
+
+ public TriggerManagerException(Throwable e) {
+ super(e);
+ }
}
diff --git a/src/java/azkaban/trigger/TriggerStatus.java b/src/java/azkaban/trigger/TriggerStatus.java
index 3fcadf7..8d397bc 100644
--- a/src/java/azkaban/trigger/TriggerStatus.java
+++ b/src/java/azkaban/trigger/TriggerStatus.java
@@ -1,7 +1,7 @@
package azkaban.trigger;
public enum TriggerStatus {
- READY(10), PAUSED(20), EXPIRED(30);
+ READY(10), PAUSED(20), EXPIRED(30), PREPARING(40);
private int numVal;
@@ -21,6 +21,8 @@ public enum TriggerStatus {
return PAUSED;
case 30:
return EXPIRED;
+ case 40:
+ return PREPARING;
default:
return READY;
}
diff --git a/src/java/azkaban/triggerapp/AzkabanTriggerServer.java b/src/java/azkaban/triggerapp/AzkabanTriggerServer.java
new file mode 100644
index 0000000..c84198f
--- /dev/null
+++ b/src/java/azkaban/triggerapp/AzkabanTriggerServer.java
@@ -0,0 +1,481 @@
+package azkaban.triggerapp;
+
+import java.io.File;
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.lang.management.ManagementFactory;
+import java.net.MalformedURLException;
+import java.net.URL;
+import java.net.URLClassLoader;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.TimeZone;
+
+import javax.management.MBeanInfo;
+import javax.management.MBeanServer;
+import javax.management.ObjectName;
+
+import org.apache.log4j.Logger;
+import org.joda.time.DateTimeZone;
+import org.mortbay.jetty.Server;
+import org.mortbay.jetty.bio.SocketConnector;
+import org.mortbay.jetty.security.SslSocketConnector;
+import org.mortbay.jetty.servlet.Context;
+import org.mortbay.jetty.servlet.ServletHolder;
+import org.mortbay.thread.QueuedThreadPool;
+
+import azkaban.executor.ExecutorManager;
+import azkaban.executor.JdbcExecutorLoader;
+import azkaban.jmx.JmxJettyServer;
+import azkaban.jmx.JmxTriggerRunnerManager;
+import azkaban.project.JdbcProjectLoader;
+import azkaban.project.ProjectManager;
+import azkaban.trigger.ActionTypeLoader;
+import azkaban.trigger.CheckerTypeLoader;
+import azkaban.trigger.JdbcTriggerLoader;
+import azkaban.trigger.TriggerLoader;
+import azkaban.trigger.builtin.BasicTimeChecker;
+import azkaban.trigger.builtin.CreateTriggerAction;
+import azkaban.trigger.builtin.ExecutableFlowStatusChecker;
+import azkaban.trigger.builtin.ExecuteFlowAction;
+import azkaban.utils.FileIOUtils;
+import azkaban.utils.Props;
+import azkaban.utils.PropsUtils;
+import azkaban.utils.Utils;
+import azkaban.webapp.AzkabanServer;
+import azkaban.webapp.servlet.AzkabanServletContextListener;
+
+public class AzkabanTriggerServer {
+ private static final Logger logger = Logger.getLogger(AzkabanTriggerServer.class);
+ private static final int MAX_FORM_CONTENT_SIZE = 10*1024*1024;
+
+ public static final String AZKABAN_HOME = "AZKABAN_HOME";
+ public static final String DEFAULT_CONF_PATH = "conf";
+ public static final String AZKABAN_PROPERTIES_FILE = "azkaban.properties";
+ public static final String AZKABAN_PRIVATE_PROPERTIES_FILE = "azkaban.private.properties";
+ public static final String TRIGGER_PLUGIN_DIR = "trigger.plugin.dir";
+ public static final int DEFAULT_PORT_NUMBER = 22321;
+ public static final int DEFAULT_THREAD_NUMBER = 50;
+
+ private static final String DEFAULT_TIMEZONE_ID = "default.timezone.id";
+
+ private static AzkabanTriggerServer app;
+
+ private TriggerLoader triggerLoader;
+ private TriggerRunnerManager triggerRunnerManager;
+ private ExecutorManager executorManager;
+ private ProjectManager projectManager;
+ private Props props;
+ private Server server;
+
+ private ArrayList<ObjectName> registeredMBeans = new ArrayList<ObjectName>();
+ private MBeanServer mbeanServer;
+
+ /**
+ * Constructor
+ *
+ * @throws Exception
+ */
+ public AzkabanTriggerServer(Props props) throws Exception {
+ this.props = props;
+
+ int portNumber = props.getInt("trigger.server.port", DEFAULT_PORT_NUMBER);
+ int maxThreads = props.getInt("trigger.server.maxThreads", DEFAULT_THREAD_NUMBER);
+
+ String hostname = props.getString("jetty.hostname", "localhost");
+ props.put("server.hostname", hostname);
+ props.put("server.port", portNumber);
+ props.put("server.useSSL", String.valueOf(props.getBoolean("jetty.use.ssl", true)));
+
+ server = new Server(portNumber);
+ QueuedThreadPool httpThreadPool = new QueuedThreadPool(maxThreads);
+ server.setThreadPool(httpThreadPool);
+
+ Context root = new Context(server, "/", Context.SESSIONS);
+ root.setMaxFormContentSize(MAX_FORM_CONTENT_SIZE);
+
+ root.addServlet(new ServletHolder(new TriggerServerServlet()), "/trigger");
+ root.addServlet(new ServletHolder(new JMXHttpServlet()), "/jmx");
+ root.setAttribute(AzkabanServletContextListener.AZKABAN_SERVLET_CONTEXT_KEY, this);
+
+ triggerLoader = createTriggerLoader(props);
+ projectManager = loadProjectManager(props);
+ executorManager = loadExecutorManager(props);
+ triggerRunnerManager = loadTriggerRunnerManager(props, triggerLoader);
+
+ String triggerPluginDir = props.getString("trigger.plugin.dir", "plugins/triggers");
+ loadBuiltinCheckersAndActions(this);
+ loadPluginCheckersAndActions(triggerPluginDir, this);
+
+ configureMBeanServer();
+
+ try {
+ triggerRunnerManager.start();
+ server.start();
+ }
+ catch (Exception e) {
+ logger.warn(e);
+ Utils.croak(e.getMessage(), 1);
+ }
+
+ logger.info("Azkaban Trigger Server started on port " + portNumber);
+ }
+
+
+
+
+ private TriggerRunnerManager loadTriggerRunnerManager(Props props, TriggerLoader triggerLoader) throws IOException {
+ logger.info("Loading trigger runner manager");
+ TriggerRunnerManager trm = new TriggerRunnerManager(props, triggerLoader);
+ trm.init();
+ return trm;
+ }
+
+ private ExecutorManager loadExecutorManager(Props props) throws Exception {
+ logger.info("Loading executor manager");
+ JdbcExecutorLoader loader = new JdbcExecutorLoader(props);
+ ExecutorManager execManager = new ExecutorManager(props, loader, false);
+ return execManager;
+ }
+
+ private ProjectManager loadProjectManager(Props props) {
+ logger.info("Loading project manager");
+ JdbcProjectLoader loader = new JdbcProjectLoader(props);
+ ProjectManager manager = new ProjectManager(loader, props);
+
+ return manager;
+ }
+
+ private void loadBuiltinCheckersAndActions(AzkabanTriggerServer app) {
+ logger.info("Loading built-in checker and action types");
+ ExecutorManager executorManager = app.getExecutorManager();
+ TriggerRunnerManager triggerRunnerManager = app.getTriggerRunnerManager();
+ CheckerTypeLoader checkerLoader = triggerRunnerManager.getCheckerLoader();
+ ActionTypeLoader actionLoader = triggerRunnerManager.getActionLoader();
+ // time:
+ checkerLoader.registerCheckerType(BasicTimeChecker.type, BasicTimeChecker.class);
+ ExecutableFlowStatusChecker.setExecutorManager(executorManager);
+ checkerLoader.registerCheckerType(ExecutableFlowStatusChecker.type, ExecutableFlowStatusChecker.class);
+
+ ExecuteFlowAction.setExecutorManager(executorManager);
+ ExecuteFlowAction.setProjectManager(projectManager);
+ actionLoader.registerActionType(ExecuteFlowAction.type, ExecuteFlowAction.class);
+ CreateTriggerAction.setTriggerRunnerManager(triggerRunnerManager);
+ actionLoader.registerActionType(CreateTriggerAction.type, CreateTriggerAction.class);
+ }
+
+ private void loadPluginCheckersAndActions(String pluginPath, AzkabanTriggerServer app) {
+ logger.info("Loading plug-in checker and action types");
+ File triggerPluginPath = new File(pluginPath);
+ if (!triggerPluginPath.exists()) {
+ logger.error("plugin path " + pluginPath + " doesn't exist!");
+ return;
+ }
+
+ ClassLoader parentLoader = AzkabanTriggerServer.class.getClassLoader();
+ File[] pluginDirs = triggerPluginPath.listFiles();
+ ArrayList<String> jarPaths = new ArrayList<String>();
+ for (File pluginDir: pluginDirs) {
+ if (!pluginDir.exists()) {
+ logger.error("Error! Trigger plugin path " + pluginDir.getPath() + " doesn't exist.");
+ continue;
+ }
+
+ if (!pluginDir.isDirectory()) {
+ logger.error("The plugin path " + pluginDir + " is not a directory.");
+ continue;
+ }
+
+ // Load the conf directory
+ File propertiesDir = new File(pluginDir, "conf");
+ Props pluginProps = null;
+ if (propertiesDir.exists() && propertiesDir.isDirectory()) {
+ File propertiesFile = new File(propertiesDir, "plugin.properties");
+ File propertiesOverrideFile = new File(propertiesDir, "override.properties");
+
+ if (propertiesFile.exists()) {
+ if (propertiesOverrideFile.exists()) {
+ pluginProps = PropsUtils.loadProps(null, propertiesFile, propertiesOverrideFile);
+ }
+ else {
+ pluginProps = PropsUtils.loadProps(null, propertiesFile);
+ }
+ }
+ else {
+ logger.error("Plugin conf file " + propertiesFile + " not found.");
+ continue;
+ }
+ }
+ else {
+ logger.error("Plugin conf path " + propertiesDir + " not found.");
+ continue;
+ }
+
+ List<String> extLibClasspath = pluginProps.getStringList("trigger.external.classpaths", (List<String>)null);
+
+ String pluginClass = pluginProps.getString("trigger.class");
+ if (pluginClass == null) {
+ logger.error("Trigger class is not set.");
+ }
+ else {
+ logger.error("Plugin class " + pluginClass);
+ }
+
+ URLClassLoader urlClassLoader = null;
+ File libDir = new File(pluginDir, "lib");
+ if (libDir.exists() && libDir.isDirectory()) {
+ File[] files = libDir.listFiles();
+
+ ArrayList<URL> urls = new ArrayList<URL>();
+ for (int i=0; i < files.length; ++i) {
+ try {
+ URL url = files[i].toURI().toURL();
+ urls.add(url);
+ } catch (MalformedURLException e) {
+ logger.error(e);
+ }
+ }
+ if (extLibClasspath != null) {
+ for (String extLib : extLibClasspath) {
+ try {
+ File file = new File(pluginDir, extLib);
+ URL url = file.toURI().toURL();
+ urls.add(url);
+ } catch (MalformedURLException e) {
+ logger.error(e);
+ }
+ }
+ }
+
+ urlClassLoader = new URLClassLoader(urls.toArray(new URL[urls.size()]), parentLoader);
+ }
+ else {
+ logger.error("Library path " + propertiesDir + " not found.");
+ continue;
+ }
+
+ Class<?> triggerClass = null;
+ try {
+ triggerClass = urlClassLoader.loadClass(pluginClass);
+ }
+ catch (ClassNotFoundException e) {
+ logger.error("Class " + pluginClass + " not found.");
+ continue;
+ }
+
+ String source = FileIOUtils.getSourcePathFromClass(triggerClass);
+ logger.info("Source jar " + source);
+ jarPaths.add("jar:file:" + source);
+
+// Constructor<?> constructor = null;
+// try {
+// constructor = triggerClass.getConstructor(String.class, Props.class, Context.class, AzkabanTriggerServer.class);
+// } catch (NoSuchMethodException e) {
+// logger.error("Constructor not found in " + pluginClass);
+// continue;
+// }
+ try {
+ Utils.invokeStaticMethod(urlClassLoader, pluginClass, "initiateCheckerTypes", pluginProps, app);
+ } catch (Exception e) {
+ logger.error("Unable to initiate checker types for " + pluginClass);
+ continue;
+ }
+
+ try {
+ Utils.invokeStaticMethod(urlClassLoader, pluginClass, "initiateActionTypes", pluginProps, app);
+ } catch (Exception e) {
+ logger.error("Unable to initiate action types for " + pluginClass);
+ continue;
+ }
+
+ }
+ }
+
+ private TriggerLoader createTriggerLoader(Props props) {
+ return new JdbcTriggerLoader(props);
+ }
+
+ public void stopServer() throws Exception {
+ server.stop();
+ server.destroy();
+ }
+
+ /**
+ * Returns the global azkaban properties
+ *
+ * @return
+ */
+ public Props getAzkabanProps() {
+ return props;
+ }
+
+ /**
+ * Azkaban using Jetty
+ *
+ * @param args
+ * @throws IOException
+ */
+ public static void main(String[] args) throws Exception {
+ logger.error("Starting Jetty Azkaban Trigger Server...");
+ Props azkabanSettings = AzkabanServer.loadProps(args);
+
+ if (azkabanSettings == null) {
+ logger.error("Azkaban Properties not loaded.");
+ logger.error("Exiting Azkaban Trigger Server...");
+ return;
+ }
+
+ // Setup time zone
+ if (azkabanSettings.containsKey(DEFAULT_TIMEZONE_ID)) {
+ String timezone = azkabanSettings.getString(DEFAULT_TIMEZONE_ID);
+ TimeZone.setDefault(TimeZone.getTimeZone(timezone));
+ DateTimeZone.setDefault(DateTimeZone.forID(timezone));
+
+ logger.info("Setting timezone to " + timezone);
+ }
+
+ app = new AzkabanTriggerServer(azkabanSettings);
+
+ Runtime.getRuntime().addShutdownHook(new Thread() {
+
+ public void run() {
+ logger.info("Shutting down http server...");
+ try {
+ app.stopServer();
+ } catch (Exception e) {
+ logger.error("Error while shutting down http server.", e);
+ }
+ logger.info("kk thx bye.");
+ }
+ });
+ }
+
+ /**
+ * Loads the Azkaban property file from the AZKABAN_HOME conf directory
+ *
+ * @return
+ */
+ /*package*/ static Props loadConfigurationFromAzkabanHome() {
+ String azkabanHome = System.getenv("AZKABAN_HOME");
+
+ if (azkabanHome == null) {
+ logger.error("AZKABAN_HOME not set. Will try default.");
+ return null;
+ }
+
+ if (!new File(azkabanHome).isDirectory()
+ || !new File(azkabanHome).canRead()) {
+ logger.error(azkabanHome + " is not a readable directory.");
+ return null;
+ }
+
+ File confPath = new File(azkabanHome, DEFAULT_CONF_PATH);
+ if (!confPath.exists() || !confPath.isDirectory()
+ || !confPath.canRead()) {
+ logger.error(azkabanHome
+ + " does not contain a readable conf directory.");
+ return null;
+ }
+
+ return loadAzkabanConfigurationFromDirectory(confPath);
+ }
+
+ /**
+ * Loads the Azkaban conf file int a Props object
+ *
+ * @param path
+ * @return
+ */
+ private static Props loadAzkabanConfigurationFromDirectory(File dir) {
+ File azkabanPrivatePropsFile = new File(dir, AZKABAN_PRIVATE_PROPERTIES_FILE);
+ File azkabanPropsFile = new File(dir, AZKABAN_PROPERTIES_FILE);
+
+ Props props = null;
+ try {
+ // This is purely optional
+ if (azkabanPrivatePropsFile.exists() && azkabanPrivatePropsFile.isFile()) {
+ logger.info("Loading azkaban private properties file" );
+ props = new Props(null, azkabanPrivatePropsFile);
+ }
+
+ if (azkabanPropsFile.exists() && azkabanPropsFile.isFile()) {
+ logger.info("Loading azkaban properties file" );
+ props = new Props(props, azkabanPropsFile);
+ }
+ } catch (FileNotFoundException e) {
+ logger.error("File not found. Could not load azkaban config file", e);
+ } catch (IOException e) {
+ logger.error("File found, but error reading. Could not load azkaban config file", e);
+ }
+
+ return props;
+ }
+
+ private void configureMBeanServer() {
+ logger.info("Registering MBeans...");
+ mbeanServer = ManagementFactory.getPlatformMBeanServer();
+
+ registerMbean("triggerServerJetty", new JmxJettyServer(server));
+ registerMbean("triggerRunnerManager", new JmxTriggerRunnerManager(triggerRunnerManager));
+ }
+
+ public void close() {
+ try {
+ for (ObjectName name : registeredMBeans) {
+ mbeanServer.unregisterMBean(name);
+ logger.info("Jmx MBean " + name.getCanonicalName() + " unregistered.");
+ }
+ } catch (Exception e) {
+ logger.error("Failed to cleanup MBeanServer", e);
+ }
+ }
+
+ private void registerMbean(String name, Object mbean) {
+ Class<?> mbeanClass = mbean.getClass();
+ ObjectName mbeanName;
+ try {
+ mbeanName = new ObjectName(mbeanClass.getName() + ":name=" + name);
+ mbeanServer.registerMBean(mbean, mbeanName);
+ logger.info("Bean " + mbeanClass.getCanonicalName() + " registered.");
+ registeredMBeans.add(mbeanName);
+ } catch (Exception e) {
+ logger.error("Error registering mbean " + mbeanClass.getCanonicalName(), e);
+ }
+
+ }
+
+ public List<ObjectName> getMbeanNames() {
+ return registeredMBeans;
+ }
+
+ public MBeanInfo getMBeanInfo(ObjectName name) {
+ try {
+ return mbeanServer.getMBeanInfo(name);
+ } catch (Exception e) {
+ logger.error(e);
+ return null;
+ }
+ }
+
+ public Object getMBeanAttribute(ObjectName name, String attribute) {
+ try {
+ return mbeanServer.getAttribute(name, attribute);
+ } catch (Exception e) {
+ logger.error(e);
+ return null;
+ }
+ }
+
+ public TriggerRunnerManager getTriggerRunnerManager() {
+ return triggerRunnerManager;
+ }
+
+ public ExecutorManager getExecutorManager() {
+ return executorManager;
+ }
+
+ public ProjectManager getProjectManager() {
+ return projectManager;
+ }
+
+}
diff --git a/src/java/azkaban/triggerapp/JMXHttpServlet.java b/src/java/azkaban/triggerapp/JMXHttpServlet.java
new file mode 100644
index 0000000..ac7ae41
--- /dev/null
+++ b/src/java/azkaban/triggerapp/JMXHttpServlet.java
@@ -0,0 +1,72 @@
+package azkaban.triggerapp;
+
+import java.io.IOException;
+import java.util.HashMap;
+
+import javax.management.MBeanAttributeInfo;
+import javax.management.MBeanInfo;
+import javax.management.ObjectName;
+import javax.servlet.ServletConfig;
+import javax.servlet.ServletException;
+import javax.servlet.http.HttpServlet;
+import javax.servlet.http.HttpServletRequest;
+import javax.servlet.http.HttpServletResponse;
+
+import org.apache.log4j.Logger;
+
+import azkaban.utils.JSONUtils;
+import azkaban.webapp.servlet.AzkabanServletContextListener;
+import azkaban.webapp.servlet.HttpRequestUtils;
+
+public class JMXHttpServlet extends HttpServlet implements TriggerConnectorParams {
+ private static final long serialVersionUID = -3085603824826446270L;
+ private static final Logger logger = Logger.getLogger(JMXHttpServlet.class);
+ private AzkabanTriggerServer server;
+
+ public void init(ServletConfig config) throws ServletException {
+ server = (AzkabanTriggerServer) config.getServletContext().getAttribute(AzkabanServletContextListener.AZKABAN_SERVLET_CONTEXT_KEY);
+ }
+
+ public boolean hasParam(HttpServletRequest request, String param) {
+ return HttpRequestUtils.hasParam(request, param);
+ }
+
+ public String getParam(HttpServletRequest request, String name) throws ServletException {
+ return HttpRequestUtils.getParam(request, name);
+ }
+
+ protected void doGet(HttpServletRequest req, HttpServletResponse resp) throws ServletException, IOException {
+ HashMap<String,Object> ret = new HashMap<String,Object>();
+
+ if (hasParam(req, JMX_GET_MBEANS)) {
+ ret.put("mbeans", server.getMbeanNames());
+ }
+ else if (hasParam(req, JMX_GET_ALL_MBEAN_ATTRIBUTES)) {
+ if (!hasParam(req, JMX_MBEAN)) {
+ ret.put("error", "Parameters 'mbean' must be set");
+ }
+ else {
+ String mbeanName = getParam(req, JMX_MBEAN);
+ try {
+ ObjectName name = new ObjectName(mbeanName);
+ MBeanInfo info = server.getMBeanInfo(name);
+
+ MBeanAttributeInfo[] mbeanAttrs = info.getAttributes();
+ HashMap<String, Object> attributes = new HashMap<String,Object>();
+
+ for (MBeanAttributeInfo attrInfo: mbeanAttrs) {
+ Object obj = server.getMBeanAttribute(name, attrInfo.getName());
+ attributes.put(attrInfo.getName(), obj);
+ }
+
+ ret.put("attributes", attributes);
+ } catch (Exception e) {
+ logger.error(e);
+ ret.put("error", "'" + mbeanName + "' is not a valid mBean name");
+ }
+ }
+ }
+
+ JSONUtils.toJSON(ret, resp.getOutputStream(), true);
+ }
+}
\ No newline at end of file
diff --git a/src/java/azkaban/triggerapp/TriggerConnectorParams.java b/src/java/azkaban/triggerapp/TriggerConnectorParams.java
new file mode 100644
index 0000000..f7901bd
--- /dev/null
+++ b/src/java/azkaban/triggerapp/TriggerConnectorParams.java
@@ -0,0 +1,33 @@
+package azkaban.triggerapp;
+
+public interface TriggerConnectorParams {
+ public static final String ACTION_PARAM = "action";
+ public static final String TRIGGER_ID_PARAM = "triggerid";
+ public static final String USER_PARAM = "user";
+
+ public static final String PING_ACTION = "ping";
+
+ public static final String INSERT_TRIGGER_ACTION = "insert";
+ public static final String REMOVE_TRIGGER_ACTION = "remove";
+ public static final String UPDATE_TRIGGER_ACTION = "update";
+ public static final String GET_UPDATE_ACTION = "getupdate";
+
+ public static final String RESPONSE_NOTFOUND = "notfound";
+ public static final String RESPONSE_ERROR = "error";
+ public static final String RESPONSE_SUCCESS = "success";
+ public static final String RESPONSE_ALIVE = "alive";
+ public static final String RESPONSE_UPDATETIME = "lasttime";
+ public static final String RESPONSE_UPDATED_TRIGGERS = "updated";
+
+ public static final String UPDATE_TIME_LIST_PARAM = "updatetime";
+
+ public static final String JMX_GET_MBEANS = "getMBeans";
+ public static final String JMX_GET_MBEAN_INFO = "getMBeanInfo";
+ public static final String JMX_GET_MBEAN_ATTRIBUTE = "getAttribute";
+ public static final String JMX_GET_ALL_MBEAN_ATTRIBUTES = "getAllMBeanAttributes";
+ public static final String JMX_ATTRIBUTE = "attribute";
+ public static final String JMX_MBEAN = "mBean";
+
+ public static final String JMX_GET_ALL_TRIGGER_SERVER_ATTRIBUTES = "getAllTriggerServerAttributes";
+ public static final String JMX_HOSTPORT = "hostPort";
+}
diff --git a/src/java/azkaban/triggerapp/TriggerRunnerManager.java b/src/java/azkaban/triggerapp/TriggerRunnerManager.java
new file mode 100644
index 0000000..b4e5fe7
--- /dev/null
+++ b/src/java/azkaban/triggerapp/TriggerRunnerManager.java
@@ -0,0 +1,341 @@
+package azkaban.triggerapp;
+
+import java.io.IOException;
+import java.lang.Thread.State;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingDeque;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import org.apache.log4j.Logger;
+
+import azkaban.trigger.ActionTypeLoader;
+import azkaban.trigger.CheckerTypeLoader;
+import azkaban.trigger.Condition;
+import azkaban.trigger.ConditionChecker;
+import azkaban.trigger.Trigger;
+import azkaban.trigger.TriggerAction;
+import azkaban.trigger.TriggerLoader;
+import azkaban.trigger.TriggerManagerException;
+import azkaban.trigger.TriggerStatus;
+import azkaban.utils.Props;
+
+public class TriggerRunnerManager {
+ private static Logger logger = Logger.getLogger(TriggerRunnerManager.class);
+ private static final long DEFAULT_SCANNER_INTERVAL_MS = 60000;
+
+ private static Map<Integer, Trigger> triggerIdMap = new HashMap<Integer, Trigger>();
+
+ private CheckerTypeLoader checkerTypeLoader;
+ private ActionTypeLoader actionTypeLoader;
+ private TriggerLoader triggerLoader;
+
+ private Props globalProps;
+
+ private final Props azkabanProps;
+
+ private final TriggerScannerThread runnerThread;
+ private long lastRunnerThreadCheckTime = -1;
+
+
+ public TriggerRunnerManager(Props props, TriggerLoader triggerLoader) throws IOException {
+
+ azkabanProps = props;
+
+ this.triggerLoader = triggerLoader;
+
+ long scannerInterval = props.getLong("trigger.scan.interval", DEFAULT_SCANNER_INTERVAL_MS);
+ runnerThread = new TriggerScannerThread(scannerInterval);
+
+ checkerTypeLoader = new CheckerTypeLoader();
+ actionTypeLoader = new ActionTypeLoader();
+
+ }
+
+ public void init() {
+ try{
+ checkerTypeLoader.init(azkabanProps);
+ actionTypeLoader.init(azkabanProps);
+ } catch(Exception e) {
+ e.printStackTrace();
+ logger.error(e.getMessage());
+ }
+
+ Condition.setCheckerLoader(checkerTypeLoader);
+ Trigger.setActionTypeLoader(actionTypeLoader);
+
+ }
+
+ public void start() {
+
+ try{
+ // expect loader to return valid triggers
+ List<Trigger> triggers = triggerLoader.loadTriggers();
+ for(Trigger t : triggers) {
+ runnerThread.addTrigger(t);
+ triggerIdMap.put(t.getTriggerId(), t);
+ }
+ }catch(Exception e) {
+ e.printStackTrace();
+ logger.error(e.getMessage());
+ }
+
+ runnerThread.start();
+ }
+
+ public Props getGlobalProps() {
+ return globalProps;
+ }
+
+ public void setGlobalProps(Props globalProps) {
+ this.globalProps = globalProps;
+ }
+
+ public CheckerTypeLoader getCheckerLoader() {
+ return checkerTypeLoader;
+ }
+
+ public ActionTypeLoader getActionLoader() {
+ return actionTypeLoader;
+ }
+
+ public synchronized void insertTrigger(Trigger t) throws TriggerManagerException {
+
+ triggerLoader.addTrigger(t);
+ triggerIdMap.put(t.getTriggerId(), t);
+ runnerThread.addTrigger(t);
+ }
+
+ public synchronized void removeTrigger(int id) throws TriggerManagerException {
+ Trigger t = triggerIdMap.get(id);
+ if(t != null) {
+ removeTrigger(triggerIdMap.get(id));
+ }
+ }
+
+ public synchronized void updateTrigger(int triggerId) throws TriggerManagerException {
+ Trigger t = triggerIdMap.get(triggerId);
+ if(t == null) {
+ throw new TriggerManagerException("The trigger to update doesn't exist!");
+ }
+
+ runnerThread.deleteTrigger(t);
+ runnerThread.addTrigger(t);
+ triggerIdMap.put(t.getTriggerId(), t);
+
+ triggerLoader.updateTrigger(t);
+ }
+
+ public synchronized void updateTrigger(Trigger t) throws TriggerManagerException {
+ if(!triggerIdMap.containsKey(t.getTriggerId())) {
+ throw new TriggerManagerException("The trigger to update doesn't exist!");
+ }
+
+ runnerThread.deleteTrigger(t);
+ runnerThread.addTrigger(t);
+ triggerIdMap.put(t.getTriggerId(), t);
+
+ triggerLoader.updateTrigger(t);
+ }
+
+ public synchronized void removeTrigger(Trigger t) throws TriggerManagerException {
+ t.stopCheckers();
+ triggerLoader.removeTrigger(t);
+ runnerThread.deleteTrigger(t);
+ triggerIdMap.remove(t.getTriggerId());
+ }
+
+ public List<Trigger> getTriggers() {
+ return new ArrayList<Trigger>(triggerIdMap.values());
+ }
+
+ public Map<String, Class<? extends ConditionChecker>> getSupportedCheckers() {
+ return checkerTypeLoader.getSupportedCheckers();
+ }
+
+ private class TriggerScannerThread extends Thread {
+ private BlockingQueue<Trigger> triggers;
+ private boolean shutdown = false;
+ //private AtomicBoolean stillAlive = new AtomicBoolean(true);
+ private final long scannerInterval;
+
+ public TriggerScannerThread(long scannerInterval) {
+ triggers = new LinkedBlockingDeque<Trigger>();
+ this.setName("TriggerRunnerManager-Trigger-Scanner-Thread");
+ this.scannerInterval = scannerInterval;;
+ }
+
+ @SuppressWarnings("unused")
+ public void shutdown() {
+ logger.error("Shutting down trigger manager thread " + this.getName());
+ shutdown = true;
+ //stillAlive.set(false);
+ this.interrupt();
+ }
+
+ public synchronized List<Trigger> getTriggers() {
+ return new ArrayList<Trigger>(triggers);
+ }
+
+ public synchronized void addTrigger(Trigger t) {
+ triggers.add(t);
+ }
+
+ public synchronized void deleteTrigger(Trigger t) {
+ triggers.remove(t);
+ }
+
+ public void run() {
+ //while(stillAlive.get()) {
+ while(!shutdown) {
+ synchronized (this) {
+ try{
+ lastRunnerThreadCheckTime = System.currentTimeMillis();
+
+ try{
+ checkAllTriggers();
+ } catch(Exception e) {
+ e.printStackTrace();
+ logger.error(e.getMessage());
+ } catch(Throwable t) {
+ t.printStackTrace();
+ logger.error(t.getMessage());
+ }
+
+ long timeRemaining = scannerInterval - (System.currentTimeMillis() - getLastRunnerThreadCheckTime());
+ if(timeRemaining < 0) {
+ logger.error("Trigger manager thread " + this.getName() + " is too busy!");
+ } else {
+ wait(timeRemaining);
+ }
+ } catch(InterruptedException e) {
+ logger.info("Interrupted. Probably to shut down.");
+ }
+
+ }
+ }
+ }
+
+ private void checkAllTriggers() throws TriggerManagerException {
+ for(Trigger t : triggers) {
+ if(t.getStatus().equals(TriggerStatus.READY)) {
+ if(t.triggerConditionMet()) {
+ onTriggerTrigger(t);
+ } else if (t.expireConditionMet()) {
+ onTriggerExpire(t);
+ }
+ }
+ }
+ }
+
+ private void onTriggerTrigger(Trigger t) throws TriggerManagerException {
+ List<TriggerAction> actions = t.getTriggerActions();
+ for(TriggerAction action : actions) {
+ try {
+ action.doAction();
+ } catch (Exception e) {
+ // TODO Auto-generated catch block
+ throw new TriggerManagerException("action failed to execute", e);
+ }
+ }
+ if(t.isResetOnTrigger()) {
+ t.resetTriggerConditions();
+ t.resetExpireCondition();
+// updateTrigger(t);
+ } else {
+ t.setStatus(TriggerStatus.EXPIRED);
+ }
+
+ triggerLoader.updateTrigger(t);
+
+// updateAgent(t);
+ }
+
+ private void onTriggerExpire(Trigger t) throws TriggerManagerException {
+ List<TriggerAction> expireActions = t.getExpireActions();
+ for(TriggerAction action : expireActions) {
+ try {
+ action.doAction();
+ } catch (Exception e) {
+ // TODO Auto-generated catch block
+ throw new TriggerManagerException("expire action failed to execute", e);
+ }
+ }
+ if(t.isResetOnExpire()) {
+ t.resetTriggerConditions();
+ t.resetExpireCondition();
+// updateTrigger(t);
+ } else {
+ t.setStatus(TriggerStatus.EXPIRED);
+ }
+// updateAgent(t);
+ triggerLoader.updateTrigger(t);
+ }
+ }
+
+ public synchronized Trigger getTrigger(int triggerId) {
+ return triggerIdMap.get(triggerId);
+ }
+
+ public void expireTrigger(int triggerId) {
+ Trigger t = getTrigger(triggerId);
+ t.setStatus(TriggerStatus.EXPIRED);
+// updateAgent(t);
+ }
+
+ public List<Trigger> getTriggers(String triggerSource) {
+ List<Trigger> triggers = new ArrayList<Trigger>();
+ for(Trigger t : triggerIdMap.values()) {
+ if(t.getSource().equals(triggerSource)) {
+ triggers.add(t);
+ }
+ }
+ return triggers;
+ }
+
+ public List<Trigger> getUpdatedTriggers(String triggerSource, long lastUpdateTime) {
+ List<Trigger> triggers = new ArrayList<Trigger>();
+ for(Trigger t : triggerIdMap.values()) {
+ if(t.getSource().equals(triggerSource) && t.getLastModifyTime().getMillis() > lastUpdateTime) {
+ triggers.add(t);
+ }
+ }
+ return triggers;
+ }
+
+ public List<Integer> getUpdatedTriggers(long lastUpdateTime) {
+ List<Integer> triggers = new ArrayList<Integer>();
+ for(Trigger t : triggerIdMap.values()) {
+ if(t.getLastModifyTime().getMillis() > lastUpdateTime) {
+ triggers.add(t.getTriggerId());
+ }
+ }
+ return triggers;
+ }
+
+ public long getLastRunnerThreadCheckTime() {
+ return lastRunnerThreadCheckTime;
+ }
+
+ public boolean isRunnerThreadActive() {
+ return runnerThread.isAlive();
+ }
+
+
+ public State getRunnerThreadState() {
+ return this.runnerThread.getState();
+ }
+
+ public void loadTrigger(int triggerId) throws TriggerManagerException {
+ Trigger t = triggerLoader.loadTrigger(triggerId);
+ if(t.getStatus().equals(TriggerStatus.PREPARING)) {
+ triggerIdMap.put(t.getTriggerId(), t);
+ runnerThread.addTrigger(t);
+ t.setStatus(TriggerStatus.READY);
+ }
+ }
+
+}
diff --git a/src/java/azkaban/triggerapp/TriggerServerServlet.java b/src/java/azkaban/triggerapp/TriggerServerServlet.java
new file mode 100644
index 0000000..68c36b9
--- /dev/null
+++ b/src/java/azkaban/triggerapp/TriggerServerServlet.java
@@ -0,0 +1,208 @@
+package azkaban.triggerapp;
+
+import java.io.IOException;
+import java.io.OutputStream;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import javax.servlet.ServletConfig;
+import javax.servlet.ServletException;
+import javax.servlet.http.HttpServlet;
+import javax.servlet.http.HttpServletRequest;
+import javax.servlet.http.HttpServletResponse;
+
+import org.apache.log4j.Logger;
+import org.codehaus.jackson.map.ObjectMapper;
+import org.joda.time.DateTime;
+
+import azkaban.execapp.AzkabanExecutorServer;
+import azkaban.execapp.FlowRunnerManager;
+import azkaban.executor.ConnectorParams;
+import azkaban.executor.ExecutableFlow;
+import azkaban.executor.ExecutorManagerException;
+import azkaban.trigger.TriggerLoader;
+import azkaban.trigger.TriggerManagerException;
+import azkaban.utils.JSONUtils;
+import azkaban.utils.FileIOUtils.JobMetaData;
+import azkaban.utils.FileIOUtils.LogData;
+import azkaban.webapp.servlet.AzkabanServletContextListener;
+
+public class TriggerServerServlet extends HttpServlet implements TriggerConnectorParams {
+ private static final long serialVersionUID = 1L;
+ private static final Logger logger = Logger.getLogger(TriggerServerServlet.class.getName());
+ public static final String JSON_MIME_TYPE = "application/json";
+
+ private AzkabanTriggerServer application;
+ private TriggerRunnerManager triggerRunnerManager;
+
+ public TriggerServerServlet() {
+ super();
+ }
+
+ @Override
+ public void init(ServletConfig config) throws ServletException {
+ application = (AzkabanTriggerServer) config.getServletContext().getAttribute(AzkabanServletContextListener.AZKABAN_SERVLET_CONTEXT_KEY);
+
+ if (application == null) {
+ throw new IllegalStateException(
+ "No batch application is defined in the servlet context!");
+ }
+
+ triggerRunnerManager = application.getTriggerRunnerManager();
+ }
+
+
+ protected void writeJSON(HttpServletResponse resp, Object obj) throws IOException {
+ resp.setContentType(JSON_MIME_TYPE);
+ ObjectMapper mapper = new ObjectMapper();
+ OutputStream stream = resp.getOutputStream();
+ mapper.writeValue(stream, obj);
+ }
+
+ @Override
+ public void doGet(HttpServletRequest req, HttpServletResponse resp) throws ServletException, IOException {
+ HashMap<String,Object> respMap= new HashMap<String,Object>();
+ //logger.info("ExecutorServer called by " + req.getRemoteAddr());
+ try {
+ if (!hasParam(req, ACTION_PARAM)) {
+ logger.error("Parameter action not set");
+ respMap.put("error", "Parameter action not set");
+ }
+ else {
+ String action = getParam(req, ACTION_PARAM);
+ if (action.equals(GET_UPDATE_ACTION)) {
+ //logger.info("Updated called");
+ handleAjaxGetUpdateRequest(req, respMap);
+ }
+ else if (action.equals(PING_ACTION)) {
+ respMap.put("status", "alive");
+ }
+ else {
+ int triggerId = Integer.parseInt(getParam(req, TRIGGER_ID_PARAM));
+ String user = getParam(req, USER_PARAM, null);
+
+ logger.info("User " + user + " has called action " + action + " on " + triggerId);
+ if (action.equals(INSERT_TRIGGER_ACTION)) {
+ logger.info("Insert Trigger Action");
+ handleInsertTrigger(triggerId, req, resp, respMap);
+ } else if (action.equals(REMOVE_TRIGGER_ACTION)) {
+ logger.info("Remove Trigger Action");
+ handleRemoveTrigger(triggerId, req, resp, respMap);
+ }
+ else if (action.equals(UPDATE_TRIGGER_ACTION)) {
+ logger.info("Update Trigger Action");
+ handleUpdateTrigger(triggerId, user, req, respMap);
+ }
+ else {
+ logger.error("action: '" + action + "' not supported.");
+ respMap.put("error", "action: '" + action + "' not supported.");
+ }
+ }
+ }
+ } catch (Exception e) {
+ logger.error(e);
+ respMap.put(RESPONSE_ERROR, e.getMessage());
+ }
+ writeJSON(resp, respMap);
+ resp.flushBuffer();
+ }
+
+
+
+ private void handleAjaxGetUpdateRequest(HttpServletRequest req, HashMap<String, Object> respMap) {
+ List<Integer> updates = null;
+ try{
+ long lastUpdateTime = getLongParam(req, "lastUpdateTime");
+// respMap.put(TriggerConnectorParams.RESPONSE_UPDATETIME, DateTime.now().getMillis());
+ updates = triggerRunnerManager.getUpdatedTriggers(lastUpdateTime);
+ if(updates.size() > 0) {
+ System.out.println("got " + updates.size() + " updates" );
+ }
+ respMap.put("updates", updates);
+ } catch (Exception e) {
+ logger.error(e);
+ respMap.put("error", e.getMessage());
+ }
+ }
+
+ private void handleInsertTrigger(int triggerId, HttpServletRequest req, HttpServletResponse resp, HashMap<String, Object> respMap) {
+ try {
+ triggerRunnerManager.loadTrigger(triggerId);
+ } catch (TriggerManagerException e) {
+ logger.error(e);
+ respMap.put("error", e.getMessage());
+ }
+ }
+
+ private void handleUpdateTrigger(int triggerId, String user, HttpServletRequest req, HashMap<String, Object> respMap) {
+ try {
+ triggerRunnerManager.updateTrigger(triggerId);
+ } catch (TriggerManagerException e) {
+ logger.error(e);
+ respMap.put("error", e.getMessage());
+ }
+ }
+
+ private void handleRemoveTrigger(int triggerId, HttpServletRequest req, HttpServletResponse resp, HashMap<String, Object> respMap) {
+ try {
+ triggerRunnerManager.removeTrigger(triggerId);
+ } catch (TriggerManagerException e) {
+ logger.error(e);
+ respMap.put("error", e.getMessage());
+ }
+ }
+
+ @Override
+ public void doPost(HttpServletRequest req, HttpServletResponse resp) throws ServletException, IOException {
+
+ }
+
+ /**
+ * Duplicated code with AbstractAzkabanServlet, but ne
+ */
+ public boolean hasParam(HttpServletRequest request, String param) {
+ return request.getParameter(param) != null;
+ }
+
+ public String getParam(HttpServletRequest request, String name)
+ throws ServletException {
+ String p = request.getParameter(name);
+ if (p == null)
+ throw new ServletException("Missing required parameter '" + name + "'.");
+ else
+ return p;
+ }
+
+ public String getParam(HttpServletRequest request, String name, String defaultVal ) {
+ String p = request.getParameter(name);
+ if (p == null) {
+ return defaultVal;
+ }
+
+ return p;
+ }
+
+ public long getLongParam(HttpServletRequest request, String name) throws ServletException {
+ String p = getParam(request, name);
+ return Long.parseLong(p);
+ }
+
+ public int getIntParam(HttpServletRequest request, String name) throws ServletException {
+ String p = getParam(request, name);
+ return Integer.parseInt(p);
+ }
+
+ public int getIntParam(HttpServletRequest request, String name, int defaultVal) {
+ if (hasParam(request, name)) {
+ try {
+ return getIntParam(request, name);
+ } catch (Exception e) {
+ return defaultVal;
+ }
+ }
+
+ return defaultVal;
+ }
+}
src/java/azkaban/webapp/AzkabanWebServer.java 44(+15 -29)
diff --git a/src/java/azkaban/webapp/AzkabanWebServer.java b/src/java/azkaban/webapp/AzkabanWebServer.java
index 2ae3ee3..fdfbed3 100644
--- a/src/java/azkaban/webapp/AzkabanWebServer.java
+++ b/src/java/azkaban/webapp/AzkabanWebServer.java
@@ -57,7 +57,6 @@ import azkaban.executor.ExecutorManager;
import azkaban.executor.JdbcExecutorLoader;
import azkaban.jmx.JmxExecutorManager;
import azkaban.jmx.JmxJettyServer;
-import azkaban.jmx.JmxSLAManager;
import azkaban.jmx.JmxScheduler;
import azkaban.project.JdbcProjectLoader;
import azkaban.project.ProjectManager;
@@ -66,13 +65,11 @@ import azkaban.scheduler.JdbcScheduleLoader;
import azkaban.scheduler.ScheduleLoader;
import azkaban.scheduler.ScheduleManager;
import azkaban.scheduler.TriggerBasedScheduleLoader;
-import azkaban.sla.JdbcSLALoader;
-import azkaban.sla.SLAManager;
-import azkaban.sla.SLAManagerException;
import azkaban.trigger.JdbcTriggerLoader;
import azkaban.trigger.TriggerLoader;
import azkaban.trigger.TriggerManager;
import azkaban.trigger.TriggerAgent;
+import azkaban.trigger.TriggerManagerException;
import azkaban.user.UserManager;
import azkaban.user.XmlUserManager;
import azkaban.utils.FileIOUtils;
@@ -143,8 +140,6 @@ public class AzkabanWebServer extends AzkabanServer {
// private TriggerBasedScheduler scheduler;
private TriggerManager triggerManager;
- private SLAManager slaManager;
-
private final ClassLoader baseClassLoader;
private Props props;
@@ -174,15 +169,14 @@ public class AzkabanWebServer extends AzkabanServer {
sessionCache = new SessionCache(props);
userManager = loadUserManager(props);
executorManager = loadExecutorManager(props);
- slaManager = loadSLAManager(props);
triggerManager = loadTriggerManager(props);
- projectManager = loadProjectManager(props, triggerManager);
+ projectManager = loadProjectManager(props);
// scheduler = loadScheduler(executorManager, projectManager, triggerManager);
- scheduleManager = loadScheduleManager(projectManager, executorManager, slaManager, triggerManager, props);
+ scheduleManager = loadScheduleManager(executorManager, triggerManager, props);
baseClassLoader = getBaseClassloader();
@@ -231,31 +225,34 @@ public class AzkabanWebServer extends AzkabanServer {
return manager;
}
- private ProjectManager loadProjectManager(Props props, TriggerManager triggerManager) {
+ private ProjectManager loadProjectManager(Props props) {
logger.info("Loading JDBC for project management");
JdbcProjectLoader loader = new JdbcProjectLoader(props);
- ProjectManager manager = new ProjectManager(loader, props, triggerManager);
+ ProjectManager manager = new ProjectManager(loader, props);
+ manager.setTriggerManager(triggerManager);
return manager;
}
private ExecutorManager loadExecutorManager(Props props) throws Exception {
JdbcExecutorLoader loader = new JdbcExecutorLoader(props);
- ExecutorManager execManager = new ExecutorManager(props, loader);
+ ExecutorManager execManager = new ExecutorManager(props, loader, true);
return execManager;
}
- private ScheduleManager loadScheduleManager(ProjectManager projectManager, ExecutorManager executorManager, SLAManager slaManager, TriggerManager triggerManager, Props props ) throws Exception {
+ private ScheduleManager loadScheduleManager(ExecutorManager executorManager, TriggerManager triggerManager, Props props ) throws Exception {
ScheduleManager schedManager = null;
String scheduleLoaderType = props.getString("azkaban.scheduler.loader", "TriggerBasedScheduleLoader");
if(scheduleLoaderType.equals("JdbcScheduleLoader")) {
ScheduleLoader loader = new JdbcScheduleLoader(props);
- schedManager = new ScheduleManager(executorManager, projectManager, slaManager, loader, false);
+ schedManager = new ScheduleManager(executorManager, loader, false);
+ schedManager.setProjectManager(projectManager);
+ schedManager.start();
} else if(scheduleLoaderType.equals("TriggerBasedScheduleLoader")) {
logger.info("Loading trigger based scheduler");
- ScheduleLoader loader = new TriggerBasedScheduleLoader(triggerManager, executorManager, projectManager, ScheduleManager.triggerSource);
- schedManager = new ScheduleManager(executorManager, projectManager, slaManager, loader, true);
+ ScheduleLoader loader = new TriggerBasedScheduleLoader(triggerManager, executorManager, null, ScheduleManager.triggerSource);
+ schedManager = new ScheduleManager(executorManager, loader, true);
}
return schedManager;
@@ -266,12 +263,7 @@ public class AzkabanWebServer extends AzkabanServer {
// return new TriggerBasedScheduler(executorManager, projectManager, triggerManager, loader);
// }
- private SLAManager loadSLAManager(Props props) throws SLAManagerException {
- SLAManager slaManager = new SLAManager(executorManager, new JdbcSLALoader(props), props);
- return slaManager;
- }
-
- private TriggerManager loadTriggerManager(Props props) {
+ private TriggerManager loadTriggerManager(Props props) throws TriggerManagerException {
TriggerLoader loader = new JdbcTriggerLoader(props);
return new TriggerManager(props, loader);
}
@@ -317,10 +309,6 @@ public class AzkabanWebServer extends AzkabanServer {
return executorManager;
}
- public SLAManager getSLAManager() {
- return slaManager;
- }
-
public ScheduleManager getScheduleManager() {
return scheduleManager;
}
@@ -663,7 +651,7 @@ public class AzkabanWebServer extends AzkabanServer {
}
TriggerPlugin plugin = (TriggerPlugin) obj;
-// AbstractTriggerServlet avServlet = (AbstractTriggerServlet)obj;
+// AbstractAzkabanServlet avServlet = (AbstractAzkabanServlet) plugin.getServlet();
// root.addServlet(new ServletHolder(avServlet), "/" + pluginWebPath + "/*");
installedTriggerPlugins.put(pluginName, plugin);
}
@@ -908,7 +896,6 @@ public class AzkabanWebServer extends AzkabanServer {
registerMbean("jetty", new JmxJettyServer(server));
registerMbean("scheduler", new JmxScheduler(scheduleManager));
- registerMbean("slaManager", new JmxSLAManager(slaManager));
registerMbean("executorManager", new JmxExecutorManager(executorManager));
}
@@ -922,7 +909,6 @@ public class AzkabanWebServer extends AzkabanServer {
logger.error("Failed to cleanup MBeanServer", e);
}
scheduleManager.shutdown();
- slaManager.shutdown();
executorManager.shutdown();
}
src/java/azkaban/webapp/servlet/ScheduleServlet.java 346(+170 -176)
diff --git a/src/java/azkaban/webapp/servlet/ScheduleServlet.java b/src/java/azkaban/webapp/servlet/ScheduleServlet.java
index 8e71ee4..73fefdb 100644
--- a/src/java/azkaban/webapp/servlet/ScheduleServlet.java
+++ b/src/java/azkaban/webapp/servlet/ScheduleServlet.java
@@ -54,11 +54,6 @@ import azkaban.scheduler.Schedule;
import azkaban.scheduler.ScheduleManager;
import azkaban.scheduler.ScheduleManagerException;
import azkaban.scheduler.ScheduleStatisticManager;
-import azkaban.sla.SLA;
-import azkaban.sla.SLA.SlaAction;
-import azkaban.sla.SLA.SlaRule;
-import azkaban.sla.SLA.SlaSetting;
-import azkaban.sla.SlaOptions;
import azkaban.user.Permission;
import azkaban.user.Permission.Type;
import azkaban.user.User;
@@ -100,13 +95,13 @@ public class ScheduleServlet extends LoginAbstractAzkabanServlet {
HashMap<String, Object> ret = new HashMap<String, Object>();
String ajaxName = getParam(req, "ajax");
- if (ajaxName.equals("slaInfo")) {
- ajaxSlaInfo(req, ret, session.getUser());
- }
- else if(ajaxName.equals("setSla")) {
- ajaxSetSla(req, ret, session.getUser());
- }
- else if(ajaxName.equals("loadFlow")) {
+// if (ajaxName.equals("slaInfo")) {
+// ajaxSlaInfo(req, ret, session.getUser());
+// }
+// else if(ajaxName.equals("setSla")) {
+// ajaxSetSla(req, ret, session.getUser());
+// }
+ if(ajaxName.equals("loadFlow")) {
ajaxLoadFlows(req, ret, session.getUser());
}
else if(ajaxName.equals("loadHistory")) {
@@ -122,98 +117,98 @@ public class ScheduleServlet extends LoginAbstractAzkabanServlet {
}
}
- private void ajaxSetSla(HttpServletRequest req, HashMap<String, Object> ret, User user) {
- try {
-
- int scheduleId = getIntParam(req, "scheduleId");
-
- Schedule sched = scheduleManager.getSchedule(scheduleId);
-
- Project project = projectManager.getProject(sched.getProjectId());
- if(!hasPermission(project, user, Permission.Type.SCHEDULE)) {
- ret.put("error", "User " + user + " does not have permission to set SLA for this flow.");
- return;
- }
-
-
- SlaOptions slaOptions= new SlaOptions();
-
- String slaEmails = getParam(req, "slaEmails");
- String[] emailSplit = slaEmails.split("\\s*,\\s*|\\s*;\\s*|\\s+");
-
- Map<String, String> settings = getParamGroup(req, "settings");
- List<SlaSetting> slaSettings = new ArrayList<SlaSetting>();
- for(String set : settings.keySet()) {
- SlaSetting s;
- try {
- s = parseSlaSetting(settings.get(set));
- }
- catch (Exception e) {
- throw new ServletException(e);
- }
- if(s != null) {
- slaSettings.add(s);
- }
- }
-
- if(slaSettings.size() > 0) {
- if(slaEmails.equals("")) {
- ret.put("error", "Please put correct email settings for your SLA actions");
- return;
- }
- slaOptions.setSlaEmails(Arrays.asList(emailSplit));
- slaOptions.setSettings(slaSettings);
- }
- else {
- slaOptions = null;
- }
- sched.setSlaOptions(slaOptions);
- scheduleManager.insertSchedule(sched);
-
- if(slaOptions != null) {
- projectManager.postProjectEvent(project, EventType.SLA, user.getUserId(), "SLA for flow " + sched.getFlowName() + " has been added/changed.");
- }
-
- } catch (ServletException e) {
- ret.put("error", e.getMessage());
- } catch (ScheduleManagerException e) {
- ret.put("error", e.getMessage());
- }
-
- }
+// private void ajaxSetSla(HttpServletRequest req, HashMap<String, Object> ret, User user) {
+// try {
+//
+// int scheduleId = getIntParam(req, "scheduleId");
+//
+// Schedule sched = scheduleManager.getSchedule(scheduleId);
+//
+// Project project = projectManager.getProject(sched.getProjectId());
+// if(!hasPermission(project, user, Permission.Type.SCHEDULE)) {
+// ret.put("error", "User " + user + " does not have permission to set SLA for this flow.");
+// return;
+// }
+//
+//
+// SlaOptions slaOptions= new SlaOptions();
+//
+// String slaEmails = getParam(req, "slaEmails");
+// String[] emailSplit = slaEmails.split("\\s*,\\s*|\\s*;\\s*|\\s+");
+//
+// Map<String, String> settings = getParamGroup(req, "settings");
+// List<SlaSetting> slaSettings = new ArrayList<SlaSetting>();
+// for(String set : settings.keySet()) {
+// SlaSetting s;
+// try {
+// s = parseSlaSetting(settings.get(set));
+// }
+// catch (Exception e) {
+// throw new ServletException(e);
+// }
+// if(s != null) {
+// slaSettings.add(s);
+// }
+// }
+//
+// if(slaSettings.size() > 0) {
+// if(slaEmails.equals("")) {
+// ret.put("error", "Please put correct email settings for your SLA actions");
+// return;
+// }
+// slaOptions.setSlaEmails(Arrays.asList(emailSplit));
+// slaOptions.setSettings(slaSettings);
+// }
+// else {
+// slaOptions = null;
+// }
+// sched.setSlaOptions(slaOptions);
+// scheduleManager.insertSchedule(sched);
+//
+// if(slaOptions != null) {
+// projectManager.postProjectEvent(project, EventType.SLA, user.getUserId(), "SLA for flow " + sched.getFlowName() + " has been added/changed.");
+// }
+//
+// } catch (ServletException e) {
+// ret.put("error", e.getMessage());
+// } catch (ScheduleManagerException e) {
+// ret.put("error", e.getMessage());
+// }
+//
+// }
- private SlaSetting parseSlaSetting(String set) throws ScheduleManagerException {
- // "" + Duration + EmailAction + KillAction
- String[] parts = set.split(",", -1);
- String id = parts[0];
- String rule = parts[1];
- String duration = parts[2];
- String emailAction = parts[3];
- String killAction = parts[4];
- if(emailAction.equals("true") || killAction.equals("true")) {
- SlaSetting r = new SlaSetting();
- r.setId(id);
- r.setRule(SlaRule.valueOf(rule));
- ReadablePeriod dur;
- try {
- dur = parseDuration(duration);
- }
- catch (Exception e) {
- throw new ScheduleManagerException("Unable to parse duration for a SLA that needs to take actions!", e);
- }
- r.setDuration(dur);
- List<SlaAction> actions = new ArrayList<SLA.SlaAction>();
- if(emailAction.equals("true")) {
- actions.add(SlaAction.EMAIL);
- }
- if(killAction.equals("true")) {
- actions.add(SlaAction.KILL);
- }
- r.setActions(actions);
- return r;
- }
- return null;
- }
+// private SlaSetting parseSlaSetting(String set) throws ScheduleManagerException {
+// // "" + Duration + EmailAction + KillAction
+// String[] parts = set.split(",", -1);
+// String id = parts[0];
+// String rule = parts[1];
+// String duration = parts[2];
+// String emailAction = parts[3];
+// String killAction = parts[4];
+// if(emailAction.equals("true") || killAction.equals("true")) {
+// SlaSetting r = new SlaSetting();
+// r.setId(id);
+// r.setRule(SlaRule.valueOf(rule));
+// ReadablePeriod dur;
+// try {
+// dur = parseDuration(duration);
+// }
+// catch (Exception e) {
+// throw new ScheduleManagerException("Unable to parse duration for a SLA that needs to take actions!", e);
+// }
+// r.setDuration(dur);
+// List<SlaAction> actions = new ArrayList<SLA.SlaAction>();
+// if(emailAction.equals("true")) {
+// actions.add(SlaAction.EMAIL);
+// }
+// if(killAction.equals("true")) {
+// actions.add(SlaAction.KILL);
+// }
+// r.setActions(actions);
+// return r;
+// }
+// return null;
+// }
private ReadablePeriod parseDuration(String duration) {
int hour = Integer.parseInt(duration.split(":")[0]);
@@ -221,77 +216,77 @@ public class ScheduleServlet extends LoginAbstractAzkabanServlet {
return Minutes.minutes(min+hour*60).toPeriod();
}
- private void ajaxSlaInfo(HttpServletRequest req, HashMap<String, Object> ret, User user) {
- int scheduleId;
- try {
- scheduleId = getIntParam(req, "scheduleId");
-
- Schedule sched = scheduleManager.getSchedule(scheduleId);
-
- Project project = getProjectAjaxByPermission(ret, sched.getProjectId(), user, Type.READ);
- if (project == null) {
- ret.put("error", "Error loading project. Project " + sched.getProjectId() + " doesn't exist");
- return;
- }
-
- Flow flow = project.getFlow(sched.getFlowName());
- if (flow == null) {
- ret.put("error", "Error loading flow. Flow " + sched.getFlowName() + " doesn't exist in " + sched.getProjectId());
- return;
- }
-
- SlaOptions slaOptions = sched.getSlaOptions();
- ExecutionOptions flowOptions = sched.getExecutionOptions();
-
- if(slaOptions != null) {
- ret.put("slaEmails", slaOptions.getSlaEmails());
- List<SlaSetting> settings = slaOptions.getSettings();
- List<Object> setObj = new ArrayList<Object>();
- for(SlaSetting set: settings) {
- setObj.add(set.toObject());
- }
- ret.put("settings", setObj);
- }
- else if (flowOptions != null) {
- if(flowOptions.getFailureEmails() != null) {
- List<String> emails = flowOptions.getFailureEmails();
- if(emails.size() > 0) {
- ret.put("slaEmails", emails);
- }
- }
- }
- else {
- if(flow.getFailureEmails() != null) {
- List<String> emails = flow.getFailureEmails();
- if(emails.size() > 0) {
- ret.put("slaEmails", emails);
- }
- }
- }
-
- List<String> disabledJobs;
- if(flowOptions != null) {
- disabledJobs = flowOptions.getDisabledJobs() == null ? new ArrayList<String>() : flowOptions.getDisabledJobs();
- }
- else {
- disabledJobs = new ArrayList<String>();
- }
-
- List<String> allJobs = new ArrayList<String>();
- for(Node n : flow.getNodes()) {
- if(!disabledJobs.contains(n.getId())) {
- allJobs.add(n.getId());
- }
- }
- ret.put("allJobNames", allJobs);
- } catch (ServletException e) {
- ret.put("error", e);
- } catch (ScheduleManagerException e) {
- // TODO Auto-generated catch block
- ret.put("error", e);
- }
-
- }
+// private void ajaxSlaInfo(HttpServletRequest req, HashMap<String, Object> ret, User user) {
+// int scheduleId;
+// try {
+// scheduleId = getIntParam(req, "scheduleId");
+//
+// Schedule sched = scheduleManager.getSchedule(scheduleId);
+//
+// Project project = getProjectAjaxByPermission(ret, sched.getProjectId(), user, Type.READ);
+// if (project == null) {
+// ret.put("error", "Error loading project. Project " + sched.getProjectId() + " doesn't exist");
+// return;
+// }
+//
+// Flow flow = project.getFlow(sched.getFlowName());
+// if (flow == null) {
+// ret.put("error", "Error loading flow. Flow " + sched.getFlowName() + " doesn't exist in " + sched.getProjectId());
+// return;
+// }
+//
+// SlaOptions slaOptions = sched.getSlaOptions();
+// ExecutionOptions flowOptions = sched.getExecutionOptions();
+//
+// if(slaOptions != null) {
+// ret.put("slaEmails", slaOptions.getSlaEmails());
+// List<SlaSetting> settings = slaOptions.getSettings();
+// List<Object> setObj = new ArrayList<Object>();
+// for(SlaSetting set: settings) {
+// setObj.add(set.toObject());
+// }
+// ret.put("settings", setObj);
+// }
+// else if (flowOptions != null) {
+// if(flowOptions.getFailureEmails() != null) {
+// List<String> emails = flowOptions.getFailureEmails();
+// if(emails.size() > 0) {
+// ret.put("slaEmails", emails);
+// }
+// }
+// }
+// else {
+// if(flow.getFailureEmails() != null) {
+// List<String> emails = flow.getFailureEmails();
+// if(emails.size() > 0) {
+// ret.put("slaEmails", emails);
+// }
+// }
+// }
+//
+// List<String> disabledJobs;
+// if(flowOptions != null) {
+// disabledJobs = flowOptions.getDisabledJobs() == null ? new ArrayList<String>() : flowOptions.getDisabledJobs();
+// }
+// else {
+// disabledJobs = new ArrayList<String>();
+// }
+//
+// List<String> allJobs = new ArrayList<String>();
+// for(Node n : flow.getNodes()) {
+// if(!disabledJobs.contains(n.getId())) {
+// allJobs.add(n.getId());
+// }
+// }
+// ret.put("allJobNames", allJobs);
+// } catch (ServletException e) {
+// ret.put("error", e);
+// } catch (ScheduleManagerException e) {
+// // TODO Auto-generated catch block
+// ret.put("error", e);
+// }
+//
+// }
protected Project getProjectAjaxByPermission(Map<String, Object> ret, int projectId, User user, Permission.Type type) {
Project project = projectManager.getProject(projectId);
@@ -618,9 +613,8 @@ public class ScheduleServlet extends LoginAbstractAzkabanServlet {
catch (Exception e) {
ret.put("error", e.getMessage());
}
- SlaOptions slaOptions = null;
- Schedule schedule = scheduleManager.scheduleFlow(-1, projectId, projectName, flowName, "ready", firstSchedTime.getMillis(), firstSchedTime.getZone(), thePeriod, DateTime.now().getMillis(), firstSchedTime.getMillis(), firstSchedTime.getMillis(), user.getUserId(), flowOptions, slaOptions);
+ Schedule schedule = scheduleManager.scheduleFlow(-1, projectId, projectName, flowName, "ready", firstSchedTime.getMillis(), firstSchedTime.getZone(), thePeriod, DateTime.now().getMillis(), firstSchedTime.getMillis(), firstSchedTime.getMillis(), user.getUserId(), flowOptions);
logger.info("User '" + user.getUserId() + "' has scheduled " + "[" + projectName + flowName + " (" + projectId +")" + "].");
projectManager.postProjectEvent(project, EventType.SCHEDULE, user.getUserId(), "Schedule " + schedule.toString() + " has been added.");
diff --git a/unit/java/azkaban/scheduler/JdbcScheduleLoaderTest.java b/unit/java/azkaban/scheduler/JdbcScheduleLoaderTest.java
index 5fc7235..6365fd0 100644
--- a/unit/java/azkaban/scheduler/JdbcScheduleLoaderTest.java
+++ b/unit/java/azkaban/scheduler/JdbcScheduleLoaderTest.java
@@ -18,10 +18,6 @@ import org.junit.BeforeClass;
import org.junit.Test;
import azkaban.executor.ExecutionOptions;
-import azkaban.sla.SLA.SlaAction;
-import azkaban.sla.SLA.SlaRule;
-import azkaban.sla.SLA.SlaSetting;
-import azkaban.sla.SlaOptions;
import azkaban.database.DataSourceUtils;
import azkaban.utils.Props;
@@ -111,28 +107,18 @@ public class JdbcScheduleLoaderTest {
List<String> disabledJobs = new ArrayList<String>();
disabledJobs.add("job1");
disabledJobs.add("job2");
- List<SlaSetting> slaSets = new ArrayList<SlaSetting>();
- SlaSetting set1 = new SlaSetting();
- List<SlaAction> actions = new ArrayList<SlaAction>();
- actions.add(SlaAction.EMAIL);
- set1.setActions(actions);
- set1.setId("");
- set1.setDuration(Schedule.parsePeriodString("1h"));
- set1.setRule(SlaRule.FINISH);
- slaSets.add(set1);
+
ExecutionOptions flowOptions = new ExecutionOptions();
flowOptions.setFailureEmails(emails);
flowOptions.setDisabledJobs(disabledJobs);
- SlaOptions slaOptions = new SlaOptions();
- slaOptions.setSlaEmails(emails);
- slaOptions.setSettings(slaSets);
+
- Schedule s1 = new Schedule(-1, 1, "proj1", "flow1", "ready", 11111, "America/Los_Angeles", "1d", 22222, 33333, 44444, "cyu", flowOptions, slaOptions);
- Schedule s2 = new Schedule(-1, 1, "proj1", "flow2", "ready", 11111, "America/Los_Angeles", "1d", 22222, 33333, 44444, "ccc", flowOptions, slaOptions);
- Schedule s3 = new Schedule(-1, 2, "proj1", "flow1", "ready", 11111, "America/Los_Angeles", "1d", 22222, 33333, 44444, "cyu", flowOptions, slaOptions);
- Schedule s4 = new Schedule(-1, 3, "proj2", "flow1", "ready", 11111, "America/Los_Angeles", "1d", 22222, 33333, 44444, "cyu", flowOptions, slaOptions);
- Schedule s5 = new Schedule(-1, 3, "proj2", "flow2", "ready", 11111, "America/Los_Angeles", "1d", 22222, 33333, 44444, "cyu", flowOptions, slaOptions);
- Schedule s6 = new Schedule(-1, 3, "proj2", "flow3", "error", 11111, "America/Los_Angeles", "1d", 22222, 33333, 44444, "cyu", flowOptions, slaOptions);
+ Schedule s1 = new Schedule(-1, 1, "proj1", "flow1", "ready", 11111, "America/Los_Angeles", "1d", 22222, 33333, 44444, "azkaban", flowOptions);
+ Schedule s2 = new Schedule(-1, 1, "proj1", "flow2", "ready", 11111, "America/Los_Angeles", "1d", 22222, 33333, 44444, "azkaban", flowOptions);
+ Schedule s3 = new Schedule(-1, 2, "proj1", "flow1", "ready", 11111, "America/Los_Angeles", "1d", 22222, 33333, 44444, "azkaban", flowOptions);
+ Schedule s4 = new Schedule(-1, 3, "proj2", "flow1", "ready", 11111, "America/Los_Angeles", "1d", 22222, 33333, 44444, "azkaban", flowOptions);
+ Schedule s5 = new Schedule(-1, 3, "proj2", "flow2", "ready", 11111, "America/Los_Angeles", "1d", 22222, 33333, 44444, "azkaban", flowOptions);
+ Schedule s6 = new Schedule(-1, 3, "proj2", "flow3", "error", 11111, "America/Los_Angeles", "1d", 22222, 33333, 44444, "azkaban", flowOptions);
loader.insertSchedule(s1);
loader.insertSchedule(s2);
@@ -149,11 +135,6 @@ public class JdbcScheduleLoaderTest {
Assert.assertEquals(44444, sched.getSubmitTime());
Assert.assertEquals("1d", Schedule.createPeriodString(sched.getPeriod()));
ExecutionOptions fOpt = sched.getExecutionOptions();
- SlaOptions sOpt = sched.getSlaOptions();
- Assert.assertEquals(SlaAction.EMAIL, sOpt.getSettings().get(0).getActions().get(0));
- Assert.assertEquals("", sOpt.getSettings().get(0).getId());
- Assert.assertEquals(Schedule.parsePeriodString("1h"), sOpt.getSettings().get(0).getDuration());
- Assert.assertEquals(SlaRule.FINISH, sOpt.getSettings().get(0).getRule());
Assert.assertEquals(2, fOpt.getFailureEmails().size());
Assert.assertEquals(null, fOpt.getSuccessEmails());
Assert.assertEquals(2, fOpt.getDisabledJobs().size());
@@ -176,32 +157,19 @@ public class JdbcScheduleLoaderTest {
List<String> disabledJobs = new ArrayList<String>();
disabledJobs.add("job1");
disabledJobs.add("job2");
- List<SlaSetting> slaSets = new ArrayList<SlaSetting>();
- SlaSetting set1 = new SlaSetting();
- List<SlaAction> actions = new ArrayList<SlaAction>();
- actions.add(SlaAction.EMAIL);
- set1.setActions(actions);
- set1.setId("");
- set1.setDuration(Schedule.parsePeriodString("1h"));
- set1.setRule(SlaRule.FINISH);
- slaSets.add(set1);
+
ExecutionOptions flowOptions = new ExecutionOptions();
flowOptions.setFailureEmails(emails);
flowOptions.setDisabledJobs(disabledJobs);
- SlaOptions slaOptions = new SlaOptions();
- slaOptions.setSlaEmails(emails);
- slaOptions.setSettings(slaSets);
System.out.println("the flow options are " + flowOptions);
- System.out.println("the sla options are " + slaOptions);
- Schedule s1 = new Schedule(-1, 1, "proj1", "flow1", "ready", 11111, "America/Los_Angeles", "1d", 22222, 33333, 44444, "cyu", flowOptions, slaOptions);
+ Schedule s1 = new Schedule(-1, 1, "proj1", "flow1", "ready", 11111, "America/Los_Angeles", "1d", 22222, 33333, 44444, "azkaban", flowOptions);
loader.insertSchedule(s1);
emails.add("email3");
- slaOptions.setSlaEmails(emails);
- Schedule s2 = new Schedule(-1, 1, "proj1", "flow1", "ready", 11112, "America/Los_Angeles", "2M", 22223, 33334, 44445, "cyu", flowOptions, slaOptions);
+ Schedule s2 = new Schedule(-1, 1, "proj1", "flow1", "ready", 11112, "America/Los_Angeles", "2M", 22223, 33334, 44445, "azkaban", flowOptions);
loader.updateSchedule(s2);
@@ -212,7 +180,6 @@ public class JdbcScheduleLoaderTest {
Assert.assertEquals(44445, schedules.get(0).getSubmitTime());
Assert.assertEquals("2M", Schedule.createPeriodString(schedules.get(0).getPeriod()));
// System.out.println("the options are " + schedules.get(0).getSchedOptions());
- Assert.assertEquals(3, schedules.get(0).getSlaOptions().getSlaEmails().size());
}
@Test
@@ -237,23 +204,11 @@ public class JdbcScheduleLoaderTest {
List<String> disabledJobs = new ArrayList<String>();
disabledJobs.add("job1");
disabledJobs.add("job2");
- List<SlaSetting> slaSets = new ArrayList<SlaSetting>();
- SlaSetting set1 = new SlaSetting();
- List<SlaAction> actions = new ArrayList<SlaAction>();
- actions.add(SlaAction.EMAIL);
- set1.setActions(actions);
- set1.setId("");
- set1.setDuration(Schedule.parsePeriodString("1h"));
- set1.setRule(SlaRule.FINISH);
- slaSets.add(set1);
ExecutionOptions flowOptions = new ExecutionOptions();
flowOptions.setFailureEmails(emails);
flowOptions.setDisabledJobs(disabledJobs);
- SlaOptions slaOptions = new SlaOptions();
- slaOptions.setSlaEmails(emails);
- slaOptions.setSettings(slaSets);
- Schedule s = new Schedule(-1, i+1, "proj"+(i+1), "flow1", "ready", 11111, "America/Los_Angeles", "1d", 22222, 33333, 44444, "cyu", flowOptions, slaOptions);
+ Schedule s = new Schedule(-1, i+1, "proj"+(i+1), "flow1", "ready", 11111, "America/Los_Angeles", "1d", 22222, 33333, 44444, "azkaban", flowOptions);
schedules.add(s);
try {
loader.insertSchedule(s);
diff --git a/unit/java/azkaban/test/trigger/BasicTimeCheckerTest.java b/unit/java/azkaban/test/trigger/BasicTimeCheckerTest.java
index a103f5d..d65ce7b 100644
--- a/unit/java/azkaban/test/trigger/BasicTimeCheckerTest.java
+++ b/unit/java/azkaban/test/trigger/BasicTimeCheckerTest.java
@@ -10,9 +10,9 @@ import org.joda.time.DateTime;
import org.joda.time.ReadablePeriod;
import org.junit.Test;
-import azkaban.scheduler.BasicTimeChecker;
import azkaban.trigger.Condition;
import azkaban.trigger.ConditionChecker;
+import azkaban.trigger.builtin.BasicTimeChecker;
import azkaban.utils.Utils;
public class BasicTimeCheckerTest {
diff --git a/unit/java/azkaban/test/trigger/ConditionTest.java b/unit/java/azkaban/test/trigger/ConditionTest.java
index 34ce3bf..09510cf 100644
--- a/unit/java/azkaban/test/trigger/ConditionTest.java
+++ b/unit/java/azkaban/test/trigger/ConditionTest.java
@@ -11,11 +11,11 @@ import static org.junit.Assert.assertFalse;
import org.joda.time.DateTime;
import org.junit.Test;
-import azkaban.scheduler.BasicTimeChecker;
import azkaban.trigger.CheckerTypeLoader;
import azkaban.trigger.Condition;
import azkaban.trigger.ConditionChecker;
import azkaban.trigger.TriggerException;
+import azkaban.trigger.builtin.BasicTimeChecker;
import azkaban.utils.JSONUtils;
import azkaban.utils.Props;
import azkaban.utils.Utils;
diff --git a/unit/java/azkaban/test/trigger/ExecuteFlowActionTest.java b/unit/java/azkaban/test/trigger/ExecuteFlowActionTest.java
index a996885..962884c 100644
--- a/unit/java/azkaban/test/trigger/ExecuteFlowActionTest.java
+++ b/unit/java/azkaban/test/trigger/ExecuteFlowActionTest.java
@@ -8,11 +8,11 @@ import java.util.List;
import org.junit.Test;
-import azkaban.actions.ExecuteFlowAction;
import azkaban.executor.ExecutionOptions;
import azkaban.trigger.ActionTypeLoader;
import azkaban.trigger.TriggerAction;
import azkaban.trigger.TriggerException;
+import azkaban.trigger.builtin.ExecuteFlowAction;
import azkaban.utils.Props;
diff --git a/unit/java/azkaban/test/trigger/JdbcTriggerLoaderTest.java b/unit/java/azkaban/test/trigger/JdbcTriggerLoaderTest.java
index 31a0788..98bd5ae 100644
--- a/unit/java/azkaban/test/trigger/JdbcTriggerLoaderTest.java
+++ b/unit/java/azkaban/test/trigger/JdbcTriggerLoaderTest.java
@@ -20,11 +20,9 @@ import org.junit.After;
import org.junit.Before;
import org.junit.Test;
-import azkaban.actions.ExecuteFlowAction;
import azkaban.database.DataSourceUtils;
import azkaban.executor.ExecutableFlow;
import azkaban.executor.ExecutionOptions;
-import azkaban.scheduler.BasicTimeChecker;
import azkaban.trigger.ActionTypeLoader;
import azkaban.trigger.CheckerTypeLoader;
import azkaban.trigger.Condition;
@@ -35,6 +33,8 @@ import azkaban.trigger.TriggerAction;
import azkaban.trigger.TriggerException;
import azkaban.trigger.TriggerLoader;
import azkaban.trigger.TriggerManagerException;
+import azkaban.trigger.builtin.BasicTimeChecker;
+import azkaban.trigger.builtin.ExecuteFlowAction;
import azkaban.utils.Props;
import azkaban.utils.Utils;
diff --git a/unit/java/azkaban/test/trigger/TriggerManagerTest.java b/unit/java/azkaban/test/trigger/TriggerManagerTest.java
index 98ec8fe..508436b 100644
--- a/unit/java/azkaban/test/trigger/TriggerManagerTest.java
+++ b/unit/java/azkaban/test/trigger/TriggerManagerTest.java
@@ -53,17 +53,17 @@ public class TriggerManagerTest {
ThresholdChecker.setVal(1);
- triggerManager.insertTrigger(createDummyTrigger("test1", "triggerLoader", 10));
+ triggerManager.insertTrigger(createDummyTrigger("test1", "triggerLoader", 10), "testUser");
List<Trigger> triggers = triggerManager.getTriggers();
assertTrue(triggers.size() == 1);
Trigger t1 = triggers.get(0);
t1.setResetOnTrigger(false);
- triggerManager.updateTrigger(t1);
+ triggerManager.updateTrigger(t1, "testUser");
ThresholdChecker checker1 = (ThresholdChecker) t1.getTriggerCondition().getCheckers().values().toArray()[0];
assertTrue(t1.getSource().equals("triggerLoader"));
Trigger t2 = createDummyTrigger("test2: add new trigger", "addNewTriggerTest", 20);
- triggerManager.insertTrigger(t2);
+ triggerManager.insertTrigger(t2, "testUser");
ThresholdChecker checker2 = (ThresholdChecker) t2.getTriggerCondition().getCheckers().values().toArray()[0];
ThresholdChecker.setVal(15);
@@ -134,6 +134,20 @@ public class TriggerManagerTest {
public List<Trigger> loadTriggers() {
return new ArrayList<Trigger>(triggers.values());
}
+
+ @Override
+ public Trigger loadTrigger(int triggerId)
+ throws TriggerManagerException {
+ // TODO Auto-generated method stub
+ return null;
+ }
+
+ @Override
+ public List<Trigger> getUpdatedTriggers(long lastUpdateTime)
+ throws TriggerManagerException {
+ // TODO Auto-generated method stub
+ return null;
+ }
}
diff --git a/unit/java/azkaban/test/trigger/TriggerTest.java b/unit/java/azkaban/test/trigger/TriggerTest.java
index db8e324..8629edd 100644
--- a/unit/java/azkaban/test/trigger/TriggerTest.java
+++ b/unit/java/azkaban/test/trigger/TriggerTest.java
@@ -13,9 +13,7 @@ import org.junit.Test;
import static org.junit.Assert.*;
-import azkaban.actions.ExecuteFlowAction;
import azkaban.executor.ExecutionOptions;
-import azkaban.scheduler.BasicTimeChecker;
import azkaban.trigger.ActionTypeLoader;
import azkaban.trigger.CheckerTypeLoader;
import azkaban.trigger.Condition;
@@ -23,6 +21,8 @@ import azkaban.trigger.ConditionChecker;
import azkaban.trigger.Trigger;
import azkaban.trigger.TriggerAction;
import azkaban.trigger.TriggerException;
+import azkaban.trigger.builtin.BasicTimeChecker;
+import azkaban.trigger.builtin.ExecuteFlowAction;
import azkaban.utils.JSONUtils;
import azkaban.utils.Props;
import azkaban.utils.Utils;