azkaban-aplcache

Revert "Refactoring Azkaban SLA Trigger check logic" (#897) Reverts

2/3/2017 9:08:56 PM

Changes

azkaban-common/src/test/java/azkaban/trigger/builtin/SlaCheckerTest.java 215(+0 -215)

build.gradle 2(+1 -1)

Details

diff --git a/azkaban-common/src/main/java/azkaban/trigger/builtin/SlaChecker.java b/azkaban-common/src/main/java/azkaban/trigger/builtin/SlaChecker.java
index 3ec0ba5..27b9ba4 100644
--- a/azkaban-common/src/main/java/azkaban/trigger/builtin/SlaChecker.java
+++ b/azkaban-common/src/main/java/azkaban/trigger/builtin/SlaChecker.java
@@ -32,192 +32,210 @@ import azkaban.sla.SlaOption;
 import azkaban.trigger.ConditionChecker;
 import azkaban.utils.Utils;
 
-import static java.util.Objects.requireNonNull;
-
-
 public class SlaChecker implements ConditionChecker {
-  public static final String type = "SlaChecker";
-  enum NodeType { FLOW, JOB }
-  enum ConditionType { FINISH, SUCCEED }
 
   private static final Logger logger = Logger.getLogger(SlaChecker.class);
-  private static ExecutorManagerAdapter executorManager;
+  public static final String type = "SlaChecker";
 
-  private final String id;
-  private final SlaOption slaOption;
-  private final int execId;
-
-  private ExecutableFlow flow = null;
-  private ExecutableNode job = null;
-  private NodeType nodeType;
-  private ConditionType conditionType;
-  private long startTime = -1;
-  private long slaExpireTime = -1;
-
-  /**
-   * TODO remove static linking
-   * @param executorManagerAdapter
-   */
-  public static void setExecutorManager(ExecutorManagerAdapter executorManagerAdapter) {
-    executorManager = executorManagerAdapter;
-  }
+  private String id;
+  private SlaOption slaOption;
+  private int execId;
+  private long checkTime = -1;
+
+  private static ExecutorManagerAdapter executorManager;
 
   public SlaChecker(String id, SlaOption slaOption, int execId) {
     this.id = id;
     this.slaOption = slaOption;
     this.execId = execId;
-
-    retrieveSlaTypeProperties();
   }
 
-  private void retrieveSlaTypeProperties() {
-    switch (slaOption.getType()) {
-      case SlaOption.TYPE_FLOW_FINISH:
-        nodeType = NodeType.FLOW;
-        conditionType = ConditionType.FINISH;
-        break;
-      case SlaOption.TYPE_FLOW_SUCCEED:
-        nodeType = NodeType.FLOW;
-        conditionType = ConditionType.SUCCEED;
-        break;
-      case SlaOption.TYPE_JOB_FINISH:
-        nodeType = NodeType.JOB;
-        conditionType = ConditionType.FINISH;
-        break;
-      case SlaOption.TYPE_JOB_SUCCEED:
-        nodeType = NodeType.JOB;
-        conditionType = ConditionType.SUCCEED;
-        break;
-    }
+  public static void setExecutorManager(ExecutorManagerAdapter em) {
+    executorManager = em;
   }
 
-  private long getStartTime() {
-    switch (nodeType) {
-      case FLOW: return flow.getStartTime();
-      case JOB: return job.getStartTime();
+  private Boolean isSlaMissed(ExecutableFlow flow) {
+    String type = slaOption.getType();
+    if (flow.getStartTime() < 0) {
+      return Boolean.FALSE;
     }
-    return -1;
-  }
-
-  private Status getStatus() {
-    switch (nodeType) {
-      case FLOW: return flow.getStatus();
-      case JOB: return job.getStatus();
+    Status status;
+    if (type.equals(SlaOption.TYPE_FLOW_FINISH)) {
+      if (checkTime < flow.getStartTime()) {
+        ReadablePeriod duration =
+            Utils.parsePeriodString((String) slaOption.getInfo().get(
+                SlaOption.INFO_DURATION));
+        DateTime startTime = new DateTime(flow.getStartTime());
+        DateTime nextCheckTime = startTime.plus(duration);
+        this.checkTime = nextCheckTime.getMillis();
+      }
+      status = flow.getStatus();
+      if (checkTime < DateTime.now().getMillis()) {
+        return !isFlowFinished(status);
+      }
+    } else if (type.equals(SlaOption.TYPE_FLOW_SUCCEED)) {
+      if (checkTime < flow.getStartTime()) {
+        ReadablePeriod duration =
+            Utils.parsePeriodString((String) slaOption.getInfo().get(
+                SlaOption.INFO_DURATION));
+        DateTime startTime = new DateTime(flow.getStartTime());
+        DateTime nextCheckTime = startTime.plus(duration);
+        this.checkTime = nextCheckTime.getMillis();
+      }
+      status = flow.getStatus();
+      if (checkTime < DateTime.now().getMillis()) {
+        return !isFlowSucceeded(status);
+      } else {
+        return status.equals(Status.FAILED) || status.equals(Status.KILLED);
+      }
+    } else if (type.equals(SlaOption.TYPE_JOB_FINISH)) {
+      String jobName =
+          (String) slaOption.getInfo().get(SlaOption.INFO_JOB_NAME);
+      ExecutableNode node = flow.getExecutableNode(jobName);
+      if (node.getStartTime() < 0) {
+        return Boolean.FALSE;
+      }
+      if (checkTime < node.getStartTime()) {
+        ReadablePeriod duration =
+            Utils.parsePeriodString((String) slaOption.getInfo().get(
+                SlaOption.INFO_DURATION));
+        DateTime startTime = new DateTime(node.getStartTime());
+        DateTime nextCheckTime = startTime.plus(duration);
+        this.checkTime = nextCheckTime.getMillis();
+      }
+      status = node.getStatus();
+      if (checkTime < DateTime.now().getMillis()) {
+        return !isJobFinished(status);
+      }
+    } else if (type.equals(SlaOption.TYPE_JOB_SUCCEED)) {
+      String jobName =
+          (String) slaOption.getInfo().get(SlaOption.INFO_JOB_NAME);
+      ExecutableNode node = flow.getExecutableNode(jobName);
+      if (node.getStartTime() < 0) {
+        return Boolean.FALSE;
+      }
+      if (checkTime < node.getStartTime()) {
+        ReadablePeriod duration =
+            Utils.parsePeriodString((String) slaOption.getInfo().get(
+                SlaOption.INFO_DURATION));
+        DateTime startTime = new DateTime(node.getStartTime());
+        DateTime nextCheckTime = startTime.plus(duration);
+        this.checkTime = nextCheckTime.getMillis();
+      }
+      status = node.getStatus();
+      if (checkTime < DateTime.now().getMillis()) {
+        return !isJobFinished(status);
+      } else {
+        return status.equals(Status.FAILED) || status.equals(Status.KILLED);
+      }
     }
-    return null;
+    return Boolean.FALSE;
   }
 
-  /**
-   * Initialize the instance.
-   *
-   * Initializing the derived members depends on ExecutorManager to be set.
-   * Also, initializing the start time is possible when the job has already started.
-   *
-   * @return false on caught exceptions else true.
-   */
-  private boolean init() {
-    try {
-      requireNonNull(executorManager);
-      if (flow == null) {
-        flow = executorManager.getExecutableFlow(execId);
-        if (NodeType.JOB == nodeType) {
-          String jobName = (String) slaOption.getInfo().get(SlaOption.INFO_JOB_NAME);
-          job = flow.getExecutableNode(jobName);
-        }
+  private Boolean isSlaGood(ExecutableFlow flow) {
+    String type = slaOption.getType();
+    if (flow.getStartTime() < 0) {
+      return Boolean.FALSE;
+    }
+    Status status;
+    if (type.equals(SlaOption.TYPE_FLOW_FINISH)) {
+      if (checkTime < flow.getStartTime()) {
+        ReadablePeriod duration =
+            Utils.parsePeriodString((String) slaOption.getInfo().get(
+                SlaOption.INFO_DURATION));
+        DateTime startTime = new DateTime(flow.getStartTime());
+        DateTime nextCheckTime = startTime.plus(duration);
+        this.checkTime = nextCheckTime.getMillis();
       }
-      // Start time needs to be initialized if unset
-      if (startTime < 0) {
-        long t = getStartTime();
-        if (t > 0) {
-          this.startTime = t;
-          ReadablePeriod duration = Utils.parsePeriodString((String) slaOption.getInfo().get(SlaOption.INFO_DURATION));
-          this.slaExpireTime = new DateTime(startTime).plus(duration).getMillis();
-        }
+      status = flow.getStatus();
+      return isFlowFinished(status);
+    } else if (type.equals(SlaOption.TYPE_FLOW_SUCCEED)) {
+      if (checkTime < flow.getStartTime()) {
+        ReadablePeriod duration =
+            Utils.parsePeriodString((String) slaOption.getInfo().get(
+                SlaOption.INFO_DURATION));
+        DateTime startTime = new DateTime(flow.getStartTime());
+        DateTime nextCheckTime = startTime.plus(duration);
+        this.checkTime = nextCheckTime.getMillis();
       }
-    } catch (ExecutorManagerException e) {
-      // Log errors but don't crash
-      logger.error("Can't get executable flow.", e);
-      return false;
+      status = flow.getStatus();
+      return isFlowSucceeded(status);
+    } else if (type.equals(SlaOption.TYPE_JOB_FINISH)) {
+      String jobName =
+          (String) slaOption.getInfo().get(SlaOption.INFO_JOB_NAME);
+      ExecutableNode node = flow.getExecutableNode(jobName);
+      if (node.getStartTime() < 0) {
+        return Boolean.FALSE;
+      }
+      if (checkTime < node.getStartTime()) {
+        ReadablePeriod duration =
+            Utils.parsePeriodString((String) slaOption.getInfo().get(
+                SlaOption.INFO_DURATION));
+        DateTime startTime = new DateTime(node.getStartTime());
+        DateTime nextCheckTime = startTime.plus(duration);
+        this.checkTime = nextCheckTime.getMillis();
+      }
+      status = node.getStatus();
+      return isJobFinished(status);
+    } else if (type.equals(SlaOption.TYPE_JOB_SUCCEED)) {
+      String jobName =
+          (String) slaOption.getInfo().get(SlaOption.INFO_JOB_NAME);
+      ExecutableNode node = flow.getExecutableNode(jobName);
+      if (node.getStartTime() < 0) {
+        return Boolean.FALSE;
+      }
+      if (checkTime < node.getStartTime()) {
+        ReadablePeriod duration =
+            Utils.parsePeriodString((String) slaOption.getInfo().get(
+                SlaOption.INFO_DURATION));
+        DateTime startTime = new DateTime(node.getStartTime());
+        DateTime nextCheckTime = startTime.plus(duration);
+        this.checkTime = nextCheckTime.getMillis();
+      }
+      status = node.getStatus();
+      return isJobSucceeded(status);
     }
-    return true;
-  }
-
-  private boolean hasSlaExpired() {
-    return slaExpireTime < DateTime.now().getMillis();
+    return Boolean.FALSE;
   }
 
   // return true to trigger sla action
   @Override
   public Object eval() {
     logger.info("Checking sla for execution " + execId);
-    return isSlaFailed();
+    ExecutableFlow flow;
+    try {
+      flow = executorManager.getExecutableFlow(execId);
+    } catch (ExecutorManagerException e) {
+      logger.error("Can't get executable flow.", e);
+      e.printStackTrace();
+      // something wrong, send out alerts
+      return Boolean.TRUE;
+    }
+    return isSlaMissed(flow);
   }
 
-  /**
-   * TODO Remove convoluted logic. Investigate the need for 2 separate methods for pass and fail test
-   * DO NOT DELETE METHOD. This is used by some weird reflection code using JEXL expression
-   *
-   * @return
-   */
-  @SuppressWarnings("unused")
   public Object isSlaFailed() {
-    if (!init()) {
-      // Don't trigger SLA actions if unable to init
-      return false;
-    }
-    if (startTime < 0) {
-      return false;
+    ExecutableFlow flow;
+    try {
+      flow = executorManager.getExecutableFlow(execId);
+    } catch (ExecutorManagerException e) {
+      logger.error("Can't get executable flow.", e);
+      // something wrong, send out alerts
+      return Boolean.TRUE;
     }
-
-    /*
-     * Fire the trigger (return true) if the desired terminal state isn't reached when
-     *  - the SLA time has already expired, OR
-     *  - the flow / job has already reached a terminal state.
-     */
-    Status status = getStatus();
-    return (isTerminalState(status) || hasSlaExpired()) && !hasReachedDesiredTerminalState();
+    return isSlaMissed(flow);
   }
 
-  /**
-   * TODO Remove convoluted logic. Investigate the need for 2 separate methods for pass and fail test
-   * DO NOT DELETE METHOD. This is used by some weird reflection code using JEXL expression
-   *
-   * @return
-   */
-  @SuppressWarnings("unused")
   public Object isSlaPassed() {
-    if (!init()) {
-      return true;
-    }
-    if (startTime < 0) {
-      return false;
-    }
-    return hasReachedDesiredTerminalState();
-  }
-
-  private boolean hasReachedDesiredTerminalState() {
-    Status status = getStatus();
-    switch (conditionType) {
-      case FINISH:
-        return isTerminalState(status);
-      case SUCCEED:
-        return hasSucceeded(status);
+    ExecutableFlow flow;
+    try {
+      flow = executorManager.getExecutableFlow(execId);
+    } catch (ExecutorManagerException e) {
+      logger.error("Can't get executable flow.", e);
+      // something wrong, send out alerts
+      return Boolean.TRUE;
     }
-    return false;
-  }
-
-  /**
-   * Terminal States are final states post which there will not be any further changes in state.
-   *
-   * @param status status of flow / job
-   * @return true if status is a terminal state
-   */
-  private boolean isTerminalState(Status status) {
-    return Status.FAILED.equals(status) || Status.KILLED.equals(status) || hasSucceeded(status);
-  }
-
-  private boolean hasSucceeded(Status status) {
-    return Status.SUCCEEDED.equals(status);
+    return isSlaGood(flow);
   }
 
   @Override
@@ -226,7 +244,8 @@ public class SlaChecker implements ConditionChecker {
   }
 
   @Override
-  public void reset() { }
+  public void reset() {
+  }
 
   @Override
   public String getId() {
@@ -243,23 +262,27 @@ public class SlaChecker implements ConditionChecker {
     return createFromJson(obj);
   }
 
+  @SuppressWarnings("unchecked")
   public static SlaChecker createFromJson(Object obj) throws Exception {
-    return createFromJson((Map<String, Object>) obj);
+    return createFromJson((HashMap<String, Object>) obj);
   }
 
-  public static SlaChecker createFromJson(Map<String, Object> obj) throws Exception {
-    if (!obj.get("type").equals(type)) {
-      throw new Exception("Cannot create checker of " + type + " from " + obj.get("type"));
+  public static SlaChecker createFromJson(HashMap<String, Object> obj)
+      throws Exception {
+    Map<String, Object> jsonObj = (HashMap<String, Object>) obj;
+    if (!jsonObj.get("type").equals(type)) {
+      throw new Exception("Cannot create checker of " + type + " from "
+          + jsonObj.get("type"));
     }
-    String id = (String) obj.get("id");
-    SlaOption slaOption = SlaOption.fromObject(obj.get("slaOption"));
-    int execId = Integer.valueOf((String) obj.get("execId"));
+    String id = (String) jsonObj.get("id");
+    SlaOption slaOption = SlaOption.fromObject(jsonObj.get("slaOption"));
+    int execId = Integer.valueOf((String) jsonObj.get("execId"));
     return new SlaChecker(id, slaOption, execId);
   }
 
   @Override
   public Object toJson() {
-    Map<String, Object> jsonObj = new HashMap<>();
+    Map<String, Object> jsonObj = new HashMap<String, Object>();
     jsonObj.put("type", type);
     jsonObj.put("id", id);
     jsonObj.put("slaOption", slaOption.toObject());
@@ -269,13 +292,42 @@ public class SlaChecker implements ConditionChecker {
   }
 
   @Override
-  public void stopChecker() { }
+  public void stopChecker() {
+
+  }
 
   @Override
-  public void setContext(Map<String, Object> context) { }
+  public void setContext(Map<String, Object> context) {
+  }
 
   @Override
   public long getNextCheckTime() {
-    return slaExpireTime;
+    return checkTime;
+  }
+
+  private boolean isFlowFinished(Status status) {
+    if (status.equals(Status.FAILED) || status.equals(Status.KILLED)
+        || status.equals(Status.SUCCEEDED)) {
+      return Boolean.TRUE;
+    } else {
+      return Boolean.FALSE;
+    }
+  }
+
+  private boolean isFlowSucceeded(Status status) {
+    return status.equals(Status.SUCCEEDED);
+  }
+
+  private boolean isJobFinished(Status status) {
+    if (status.equals(Status.FAILED) || status.equals(Status.KILLED)
+        || status.equals(Status.SUCCEEDED)) {
+      return Boolean.TRUE;
+    } else {
+      return Boolean.FALSE;
+    }
+  }
+
+  private boolean isJobSucceeded(Status status) {
+    return status.equals(Status.SUCCEEDED);
   }
 }

build.gradle 2(+1 -1)

diff --git a/build.gradle b/build.gradle
index e713019..f84422b 100644
--- a/build.gradle
+++ b/build.gradle
@@ -96,7 +96,7 @@ subprojects {
  * Gradle wrapper task.
  */
 task wrapper(type: Wrapper) {
-  gradleVersion = '2.13'
+  gradleVersion = '2.7'
 }
 
 idea {
diff --git a/gradle/wrapper/gradle-wrapper.properties b/gradle/wrapper/gradle-wrapper.properties
index 015050a..54e90fc 100644
--- a/gradle/wrapper/gradle-wrapper.properties
+++ b/gradle/wrapper/gradle-wrapper.properties
@@ -1,6 +1,6 @@
-#Thu Jan 05 19:08:47 PST 2017
+#Sat May 28 17:40:49 PDT 2016
 distributionBase=GRADLE_USER_HOME
 distributionPath=wrapper/dists
 zipStoreBase=GRADLE_USER_HOME
 zipStorePath=wrapper/dists
-distributionUrl=https\://services.gradle.org/distributions/gradle-2.13-bin.zip
+distributionUrl=https\://services.gradle.org/distributions/gradle-2.7-all.zip