azkaban-aplcache
Changes
build.gradle 2(+1 -1)
Details
diff --git a/azkaban-common/src/main/java/azkaban/trigger/builtin/SlaChecker.java b/azkaban-common/src/main/java/azkaban/trigger/builtin/SlaChecker.java
index 3ec0ba5..27b9ba4 100644
--- a/azkaban-common/src/main/java/azkaban/trigger/builtin/SlaChecker.java
+++ b/azkaban-common/src/main/java/azkaban/trigger/builtin/SlaChecker.java
@@ -32,192 +32,210 @@ import azkaban.sla.SlaOption;
import azkaban.trigger.ConditionChecker;
import azkaban.utils.Utils;
-import static java.util.Objects.requireNonNull;
-
-
public class SlaChecker implements ConditionChecker {
- public static final String type = "SlaChecker";
- enum NodeType { FLOW, JOB }
- enum ConditionType { FINISH, SUCCEED }
private static final Logger logger = Logger.getLogger(SlaChecker.class);
- private static ExecutorManagerAdapter executorManager;
+ public static final String type = "SlaChecker";
- private final String id;
- private final SlaOption slaOption;
- private final int execId;
-
- private ExecutableFlow flow = null;
- private ExecutableNode job = null;
- private NodeType nodeType;
- private ConditionType conditionType;
- private long startTime = -1;
- private long slaExpireTime = -1;
-
- /**
- * TODO remove static linking
- * @param executorManagerAdapter
- */
- public static void setExecutorManager(ExecutorManagerAdapter executorManagerAdapter) {
- executorManager = executorManagerAdapter;
- }
+ private String id;
+ private SlaOption slaOption;
+ private int execId;
+ private long checkTime = -1;
+
+ private static ExecutorManagerAdapter executorManager;
public SlaChecker(String id, SlaOption slaOption, int execId) {
this.id = id;
this.slaOption = slaOption;
this.execId = execId;
-
- retrieveSlaTypeProperties();
}
- private void retrieveSlaTypeProperties() {
- switch (slaOption.getType()) {
- case SlaOption.TYPE_FLOW_FINISH:
- nodeType = NodeType.FLOW;
- conditionType = ConditionType.FINISH;
- break;
- case SlaOption.TYPE_FLOW_SUCCEED:
- nodeType = NodeType.FLOW;
- conditionType = ConditionType.SUCCEED;
- break;
- case SlaOption.TYPE_JOB_FINISH:
- nodeType = NodeType.JOB;
- conditionType = ConditionType.FINISH;
- break;
- case SlaOption.TYPE_JOB_SUCCEED:
- nodeType = NodeType.JOB;
- conditionType = ConditionType.SUCCEED;
- break;
- }
+ public static void setExecutorManager(ExecutorManagerAdapter em) {
+ executorManager = em;
}
- private long getStartTime() {
- switch (nodeType) {
- case FLOW: return flow.getStartTime();
- case JOB: return job.getStartTime();
+ private Boolean isSlaMissed(ExecutableFlow flow) {
+ String type = slaOption.getType();
+ if (flow.getStartTime() < 0) {
+ return Boolean.FALSE;
}
- return -1;
- }
-
- private Status getStatus() {
- switch (nodeType) {
- case FLOW: return flow.getStatus();
- case JOB: return job.getStatus();
+ Status status;
+ if (type.equals(SlaOption.TYPE_FLOW_FINISH)) {
+ if (checkTime < flow.getStartTime()) {
+ ReadablePeriod duration =
+ Utils.parsePeriodString((String) slaOption.getInfo().get(
+ SlaOption.INFO_DURATION));
+ DateTime startTime = new DateTime(flow.getStartTime());
+ DateTime nextCheckTime = startTime.plus(duration);
+ this.checkTime = nextCheckTime.getMillis();
+ }
+ status = flow.getStatus();
+ if (checkTime < DateTime.now().getMillis()) {
+ return !isFlowFinished(status);
+ }
+ } else if (type.equals(SlaOption.TYPE_FLOW_SUCCEED)) {
+ if (checkTime < flow.getStartTime()) {
+ ReadablePeriod duration =
+ Utils.parsePeriodString((String) slaOption.getInfo().get(
+ SlaOption.INFO_DURATION));
+ DateTime startTime = new DateTime(flow.getStartTime());
+ DateTime nextCheckTime = startTime.plus(duration);
+ this.checkTime = nextCheckTime.getMillis();
+ }
+ status = flow.getStatus();
+ if (checkTime < DateTime.now().getMillis()) {
+ return !isFlowSucceeded(status);
+ } else {
+ return status.equals(Status.FAILED) || status.equals(Status.KILLED);
+ }
+ } else if (type.equals(SlaOption.TYPE_JOB_FINISH)) {
+ String jobName =
+ (String) slaOption.getInfo().get(SlaOption.INFO_JOB_NAME);
+ ExecutableNode node = flow.getExecutableNode(jobName);
+ if (node.getStartTime() < 0) {
+ return Boolean.FALSE;
+ }
+ if (checkTime < node.getStartTime()) {
+ ReadablePeriod duration =
+ Utils.parsePeriodString((String) slaOption.getInfo().get(
+ SlaOption.INFO_DURATION));
+ DateTime startTime = new DateTime(node.getStartTime());
+ DateTime nextCheckTime = startTime.plus(duration);
+ this.checkTime = nextCheckTime.getMillis();
+ }
+ status = node.getStatus();
+ if (checkTime < DateTime.now().getMillis()) {
+ return !isJobFinished(status);
+ }
+ } else if (type.equals(SlaOption.TYPE_JOB_SUCCEED)) {
+ String jobName =
+ (String) slaOption.getInfo().get(SlaOption.INFO_JOB_NAME);
+ ExecutableNode node = flow.getExecutableNode(jobName);
+ if (node.getStartTime() < 0) {
+ return Boolean.FALSE;
+ }
+ if (checkTime < node.getStartTime()) {
+ ReadablePeriod duration =
+ Utils.parsePeriodString((String) slaOption.getInfo().get(
+ SlaOption.INFO_DURATION));
+ DateTime startTime = new DateTime(node.getStartTime());
+ DateTime nextCheckTime = startTime.plus(duration);
+ this.checkTime = nextCheckTime.getMillis();
+ }
+ status = node.getStatus();
+ if (checkTime < DateTime.now().getMillis()) {
+ return !isJobFinished(status);
+ } else {
+ return status.equals(Status.FAILED) || status.equals(Status.KILLED);
+ }
}
- return null;
+ return Boolean.FALSE;
}
- /**
- * Initialize the instance.
- *
- * Initializing the derived members depends on ExecutorManager to be set.
- * Also, initializing the start time is possible when the job has already started.
- *
- * @return false on caught exceptions else true.
- */
- private boolean init() {
- try {
- requireNonNull(executorManager);
- if (flow == null) {
- flow = executorManager.getExecutableFlow(execId);
- if (NodeType.JOB == nodeType) {
- String jobName = (String) slaOption.getInfo().get(SlaOption.INFO_JOB_NAME);
- job = flow.getExecutableNode(jobName);
- }
+ private Boolean isSlaGood(ExecutableFlow flow) {
+ String type = slaOption.getType();
+ if (flow.getStartTime() < 0) {
+ return Boolean.FALSE;
+ }
+ Status status;
+ if (type.equals(SlaOption.TYPE_FLOW_FINISH)) {
+ if (checkTime < flow.getStartTime()) {
+ ReadablePeriod duration =
+ Utils.parsePeriodString((String) slaOption.getInfo().get(
+ SlaOption.INFO_DURATION));
+ DateTime startTime = new DateTime(flow.getStartTime());
+ DateTime nextCheckTime = startTime.plus(duration);
+ this.checkTime = nextCheckTime.getMillis();
}
- // Start time needs to be initialized if unset
- if (startTime < 0) {
- long t = getStartTime();
- if (t > 0) {
- this.startTime = t;
- ReadablePeriod duration = Utils.parsePeriodString((String) slaOption.getInfo().get(SlaOption.INFO_DURATION));
- this.slaExpireTime = new DateTime(startTime).plus(duration).getMillis();
- }
+ status = flow.getStatus();
+ return isFlowFinished(status);
+ } else if (type.equals(SlaOption.TYPE_FLOW_SUCCEED)) {
+ if (checkTime < flow.getStartTime()) {
+ ReadablePeriod duration =
+ Utils.parsePeriodString((String) slaOption.getInfo().get(
+ SlaOption.INFO_DURATION));
+ DateTime startTime = new DateTime(flow.getStartTime());
+ DateTime nextCheckTime = startTime.plus(duration);
+ this.checkTime = nextCheckTime.getMillis();
}
- } catch (ExecutorManagerException e) {
- // Log errors but don't crash
- logger.error("Can't get executable flow.", e);
- return false;
+ status = flow.getStatus();
+ return isFlowSucceeded(status);
+ } else if (type.equals(SlaOption.TYPE_JOB_FINISH)) {
+ String jobName =
+ (String) slaOption.getInfo().get(SlaOption.INFO_JOB_NAME);
+ ExecutableNode node = flow.getExecutableNode(jobName);
+ if (node.getStartTime() < 0) {
+ return Boolean.FALSE;
+ }
+ if (checkTime < node.getStartTime()) {
+ ReadablePeriod duration =
+ Utils.parsePeriodString((String) slaOption.getInfo().get(
+ SlaOption.INFO_DURATION));
+ DateTime startTime = new DateTime(node.getStartTime());
+ DateTime nextCheckTime = startTime.plus(duration);
+ this.checkTime = nextCheckTime.getMillis();
+ }
+ status = node.getStatus();
+ return isJobFinished(status);
+ } else if (type.equals(SlaOption.TYPE_JOB_SUCCEED)) {
+ String jobName =
+ (String) slaOption.getInfo().get(SlaOption.INFO_JOB_NAME);
+ ExecutableNode node = flow.getExecutableNode(jobName);
+ if (node.getStartTime() < 0) {
+ return Boolean.FALSE;
+ }
+ if (checkTime < node.getStartTime()) {
+ ReadablePeriod duration =
+ Utils.parsePeriodString((String) slaOption.getInfo().get(
+ SlaOption.INFO_DURATION));
+ DateTime startTime = new DateTime(node.getStartTime());
+ DateTime nextCheckTime = startTime.plus(duration);
+ this.checkTime = nextCheckTime.getMillis();
+ }
+ status = node.getStatus();
+ return isJobSucceeded(status);
}
- return true;
- }
-
- private boolean hasSlaExpired() {
- return slaExpireTime < DateTime.now().getMillis();
+ return Boolean.FALSE;
}
// return true to trigger sla action
@Override
public Object eval() {
logger.info("Checking sla for execution " + execId);
- return isSlaFailed();
+ ExecutableFlow flow;
+ try {
+ flow = executorManager.getExecutableFlow(execId);
+ } catch (ExecutorManagerException e) {
+ logger.error("Can't get executable flow.", e);
+ e.printStackTrace();
+ // something wrong, send out alerts
+ return Boolean.TRUE;
+ }
+ return isSlaMissed(flow);
}
- /**
- * TODO Remove convoluted logic. Investigate the need for 2 separate methods for pass and fail test
- * DO NOT DELETE METHOD. This is used by some weird reflection code using JEXL expression
- *
- * @return
- */
- @SuppressWarnings("unused")
public Object isSlaFailed() {
- if (!init()) {
- // Don't trigger SLA actions if unable to init
- return false;
- }
- if (startTime < 0) {
- return false;
+ ExecutableFlow flow;
+ try {
+ flow = executorManager.getExecutableFlow(execId);
+ } catch (ExecutorManagerException e) {
+ logger.error("Can't get executable flow.", e);
+ // something wrong, send out alerts
+ return Boolean.TRUE;
}
-
- /*
- * Fire the trigger (return true) if the desired terminal state isn't reached when
- * - the SLA time has already expired, OR
- * - the flow / job has already reached a terminal state.
- */
- Status status = getStatus();
- return (isTerminalState(status) || hasSlaExpired()) && !hasReachedDesiredTerminalState();
+ return isSlaMissed(flow);
}
- /**
- * TODO Remove convoluted logic. Investigate the need for 2 separate methods for pass and fail test
- * DO NOT DELETE METHOD. This is used by some weird reflection code using JEXL expression
- *
- * @return
- */
- @SuppressWarnings("unused")
public Object isSlaPassed() {
- if (!init()) {
- return true;
- }
- if (startTime < 0) {
- return false;
- }
- return hasReachedDesiredTerminalState();
- }
-
- private boolean hasReachedDesiredTerminalState() {
- Status status = getStatus();
- switch (conditionType) {
- case FINISH:
- return isTerminalState(status);
- case SUCCEED:
- return hasSucceeded(status);
+ ExecutableFlow flow;
+ try {
+ flow = executorManager.getExecutableFlow(execId);
+ } catch (ExecutorManagerException e) {
+ logger.error("Can't get executable flow.", e);
+ // something wrong, send out alerts
+ return Boolean.TRUE;
}
- return false;
- }
-
- /**
- * Terminal States are final states post which there will not be any further changes in state.
- *
- * @param status status of flow / job
- * @return true if status is a terminal state
- */
- private boolean isTerminalState(Status status) {
- return Status.FAILED.equals(status) || Status.KILLED.equals(status) || hasSucceeded(status);
- }
-
- private boolean hasSucceeded(Status status) {
- return Status.SUCCEEDED.equals(status);
+ return isSlaGood(flow);
}
@Override
@@ -226,7 +244,8 @@ public class SlaChecker implements ConditionChecker {
}
@Override
- public void reset() { }
+ public void reset() {
+ }
@Override
public String getId() {
@@ -243,23 +262,27 @@ public class SlaChecker implements ConditionChecker {
return createFromJson(obj);
}
+ @SuppressWarnings("unchecked")
public static SlaChecker createFromJson(Object obj) throws Exception {
- return createFromJson((Map<String, Object>) obj);
+ return createFromJson((HashMap<String, Object>) obj);
}
- public static SlaChecker createFromJson(Map<String, Object> obj) throws Exception {
- if (!obj.get("type").equals(type)) {
- throw new Exception("Cannot create checker of " + type + " from " + obj.get("type"));
+ public static SlaChecker createFromJson(HashMap<String, Object> obj)
+ throws Exception {
+ Map<String, Object> jsonObj = (HashMap<String, Object>) obj;
+ if (!jsonObj.get("type").equals(type)) {
+ throw new Exception("Cannot create checker of " + type + " from "
+ + jsonObj.get("type"));
}
- String id = (String) obj.get("id");
- SlaOption slaOption = SlaOption.fromObject(obj.get("slaOption"));
- int execId = Integer.valueOf((String) obj.get("execId"));
+ String id = (String) jsonObj.get("id");
+ SlaOption slaOption = SlaOption.fromObject(jsonObj.get("slaOption"));
+ int execId = Integer.valueOf((String) jsonObj.get("execId"));
return new SlaChecker(id, slaOption, execId);
}
@Override
public Object toJson() {
- Map<String, Object> jsonObj = new HashMap<>();
+ Map<String, Object> jsonObj = new HashMap<String, Object>();
jsonObj.put("type", type);
jsonObj.put("id", id);
jsonObj.put("slaOption", slaOption.toObject());
@@ -269,13 +292,42 @@ public class SlaChecker implements ConditionChecker {
}
@Override
- public void stopChecker() { }
+ public void stopChecker() {
+
+ }
@Override
- public void setContext(Map<String, Object> context) { }
+ public void setContext(Map<String, Object> context) {
+ }
@Override
public long getNextCheckTime() {
- return slaExpireTime;
+ return checkTime;
+ }
+
+ private boolean isFlowFinished(Status status) {
+ if (status.equals(Status.FAILED) || status.equals(Status.KILLED)
+ || status.equals(Status.SUCCEEDED)) {
+ return Boolean.TRUE;
+ } else {
+ return Boolean.FALSE;
+ }
+ }
+
+ private boolean isFlowSucceeded(Status status) {
+ return status.equals(Status.SUCCEEDED);
+ }
+
+ private boolean isJobFinished(Status status) {
+ if (status.equals(Status.FAILED) || status.equals(Status.KILLED)
+ || status.equals(Status.SUCCEEDED)) {
+ return Boolean.TRUE;
+ } else {
+ return Boolean.FALSE;
+ }
+ }
+
+ private boolean isJobSucceeded(Status status) {
+ return status.equals(Status.SUCCEEDED);
}
}
build.gradle 2(+1 -1)
diff --git a/build.gradle b/build.gradle
index e713019..f84422b 100644
--- a/build.gradle
+++ b/build.gradle
@@ -96,7 +96,7 @@ subprojects {
* Gradle wrapper task.
*/
task wrapper(type: Wrapper) {
- gradleVersion = '2.13'
+ gradleVersion = '2.7'
}
idea {
diff --git a/gradle/wrapper/gradle-wrapper.properties b/gradle/wrapper/gradle-wrapper.properties
index 015050a..54e90fc 100644
--- a/gradle/wrapper/gradle-wrapper.properties
+++ b/gradle/wrapper/gradle-wrapper.properties
@@ -1,6 +1,6 @@
-#Thu Jan 05 19:08:47 PST 2017
+#Sat May 28 17:40:49 PDT 2016
distributionBase=GRADLE_USER_HOME
distributionPath=wrapper/dists
zipStoreBase=GRADLE_USER_HOME
zipStorePath=wrapper/dists
-distributionUrl=https\://services.gradle.org/distributions/gradle-2.13-bin.zip
+distributionUrl=https\://services.gradle.org/distributions/gradle-2.7-all.zip