azkaban-uncached
Changes
src/java/azkaban/trigger/builtin/SlaChecker.java 222(+155 -67)
src/java/azkaban/trigger/TriggerManager.java 59(+55 -4)
src/web/js/azkaban.exflow.view.js 2(+2 -0)
Details
diff --git a/scheduleTriggerMigration/file2Trigger/file2Trigger b/scheduleTriggerMigration/file2Trigger/file2Trigger
new file mode 100644
index 0000000..7645b61
--- /dev/null
+++ b/scheduleTriggerMigration/file2Trigger/file2Trigger
@@ -0,0 +1,5 @@
+CONF_FILE=
+
+java -cp "lib/*:extlib/*" azkaban.file2Trigger.File2ScheduleTrigger $CONF_FILE ../schedule2File/schedules
+
+
diff --git a/scheduleTriggerMigration/file2Trigger/lib/azkaban-2.2.jar b/scheduleTriggerMigration/file2Trigger/lib/azkaban-2.2.jar
new file mode 100644
index 0000000..97a835e
Binary files /dev/null and b/scheduleTriggerMigration/file2Trigger/lib/azkaban-2.2.jar differ
diff --git a/scheduleTriggerMigration/file2Trigger/lib/commons-dbcp-1.4.jar b/scheduleTriggerMigration/file2Trigger/lib/commons-dbcp-1.4.jar
new file mode 100644
index 0000000..c4c1c4f
Binary files /dev/null and b/scheduleTriggerMigration/file2Trigger/lib/commons-dbcp-1.4.jar differ
diff --git a/scheduleTriggerMigration/file2Trigger/lib/commons-dbutils-1.5.jar b/scheduleTriggerMigration/file2Trigger/lib/commons-dbutils-1.5.jar
new file mode 100644
index 0000000..b0c0e12
Binary files /dev/null and b/scheduleTriggerMigration/file2Trigger/lib/commons-dbutils-1.5.jar differ
diff --git a/scheduleTriggerMigration/file2Trigger/lib/commons-jexl-2.1.1.jar b/scheduleTriggerMigration/file2Trigger/lib/commons-jexl-2.1.1.jar
new file mode 100644
index 0000000..ab288a8
Binary files /dev/null and b/scheduleTriggerMigration/file2Trigger/lib/commons-jexl-2.1.1.jar differ
diff --git a/scheduleTriggerMigration/file2Trigger/lib/commons-logging-1.1.1.jar b/scheduleTriggerMigration/file2Trigger/lib/commons-logging-1.1.1.jar
new file mode 100644
index 0000000..1deef14
Binary files /dev/null and b/scheduleTriggerMigration/file2Trigger/lib/commons-logging-1.1.1.jar differ
diff --git a/scheduleTriggerMigration/file2Trigger/lib/commons-pool-1.6.jar b/scheduleTriggerMigration/file2Trigger/lib/commons-pool-1.6.jar
new file mode 100644
index 0000000..72ca75a
Binary files /dev/null and b/scheduleTriggerMigration/file2Trigger/lib/commons-pool-1.6.jar differ
diff --git a/scheduleTriggerMigration/file2Trigger/lib/file2trigger.jar b/scheduleTriggerMigration/file2Trigger/lib/file2trigger.jar
new file mode 100644
index 0000000..3d27e94
Binary files /dev/null and b/scheduleTriggerMigration/file2Trigger/lib/file2trigger.jar differ
diff --git a/scheduleTriggerMigration/file2Trigger/lib/jackson-core-asl-1.9.5.jar b/scheduleTriggerMigration/file2Trigger/lib/jackson-core-asl-1.9.5.jar
new file mode 100644
index 0000000..6862bdd
Binary files /dev/null and b/scheduleTriggerMigration/file2Trigger/lib/jackson-core-asl-1.9.5.jar differ
diff --git a/scheduleTriggerMigration/file2Trigger/lib/jackson-mapper-asl-1.9.5.jar b/scheduleTriggerMigration/file2Trigger/lib/jackson-mapper-asl-1.9.5.jar
new file mode 100644
index 0000000..147ab38
Binary files /dev/null and b/scheduleTriggerMigration/file2Trigger/lib/jackson-mapper-asl-1.9.5.jar differ
diff --git a/scheduleTriggerMigration/file2Trigger/lib/joda-time-2.0.jar b/scheduleTriggerMigration/file2Trigger/lib/joda-time-2.0.jar
new file mode 100644
index 0000000..169a7a4
Binary files /dev/null and b/scheduleTriggerMigration/file2Trigger/lib/joda-time-2.0.jar differ
diff --git a/scheduleTriggerMigration/file2Trigger/lib/log4j-1.2.16.jar b/scheduleTriggerMigration/file2Trigger/lib/log4j-1.2.16.jar
new file mode 100644
index 0000000..3f9d847
Binary files /dev/null and b/scheduleTriggerMigration/file2Trigger/lib/log4j-1.2.16.jar differ
diff --git a/scheduleTriggerMigration/schedule2File/lib/azkaban-2.1.jar b/scheduleTriggerMigration/schedule2File/lib/azkaban-2.1.jar
new file mode 100644
index 0000000..63053f6
Binary files /dev/null and b/scheduleTriggerMigration/schedule2File/lib/azkaban-2.1.jar differ
diff --git a/scheduleTriggerMigration/schedule2File/lib/commons-dbcp-1.4.jar b/scheduleTriggerMigration/schedule2File/lib/commons-dbcp-1.4.jar
new file mode 100644
index 0000000..c4c1c4f
Binary files /dev/null and b/scheduleTriggerMigration/schedule2File/lib/commons-dbcp-1.4.jar differ
diff --git a/scheduleTriggerMigration/schedule2File/lib/commons-dbutils-1.5.jar b/scheduleTriggerMigration/schedule2File/lib/commons-dbutils-1.5.jar
new file mode 100644
index 0000000..b0c0e12
Binary files /dev/null and b/scheduleTriggerMigration/schedule2File/lib/commons-dbutils-1.5.jar differ
diff --git a/scheduleTriggerMigration/schedule2File/lib/commons-io-2.4.jar b/scheduleTriggerMigration/schedule2File/lib/commons-io-2.4.jar
new file mode 100644
index 0000000..90035a4
Binary files /dev/null and b/scheduleTriggerMigration/schedule2File/lib/commons-io-2.4.jar differ
diff --git a/scheduleTriggerMigration/schedule2File/lib/commons-pool-1.6.jar b/scheduleTriggerMigration/schedule2File/lib/commons-pool-1.6.jar
new file mode 100644
index 0000000..72ca75a
Binary files /dev/null and b/scheduleTriggerMigration/schedule2File/lib/commons-pool-1.6.jar differ
diff --git a/scheduleTriggerMigration/schedule2File/lib/jackson-core-asl-1.9.5.jar b/scheduleTriggerMigration/schedule2File/lib/jackson-core-asl-1.9.5.jar
new file mode 100644
index 0000000..6862bdd
Binary files /dev/null and b/scheduleTriggerMigration/schedule2File/lib/jackson-core-asl-1.9.5.jar differ
diff --git a/scheduleTriggerMigration/schedule2File/lib/jackson-mapper-asl-1.9.5.jar b/scheduleTriggerMigration/schedule2File/lib/jackson-mapper-asl-1.9.5.jar
new file mode 100644
index 0000000..147ab38
Binary files /dev/null and b/scheduleTriggerMigration/schedule2File/lib/jackson-mapper-asl-1.9.5.jar differ
diff --git a/scheduleTriggerMigration/schedule2File/lib/joda-time-2.0.jar b/scheduleTriggerMigration/schedule2File/lib/joda-time-2.0.jar
new file mode 100644
index 0000000..169a7a4
Binary files /dev/null and b/scheduleTriggerMigration/schedule2File/lib/joda-time-2.0.jar differ
diff --git a/scheduleTriggerMigration/schedule2File/lib/log4j-1.2.16.jar b/scheduleTriggerMigration/schedule2File/lib/log4j-1.2.16.jar
new file mode 100644
index 0000000..3f9d847
Binary files /dev/null and b/scheduleTriggerMigration/schedule2File/lib/log4j-1.2.16.jar differ
diff --git a/scheduleTriggerMigration/schedule2File/lib/schedule2file.jar b/scheduleTriggerMigration/schedule2File/lib/schedule2file.jar
new file mode 100644
index 0000000..d457612
Binary files /dev/null and b/scheduleTriggerMigration/schedule2File/lib/schedule2file.jar differ
diff --git a/scheduleTriggerMigration/schedule2File/schedule2file b/scheduleTriggerMigration/schedule2File/schedule2file
new file mode 100644
index 0000000..191b4f0
--- /dev/null
+++ b/scheduleTriggerMigration/schedule2File/schedule2file
@@ -0,0 +1,7 @@
+
+CONF_FILE=
+
+java -cp "lib/*:extlib/*" azkaban.schedule2File.Schedule2File $CONF_FILE schedules
+
+
+
diff --git a/src/java/azkaban/executor/ExecutorManager.java b/src/java/azkaban/executor/ExecutorManager.java
index fd8a16c..bf87720 100644
--- a/src/java/azkaban/executor/ExecutorManager.java
+++ b/src/java/azkaban/executor/ExecutorManager.java
@@ -41,6 +41,9 @@ import org.apache.log4j.Logger;
import org.joda.time.DateTime;
import azkaban.alert.Alerter;
+import azkaban.execapp.event.Event;
+import azkaban.execapp.event.Event.Type;
+import azkaban.execapp.event.EventHandler;
import azkaban.project.Project;
import azkaban.scheduler.ScheduleStatisticManager;
import azkaban.utils.FileIOUtils.JobMetaData;
@@ -53,7 +56,7 @@ import azkaban.utils.Props;
* Executor manager used to manage the client side job.
*
*/
-public class ExecutorManager implements ExecutorManagerAdapter {
+public class ExecutorManager extends EventHandler implements ExecutorManagerAdapter {
private static Logger logger = Logger.getLogger(ExecutorManager.class);
private ExecutorLoader executorLoader;
private String executorHost;
@@ -723,6 +726,7 @@ public class ExecutorManager implements ExecutorManagerAdapter {
if(flow.getScheduleId() >= 0 && flow.getStatus() == Status.SUCCEEDED){
ScheduleStatisticManager.invalidateCache(flow.getScheduleId(), cacheDir);
}
+ fireEventListeners(Event.create(flow, Type.FLOW_FINISHED));
recentlyFinished.put(flow.getExecutionId(), flow);
}
@@ -789,6 +793,7 @@ public class ExecutorManager implements ExecutorManagerAdapter {
updaterStage = "finalizing flow " + execId + " cleaning from memory";
runningFlows.remove(execId);
+ fireEventListeners(Event.create(dsFlow, Type.FLOW_FINISHED));
recentlyFinished.put(execId, dsFlow);
} catch (ExecutorManagerException e) {
diff --git a/src/java/azkaban/executor/ExecutorManagerAdapter.java b/src/java/azkaban/executor/ExecutorManagerAdapter.java
index 85b55df..cc81589 100644
--- a/src/java/azkaban/executor/ExecutorManagerAdapter.java
+++ b/src/java/azkaban/executor/ExecutorManagerAdapter.java
@@ -10,7 +10,7 @@ import azkaban.project.Project;
import azkaban.utils.FileIOUtils.JobMetaData;
import azkaban.utils.FileIOUtils.LogData;
-public interface ExecutorManagerAdapter {
+public interface ExecutorManagerAdapter{
public static final String LOCAL_MODE = "local";
public static final String REMOTE_MODE = "remote";
diff --git a/src/java/azkaban/trigger/builtin/ExecuteFlowAction.java b/src/java/azkaban/trigger/builtin/ExecuteFlowAction.java
index 1cbbd61..2f3a5f4 100644
--- a/src/java/azkaban/trigger/builtin/ExecuteFlowAction.java
+++ b/src/java/azkaban/trigger/builtin/ExecuteFlowAction.java
@@ -247,15 +247,15 @@ public class ExecuteFlowAction implements TriggerAction {
int execId = exflow.getExecutionId();
for(SlaOption sla : slaOptions) {
logger.info("Adding sla trigger " + sla.toString() + " to execution " + execId);
- SlaChecker slaChecker = new SlaChecker("slaChecker", sla, execId);
+ SlaChecker slaFailChecker = new SlaChecker("slaFailChecker", sla, execId);
Map<String, ConditionChecker> slaCheckers = new HashMap<String, ConditionChecker>();
- slaCheckers.put(slaChecker.getId(), slaChecker);
- Condition triggerCond = new Condition(slaCheckers, slaChecker.getId() + ".eval()");
- // if whole flow finish before violate sla, just abort
- ExecutionChecker execChecker = new ExecutionChecker("execChecker", execId, null, Status.SUCCEEDED);
+ slaCheckers.put(slaFailChecker.getId(), slaFailChecker);
+ Condition triggerCond = new Condition(slaCheckers, slaFailChecker.getId() + ".isSlaFailed()");
+ // if whole flow finish before violate sla, just expire
+ SlaChecker slaPassChecker = new SlaChecker("slaPassChecker", sla, execId);
Map<String, ConditionChecker> expireCheckers = new HashMap<String, ConditionChecker>();
- expireCheckers.put(execChecker.getId(), execChecker);
- Condition expireCond = new Condition(expireCheckers, execChecker.getId() + ".eval()");
+ expireCheckers.put(slaPassChecker.getId(), slaPassChecker);
+ Condition expireCond = new Condition(expireCheckers, slaPassChecker.getId() + ".isSlaPassed()");
List<TriggerAction> actions = new ArrayList<TriggerAction>();
List<String> slaActions = sla.getActions();
for(String act : slaActions) {
@@ -268,6 +268,7 @@ public class ExecuteFlowAction implements TriggerAction {
}
}
Trigger slaTrigger = new Trigger("azkaban_sla", "azkaban", triggerCond, expireCond, actions);
+ slaTrigger.getInfo().put("monitored.finished.execution", String.valueOf(execId));
slaTrigger.setResetOnTrigger(false);
slaTrigger.setResetOnExpire(false);
logger.info("Ready to put in the sla trigger");
src/java/azkaban/trigger/builtin/SlaChecker.java 222(+155 -67)
diff --git a/src/java/azkaban/trigger/builtin/SlaChecker.java b/src/java/azkaban/trigger/builtin/SlaChecker.java
index eccb5b6..f8897a5 100644
--- a/src/java/azkaban/trigger/builtin/SlaChecker.java
+++ b/src/java/azkaban/trigger/builtin/SlaChecker.java
@@ -54,99 +54,136 @@ public class SlaChecker implements ConditionChecker{
executorManager = em;
}
- private Boolean violateSla(ExecutableFlow flow) {
+ private Boolean isSlaMissed(ExecutableFlow flow) {
String type = slaOption.getType();
- logger.info("Checking for " + flow.getExecutionId() + " with sla " + type);
logger.info("flow is " + flow.getStatus());
if(flow.getStartTime() < 0) {
return Boolean.FALSE;
}
+ Status status;
if(type.equals(SlaOption.TYPE_FLOW_FINISH)) {
- ReadablePeriod duration = Utils.parsePeriodString((String) slaOption.getInfo().get(SlaOption.INFO_DURATION));
- DateTime startTime = new DateTime(flow.getStartTime());
- DateTime checkTime = startTime.plus(duration);
- this.checkTime = checkTime.getMillis();
- if(checkTime.isBeforeNow()) {
- Status status = flow.getStatus();
- if(status.equals(Status.FAILED) || status.equals(Status.KILLED) || status.equals(Status.SUCCEEDED)) {
- return Boolean.FALSE;
- } else {
- return Boolean.TRUE;
- }
+ if(checkTime < flow.getStartTime()) {
+ ReadablePeriod duration = Utils.parsePeriodString((String) slaOption.getInfo().get(SlaOption.INFO_DURATION));
+ DateTime startTime = new DateTime(flow.getStartTime());
+ DateTime nextCheckTime = startTime.plus(duration);
+ this.checkTime = nextCheckTime.getMillis();
+ }
+ status = flow.getStatus();
+ if(checkTime < DateTime.now().getMillis()) {
+ return !isFlowFinished(status);
}
} else if(type.equals(SlaOption.TYPE_FLOW_SUCCEED)) {
- ReadablePeriod duration = Utils.parsePeriodString((String) slaOption.getInfo().get(SlaOption.INFO_DURATION));
- DateTime startTime = new DateTime(flow.getStartTime());
- DateTime checkTime = startTime.plus(duration);
- this.checkTime = checkTime.getMillis();
- if(checkTime.isBeforeNow()) {
- Status status = flow.getStatus();
- if(status.equals(Status.SUCCEEDED)) {
- return Boolean.FALSE;
- } else {
- return Boolean.TRUE;
- }
+ if(checkTime < flow.getStartTime()) {
+ ReadablePeriod duration = Utils.parsePeriodString((String) slaOption.getInfo().get(SlaOption.INFO_DURATION));
+ DateTime startTime = new DateTime(flow.getStartTime());
+ DateTime nextCheckTime = startTime.plus(duration);
+ this.checkTime = nextCheckTime.getMillis();
+ }
+ status = flow.getStatus();
+ if(checkTime < DateTime.now().getMillis()) {
+ return !isFlowSucceeded(status);
+ } else {
+ return status.equals(Status.FAILED) || status.equals(Status.KILLED);
}
} else if(type.equals(SlaOption.TYPE_JOB_FINISH)) {
String jobName = (String) slaOption.getInfo().get(SlaOption.INFO_JOB_NAME);
ExecutableNode node = flow.getExecutableNode(jobName);
- if(node.getStartTime() > 0) {
+ if(node.getStartTime() < 0) {
+ return Boolean.FALSE;
+ }
+ if(checkTime < node.getStartTime()) {
ReadablePeriod duration = Utils.parsePeriodString((String) slaOption.getInfo().get(SlaOption.INFO_DURATION));
DateTime startTime = new DateTime(node.getStartTime());
- DateTime checkTime = startTime.plus(duration);
- this.checkTime = checkTime.getMillis();
- if(checkTime.isBeforeNow()) {
- Status status = node.getStatus();
- if(status.equals(Status.FAILED) || status.equals(Status.KILLED) || status.equals(Status.SUCCEEDED)) {
- return Boolean.FALSE;
- } else {
- return Boolean.TRUE;
- }
- }
+ DateTime nextCheckTime = startTime.plus(duration);
+ this.checkTime = nextCheckTime.getMillis();
+ }
+ status = node.getStatus();
+ if(checkTime < DateTime.now().getMillis()) {
+ return !isJobFinished(status);
}
} else if(type.equals(SlaOption.TYPE_JOB_SUCCEED)) {
String jobName = (String) slaOption.getInfo().get(SlaOption.INFO_JOB_NAME);
ExecutableNode node = flow.getExecutableNode(jobName);
- if(node.getStartTime() > 0) {
+ if(node.getStartTime() < 0) {
+ return Boolean.FALSE;
+ }
+ if(checkTime < node.getStartTime()) {
+ ReadablePeriod duration = Utils.parsePeriodString((String) slaOption.getInfo().get(SlaOption.INFO_DURATION));
+ DateTime startTime = new DateTime(node.getStartTime());
+ DateTime nextCheckTime = startTime.plus(duration);
+ this.checkTime = nextCheckTime.getMillis();
+ }
+ status = node.getStatus();
+ if(checkTime < DateTime.now().getMillis()) {
+ return !isJobFinished(status);
+ } else {
+ return status.equals(Status.FAILED) || status.equals(Status.KILLED);
+ }
+ }
+ return Boolean.FALSE;
+ }
+
+ private Boolean isSlaGood(ExecutableFlow flow) {
+ String type = slaOption.getType();
+ logger.info("flow is " + flow.getStatus());
+ if(flow.getStartTime() < 0) {
+ return Boolean.FALSE;
+ }
+ Status status;
+ if(type.equals(SlaOption.TYPE_FLOW_FINISH)) {
+ if(checkTime < flow.getStartTime()) {
+ ReadablePeriod duration = Utils.parsePeriodString((String) slaOption.getInfo().get(SlaOption.INFO_DURATION));
+ DateTime startTime = new DateTime(flow.getStartTime());
+ DateTime nextCheckTime = startTime.plus(duration);
+ this.checkTime = nextCheckTime.getMillis();
+ }
+ status = flow.getStatus();
+ return isFlowFinished(status);
+ } else if(type.equals(SlaOption.TYPE_FLOW_SUCCEED)) {
+ if(checkTime < flow.getStartTime()) {
+ ReadablePeriod duration = Utils.parsePeriodString((String) slaOption.getInfo().get(SlaOption.INFO_DURATION));
+ DateTime startTime = new DateTime(flow.getStartTime());
+ DateTime nextCheckTime = startTime.plus(duration);
+ this.checkTime = nextCheckTime.getMillis();
+ }
+ status = flow.getStatus();
+ return isFlowSucceeded(status);
+ } else if(type.equals(SlaOption.TYPE_JOB_FINISH)) {
+ String jobName = (String) slaOption.getInfo().get(SlaOption.INFO_JOB_NAME);
+ ExecutableNode node = flow.getExecutableNode(jobName);
+ if(node.getStartTime() < 0) {
+ return Boolean.FALSE;
+ }
+ if(checkTime < node.getStartTime()) {
+ ReadablePeriod duration = Utils.parsePeriodString((String) slaOption.getInfo().get(SlaOption.INFO_DURATION));
+ DateTime startTime = new DateTime(node.getStartTime());
+ DateTime nextCheckTime = startTime.plus(duration);
+ this.checkTime = nextCheckTime.getMillis();
+ }
+ status = node.getStatus();
+ return isJobFinished(status);
+ } else if(type.equals(SlaOption.TYPE_JOB_SUCCEED)) {
+ String jobName = (String) slaOption.getInfo().get(SlaOption.INFO_JOB_NAME);
+ ExecutableNode node = flow.getExecutableNode(jobName);
+ if(node.getStartTime() < 0) {
+ return Boolean.FALSE;
+ }
+ if(checkTime < node.getStartTime()) {
ReadablePeriod duration = Utils.parsePeriodString((String) slaOption.getInfo().get(SlaOption.INFO_DURATION));
DateTime startTime = new DateTime(node.getStartTime());
- DateTime checkTime = startTime.plus(duration);
- this.checkTime = checkTime.getMillis();
- if(checkTime.isBeforeNow()) {
- Status status = node.getStatus();
- if(status.equals(Status.SUCCEEDED)) {
- return Boolean.FALSE;
- } else {
- return Boolean.TRUE;
- }
- }
+ DateTime nextCheckTime = startTime.plus(duration);
+ this.checkTime = nextCheckTime.getMillis();
}
+ status = node.getStatus();
+ return isJobSucceeded(status);
}
-// else if(type.equals(SlaOption.TYPE_JOB_PROGRESS)) {
-// String jobName = (String) slaOption.getInfo().get(SlaOption.INFO_JOB_NAME);
-// float targetProgress = Float.valueOf((String) slaOption.getInfo().get(SlaOption.INFO_PROGRESS_PERCENT));
-// ExecutableNode node = flow.getExecutableNode(jobName);
-// if(node.getStartTime() > 0) {
-// ReadablePeriod duration = Utils.parsePeriodString((String) slaOption.getInfo().get(SlaOption.INFO_DURATION));
-// DateTime startTime = new DateTime(node.getStartTime());
-// DateTime checkTime = startTime.plus(duration);
-// if(checkTime.isBeforeNow()) {
-// if(node.getProgress() > targetProgress) {
-// return Boolean.FALSE;
-// } else {
-// return Boolean.TRUE;
-// }
-// }
-// } else {
-// return Boolean.FALSE;
-// }
-// }
return Boolean.FALSE;
}
// return true to trigger sla action
@Override
public Object eval() {
+ logger.info("Checking sla for execution " + execId);
ExecutableFlow flow;
try {
flow = executorManager.getExecutableFlow(execId);
@@ -156,7 +193,35 @@ public class SlaChecker implements ConditionChecker{
// something wrong, send out alerts
return Boolean.TRUE;
}
- return violateSla(flow);
+ return isSlaMissed(flow);
+ }
+
+ public Object isSlaFailed() {
+ logger.info("Testing if sla failed for execution " + execId);
+ ExecutableFlow flow;
+ try {
+ flow = executorManager.getExecutableFlow(execId);
+ } catch (ExecutorManagerException e) {
+ logger.error("Can't get executable flow.", e);
+ e.printStackTrace();
+ // something wrong, send out alerts
+ return Boolean.TRUE;
+ }
+ return isSlaMissed(flow);
+ }
+
+ public Object isSlaPassed() {
+ logger.info("Testing if sla is good for execution " + execId);
+ ExecutableFlow flow;
+ try {
+ flow = executorManager.getExecutableFlow(execId);
+ } catch (ExecutorManagerException e) {
+ logger.error("Can't get executable flow.", e);
+ e.printStackTrace();
+ // something wrong, send out alerts
+ return Boolean.TRUE;
+ }
+ return isSlaGood(flow);
}
@Override
@@ -223,5 +288,28 @@ public class SlaChecker implements ConditionChecker{
public long getNextCheckTime() {
return checkTime;
}
-
+
+ private boolean isFlowFinished(Status status) {
+ if(status.equals(Status.FAILED) || status.equals(Status.KILLED) || status.equals(Status.SUCCEEDED)) {
+ return Boolean.TRUE;
+ } else {
+ return Boolean.FALSE;
+ }
+ }
+
+ private boolean isFlowSucceeded(Status status) {
+ return status.equals(Status.SUCCEEDED);
+ }
+
+ private boolean isJobFinished(Status status) {
+ if(status.equals(Status.FAILED) || status.equals(Status.KILLED) || status.equals(Status.SUCCEEDED)) {
+ return Boolean.TRUE;
+ } else {
+ return Boolean.FALSE;
+ }
+ }
+
+ private boolean isJobSucceeded(Status status) {
+ return status.equals(Status.SUCCEEDED);
+ }
}
diff --git a/src/java/azkaban/trigger/Condition.java b/src/java/azkaban/trigger/Condition.java
index 6df5503..1bead36 100644
--- a/src/java/azkaban/trigger/Condition.java
+++ b/src/java/azkaban/trigger/Condition.java
@@ -119,7 +119,7 @@ public class Condition {
}
public boolean isMet() {
- logger.info("Testing ondition " + expression);
+ logger.info("Testing condition " + expression);
return expression.evaluate(context).equals(Boolean.TRUE);
}
src/java/azkaban/trigger/TriggerManager.java 59(+55 -4)
diff --git a/src/java/azkaban/trigger/TriggerManager.java b/src/java/azkaban/trigger/TriggerManager.java
index e6e5b9d..496a9b3 100644
--- a/src/java/azkaban/trigger/TriggerManager.java
+++ b/src/java/azkaban/trigger/TriggerManager.java
@@ -28,9 +28,20 @@ import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.PriorityBlockingQueue;
import org.apache.log4j.Logger;
+import azkaban.execapp.JobRunner;
+import azkaban.execapp.event.Event;
+import azkaban.execapp.event.EventHandler;
+import azkaban.execapp.event.EventListener;
+import azkaban.execapp.event.Event.Type;
+import azkaban.executor.ExecutableFlow;
+import azkaban.executor.ExecutableNode;
+import azkaban.executor.ExecutionOptions;
+import azkaban.executor.ExecutorManager;
+import azkaban.executor.Status;
+import azkaban.executor.ExecutionOptions.FailureAction;
import azkaban.utils.Props;
-public class TriggerManager implements TriggerManagerAdapter{
+public class TriggerManager extends EventHandler implements TriggerManagerAdapter{
private static Logger logger = Logger.getLogger(TriggerManager.class);
public static final long DEFAULT_SCANNER_INTERVAL_MS = 60000;
@@ -45,9 +56,11 @@ public class TriggerManager implements TriggerManagerAdapter{
private long runnerThreadIdleTime = -1;
private LocalTriggerJMX jmxStats = new LocalTriggerJMX();
+ private ExecutorManagerEventListener listener = new ExecutorManagerEventListener();
+
private String scannerStage = "";
- public TriggerManager(Props props, TriggerLoader triggerLoader) throws TriggerManagerException {
+ public TriggerManager(Props props, TriggerLoader triggerLoader, ExecutorManager executorManager) throws TriggerManagerException {
this.triggerLoader = triggerLoader;
@@ -67,6 +80,8 @@ public class TriggerManager implements TriggerManagerAdapter{
Condition.setCheckerLoader(checkerTypeLoader);
Trigger.setActionTypeLoader(actionTypeLoader);
+ executorManager.addListener(listener);
+
logger.info("TriggerManager loaded.");
}
@@ -154,12 +169,14 @@ public class TriggerManager implements TriggerManagerAdapter{
private class TriggerScannerThread extends Thread {
private BlockingQueue<Trigger> triggers;
+ private Map<Integer, ExecutableFlow> justFinishedFlows;
private boolean shutdown = false;
//private AtomicBoolean stillAlive = new AtomicBoolean(true);
private final long scannerInterval;
public TriggerScannerThread(long scannerInterval) {
triggers = new PriorityBlockingQueue<Trigger>(1, new TriggerComparator());
+ justFinishedFlows = new ConcurrentHashMap<Integer, ExecutableFlow>();
this.setName("TriggerRunnerManager-Trigger-Scanner-Thread");
this.scannerInterval = scannerInterval;;
}
@@ -171,6 +188,10 @@ public class TriggerManager implements TriggerManagerAdapter{
this.interrupt();
}
+ public synchronized void addJustFinishedFlow(ExecutableFlow flow) {
+ justFinishedFlows.put(flow.getExecutionId(), flow);
+ }
+
public synchronized void addTrigger(Trigger t) {
t.updateNextCheckTime();
triggers.add(t);
@@ -191,6 +212,7 @@ public class TriggerManager implements TriggerManagerAdapter{
try{
checkAllTriggers();
+ justFinishedFlows.clear();
} catch(Exception e) {
e.printStackTrace();
logger.error(e.getMessage());
@@ -217,12 +239,27 @@ public class TriggerManager implements TriggerManagerAdapter{
private void checkAllTriggers() throws TriggerManagerException {
long now = System.currentTimeMillis();
+
+ // sweep through the rest of them
for(Trigger t : triggers) {
scannerStage = "Checking for trigger " + t.getTriggerId();
- if(t.getNextCheckTime() > now) {
+
+ boolean shouldSkip = true;
+ if(shouldSkip && t.getInfo() != null && t.getInfo().containsKey("monitored.finished.execution")) {
+ int execId = Integer.valueOf((String) t.getInfo().get("monitored.finished.execution"));
+ if(justFinishedFlows.containsKey(execId)) {
+ logger.info("Monitored execution has finished. Checking trigger earlier " + t.getTriggerId());
+ shouldSkip = false;
+ }
+ }
+ if(shouldSkip && t.getNextCheckTime() > now) {
+ shouldSkip = false;
+ }
+
+ if(shouldSkip) {
logger.info("Skipping trigger" + t.getTriggerId() + " until " + t.getNextCheckTime());
- continue;
}
+
logger.info("Checking trigger " + t.getTriggerId());
if(t.getStatus().equals(TriggerStatus.READY)) {
if(t.triggerConditionMet()) {
@@ -473,5 +510,19 @@ public class TriggerManager implements TriggerManagerAdapter{
actionTypeLoader.registerActionType(name, action);
}
+ private class ExecutorManagerEventListener implements EventListener {
+ public ExecutorManagerEventListener() {
+ }
+
+ @Override
+ public synchronized void handleEvent(Event event) {
+
+ ExecutableFlow flow = (ExecutableFlow) event.getRunner();
+ if (event.getType() == Type.FLOW_FINISHED) {
+ logger.info("Flow finish event received. " + flow.getExecutionId() );
+ runnerThread.addJustFinishedFlow(flow);
+ }
+ }
+ }
}
diff --git a/src/java/azkaban/webapp/AzkabanWebServer.java b/src/java/azkaban/webapp/AzkabanWebServer.java
index 7bb0282..df2c02a 100644
--- a/src/java/azkaban/webapp/AzkabanWebServer.java
+++ b/src/java/azkaban/webapp/AzkabanWebServer.java
@@ -54,7 +54,6 @@ import org.mortbay.thread.QueuedThreadPool;
import azkaban.alert.Alerter;
import azkaban.database.AzkabanDatabaseSetup;
import azkaban.executor.ExecutorManager;
-import azkaban.executor.ExecutorManagerAdapter;
import azkaban.executor.JdbcExecutorLoader;
import azkaban.jmx.JmxExecutorManager;
import azkaban.jmx.JmxJettyServer;
@@ -143,7 +142,8 @@ public class AzkabanWebServer extends AzkabanServer {
private final Server server;
private UserManager userManager;
private ProjectManager projectManager;
- private ExecutorManagerAdapter executorManager;
+// private ExecutorManagerAdapter executorManager;
+ private ExecutorManager executorManager;
private ScheduleManager scheduleManager;
private TriggerManager triggerManager;
private Map<String, Alerter> alerters;
@@ -185,7 +185,7 @@ public class AzkabanWebServer extends AzkabanServer {
triggerManager = loadTriggerManager(props);
loadBuiltinCheckersAndActions();
- // load all triggger agents here
+ // load all trigger agents here
scheduleManager = loadScheduleManager(triggerManager, props);
String triggerPluginDir = props.getString("trigger.plugin.dir", "plugins/triggers");
@@ -269,7 +269,7 @@ public class AzkabanWebServer extends AzkabanServer {
private TriggerManager loadTriggerManager(Props props) throws TriggerManagerException {
TriggerLoader loader = new JdbcTriggerLoader(props);
- return new TriggerManager(props, loader);
+ return new TriggerManager(props, loader, executorManager);
}
private void loadBuiltinCheckersAndActions() {
@@ -590,7 +590,7 @@ public class AzkabanWebServer extends AzkabanServer {
/**
*
*/
- public ExecutorManagerAdapter getExecutorManager() {
+ public ExecutorManager getExecutorManager() {
return executorManager;
}
diff --git a/src/java/azkaban/webapp/servlet/ProjectManagerServlet.java b/src/java/azkaban/webapp/servlet/ProjectManagerServlet.java
index 8d87d68..6affa18 100644
--- a/src/java/azkaban/webapp/servlet/ProjectManagerServlet.java
+++ b/src/java/azkaban/webapp/servlet/ProjectManagerServlet.java
@@ -1256,7 +1256,10 @@ public class ProjectManagerServlet extends LoginAbstractAzkabanServlet {
String type = null;
final String contentType = item.getContentType();
- if(contentType != null && (contentType.startsWith("application/zip") || contentType.startsWith("application/x-zip-compressed"))) {
+ if (contentType != null
+ && (contentType.startsWith("application/zip")
+ || contentType.startsWith("application/x-zip-compressed")
+ || contentType.startsWith("application/octet-stream"))) {
type = "zip";
}
else {
src/web/js/azkaban.exflow.view.js 2(+2 -0)
diff --git a/src/web/js/azkaban.exflow.view.js b/src/web/js/azkaban.exflow.view.js
index 377703b..3ce1e25 100644
--- a/src/web/js/azkaban.exflow.view.js
+++ b/src/web/js/azkaban.exflow.view.js
@@ -646,6 +646,7 @@ var logUpdaterFunction = function() {
var exNodeClickCallback = function(event) {
console.log("Node clicked callback");
var jobId = event.currentTarget.jobid;
+ var attempt = event.currentTarget.attempt;
var requestURL = contextURL + "/manager?project=" + projectName + "&flow=" + flowId + "&job=" + jobId;
var visualizerURL = contextURL + "/pigvisualizer?execid=" + execId + "&jobid=" + jobId;
@@ -661,6 +662,7 @@ var exNodeClickCallback = function(event) {
var exJobClickCallback = function(event) {
console.log("Node clicked callback");
var jobId = event.currentTarget.jobid;
+ var attempt = event.currentTarget.attempt;
var requestURL = contextURL + "/manager?project=" + projectName + "&flow=" + flowId + "&job=" + jobId;
var visualizerURL = contextURL + "/pigvisualizer?execid=" + execId + "&jobid=" + jobId;
diff --git a/unit/java/azkaban/test/trigger/TriggerManagerTest.java b/unit/java/azkaban/test/trigger/TriggerManagerTest.java
index fbd5da7..ebf1f38 100644
--- a/unit/java/azkaban/test/trigger/TriggerManagerTest.java
+++ b/unit/java/azkaban/test/trigger/TriggerManagerTest.java
@@ -12,6 +12,7 @@ import org.junit.After;
import org.junit.Before;
import org.junit.Test;
+import azkaban.executor.ExecutorManager;
import azkaban.trigger.Condition;
import azkaban.trigger.ConditionChecker;
import azkaban.trigger.CheckerTypeLoader;
@@ -33,7 +34,6 @@ public class TriggerManagerTest {
public void setup() throws TriggerException, TriggerManagerException {
triggerLoader = new MockTriggerLoader();
-
}
@After
@@ -47,7 +47,7 @@ public class TriggerManagerTest {
Props props = new Props();
props.put("trigger.scan.interval", 4000);
- TriggerManager triggerManager = new TriggerManager(props, triggerLoader);
+ TriggerManager triggerManager = new TriggerManager(props, triggerLoader, null);
triggerManager.registerCheckerType(ThresholdChecker.type, ThresholdChecker.class);
triggerManager.registerActionType(DummyTriggerAction.type, DummyTriggerAction.class);