azkaban-aplcache
Changes
build.gradle 2(+1 -1)
Details
diff --git a/azkaban-common/src/main/java/azkaban/trigger/builtin/SlaChecker.java b/azkaban-common/src/main/java/azkaban/trigger/builtin/SlaChecker.java
index 27b9ba4..3ec0ba5 100644
--- a/azkaban-common/src/main/java/azkaban/trigger/builtin/SlaChecker.java
+++ b/azkaban-common/src/main/java/azkaban/trigger/builtin/SlaChecker.java
@@ -32,210 +32,192 @@ import azkaban.sla.SlaOption;
import azkaban.trigger.ConditionChecker;
import azkaban.utils.Utils;
-public class SlaChecker implements ConditionChecker {
+import static java.util.Objects.requireNonNull;
- private static final Logger logger = Logger.getLogger(SlaChecker.class);
- public static final String type = "SlaChecker";
- private String id;
- private SlaOption slaOption;
- private int execId;
- private long checkTime = -1;
+public class SlaChecker implements ConditionChecker {
+ public static final String type = "SlaChecker";
+ enum NodeType { FLOW, JOB }
+ enum ConditionType { FINISH, SUCCEED }
+ private static final Logger logger = Logger.getLogger(SlaChecker.class);
private static ExecutorManagerAdapter executorManager;
+ private final String id;
+ private final SlaOption slaOption;
+ private final int execId;
+
+ private ExecutableFlow flow = null;
+ private ExecutableNode job = null;
+ private NodeType nodeType;
+ private ConditionType conditionType;
+ private long startTime = -1;
+ private long slaExpireTime = -1;
+
+ /**
+ * TODO remove static linking
+ * @param executorManagerAdapter
+ */
+ public static void setExecutorManager(ExecutorManagerAdapter executorManagerAdapter) {
+ executorManager = executorManagerAdapter;
+ }
+
public SlaChecker(String id, SlaOption slaOption, int execId) {
this.id = id;
this.slaOption = slaOption;
this.execId = execId;
- }
- public static void setExecutorManager(ExecutorManagerAdapter em) {
- executorManager = em;
+ retrieveSlaTypeProperties();
}
- private Boolean isSlaMissed(ExecutableFlow flow) {
- String type = slaOption.getType();
- if (flow.getStartTime() < 0) {
- return Boolean.FALSE;
+ private void retrieveSlaTypeProperties() {
+ switch (slaOption.getType()) {
+ case SlaOption.TYPE_FLOW_FINISH:
+ nodeType = NodeType.FLOW;
+ conditionType = ConditionType.FINISH;
+ break;
+ case SlaOption.TYPE_FLOW_SUCCEED:
+ nodeType = NodeType.FLOW;
+ conditionType = ConditionType.SUCCEED;
+ break;
+ case SlaOption.TYPE_JOB_FINISH:
+ nodeType = NodeType.JOB;
+ conditionType = ConditionType.FINISH;
+ break;
+ case SlaOption.TYPE_JOB_SUCCEED:
+ nodeType = NodeType.JOB;
+ conditionType = ConditionType.SUCCEED;
+ break;
}
- Status status;
- if (type.equals(SlaOption.TYPE_FLOW_FINISH)) {
- if (checkTime < flow.getStartTime()) {
- ReadablePeriod duration =
- Utils.parsePeriodString((String) slaOption.getInfo().get(
- SlaOption.INFO_DURATION));
- DateTime startTime = new DateTime(flow.getStartTime());
- DateTime nextCheckTime = startTime.plus(duration);
- this.checkTime = nextCheckTime.getMillis();
- }
- status = flow.getStatus();
- if (checkTime < DateTime.now().getMillis()) {
- return !isFlowFinished(status);
- }
- } else if (type.equals(SlaOption.TYPE_FLOW_SUCCEED)) {
- if (checkTime < flow.getStartTime()) {
- ReadablePeriod duration =
- Utils.parsePeriodString((String) slaOption.getInfo().get(
- SlaOption.INFO_DURATION));
- DateTime startTime = new DateTime(flow.getStartTime());
- DateTime nextCheckTime = startTime.plus(duration);
- this.checkTime = nextCheckTime.getMillis();
- }
- status = flow.getStatus();
- if (checkTime < DateTime.now().getMillis()) {
- return !isFlowSucceeded(status);
- } else {
- return status.equals(Status.FAILED) || status.equals(Status.KILLED);
- }
- } else if (type.equals(SlaOption.TYPE_JOB_FINISH)) {
- String jobName =
- (String) slaOption.getInfo().get(SlaOption.INFO_JOB_NAME);
- ExecutableNode node = flow.getExecutableNode(jobName);
- if (node.getStartTime() < 0) {
- return Boolean.FALSE;
- }
- if (checkTime < node.getStartTime()) {
- ReadablePeriod duration =
- Utils.parsePeriodString((String) slaOption.getInfo().get(
- SlaOption.INFO_DURATION));
- DateTime startTime = new DateTime(node.getStartTime());
- DateTime nextCheckTime = startTime.plus(duration);
- this.checkTime = nextCheckTime.getMillis();
- }
- status = node.getStatus();
- if (checkTime < DateTime.now().getMillis()) {
- return !isJobFinished(status);
- }
- } else if (type.equals(SlaOption.TYPE_JOB_SUCCEED)) {
- String jobName =
- (String) slaOption.getInfo().get(SlaOption.INFO_JOB_NAME);
- ExecutableNode node = flow.getExecutableNode(jobName);
- if (node.getStartTime() < 0) {
- return Boolean.FALSE;
- }
- if (checkTime < node.getStartTime()) {
- ReadablePeriod duration =
- Utils.parsePeriodString((String) slaOption.getInfo().get(
- SlaOption.INFO_DURATION));
- DateTime startTime = new DateTime(node.getStartTime());
- DateTime nextCheckTime = startTime.plus(duration);
- this.checkTime = nextCheckTime.getMillis();
- }
- status = node.getStatus();
- if (checkTime < DateTime.now().getMillis()) {
- return !isJobFinished(status);
- } else {
- return status.equals(Status.FAILED) || status.equals(Status.KILLED);
- }
+ }
+
+ private long getStartTime() {
+ switch (nodeType) {
+ case FLOW: return flow.getStartTime();
+ case JOB: return job.getStartTime();
}
- return Boolean.FALSE;
+ return -1;
}
- private Boolean isSlaGood(ExecutableFlow flow) {
- String type = slaOption.getType();
- if (flow.getStartTime() < 0) {
- return Boolean.FALSE;
+ private Status getStatus() {
+ switch (nodeType) {
+ case FLOW: return flow.getStatus();
+ case JOB: return job.getStatus();
}
- Status status;
- if (type.equals(SlaOption.TYPE_FLOW_FINISH)) {
- if (checkTime < flow.getStartTime()) {
- ReadablePeriod duration =
- Utils.parsePeriodString((String) slaOption.getInfo().get(
- SlaOption.INFO_DURATION));
- DateTime startTime = new DateTime(flow.getStartTime());
- DateTime nextCheckTime = startTime.plus(duration);
- this.checkTime = nextCheckTime.getMillis();
- }
- status = flow.getStatus();
- return isFlowFinished(status);
- } else if (type.equals(SlaOption.TYPE_FLOW_SUCCEED)) {
- if (checkTime < flow.getStartTime()) {
- ReadablePeriod duration =
- Utils.parsePeriodString((String) slaOption.getInfo().get(
- SlaOption.INFO_DURATION));
- DateTime startTime = new DateTime(flow.getStartTime());
- DateTime nextCheckTime = startTime.plus(duration);
- this.checkTime = nextCheckTime.getMillis();
- }
- status = flow.getStatus();
- return isFlowSucceeded(status);
- } else if (type.equals(SlaOption.TYPE_JOB_FINISH)) {
- String jobName =
- (String) slaOption.getInfo().get(SlaOption.INFO_JOB_NAME);
- ExecutableNode node = flow.getExecutableNode(jobName);
- if (node.getStartTime() < 0) {
- return Boolean.FALSE;
- }
- if (checkTime < node.getStartTime()) {
- ReadablePeriod duration =
- Utils.parsePeriodString((String) slaOption.getInfo().get(
- SlaOption.INFO_DURATION));
- DateTime startTime = new DateTime(node.getStartTime());
- DateTime nextCheckTime = startTime.plus(duration);
- this.checkTime = nextCheckTime.getMillis();
- }
- status = node.getStatus();
- return isJobFinished(status);
- } else if (type.equals(SlaOption.TYPE_JOB_SUCCEED)) {
- String jobName =
- (String) slaOption.getInfo().get(SlaOption.INFO_JOB_NAME);
- ExecutableNode node = flow.getExecutableNode(jobName);
- if (node.getStartTime() < 0) {
- return Boolean.FALSE;
+ return null;
+ }
+
+ /**
+ * Initialize the instance.
+ *
+ * Initializing the derived members depends on ExecutorManager to be set.
+ * Also, initializing the start time is possible when the job has already started.
+ *
+ * @return false on caught exceptions else true.
+ */
+ private boolean init() {
+ try {
+ requireNonNull(executorManager);
+ if (flow == null) {
+ flow = executorManager.getExecutableFlow(execId);
+ if (NodeType.JOB == nodeType) {
+ String jobName = (String) slaOption.getInfo().get(SlaOption.INFO_JOB_NAME);
+ job = flow.getExecutableNode(jobName);
+ }
}
- if (checkTime < node.getStartTime()) {
- ReadablePeriod duration =
- Utils.parsePeriodString((String) slaOption.getInfo().get(
- SlaOption.INFO_DURATION));
- DateTime startTime = new DateTime(node.getStartTime());
- DateTime nextCheckTime = startTime.plus(duration);
- this.checkTime = nextCheckTime.getMillis();
+ // Start time needs to be initialized if unset
+ if (startTime < 0) {
+ long t = getStartTime();
+ if (t > 0) {
+ this.startTime = t;
+ ReadablePeriod duration = Utils.parsePeriodString((String) slaOption.getInfo().get(SlaOption.INFO_DURATION));
+ this.slaExpireTime = new DateTime(startTime).plus(duration).getMillis();
+ }
}
- status = node.getStatus();
- return isJobSucceeded(status);
+ } catch (ExecutorManagerException e) {
+ // Log errors but don't crash
+ logger.error("Can't get executable flow.", e);
+ return false;
}
- return Boolean.FALSE;
+ return true;
+ }
+
+ private boolean hasSlaExpired() {
+ return slaExpireTime < DateTime.now().getMillis();
}
// return true to trigger sla action
@Override
public Object eval() {
logger.info("Checking sla for execution " + execId);
- ExecutableFlow flow;
- try {
- flow = executorManager.getExecutableFlow(execId);
- } catch (ExecutorManagerException e) {
- logger.error("Can't get executable flow.", e);
- e.printStackTrace();
- // something wrong, send out alerts
- return Boolean.TRUE;
- }
- return isSlaMissed(flow);
+ return isSlaFailed();
}
+ /**
+ * TODO Remove convoluted logic. Investigate the need for 2 separate methods for pass and fail test
+ * DO NOT DELETE METHOD. This is used by some weird reflection code using JEXL expression
+ *
+ * @return
+ */
+ @SuppressWarnings("unused")
public Object isSlaFailed() {
- ExecutableFlow flow;
- try {
- flow = executorManager.getExecutableFlow(execId);
- } catch (ExecutorManagerException e) {
- logger.error("Can't get executable flow.", e);
- // something wrong, send out alerts
- return Boolean.TRUE;
+ if (!init()) {
+ // Don't trigger SLA actions if unable to init
+ return false;
+ }
+ if (startTime < 0) {
+ return false;
}
- return isSlaMissed(flow);
+
+ /*
+ * Fire the trigger (return true) if the desired terminal state isn't reached when
+ * - the SLA time has already expired, OR
+ * - the flow / job has already reached a terminal state.
+ */
+ Status status = getStatus();
+ return (isTerminalState(status) || hasSlaExpired()) && !hasReachedDesiredTerminalState();
}
+ /**
+ * TODO Remove convoluted logic. Investigate the need for 2 separate methods for pass and fail test
+ * DO NOT DELETE METHOD. This is used by some weird reflection code using JEXL expression
+ *
+ * @return
+ */
+ @SuppressWarnings("unused")
public Object isSlaPassed() {
- ExecutableFlow flow;
- try {
- flow = executorManager.getExecutableFlow(execId);
- } catch (ExecutorManagerException e) {
- logger.error("Can't get executable flow.", e);
- // something wrong, send out alerts
- return Boolean.TRUE;
+ if (!init()) {
+ return true;
+ }
+ if (startTime < 0) {
+ return false;
+ }
+ return hasReachedDesiredTerminalState();
+ }
+
+ private boolean hasReachedDesiredTerminalState() {
+ Status status = getStatus();
+ switch (conditionType) {
+ case FINISH:
+ return isTerminalState(status);
+ case SUCCEED:
+ return hasSucceeded(status);
}
- return isSlaGood(flow);
+ return false;
+ }
+
+ /**
+ * Terminal States are final states post which there will not be any further changes in state.
+ *
+ * @param status status of flow / job
+ * @return true if status is a terminal state
+ */
+ private boolean isTerminalState(Status status) {
+ return Status.FAILED.equals(status) || Status.KILLED.equals(status) || hasSucceeded(status);
+ }
+
+ private boolean hasSucceeded(Status status) {
+ return Status.SUCCEEDED.equals(status);
}
@Override
@@ -244,8 +226,7 @@ public class SlaChecker implements ConditionChecker {
}
@Override
- public void reset() {
- }
+ public void reset() { }
@Override
public String getId() {
@@ -262,27 +243,23 @@ public class SlaChecker implements ConditionChecker {
return createFromJson(obj);
}
- @SuppressWarnings("unchecked")
public static SlaChecker createFromJson(Object obj) throws Exception {
- return createFromJson((HashMap<String, Object>) obj);
+ return createFromJson((Map<String, Object>) obj);
}
- public static SlaChecker createFromJson(HashMap<String, Object> obj)
- throws Exception {
- Map<String, Object> jsonObj = (HashMap<String, Object>) obj;
- if (!jsonObj.get("type").equals(type)) {
- throw new Exception("Cannot create checker of " + type + " from "
- + jsonObj.get("type"));
+ public static SlaChecker createFromJson(Map<String, Object> obj) throws Exception {
+ if (!obj.get("type").equals(type)) {
+ throw new Exception("Cannot create checker of " + type + " from " + obj.get("type"));
}
- String id = (String) jsonObj.get("id");
- SlaOption slaOption = SlaOption.fromObject(jsonObj.get("slaOption"));
- int execId = Integer.valueOf((String) jsonObj.get("execId"));
+ String id = (String) obj.get("id");
+ SlaOption slaOption = SlaOption.fromObject(obj.get("slaOption"));
+ int execId = Integer.valueOf((String) obj.get("execId"));
return new SlaChecker(id, slaOption, execId);
}
@Override
public Object toJson() {
- Map<String, Object> jsonObj = new HashMap<String, Object>();
+ Map<String, Object> jsonObj = new HashMap<>();
jsonObj.put("type", type);
jsonObj.put("id", id);
jsonObj.put("slaOption", slaOption.toObject());
@@ -292,42 +269,13 @@ public class SlaChecker implements ConditionChecker {
}
@Override
- public void stopChecker() {
-
- }
+ public void stopChecker() { }
@Override
- public void setContext(Map<String, Object> context) {
- }
+ public void setContext(Map<String, Object> context) { }
@Override
public long getNextCheckTime() {
- return checkTime;
- }
-
- private boolean isFlowFinished(Status status) {
- if (status.equals(Status.FAILED) || status.equals(Status.KILLED)
- || status.equals(Status.SUCCEEDED)) {
- return Boolean.TRUE;
- } else {
- return Boolean.FALSE;
- }
- }
-
- private boolean isFlowSucceeded(Status status) {
- return status.equals(Status.SUCCEEDED);
- }
-
- private boolean isJobFinished(Status status) {
- if (status.equals(Status.FAILED) || status.equals(Status.KILLED)
- || status.equals(Status.SUCCEEDED)) {
- return Boolean.TRUE;
- } else {
- return Boolean.FALSE;
- }
- }
-
- private boolean isJobSucceeded(Status status) {
- return status.equals(Status.SUCCEEDED);
+ return slaExpireTime;
}
}
diff --git a/azkaban-common/src/test/java/azkaban/trigger/builtin/SlaCheckerTest.java b/azkaban-common/src/test/java/azkaban/trigger/builtin/SlaCheckerTest.java
new file mode 100644
index 0000000..a2f2e69
--- /dev/null
+++ b/azkaban-common/src/test/java/azkaban/trigger/builtin/SlaCheckerTest.java
@@ -0,0 +1,215 @@
+package azkaban.trigger.builtin;
+
+import azkaban.executor.ExecutableFlow;
+import azkaban.executor.ExecutableNode;
+import azkaban.executor.ExecutorManagerAdapter;
+import azkaban.executor.ExecutorManagerException;
+import azkaban.executor.Status;
+import azkaban.sla.SlaOption;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+import org.joda.time.DateTime;
+import org.junit.Before;
+import org.junit.Test;
+
+import static azkaban.sla.SlaOption.TYPE_FLOW_FINISH;
+import static azkaban.sla.SlaOption.TYPE_FLOW_SUCCEED;
+import static azkaban.sla.SlaOption.TYPE_JOB_SUCCEED;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+
+public class SlaCheckerTest {
+ private final int RUNNING_FLOW = 1;
+ private final int SUCCEEDED_FLOW = 2;
+ private final int KILLED_FLOW = 3;
+ private final int FAILED_FLOW = 4;
+
+ private ExecutorManagerAdapter executorManager;
+
+ @Before
+ public void setUp() throws Exception {
+ executorManager = mock(ExecutorManagerAdapter.class);
+ SlaChecker.setExecutorManager(executorManager);
+
+ addMockFlow(RUNNING_FLOW, Status.RUNNING);
+ addMockFlow(SUCCEEDED_FLOW, Status.SUCCEEDED);
+ addMockFlow(KILLED_FLOW, Status.KILLED);
+ addMockFlow(FAILED_FLOW, Status.FAILED);
+ }
+
+ private ExecutableFlow addMockFlow(int execId, Status status) throws ExecutorManagerException {
+ ExecutableFlow flow = mock(ExecutableFlow.class);
+ when(flow.getStartTime()).thenReturn(DateTime.now().getMillis());
+ when(flow.getStatus()).thenReturn(status);
+
+ when(executorManager.getExecutableFlow(execId)).thenReturn(flow);
+
+ return flow;
+ }
+
+ private static SlaOption createSlaOption(String type, String duration) {
+ Map<String, Object> info = new HashMap<>();
+ info.put(SlaOption.INFO_DURATION, duration);
+ return new SlaOption(type, Collections.emptyList(), info);
+ }
+
+ private static SlaChecker createSlaChecker(String type, String duration, int execId) {
+ SlaOption slaOption = createSlaOption(type, duration);
+ return new SlaChecker("MockFlowSlaChecker", slaOption, execId);
+ }
+
+ @Test
+ public void testFlowRunningExpired() throws Exception {
+ SlaChecker slaChecker = createSlaChecker(TYPE_FLOW_FINISH, "0s", RUNNING_FLOW);
+
+ assertFalse((Boolean) slaChecker.isSlaPassed());
+ assertTrue((Boolean) slaChecker.isSlaFailed());
+ }
+
+ @Test
+ public void testFlowRunningNotExpired() throws Exception {
+ SlaChecker slaChecker = createSlaChecker(TYPE_FLOW_FINISH, "1d", RUNNING_FLOW);
+
+ assertFalse((Boolean) slaChecker.isSlaPassed());
+ assertFalse((Boolean) slaChecker.isSlaFailed());
+ }
+
+ @Test
+ public void testFlowRunning() throws Exception {
+ final int execId = 100;
+ ExecutableFlow flow = addMockFlow(execId, Status.RUNNING);
+ SlaChecker slaChecker = createSlaChecker(TYPE_FLOW_FINISH, "2s", execId);
+
+ // Flow hasn't reached terminal state
+ assertFalse((Boolean) slaChecker.isSlaPassed());
+ // But SLA is still valid. returns false => trigger is not activated.
+ assertFalse((Boolean) slaChecker.isSlaFailed());
+
+ Thread.sleep(2000);
+
+ // Flow still hasn't reached terminal state
+ assertFalse((Boolean) slaChecker.isSlaPassed());
+ // SLA has been missed
+ assertTrue((Boolean) slaChecker.isSlaFailed());
+
+ when(flow.getStatus()).thenReturn(Status.SUCCEEDED);
+
+ // Terminal state reached
+ assertTrue((Boolean) slaChecker.isSlaPassed());
+ // SLA has not been violated
+ assertFalse((Boolean) slaChecker.isSlaFailed());
+ }
+
+ @Test
+ public void testFlowSucceeded1() throws Exception {
+ SlaChecker slaChecker = createSlaChecker(TYPE_FLOW_SUCCEED, "0s", SUCCEEDED_FLOW);
+
+ assertTrue((Boolean) slaChecker.isSlaPassed());
+ assertFalse((Boolean) slaChecker.isSlaFailed());
+ }
+
+ @Test
+ public void testFlowSucceeded2() throws Exception {
+ SlaChecker slaChecker = createSlaChecker(TYPE_FLOW_SUCCEED, "1d", SUCCEEDED_FLOW);
+
+ assertTrue((Boolean) slaChecker.isSlaPassed());
+ assertFalse((Boolean) slaChecker.isSlaFailed());
+ }
+
+ @Test
+ public void testFlowKilledBeforeSla() throws Exception {
+ SlaChecker slaChecker = createSlaChecker(TYPE_FLOW_SUCCEED, "1d", KILLED_FLOW);
+
+ assertFalse((Boolean) slaChecker.isSlaPassed());
+ assertTrue((Boolean) slaChecker.isSlaFailed());
+ }
+
+ @Test
+ public void testFlowFailedBeforeSla() throws Exception {
+ SlaChecker slaChecker = createSlaChecker(TYPE_FLOW_SUCCEED, "1d", FAILED_FLOW);
+
+ assertFalse((Boolean) slaChecker.isSlaPassed());
+ assertTrue((Boolean) slaChecker.isSlaFailed());
+ }
+
+ @Test
+ public void testFlowFailedAfterSla() throws Exception {
+ SlaChecker slaChecker = createSlaChecker(TYPE_FLOW_SUCCEED, "0s", FAILED_FLOW);
+
+ assertFalse((Boolean) slaChecker.isSlaPassed());
+ assertTrue((Boolean) slaChecker.isSlaFailed());
+ }
+
+ @Test
+ public void testFlowKilledAfterSla() throws Exception {
+ SlaChecker slaChecker = createSlaChecker(TYPE_FLOW_SUCCEED, "0s", KILLED_FLOW);
+
+ assertFalse((Boolean) slaChecker.isSlaPassed());
+ assertTrue((Boolean) slaChecker.isSlaFailed());
+ }
+
+ @Test
+ public void testFlowKilledBeforeSlaFinish() throws Exception {
+ SlaChecker slaChecker = createSlaChecker(TYPE_FLOW_FINISH, "1d", KILLED_FLOW);
+
+ assertTrue((Boolean) slaChecker.isSlaPassed());
+ assertFalse((Boolean) slaChecker.isSlaFailed());
+ }
+
+ @Test
+ public void testFlowFailedAfterSlaFinish() throws Exception {
+ SlaChecker slaChecker = createSlaChecker(TYPE_FLOW_FINISH, "0s", FAILED_FLOW);
+
+ assertTrue((Boolean) slaChecker.isSlaPassed());
+ assertFalse((Boolean) slaChecker.isSlaFailed());
+ }
+
+ @Test
+ public void testJobSucceedSla() throws Exception {
+ final int execId = 100;
+ final String mockJobName = "mockJob";
+
+ Map<String, Object> info = new HashMap<>();
+ info.put(SlaOption.INFO_DURATION, "2s");
+ info.put(SlaOption.INFO_JOB_NAME, mockJobName);
+ SlaOption slaOption = new SlaOption(TYPE_JOB_SUCCEED, Collections.emptyList(), info);
+
+ ExecutableNode mockJob = mock(ExecutableNode.class);
+ when(mockJob.getStartTime()).thenReturn(DateTime.now().getMillis());
+ when(mockJob.getStatus()).thenReturn(Status.RUNNING);
+
+ ExecutableFlow flow = addMockFlow(execId, Status.RUNNING);
+ when(flow.getExecutableNode(mockJobName)).thenReturn(mockJob);
+
+ SlaChecker slaChecker = new SlaChecker("MockJobSlaChecker", slaOption, execId);
+
+ // Flow hasn't reached terminal state
+ assertFalse((Boolean) slaChecker.isSlaPassed());
+ // But SLA is still valid. returns false => trigger is not activated.
+ assertFalse((Boolean) slaChecker.isSlaFailed());
+
+ Thread.sleep(2000);
+
+ // Flow still hasn't reached terminal state
+ assertFalse((Boolean) slaChecker.isSlaPassed());
+ // SLA has been missed
+ assertTrue((Boolean) slaChecker.isSlaFailed());
+
+ when(mockJob.getStatus()).thenReturn(Status.SUCCEEDED);
+ // Terminal state reached
+ assertTrue((Boolean) slaChecker.isSlaPassed());
+ // SLA has not been violated
+ assertFalse((Boolean) slaChecker.isSlaFailed());
+
+ when(mockJob.getStatus()).thenReturn(Status.KILLED);
+ // Terminal state reached
+ assertFalse((Boolean) slaChecker.isSlaPassed());
+ // SLA has not been violated
+ assertTrue((Boolean) slaChecker.isSlaFailed());
+ }
+
+}
\ No newline at end of file
build.gradle 2(+1 -1)
diff --git a/build.gradle b/build.gradle
index f84422b..e713019 100644
--- a/build.gradle
+++ b/build.gradle
@@ -96,7 +96,7 @@ subprojects {
* Gradle wrapper task.
*/
task wrapper(type: Wrapper) {
- gradleVersion = '2.7'
+ gradleVersion = '2.13'
}
idea {
diff --git a/gradle/wrapper/gradle-wrapper.properties b/gradle/wrapper/gradle-wrapper.properties
index 54e90fc..015050a 100644
--- a/gradle/wrapper/gradle-wrapper.properties
+++ b/gradle/wrapper/gradle-wrapper.properties
@@ -1,6 +1,6 @@
-#Sat May 28 17:40:49 PDT 2016
+#Thu Jan 05 19:08:47 PST 2017
distributionBase=GRADLE_USER_HOME
distributionPath=wrapper/dists
zipStoreBase=GRADLE_USER_HOME
zipStorePath=wrapper/dists
-distributionUrl=https\://services.gradle.org/distributions/gradle-2.7-all.zip
+distributionUrl=https\://services.gradle.org/distributions/gradle-2.13-bin.zip