azkaban-aplcache

Refactoring Azkaban SLA Trigger check logic (#869) Context:

1/9/2017 9:29:20 PM

Details

diff --git a/azkaban-common/src/main/java/azkaban/trigger/builtin/SlaChecker.java b/azkaban-common/src/main/java/azkaban/trigger/builtin/SlaChecker.java
index 27b9ba4..3ec0ba5 100644
--- a/azkaban-common/src/main/java/azkaban/trigger/builtin/SlaChecker.java
+++ b/azkaban-common/src/main/java/azkaban/trigger/builtin/SlaChecker.java
@@ -32,210 +32,192 @@ import azkaban.sla.SlaOption;
 import azkaban.trigger.ConditionChecker;
 import azkaban.utils.Utils;
 
-public class SlaChecker implements ConditionChecker {
+import static java.util.Objects.requireNonNull;
 
-  private static final Logger logger = Logger.getLogger(SlaChecker.class);
-  public static final String type = "SlaChecker";
 
-  private String id;
-  private SlaOption slaOption;
-  private int execId;
-  private long checkTime = -1;
+public class SlaChecker implements ConditionChecker {
+  public static final String type = "SlaChecker";
+  enum NodeType { FLOW, JOB }
+  enum ConditionType { FINISH, SUCCEED }
 
+  private static final Logger logger = Logger.getLogger(SlaChecker.class);
   private static ExecutorManagerAdapter executorManager;
 
+  private final String id;
+  private final SlaOption slaOption;
+  private final int execId;
+
+  private ExecutableFlow flow = null;
+  private ExecutableNode job = null;
+  private NodeType nodeType;
+  private ConditionType conditionType;
+  private long startTime = -1;
+  private long slaExpireTime = -1;
+
+  /**
+   * TODO remove static linking
+   * @param executorManagerAdapter
+   */
+  public static void setExecutorManager(ExecutorManagerAdapter executorManagerAdapter) {
+    executorManager = executorManagerAdapter;
+  }
+
   public SlaChecker(String id, SlaOption slaOption, int execId) {
     this.id = id;
     this.slaOption = slaOption;
     this.execId = execId;
-  }
 
-  public static void setExecutorManager(ExecutorManagerAdapter em) {
-    executorManager = em;
+    retrieveSlaTypeProperties();
   }
 
-  private Boolean isSlaMissed(ExecutableFlow flow) {
-    String type = slaOption.getType();
-    if (flow.getStartTime() < 0) {
-      return Boolean.FALSE;
+  private void retrieveSlaTypeProperties() {
+    switch (slaOption.getType()) {
+      case SlaOption.TYPE_FLOW_FINISH:
+        nodeType = NodeType.FLOW;
+        conditionType = ConditionType.FINISH;
+        break;
+      case SlaOption.TYPE_FLOW_SUCCEED:
+        nodeType = NodeType.FLOW;
+        conditionType = ConditionType.SUCCEED;
+        break;
+      case SlaOption.TYPE_JOB_FINISH:
+        nodeType = NodeType.JOB;
+        conditionType = ConditionType.FINISH;
+        break;
+      case SlaOption.TYPE_JOB_SUCCEED:
+        nodeType = NodeType.JOB;
+        conditionType = ConditionType.SUCCEED;
+        break;
     }
-    Status status;
-    if (type.equals(SlaOption.TYPE_FLOW_FINISH)) {
-      if (checkTime < flow.getStartTime()) {
-        ReadablePeriod duration =
-            Utils.parsePeriodString((String) slaOption.getInfo().get(
-                SlaOption.INFO_DURATION));
-        DateTime startTime = new DateTime(flow.getStartTime());
-        DateTime nextCheckTime = startTime.plus(duration);
-        this.checkTime = nextCheckTime.getMillis();
-      }
-      status = flow.getStatus();
-      if (checkTime < DateTime.now().getMillis()) {
-        return !isFlowFinished(status);
-      }
-    } else if (type.equals(SlaOption.TYPE_FLOW_SUCCEED)) {
-      if (checkTime < flow.getStartTime()) {
-        ReadablePeriod duration =
-            Utils.parsePeriodString((String) slaOption.getInfo().get(
-                SlaOption.INFO_DURATION));
-        DateTime startTime = new DateTime(flow.getStartTime());
-        DateTime nextCheckTime = startTime.plus(duration);
-        this.checkTime = nextCheckTime.getMillis();
-      }
-      status = flow.getStatus();
-      if (checkTime < DateTime.now().getMillis()) {
-        return !isFlowSucceeded(status);
-      } else {
-        return status.equals(Status.FAILED) || status.equals(Status.KILLED);
-      }
-    } else if (type.equals(SlaOption.TYPE_JOB_FINISH)) {
-      String jobName =
-          (String) slaOption.getInfo().get(SlaOption.INFO_JOB_NAME);
-      ExecutableNode node = flow.getExecutableNode(jobName);
-      if (node.getStartTime() < 0) {
-        return Boolean.FALSE;
-      }
-      if (checkTime < node.getStartTime()) {
-        ReadablePeriod duration =
-            Utils.parsePeriodString((String) slaOption.getInfo().get(
-                SlaOption.INFO_DURATION));
-        DateTime startTime = new DateTime(node.getStartTime());
-        DateTime nextCheckTime = startTime.plus(duration);
-        this.checkTime = nextCheckTime.getMillis();
-      }
-      status = node.getStatus();
-      if (checkTime < DateTime.now().getMillis()) {
-        return !isJobFinished(status);
-      }
-    } else if (type.equals(SlaOption.TYPE_JOB_SUCCEED)) {
-      String jobName =
-          (String) slaOption.getInfo().get(SlaOption.INFO_JOB_NAME);
-      ExecutableNode node = flow.getExecutableNode(jobName);
-      if (node.getStartTime() < 0) {
-        return Boolean.FALSE;
-      }
-      if (checkTime < node.getStartTime()) {
-        ReadablePeriod duration =
-            Utils.parsePeriodString((String) slaOption.getInfo().get(
-                SlaOption.INFO_DURATION));
-        DateTime startTime = new DateTime(node.getStartTime());
-        DateTime nextCheckTime = startTime.plus(duration);
-        this.checkTime = nextCheckTime.getMillis();
-      }
-      status = node.getStatus();
-      if (checkTime < DateTime.now().getMillis()) {
-        return !isJobFinished(status);
-      } else {
-        return status.equals(Status.FAILED) || status.equals(Status.KILLED);
-      }
+  }
+
+  private long getStartTime() {
+    switch (nodeType) {
+      case FLOW: return flow.getStartTime();
+      case JOB: return job.getStartTime();
     }
-    return Boolean.FALSE;
+    return -1;
   }
 
-  private Boolean isSlaGood(ExecutableFlow flow) {
-    String type = slaOption.getType();
-    if (flow.getStartTime() < 0) {
-      return Boolean.FALSE;
+  private Status getStatus() {
+    switch (nodeType) {
+      case FLOW: return flow.getStatus();
+      case JOB: return job.getStatus();
     }
-    Status status;
-    if (type.equals(SlaOption.TYPE_FLOW_FINISH)) {
-      if (checkTime < flow.getStartTime()) {
-        ReadablePeriod duration =
-            Utils.parsePeriodString((String) slaOption.getInfo().get(
-                SlaOption.INFO_DURATION));
-        DateTime startTime = new DateTime(flow.getStartTime());
-        DateTime nextCheckTime = startTime.plus(duration);
-        this.checkTime = nextCheckTime.getMillis();
-      }
-      status = flow.getStatus();
-      return isFlowFinished(status);
-    } else if (type.equals(SlaOption.TYPE_FLOW_SUCCEED)) {
-      if (checkTime < flow.getStartTime()) {
-        ReadablePeriod duration =
-            Utils.parsePeriodString((String) slaOption.getInfo().get(
-                SlaOption.INFO_DURATION));
-        DateTime startTime = new DateTime(flow.getStartTime());
-        DateTime nextCheckTime = startTime.plus(duration);
-        this.checkTime = nextCheckTime.getMillis();
-      }
-      status = flow.getStatus();
-      return isFlowSucceeded(status);
-    } else if (type.equals(SlaOption.TYPE_JOB_FINISH)) {
-      String jobName =
-          (String) slaOption.getInfo().get(SlaOption.INFO_JOB_NAME);
-      ExecutableNode node = flow.getExecutableNode(jobName);
-      if (node.getStartTime() < 0) {
-        return Boolean.FALSE;
-      }
-      if (checkTime < node.getStartTime()) {
-        ReadablePeriod duration =
-            Utils.parsePeriodString((String) slaOption.getInfo().get(
-                SlaOption.INFO_DURATION));
-        DateTime startTime = new DateTime(node.getStartTime());
-        DateTime nextCheckTime = startTime.plus(duration);
-        this.checkTime = nextCheckTime.getMillis();
-      }
-      status = node.getStatus();
-      return isJobFinished(status);
-    } else if (type.equals(SlaOption.TYPE_JOB_SUCCEED)) {
-      String jobName =
-          (String) slaOption.getInfo().get(SlaOption.INFO_JOB_NAME);
-      ExecutableNode node = flow.getExecutableNode(jobName);
-      if (node.getStartTime() < 0) {
-        return Boolean.FALSE;
+    return null;
+  }
+
+  /**
+   * Initialize the instance.
+   *
+   * Initializing the derived members depends on ExecutorManager to be set.
+   * Also, initializing the start time is possible when the job has already started.
+   *
+   * @return false on caught exceptions else true.
+   */
+  private boolean init() {
+    try {
+      requireNonNull(executorManager);
+      if (flow == null) {
+        flow = executorManager.getExecutableFlow(execId);
+        if (NodeType.JOB == nodeType) {
+          String jobName = (String) slaOption.getInfo().get(SlaOption.INFO_JOB_NAME);
+          job = flow.getExecutableNode(jobName);
+        }
       }
-      if (checkTime < node.getStartTime()) {
-        ReadablePeriod duration =
-            Utils.parsePeriodString((String) slaOption.getInfo().get(
-                SlaOption.INFO_DURATION));
-        DateTime startTime = new DateTime(node.getStartTime());
-        DateTime nextCheckTime = startTime.plus(duration);
-        this.checkTime = nextCheckTime.getMillis();
+      // Start time needs to be initialized if unset
+      if (startTime < 0) {
+        long t = getStartTime();
+        if (t > 0) {
+          this.startTime = t;
+          ReadablePeriod duration = Utils.parsePeriodString((String) slaOption.getInfo().get(SlaOption.INFO_DURATION));
+          this.slaExpireTime = new DateTime(startTime).plus(duration).getMillis();
+        }
       }
-      status = node.getStatus();
-      return isJobSucceeded(status);
+    } catch (ExecutorManagerException e) {
+      // Log errors but don't crash
+      logger.error("Can't get executable flow.", e);
+      return false;
     }
-    return Boolean.FALSE;
+    return true;
+  }
+
+  private boolean hasSlaExpired() {
+    return slaExpireTime < DateTime.now().getMillis();
   }
 
   // return true to trigger sla action
   @Override
   public Object eval() {
     logger.info("Checking sla for execution " + execId);
-    ExecutableFlow flow;
-    try {
-      flow = executorManager.getExecutableFlow(execId);
-    } catch (ExecutorManagerException e) {
-      logger.error("Can't get executable flow.", e);
-      e.printStackTrace();
-      // something wrong, send out alerts
-      return Boolean.TRUE;
-    }
-    return isSlaMissed(flow);
+    return isSlaFailed();
   }
 
+  /**
+   * TODO Remove convoluted logic. Investigate the need for 2 separate methods for pass and fail test
+   * DO NOT DELETE METHOD. This is used by some weird reflection code using JEXL expression
+   *
+   * @return
+   */
+  @SuppressWarnings("unused")
   public Object isSlaFailed() {
-    ExecutableFlow flow;
-    try {
-      flow = executorManager.getExecutableFlow(execId);
-    } catch (ExecutorManagerException e) {
-      logger.error("Can't get executable flow.", e);
-      // something wrong, send out alerts
-      return Boolean.TRUE;
+    if (!init()) {
+      // Don't trigger SLA actions if unable to init
+      return false;
+    }
+    if (startTime < 0) {
+      return false;
     }
-    return isSlaMissed(flow);
+
+    /*
+     * Fire the trigger (return true) if the desired terminal state isn't reached when
+     *  - the SLA time has already expired, OR
+     *  - the flow / job has already reached a terminal state.
+     */
+    Status status = getStatus();
+    return (isTerminalState(status) || hasSlaExpired()) && !hasReachedDesiredTerminalState();
   }
 
+  /**
+   * TODO Remove convoluted logic. Investigate the need for 2 separate methods for pass and fail test
+   * DO NOT DELETE METHOD. This is used by some weird reflection code using JEXL expression
+   *
+   * @return
+   */
+  @SuppressWarnings("unused")
   public Object isSlaPassed() {
-    ExecutableFlow flow;
-    try {
-      flow = executorManager.getExecutableFlow(execId);
-    } catch (ExecutorManagerException e) {
-      logger.error("Can't get executable flow.", e);
-      // something wrong, send out alerts
-      return Boolean.TRUE;
+    if (!init()) {
+      return true;
+    }
+    if (startTime < 0) {
+      return false;
+    }
+    return hasReachedDesiredTerminalState();
+  }
+
+  private boolean hasReachedDesiredTerminalState() {
+    Status status = getStatus();
+    switch (conditionType) {
+      case FINISH:
+        return isTerminalState(status);
+      case SUCCEED:
+        return hasSucceeded(status);
     }
-    return isSlaGood(flow);
+    return false;
+  }
+
+  /**
+   * Terminal States are final states post which there will not be any further changes in state.
+   *
+   * @param status status of flow / job
+   * @return true if status is a terminal state
+   */
+  private boolean isTerminalState(Status status) {
+    return Status.FAILED.equals(status) || Status.KILLED.equals(status) || hasSucceeded(status);
+  }
+
+  private boolean hasSucceeded(Status status) {
+    return Status.SUCCEEDED.equals(status);
   }
 
   @Override
@@ -244,8 +226,7 @@ public class SlaChecker implements ConditionChecker {
   }
 
   @Override
-  public void reset() {
-  }
+  public void reset() { }
 
   @Override
   public String getId() {
@@ -262,27 +243,23 @@ public class SlaChecker implements ConditionChecker {
     return createFromJson(obj);
   }
 
-  @SuppressWarnings("unchecked")
   public static SlaChecker createFromJson(Object obj) throws Exception {
-    return createFromJson((HashMap<String, Object>) obj);
+    return createFromJson((Map<String, Object>) obj);
   }
 
-  public static SlaChecker createFromJson(HashMap<String, Object> obj)
-      throws Exception {
-    Map<String, Object> jsonObj = (HashMap<String, Object>) obj;
-    if (!jsonObj.get("type").equals(type)) {
-      throw new Exception("Cannot create checker of " + type + " from "
-          + jsonObj.get("type"));
+  public static SlaChecker createFromJson(Map<String, Object> obj) throws Exception {
+    if (!obj.get("type").equals(type)) {
+      throw new Exception("Cannot create checker of " + type + " from " + obj.get("type"));
     }
-    String id = (String) jsonObj.get("id");
-    SlaOption slaOption = SlaOption.fromObject(jsonObj.get("slaOption"));
-    int execId = Integer.valueOf((String) jsonObj.get("execId"));
+    String id = (String) obj.get("id");
+    SlaOption slaOption = SlaOption.fromObject(obj.get("slaOption"));
+    int execId = Integer.valueOf((String) obj.get("execId"));
     return new SlaChecker(id, slaOption, execId);
   }
 
   @Override
   public Object toJson() {
-    Map<String, Object> jsonObj = new HashMap<String, Object>();
+    Map<String, Object> jsonObj = new HashMap<>();
     jsonObj.put("type", type);
     jsonObj.put("id", id);
     jsonObj.put("slaOption", slaOption.toObject());
@@ -292,42 +269,13 @@ public class SlaChecker implements ConditionChecker {
   }
 
   @Override
-  public void stopChecker() {
-
-  }
+  public void stopChecker() { }
 
   @Override
-  public void setContext(Map<String, Object> context) {
-  }
+  public void setContext(Map<String, Object> context) { }
 
   @Override
   public long getNextCheckTime() {
-    return checkTime;
-  }
-
-  private boolean isFlowFinished(Status status) {
-    if (status.equals(Status.FAILED) || status.equals(Status.KILLED)
-        || status.equals(Status.SUCCEEDED)) {
-      return Boolean.TRUE;
-    } else {
-      return Boolean.FALSE;
-    }
-  }
-
-  private boolean isFlowSucceeded(Status status) {
-    return status.equals(Status.SUCCEEDED);
-  }
-
-  private boolean isJobFinished(Status status) {
-    if (status.equals(Status.FAILED) || status.equals(Status.KILLED)
-        || status.equals(Status.SUCCEEDED)) {
-      return Boolean.TRUE;
-    } else {
-      return Boolean.FALSE;
-    }
-  }
-
-  private boolean isJobSucceeded(Status status) {
-    return status.equals(Status.SUCCEEDED);
+    return slaExpireTime;
   }
 }
diff --git a/azkaban-common/src/test/java/azkaban/trigger/builtin/SlaCheckerTest.java b/azkaban-common/src/test/java/azkaban/trigger/builtin/SlaCheckerTest.java
new file mode 100644
index 0000000..a2f2e69
--- /dev/null
+++ b/azkaban-common/src/test/java/azkaban/trigger/builtin/SlaCheckerTest.java
@@ -0,0 +1,215 @@
+package azkaban.trigger.builtin;
+
+import azkaban.executor.ExecutableFlow;
+import azkaban.executor.ExecutableNode;
+import azkaban.executor.ExecutorManagerAdapter;
+import azkaban.executor.ExecutorManagerException;
+import azkaban.executor.Status;
+import azkaban.sla.SlaOption;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+import org.joda.time.DateTime;
+import org.junit.Before;
+import org.junit.Test;
+
+import static azkaban.sla.SlaOption.TYPE_FLOW_FINISH;
+import static azkaban.sla.SlaOption.TYPE_FLOW_SUCCEED;
+import static azkaban.sla.SlaOption.TYPE_JOB_SUCCEED;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+
+public class SlaCheckerTest {
+  private final int RUNNING_FLOW = 1;
+  private final int SUCCEEDED_FLOW = 2;
+  private final int KILLED_FLOW = 3;
+  private final int FAILED_FLOW = 4;
+
+  private ExecutorManagerAdapter executorManager;
+
+  @Before
+  public void setUp() throws Exception {
+    executorManager = mock(ExecutorManagerAdapter.class);
+    SlaChecker.setExecutorManager(executorManager);
+
+    addMockFlow(RUNNING_FLOW, Status.RUNNING);
+    addMockFlow(SUCCEEDED_FLOW, Status.SUCCEEDED);
+    addMockFlow(KILLED_FLOW, Status.KILLED);
+    addMockFlow(FAILED_FLOW, Status.FAILED);
+  }
+
+  private ExecutableFlow addMockFlow(int execId, Status status) throws ExecutorManagerException {
+    ExecutableFlow flow = mock(ExecutableFlow.class);
+    when(flow.getStartTime()).thenReturn(DateTime.now().getMillis());
+    when(flow.getStatus()).thenReturn(status);
+
+    when(executorManager.getExecutableFlow(execId)).thenReturn(flow);
+
+    return flow;
+  }
+
+  private static SlaOption createSlaOption(String type, String duration) {
+    Map<String, Object> info = new HashMap<>();
+    info.put(SlaOption.INFO_DURATION, duration);
+    return new SlaOption(type, Collections.emptyList(), info);
+  }
+
+  private static SlaChecker createSlaChecker(String type, String duration, int execId) {
+    SlaOption slaOption = createSlaOption(type, duration);
+    return new SlaChecker("MockFlowSlaChecker", slaOption, execId);
+  }
+
+  @Test
+  public void testFlowRunningExpired() throws Exception {
+    SlaChecker slaChecker = createSlaChecker(TYPE_FLOW_FINISH, "0s", RUNNING_FLOW);
+
+    assertFalse((Boolean) slaChecker.isSlaPassed());
+    assertTrue((Boolean) slaChecker.isSlaFailed());
+  }
+
+  @Test
+  public void testFlowRunningNotExpired() throws Exception {
+    SlaChecker slaChecker = createSlaChecker(TYPE_FLOW_FINISH, "1d", RUNNING_FLOW);
+
+    assertFalse((Boolean) slaChecker.isSlaPassed());
+    assertFalse((Boolean) slaChecker.isSlaFailed());
+  }
+
+  @Test
+  public void testFlowRunning() throws Exception {
+    final int execId = 100;
+    ExecutableFlow flow = addMockFlow(execId, Status.RUNNING);
+    SlaChecker slaChecker = createSlaChecker(TYPE_FLOW_FINISH, "2s", execId);
+
+    // Flow hasn't reached terminal state
+    assertFalse((Boolean) slaChecker.isSlaPassed());
+    // But SLA is still valid. returns false => trigger is not activated.
+    assertFalse((Boolean) slaChecker.isSlaFailed());
+
+    Thread.sleep(2000);
+
+    // Flow still hasn't reached terminal state
+    assertFalse((Boolean) slaChecker.isSlaPassed());
+    // SLA has been missed
+    assertTrue((Boolean) slaChecker.isSlaFailed());
+
+    when(flow.getStatus()).thenReturn(Status.SUCCEEDED);
+
+    // Terminal state reached
+    assertTrue((Boolean) slaChecker.isSlaPassed());
+    // SLA has not been violated
+    assertFalse((Boolean) slaChecker.isSlaFailed());
+  }
+
+  @Test
+  public void testFlowSucceeded1() throws Exception {
+    SlaChecker slaChecker = createSlaChecker(TYPE_FLOW_SUCCEED, "0s", SUCCEEDED_FLOW);
+
+    assertTrue((Boolean) slaChecker.isSlaPassed());
+    assertFalse((Boolean) slaChecker.isSlaFailed());
+  }
+
+  @Test
+  public void testFlowSucceeded2() throws Exception {
+    SlaChecker slaChecker = createSlaChecker(TYPE_FLOW_SUCCEED, "1d", SUCCEEDED_FLOW);
+
+    assertTrue((Boolean) slaChecker.isSlaPassed());
+    assertFalse((Boolean) slaChecker.isSlaFailed());
+  }
+
+  @Test
+  public void testFlowKilledBeforeSla() throws Exception {
+    SlaChecker slaChecker = createSlaChecker(TYPE_FLOW_SUCCEED, "1d", KILLED_FLOW);
+
+    assertFalse((Boolean) slaChecker.isSlaPassed());
+    assertTrue((Boolean) slaChecker.isSlaFailed());
+  }
+
+  @Test
+  public void testFlowFailedBeforeSla() throws Exception {
+    SlaChecker slaChecker = createSlaChecker(TYPE_FLOW_SUCCEED, "1d", FAILED_FLOW);
+
+    assertFalse((Boolean) slaChecker.isSlaPassed());
+    assertTrue((Boolean) slaChecker.isSlaFailed());
+  }
+
+  @Test
+  public void testFlowFailedAfterSla() throws Exception {
+    SlaChecker slaChecker = createSlaChecker(TYPE_FLOW_SUCCEED, "0s", FAILED_FLOW);
+
+    assertFalse((Boolean) slaChecker.isSlaPassed());
+    assertTrue((Boolean) slaChecker.isSlaFailed());
+  }
+
+  @Test
+  public void testFlowKilledAfterSla() throws Exception {
+    SlaChecker slaChecker = createSlaChecker(TYPE_FLOW_SUCCEED, "0s", KILLED_FLOW);
+
+    assertFalse((Boolean) slaChecker.isSlaPassed());
+    assertTrue((Boolean) slaChecker.isSlaFailed());
+  }
+
+  @Test
+  public void testFlowKilledBeforeSlaFinish() throws Exception {
+    SlaChecker slaChecker = createSlaChecker(TYPE_FLOW_FINISH, "1d", KILLED_FLOW);
+
+    assertTrue((Boolean) slaChecker.isSlaPassed());
+    assertFalse((Boolean) slaChecker.isSlaFailed());
+  }
+
+  @Test
+  public void testFlowFailedAfterSlaFinish() throws Exception {
+    SlaChecker slaChecker = createSlaChecker(TYPE_FLOW_FINISH, "0s", FAILED_FLOW);
+
+    assertTrue((Boolean) slaChecker.isSlaPassed());
+    assertFalse((Boolean) slaChecker.isSlaFailed());
+  }
+
+  @Test
+  public void testJobSucceedSla() throws Exception {
+    final int execId = 100;
+    final String mockJobName = "mockJob";
+
+    Map<String, Object> info = new HashMap<>();
+    info.put(SlaOption.INFO_DURATION, "2s");
+    info.put(SlaOption.INFO_JOB_NAME, mockJobName);
+    SlaOption slaOption = new SlaOption(TYPE_JOB_SUCCEED, Collections.emptyList(), info);
+
+    ExecutableNode mockJob = mock(ExecutableNode.class);
+    when(mockJob.getStartTime()).thenReturn(DateTime.now().getMillis());
+    when(mockJob.getStatus()).thenReturn(Status.RUNNING);
+
+    ExecutableFlow flow = addMockFlow(execId, Status.RUNNING);
+    when(flow.getExecutableNode(mockJobName)).thenReturn(mockJob);
+
+    SlaChecker slaChecker = new SlaChecker("MockJobSlaChecker", slaOption, execId);
+
+    // Flow hasn't reached terminal state
+    assertFalse((Boolean) slaChecker.isSlaPassed());
+    // But SLA is still valid. returns false => trigger is not activated.
+    assertFalse((Boolean) slaChecker.isSlaFailed());
+
+    Thread.sleep(2000);
+
+    // Flow still hasn't reached terminal state
+    assertFalse((Boolean) slaChecker.isSlaPassed());
+    // SLA has been missed
+    assertTrue((Boolean) slaChecker.isSlaFailed());
+
+    when(mockJob.getStatus()).thenReturn(Status.SUCCEEDED);
+    // Terminal state reached
+    assertTrue((Boolean) slaChecker.isSlaPassed());
+    // SLA has not been violated
+    assertFalse((Boolean) slaChecker.isSlaFailed());
+
+    when(mockJob.getStatus()).thenReturn(Status.KILLED);
+    // Terminal state reached
+    assertFalse((Boolean) slaChecker.isSlaPassed());
+    // SLA has not been violated
+    assertTrue((Boolean) slaChecker.isSlaFailed());
+  }
+
+}
\ No newline at end of file

build.gradle 2(+1 -1)

diff --git a/build.gradle b/build.gradle
index f84422b..e713019 100644
--- a/build.gradle
+++ b/build.gradle
@@ -96,7 +96,7 @@ subprojects {
  * Gradle wrapper task.
  */
 task wrapper(type: Wrapper) {
-  gradleVersion = '2.7'
+  gradleVersion = '2.13'
 }
 
 idea {
diff --git a/gradle/wrapper/gradle-wrapper.properties b/gradle/wrapper/gradle-wrapper.properties
index 54e90fc..015050a 100644
--- a/gradle/wrapper/gradle-wrapper.properties
+++ b/gradle/wrapper/gradle-wrapper.properties
@@ -1,6 +1,6 @@
-#Sat May 28 17:40:49 PDT 2016
+#Thu Jan 05 19:08:47 PST 2017
 distributionBase=GRADLE_USER_HOME
 distributionPath=wrapper/dists
 zipStoreBase=GRADLE_USER_HOME
 zipStorePath=wrapper/dists
-distributionUrl=https\://services.gradle.org/distributions/gradle-2.7-all.zip
+distributionUrl=https\://services.gradle.org/distributions/gradle-2.13-bin.zip