azkaban-aplcache

new Trigger and TriggerManager in executor (#1083) keep

5/21/2017 8:51:13 PM

Details

diff --git a/azkaban-common/src/main/java/azkaban/Constants.java b/azkaban-common/src/main/java/azkaban/Constants.java
index aea1499..4abad36 100644
--- a/azkaban-common/src/main/java/azkaban/Constants.java
+++ b/azkaban-common/src/main/java/azkaban/Constants.java
@@ -37,6 +37,8 @@ public class Constants {
 
   public static final String AZKABAN_SERVLET_CONTEXT_KEY = "azkaban_app";
 
+  // Internal username used to perform SLA action
+  public static final String AZKABAN_SLA_CHECKER_USERNAME = "azkaban_sla";
 
   // Memory check retry interval when OOM in ms
   public static final long MEMORY_CHECK_INTERVAL_MS = 1000 * 60 * 1;
diff --git a/azkaban-common/src/main/java/azkaban/executor/ExecutableFlow.java b/azkaban-common/src/main/java/azkaban/executor/ExecutableFlow.java
index 806106c..ea72c80 100644
--- a/azkaban-common/src/main/java/azkaban/executor/ExecutableFlow.java
+++ b/azkaban-common/src/main/java/azkaban/executor/ExecutableFlow.java
@@ -15,6 +15,7 @@
  */
 package azkaban.executor;
 
+import azkaban.sla.SlaOption;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.HashMap;
@@ -22,11 +23,13 @@ import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
+import java.util.stream.Collectors;
 
 import azkaban.flow.Flow;
 import azkaban.project.Project;
 import azkaban.utils.TypedMapWrapper;
 
+
 public class ExecutableFlow extends ExecutableFlowBase {
   public static final String EXECUTIONID_PARAM = "executionId";
   public static final String EXECUTIONPATH_PARAM = "executionPath";
@@ -40,6 +43,7 @@ public class ExecutableFlow extends ExecutableFlowBase {
   public static final String PROJECTNAME_PARAM = "projectName";
   public static final String LASTMODIFIEDTIME_PARAM = "lastModfiedTime";
   public static final String LASTMODIFIEDUSER_PARAM = "lastModifiedUser";
+  public static final String SLAOPTIONS_PARAM = "slaOptions";
 
 
   private int executionId = -1;
@@ -55,6 +59,7 @@ public class ExecutableFlow extends ExecutableFlowBase {
 
   private HashSet<String> proxyUsers = new HashSet<String>();
   private ExecutionOptions executionOptions;
+  private List<SlaOption> slaOptions = new ArrayList<>();
 
   public ExecutableFlow(Project project, Flow flow) {
     this.projectId = project.getId();
@@ -95,6 +100,11 @@ public class ExecutableFlow extends ExecutableFlowBase {
     return executionOptions;
   }
 
+  public List<SlaOption> getSlaOptions() {
+    return slaOptions;
+  }
+
+
   @Override
   protected void setFlow(Project project, Flow flow) {
     super.setFlow(project, flow);
@@ -191,6 +201,10 @@ public class ExecutableFlow extends ExecutableFlowBase {
     this.submitTime = submitTime;
   }
 
+  public void setSlaOptions(List<SlaOption> slaOptions) {
+    this.slaOptions = slaOptions;
+  }
+
   @Override
   public Map<String, Object> toObject() {
     HashMap<String, Object> flowObj = new HashMap<String, Object>();
@@ -218,11 +232,16 @@ public class ExecutableFlow extends ExecutableFlowBase {
 
     flowObj.put(SUBMITTIME_PARAM, submitTime);
 
+    List<Map<String, Object>> slaOptions = new ArrayList<>();
+    this.getSlaOptions().stream().forEach((slaOption) -> slaOptions.add(slaOption.toObject()));
+
+    flowObj.put(SLAOPTIONS_PARAM, slaOptions);
+
     return flowObj;
   }
 
   @SuppressWarnings("unchecked")
-  public static ExecutableFlow createExecutableFlowFromObject(Object obj) {
+    public static ExecutableFlow createExecutableFlowFromObject(Object obj) {
     ExecutableFlow exFlow = new ExecutableFlow();
     HashMap<String, Object> flowObj = (HashMap<String, Object>) obj;
     exFlow.fillExecutableFromMapObject(flowObj);
@@ -259,7 +278,12 @@ public class ExecutableFlow extends ExecutableFlowBase {
     if (flowObj.containsKey(PROXYUSERS_PARAM)) {
       List<String> proxyUserList = flowObj.<String> getList(PROXYUSERS_PARAM);
       this.addAllProxyUsers(proxyUserList);
+    }
 
+    if (flowObj.containsKey(SLAOPTIONS_PARAM)) {
+      List<SlaOption> slaOptions =
+          flowObj.getList(SLAOPTIONS_PARAM).stream().map(SlaOption::fromObject).collect(Collectors.toList());
+      this.setSlaOptions(slaOptions);
     }
   }
 
diff --git a/azkaban-common/src/main/java/azkaban/sla/SlaOption.java b/azkaban-common/src/main/java/azkaban/sla/SlaOption.java
index 767b752..4683b51 100644
--- a/azkaban-common/src/main/java/azkaban/sla/SlaOption.java
+++ b/azkaban-common/src/main/java/azkaban/sla/SlaOption.java
@@ -16,11 +16,16 @@
 
 package azkaban.sla;
 
+import com.google.common.collect.ImmutableSet;
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.HashMap;
+import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 
+import java.util.Set;
+import java.util.stream.Collectors;
 import org.joda.time.DateTime;
 import org.joda.time.format.DateTimeFormat;
 import org.joda.time.format.DateTimeFormatter;
@@ -31,16 +36,13 @@ public class SlaOption {
 
   public static final String TYPE_FLOW_FINISH = "FlowFinish";
   public static final String TYPE_FLOW_SUCCEED = "FlowSucceed";
-  public static final String TYPE_FLOW_PROGRESS = "FlowProgress";
 
   public static final String TYPE_JOB_FINISH = "JobFinish";
   public static final String TYPE_JOB_SUCCEED = "JobSucceed";
-  public static final String TYPE_JOB_PROGRESS = "JobProgress";
 
   public static final String INFO_DURATION = "Duration";
   public static final String INFO_FLOW_NAME = "FlowName";
   public static final String INFO_JOB_NAME = "JobName";
-  public static final String INFO_PROGRESS_PERCENT = "ProgressPercent";
   public static final String INFO_EMAIL_LIST = "EmailList";
 
   // always alert
@@ -61,6 +63,18 @@ public class SlaOption {
     this.actions = actions;
   }
 
+  public static List<SlaOption> getJobLevelSLAOptions(ExecutableFlow flow) {
+    Set<String> jobLevelSLAs = new HashSet<>(Arrays.asList(SlaOption.TYPE_JOB_FINISH, SlaOption.TYPE_JOB_SUCCEED));
+    return flow.getSlaOptions().stream().filter(slaOption -> jobLevelSLAs.contains(slaOption.getType()))
+        .collect(Collectors.toList());
+  }
+
+  public static List<SlaOption> getFlowLevelSLAOptions(ExecutableFlow flow) {
+    Set<String> flowLevelSLAs = new HashSet<>(Arrays.asList(SlaOption.TYPE_FLOW_FINISH, SlaOption.TYPE_FLOW_SUCCEED));
+    return flow.getSlaOptions().stream().filter(slaOption -> flowLevelSLAs.contains(slaOption.getType()))
+        .collect(Collectors.toList());
+  }
+
   public String getType() {
     return type;
   }
diff --git a/azkaban-common/src/main/java/azkaban/trigger/builtin/ExecuteFlowAction.java b/azkaban-common/src/main/java/azkaban/trigger/builtin/ExecuteFlowAction.java
index db2dc53..dfbdbe0 100644
--- a/azkaban-common/src/main/java/azkaban/trigger/builtin/ExecuteFlowAction.java
+++ b/azkaban-common/src/main/java/azkaban/trigger/builtin/ExecuteFlowAction.java
@@ -239,6 +239,10 @@ public class ExecuteFlowAction implements TriggerAction {
     }
     exflow.setExecutionOptions(executionOptions);
 
+    if (slaOptions != null && slaOptions.size() > 0) {
+      exflow.setSlaOptions(slaOptions);
+    }
+
     try {
       logger.info("Invoking flow " + project.getName() + "." + flowName);
       executorManager.submitExecutableFlow(exflow, submitUser);
@@ -246,59 +250,6 @@ public class ExecuteFlowAction implements TriggerAction {
     } catch (ExecutorManagerException e) {
       throw new RuntimeException(e);
     }
-
-    // deal with sla
-    if (slaOptions != null && slaOptions.size() > 0) {
-      int execId = exflow.getExecutionId();
-      for (SlaOption sla : slaOptions) {
-        logger.info("Adding sla trigger " + sla.toString() + " to execution "
-            + execId);
-        SlaChecker slaFailChecker =
-            new SlaChecker("slaFailChecker", sla, execId);
-        Map<String, ConditionChecker> slaCheckers =
-            new HashMap<String, ConditionChecker>();
-        slaCheckers.put(slaFailChecker.getId(), slaFailChecker);
-        Condition triggerCond =
-            new Condition(slaCheckers, slaFailChecker.getId()
-                + ".isSlaFailed()");
-        // if whole flow finish before violate sla, just expire
-        SlaChecker slaPassChecker =
-            new SlaChecker("slaPassChecker", sla, execId);
-        Map<String, ConditionChecker> expireCheckers =
-            new HashMap<String, ConditionChecker>();
-        expireCheckers.put(slaPassChecker.getId(), slaPassChecker);
-        Condition expireCond =
-            new Condition(expireCheckers, slaPassChecker.getId()
-                + ".isSlaPassed()");
-        List<TriggerAction> actions = new ArrayList<TriggerAction>();
-        List<String> slaActions = sla.getActions();
-        for (String act : slaActions) {
-          if (act.equals(SlaOption.ACTION_ALERT)) {
-            SlaAlertAction slaAlert =
-                new SlaAlertAction("slaAlert", sla, execId);
-            actions.add(slaAlert);
-          } else if (act.equals(SlaOption.ACTION_CANCEL_FLOW)) {
-            KillExecutionAction killAct =
-                new KillExecutionAction("killExecution", execId);
-            actions.add(killAct);
-          }
-        }
-        Trigger slaTrigger = new Trigger.TriggerBuilder("azkaban_sla",
-                                                        "azkaban",
-                                                        triggerCond,
-                                                        expireCond,
-                                                        actions).build();
-
-        slaTrigger.getInfo().put("monitored.finished.execution",
-            String.valueOf(execId));
-        slaTrigger.setResetOnTrigger(false);
-        slaTrigger.setResetOnExpire(false);
-        logger.info("Ready to put in the sla trigger");
-        triggerManager.insertTrigger(slaTrigger);
-        logger.info("Sla inserted.");
-      }
-    }
-
   }
 
   @Override
diff --git a/azkaban-common/src/main/java/azkaban/trigger/builtin/KillExecutionAction.java b/azkaban-common/src/main/java/azkaban/trigger/builtin/KillExecutionAction.java
index e4a1dbf..01d9537 100644
--- a/azkaban-common/src/main/java/azkaban/trigger/builtin/KillExecutionAction.java
+++ b/azkaban-common/src/main/java/azkaban/trigger/builtin/KillExecutionAction.java
@@ -16,6 +16,7 @@
 
 package azkaban.trigger.builtin;
 
+import azkaban.Constants;
 import java.util.HashMap;
 import java.util.Map;
 
@@ -26,6 +27,14 @@ import azkaban.executor.ExecutorManagerAdapter;
 import azkaban.executor.Status;
 import azkaban.trigger.TriggerAction;
 
+/**
+ * @deprecated Create a new KillExecutionAction using FlowRunnerManager
+ * instead of ExecutorManager to kill flow. Still keep the old one here
+ * for being compatible with existing SLA trigger in the database.
+ * Will remove the old one when all existing triggers expire.
+ */
+
+@Deprecated
 public class KillExecutionAction implements TriggerAction {
 
   public static final String type = "KillExecutionAction";
@@ -37,6 +46,7 @@ public class KillExecutionAction implements TriggerAction {
   private int execId;
   private static ExecutorManagerAdapter executorManager;
 
+  //todo chengren311: delete this class to executor module when all existing triggers in db are expired
   public KillExecutionAction(String actionId, int execId) {
     this.execId = execId;
     this.actionId = actionId;
@@ -94,7 +104,7 @@ public class KillExecutionAction implements TriggerAction {
     logger.info("ready to kill execution " + execId);
     if (!Status.isStatusFinished(exFlow.getStatus())) {
       logger.info("Killing execution " + execId);
-      executorManager.cancelFlow(exFlow, "azkaban_sla");
+      executorManager.cancelFlow(exFlow, Constants.AZKABAN_SLA_CHECKER_USERNAME);
     }
   }
 
diff --git a/azkaban-common/src/main/java/azkaban/trigger/builtin/SlaAlertAction.java b/azkaban-common/src/main/java/azkaban/trigger/builtin/SlaAlertAction.java
index 2da27f8..b300e18 100644
--- a/azkaban-common/src/main/java/azkaban/trigger/builtin/SlaAlertAction.java
+++ b/azkaban-common/src/main/java/azkaban/trigger/builtin/SlaAlertAction.java
@@ -16,8 +16,6 @@
 
 package azkaban.trigger.builtin;
 
-import azkaban.ServiceProvider;
-import azkaban.executor.AlerterHolder;
 import java.util.HashMap;
 import java.util.Map;
 
@@ -25,9 +23,11 @@ import org.apache.log4j.Logger;
 
 import azkaban.alert.Alerter;
 import azkaban.executor.ExecutableFlow;
-import azkaban.executor.ExecutorManagerAdapter;
 import azkaban.sla.SlaOption;
 import azkaban.trigger.TriggerAction;
+import azkaban.ServiceProvider;
+import azkaban.executor.AlerterHolder;
+import azkaban.executor.ExecutorLoader;
 
 public class SlaAlertAction implements TriggerAction {
 
@@ -35,21 +35,19 @@ public class SlaAlertAction implements TriggerAction {
 
   private static final Logger logger = Logger.getLogger(SlaAlertAction.class);
 
-  private String actionId;
-  private SlaOption slaOption;
-  private int execId;
-  private AlerterHolder alerters;
-  private static ExecutorManagerAdapter executorManager;
+  private final String actionId;
+  private final SlaOption slaOption;
+  private final int execId;
+  private final AlerterHolder alerters;
+  private final ExecutorLoader executorLoader;
 
+  //todo chengren311: move this class to executor module when all existing triggers in db are expired
   public SlaAlertAction(String id, SlaOption slaOption, int execId) {
     this.actionId = id;
     this.slaOption = slaOption;
     this.execId = execId;
     this.alerters = ServiceProvider.SERVICE_PROVIDER.getInstance(AlerterHolder.class);
-  }
-
-  public static void setExecutorManager(ExecutorManagerAdapter em) {
-    executorManager = em;
+    this.executorLoader = ServiceProvider.SERVICE_PROVIDER.getInstance(ExecutorLoader.class);
   }
 
   @Override
@@ -105,9 +103,8 @@ public class SlaAlertAction implements TriggerAction {
       Alerter alerter = alerters.get(alertType);
       if (alerter != null) {
         try {
-          ExecutableFlow flow = executorManager.getExecutableFlow(execId);
-          alerter.alertOnSla(slaOption,
-              SlaOption.createSlaMessage(slaOption, flow));
+          ExecutableFlow flow = executorLoader.fetchExecutableFlow(execId);
+          alerter.alertOnSla(slaOption, SlaOption.createSlaMessage(slaOption, flow));
         } catch (Exception e) {
           e.printStackTrace();
           logger.error("Failed to alert by " + alertType);
diff --git a/azkaban-common/src/main/java/azkaban/trigger/builtin/SlaChecker.java b/azkaban-common/src/main/java/azkaban/trigger/builtin/SlaChecker.java
index 27b9ba4..d65b273 100644
--- a/azkaban-common/src/main/java/azkaban/trigger/builtin/SlaChecker.java
+++ b/azkaban-common/src/main/java/azkaban/trigger/builtin/SlaChecker.java
@@ -16,6 +16,8 @@
 
 package azkaban.trigger.builtin;
 
+import azkaban.ServiceProvider;
+import azkaban.executor.ExecutorLoader;
 import java.util.HashMap;
 import java.util.Map;
 
@@ -25,7 +27,6 @@ import org.joda.time.ReadablePeriod;
 
 import azkaban.executor.ExecutableFlow;
 import azkaban.executor.ExecutableNode;
-import azkaban.executor.ExecutorManagerAdapter;
 import azkaban.executor.ExecutorManagerException;
 import azkaban.executor.Status;
 import azkaban.sla.SlaOption;
@@ -37,21 +38,18 @@ public class SlaChecker implements ConditionChecker {
   private static final Logger logger = Logger.getLogger(SlaChecker.class);
   public static final String type = "SlaChecker";
 
-  private String id;
-  private SlaOption slaOption;
-  private int execId;
+  private final String id;
+  private final SlaOption slaOption;
+  private final int execId;
   private long checkTime = -1;
+  private final ExecutorLoader executorLoader;
 
-  private static ExecutorManagerAdapter executorManager;
-
+  //todo chengren311: move this class to executor module when all existing triggers in db are expired
   public SlaChecker(String id, SlaOption slaOption, int execId) {
     this.id = id;
     this.slaOption = slaOption;
     this.execId = execId;
-  }
-
-  public static void setExecutorManager(ExecutorManagerAdapter em) {
-    executorManager = em;
+    this.executorLoader = ServiceProvider.SERVICE_PROVIDER.getInstance(ExecutorLoader.class);
   }
 
   private Boolean isSlaMissed(ExecutableFlow flow) {
@@ -204,7 +202,7 @@ public class SlaChecker implements ConditionChecker {
     logger.info("Checking sla for execution " + execId);
     ExecutableFlow flow;
     try {
-      flow = executorManager.getExecutableFlow(execId);
+      flow = executorLoader.fetchExecutableFlow(execId);
     } catch (ExecutorManagerException e) {
       logger.error("Can't get executable flow.", e);
       e.printStackTrace();
@@ -217,7 +215,7 @@ public class SlaChecker implements ConditionChecker {
   public Object isSlaFailed() {
     ExecutableFlow flow;
     try {
-      flow = executorManager.getExecutableFlow(execId);
+      flow = executorLoader.fetchExecutableFlow(execId);
     } catch (ExecutorManagerException e) {
       logger.error("Can't get executable flow.", e);
       // something wrong, send out alerts
@@ -229,7 +227,7 @@ public class SlaChecker implements ConditionChecker {
   public Object isSlaPassed() {
     ExecutableFlow flow;
     try {
-      flow = executorManager.getExecutableFlow(execId);
+      flow = executorLoader.fetchExecutableFlow(execId);
     } catch (ExecutorManagerException e) {
       logger.error("Can't get executable flow.", e);
       // something wrong, send out alerts
diff --git a/azkaban-exec-server/src/main/java/azkaban/execapp/action/KillExecutionAction.java b/azkaban-exec-server/src/main/java/azkaban/execapp/action/KillExecutionAction.java
new file mode 100644
index 0000000..0d0f4e6
--- /dev/null
+++ b/azkaban-exec-server/src/main/java/azkaban/execapp/action/KillExecutionAction.java
@@ -0,0 +1,102 @@
+/*
+ * Copyright 2017 LinkedIn Corp.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package azkaban.execapp.action;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import azkaban.Constants;
+import azkaban.ServiceProvider;
+import azkaban.execapp.FlowRunnerManager;
+import azkaban.trigger.TriggerAction;
+
+import org.apache.log4j.Logger;
+
+
+public class KillExecutionAction implements TriggerAction {
+
+  public static final String type = "KillExecutionAction";
+
+  private static final Logger logger = Logger
+      .getLogger(KillExecutionAction.class);
+
+  private String actionId;
+  private int execId;
+
+  public KillExecutionAction(String actionId, int execId) {
+    this.execId = execId;
+    this.actionId = actionId;
+  }
+
+  @Override
+  public String getId() {
+    return actionId;
+  }
+
+  @Override
+  public String getType() {
+    return type;
+  }
+
+  @SuppressWarnings("unchecked")
+  public static KillExecutionAction createFromJson(Object obj) {
+    return createFromJson((HashMap<String, Object>) obj);
+  }
+
+  public static KillExecutionAction createFromJson(HashMap<String, Object> obj) {
+    Map<String, Object> jsonObj = (HashMap<String, Object>) obj;
+    String objType = (String) jsonObj.get("type");
+    if (!objType.equals(type)) {
+      throw new RuntimeException("Cannot create action of " + type + " from "
+          + objType);
+    }
+    String actionId = (String) jsonObj.get("actionId");
+    int execId = Integer.valueOf((String) jsonObj.get("execId"));
+    return new KillExecutionAction(actionId, execId);
+  }
+
+  @SuppressWarnings("unchecked")
+  @Override
+  public KillExecutionAction fromJson(Object obj) throws Exception {
+    return createFromJson((HashMap<String, Object>) obj);
+  }
+
+  @Override
+  public Object toJson() {
+    Map<String, Object> jsonObj = new HashMap<String, Object>();
+    jsonObj.put("actionId", actionId);
+    jsonObj.put("type", type);
+    jsonObj.put("execId", String.valueOf(execId));
+    return jsonObj;
+  }
+
+  @Override
+  public void doAction() throws Exception {
+    logger.info("ready to kill execution " + execId);
+    ServiceProvider.SERVICE_PROVIDER.getInstance(FlowRunnerManager.class).cancelFlow(execId, Constants.AZKABAN_SLA_CHECKER_USERNAME);
+  }
+
+  @Override
+  public void setContext(Map<String, Object> context) {
+  }
+
+  @Override
+  public String getDescription() {
+    return type + " for " + execId;
+  }
+
+}
diff --git a/azkaban-exec-server/src/main/java/azkaban/execapp/AzkabanExecServerModule.java b/azkaban-exec-server/src/main/java/azkaban/execapp/AzkabanExecServerModule.java
index 3f9772b..3654919 100644
--- a/azkaban-exec-server/src/main/java/azkaban/execapp/AzkabanExecServerModule.java
+++ b/azkaban-exec-server/src/main/java/azkaban/execapp/AzkabanExecServerModule.java
@@ -33,5 +33,8 @@ public class AzkabanExecServerModule extends AbstractModule {
   protected void configure() {
     bind(ExecutorLoader.class).to(JdbcExecutorLoader.class).in(Scopes.SINGLETON);
     bind(AzkabanExecutorServer.class).in(Scopes.SINGLETON);
+    bind(TriggerManager.class).in(Scopes.SINGLETON);
+    bind(FlowRunnerManager.class).in(Scopes.SINGLETON);
+
   }
 }
diff --git a/azkaban-exec-server/src/main/java/azkaban/execapp/FlowRunner.java b/azkaban-exec-server/src/main/java/azkaban/execapp/FlowRunner.java
index f5bd936..b86cfbb 100644
--- a/azkaban-exec-server/src/main/java/azkaban/execapp/FlowRunner.java
+++ b/azkaban-exec-server/src/main/java/azkaban/execapp/FlowRunner.java
@@ -16,6 +16,8 @@
 
 package azkaban.execapp;
 
+import azkaban.ServiceProvider;
+import azkaban.sla.SlaOption;
 import java.io.File;
 import java.io.IOException;
 import java.util.ArrayList;
@@ -1061,7 +1063,8 @@ public class FlowRunner extends EventHandler implements Runnable {
 
       if (event.getType() == Type.JOB_STATUS_CHANGED) {
         updateFlow();
-      } else if (event.getType() == Type.JOB_FINISHED) {
+      }
+      else if (event.getType() == Type.JOB_FINISHED) {
         ExecutableNode node = runner.getNode();
         EventData eventData = event.getData();
         long seconds = (node.getEndTime() - node.getStartTime()) / 1000;
@@ -1083,6 +1086,11 @@ public class FlowRunner extends EventHandler implements Runnable {
           fireEventListeners(event);
         }
       }
+      else if (event.getType() == Type.JOB_STARTED) {
+        // add job level checker
+        TriggerManager triggerManager = ServiceProvider.SERVICE_PROVIDER.getInstance(TriggerManager.class);
+        triggerManager.addTrigger(flow.getExecutionId(), SlaOption.getJobLevelSLAOptions(flow));
+      }
     }
   }
 
diff --git a/azkaban-exec-server/src/main/java/azkaban/execapp/FlowRunnerManager.java b/azkaban-exec-server/src/main/java/azkaban/execapp/FlowRunnerManager.java
index 22ff364..fadffbd 100644
--- a/azkaban-exec-server/src/main/java/azkaban/execapp/FlowRunnerManager.java
+++ b/azkaban-exec-server/src/main/java/azkaban/execapp/FlowRunnerManager.java
@@ -18,6 +18,7 @@ package azkaban.execapp;
 
 import azkaban.Constants;
 import azkaban.executor.Status;
+import azkaban.sla.SlaOption;
 import azkaban.storage.StorageManager;
 import com.google.inject.Inject;
 import java.io.File;
@@ -38,6 +39,7 @@ import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.RejectedExecutionException;
 import java.util.concurrent.TimeUnit;
 
+import java.util.stream.Collectors;
 import org.apache.commons.io.FileUtils;
 import org.apache.log4j.Logger;
 
@@ -118,6 +120,8 @@ public class FlowRunnerManager implements EventListener,
   private final ProjectLoader projectLoader;
   private final JobTypeManager jobtypeManager;
   private final FlowPreparer flowPreparer;
+  private final TriggerManager triggerManager;
+
 
   private final Props azkabanProps;
   private final File executionDirectory;
@@ -151,7 +155,8 @@ public class FlowRunnerManager implements EventListener,
   public FlowRunnerManager(Props props,
       ExecutorLoader executorLoader,
       ProjectLoader projectLoader,
-      StorageManager storageManager) throws IOException {
+      StorageManager storageManager,
+      TriggerManager triggerManager) throws IOException {
     azkabanProps = props;
 
     executionDirRetention = props.getLong("execution.dir.retention", executionDirRetention);
@@ -178,6 +183,7 @@ public class FlowRunnerManager implements EventListener,
 
     this.executorLoader = executorLoader;
     this.projectLoader = projectLoader;
+    this.triggerManager = triggerManager;
 
     this.jobLogChunkSize = azkabanProps.getString("job.log.chunk.size", "5MB");
     this.jobLogNumFiles = azkabanProps.getInt("job.log.backup.index", 4);
@@ -629,16 +635,19 @@ public class FlowRunnerManager implements EventListener,
 
   @Override
   public void handleEvent(Event event) {
-    if (event.getType() == Event.Type.FLOW_FINISHED) {
-
-      FlowRunner flowRunner = (FlowRunner) event.getRunner();
-      ExecutableFlow flow = flowRunner.getExecutableFlow();
+    FlowRunner flowRunner = (FlowRunner) event.getRunner();
+    ExecutableFlow flow = flowRunner.getExecutableFlow();
 
+    if (event.getType() == Event.Type.FLOW_FINISHED) {
       recentlyFinishedFlows.put(flow.getExecutionId(), flow);
       logger.info("Flow " + flow.getExecutionId()
           + " is finished. Adding it to recently finished flows list.");
       runningFlows.remove(flow.getExecutionId());
     }
+    else if (event.getType() == Event.Type.FLOW_STARTED) {
+      // add flow level checker
+      triggerManager.addTrigger(flow.getExecutionId(), SlaOption.getFlowLevelSLAOptions(flow));
+    }
   }
 
   public LogData readFlowLogs(int execId, int startByte, int length)
@@ -889,6 +898,7 @@ public class FlowRunnerManager implements EventListener,
   public void shutdownNow() {
     logger.warn("Shutting down FlowRunnerManager now...");
     executorService.shutdownNow();
+    triggerManager.shutdown();
   }
 
 }
diff --git a/azkaban-exec-server/src/main/java/azkaban/execapp/Trigger.java b/azkaban-exec-server/src/main/java/azkaban/execapp/Trigger.java
new file mode 100644
index 0000000..e1dfc73
--- /dev/null
+++ b/azkaban-exec-server/src/main/java/azkaban/execapp/Trigger.java
@@ -0,0 +1,93 @@
+/*
+ * Copyright 2017 LinkedIn Corp.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+
+package azkaban.execapp;
+
+import java.util.List;
+
+import azkaban.trigger.Condition;
+import azkaban.trigger.TriggerAction;
+import org.apache.log4j.Logger;
+
+
+public class Trigger implements Runnable {
+  private static Logger logger = Logger.getLogger(azkaban.execapp.Trigger.class);
+  private final int execId;
+
+  // condition to trigger actions(ex. flow running longer than X mins)
+  private final Condition triggerCondition;
+  // condition to expire this trigger(ex. flow finishes before violating SLA)
+  private final Condition expireCondition;
+  private final List<TriggerAction> actions;
+
+  public Trigger(int execId,
+                 Condition triggerCondition,
+                 Condition expireCondition,
+                 List<TriggerAction> actions)
+  {
+    this.execId = execId;
+    this.triggerCondition = triggerCondition;
+    this.expireCondition = expireCondition;
+    this.actions = actions;
+  }
+
+
+  /**
+   * Perform the action if trigger condition is met
+   */
+  @Override
+  public void run() {
+    if(isTriggerExpired()) {
+      logger.info(this + " expired");
+      return ;
+    }
+
+    boolean isTriggerConditionMet = triggerCondition.isMet();
+
+    if (isTriggerConditionMet) {
+      logger.info("Condition " + triggerCondition.getExpression() + " met");
+      for (TriggerAction action : actions) {
+        try {
+          action.doAction();
+        } catch (Exception e) {
+          logger.error("Failed to do action " + action.getDescription()
+              + " for execution " + azkaban.execapp.Trigger.this.execId, e);
+        }
+      }
+    }
+  }
+
+  /**
+   * Check if the trigger is expired and reset isExpired
+   * @return true if trigger is expired
+   */
+  public boolean isTriggerExpired() {
+    return expireCondition.isMet();
+  }
+
+  public String toString() {
+    StringBuilder actionsString = new StringBuilder();
+    for (TriggerAction act : actions) {
+      actionsString.append(", ");
+      actionsString.append(act.getDescription());
+    }
+
+    return "Trigger for execution " + execId + " with trigger condition of "
+        + triggerCondition.getExpression() + " and expire condition of "
+        + expireCondition.getExpression() + actionsString;
+  }
+}
diff --git a/azkaban-exec-server/src/main/java/azkaban/execapp/TriggerManager.java b/azkaban-exec-server/src/main/java/azkaban/execapp/TriggerManager.java
new file mode 100644
index 0000000..1a7e394
--- /dev/null
+++ b/azkaban-exec-server/src/main/java/azkaban/execapp/TriggerManager.java
@@ -0,0 +1,100 @@
+/*
+ * Copyright 2017 LinkedIn Corp.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package azkaban.execapp;
+
+import azkaban.sla.SlaOption;
+import azkaban.trigger.Condition;
+import azkaban.trigger.ConditionChecker;
+import azkaban.trigger.TriggerAction;
+import azkaban.trigger.builtin.SlaAlertAction;
+import azkaban.trigger.builtin.SlaChecker;
+import azkaban.utils.Utils;
+import azkaban.execapp.action.KillExecutionAction;
+import com.google.inject.Inject;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import org.apache.log4j.Logger;
+import org.joda.time.ReadablePeriod;
+
+
+public class TriggerManager {
+  private static Logger logger = Logger.getLogger(TriggerManager.class);
+  private static final int SCHEDULED_THREAD_POOL_SIZE = 4;
+  private final ScheduledExecutorService scheduledService;
+
+  @Inject
+  public TriggerManager() {
+    this.scheduledService = Executors.newScheduledThreadPool(SCHEDULED_THREAD_POOL_SIZE);
+  }
+
+  private Condition createCondition(SlaOption sla, int execId, String checkerName, String checkerMethod) {
+    SlaChecker slaFailChecker = new SlaChecker(checkerName, sla, execId);
+    Map<String, ConditionChecker> slaCheckers = new HashMap<>();
+    slaCheckers.put(slaFailChecker.getId(), slaFailChecker);
+    return new Condition(slaCheckers, slaFailChecker.getId() + "." + checkerMethod);
+  }
+
+  private List<TriggerAction> createActions(SlaOption sla, int execId) {
+    List<TriggerAction> actions = new ArrayList<>();
+    List<String> slaActions = sla.getActions();
+    for (String act : slaActions) {
+      TriggerAction action = null;
+      switch (act) {
+        case SlaOption.ACTION_ALERT:
+          action = new SlaAlertAction("slaAlert", sla, execId);
+          break;
+        case SlaOption.ACTION_CANCEL_FLOW:
+          action = new KillExecutionAction("killExecution", execId);
+          break;
+        default:
+          logger.info("Unknown action type " + act);
+          break;
+      }
+      if (action != null) {
+        actions.add(action);
+      }
+    }
+    return actions;
+  }
+
+  public void addTrigger(int execId, List<SlaOption> slaOptions) {
+    for (SlaOption sla : slaOptions) {
+      Condition triggerCond = createCondition(sla, execId, "slaFailChecker", "isSlaFailed()");
+
+      // if whole flow finish before violating sla, just expire the checker
+      Condition expireCond = createCondition(sla, execId, "slaPassChecker", "isSlaPassed()");
+
+      List<TriggerAction> actions = createActions(sla, execId);
+      Trigger trigger = new Trigger(execId, triggerCond, expireCond, actions);
+
+      ReadablePeriod duration = Utils.parsePeriodString((String) sla.getInfo().get(SlaOption.INFO_DURATION));
+      long durationInMillis = duration.toPeriod().toStandardDuration().getMillis();
+
+      logger.info("Adding sla trigger " + sla.toString() + " to execution " + execId + ", scheduled to trigger in " + durationInMillis/1000 + " seconds");
+      scheduledService.schedule(trigger, durationInMillis, TimeUnit.MILLISECONDS);
+    }
+  }
+
+  public void shutdown() {
+    scheduledService.shutdownNow();
+  }
+}
diff --git a/azkaban-web-server/src/main/java/azkaban/webapp/AzkabanWebServer.java b/azkaban-web-server/src/main/java/azkaban/webapp/AzkabanWebServer.java
index e0a57cf..407f00f 100644
--- a/azkaban-web-server/src/main/java/azkaban/webapp/AzkabanWebServer.java
+++ b/azkaban-web-server/src/main/java/azkaban/webapp/AzkabanWebServer.java
@@ -286,13 +286,10 @@ public class AzkabanWebServer extends AzkabanServer {
 
   private void loadBuiltinCheckersAndActions() {
     logger.info("Loading built-in checker and action types");
-    SlaChecker.setExecutorManager(executorManager);
     ExecuteFlowAction.setExecutorManager(executorManager);
     ExecuteFlowAction.setProjectManager(projectManager);
     ExecuteFlowAction.setTriggerManager(triggerManager);
     KillExecutionAction.setExecutorManager(executorManager);
-    SlaAlertAction.setExecutorManager(executorManager);
-    SlaAlertAction.setExecutorManager(executorManager);
     CreateTriggerAction.setTriggerManager(triggerManager);
     ExecutionChecker.setExecutorManager(executorManager);