azkaban-aplcache

New SLA action type for killing a job (#1092) New SLA action

7/25/2017 12:31:14 AM

Details

diff --git a/azkaban-common/src/main/java/azkaban/executor/ExecutableNode.java b/azkaban-common/src/main/java/azkaban/executor/ExecutableNode.java
index a66d9b7..ad63a36 100644
--- a/azkaban-common/src/main/java/azkaban/executor/ExecutableNode.java
+++ b/azkaban-common/src/main/java/azkaban/executor/ExecutableNode.java
@@ -53,6 +53,8 @@ public class ExecutableNode {
   private long startTime = -1;
   private long endTime = -1;
   private long updateTime = -1;
+  private volatile boolean killedBySLA = false;
+
   // Path to Job File
   private String jobSource;
   // Path to top level props file
@@ -155,6 +157,14 @@ public class ExecutableNode {
     this.updateTime = updateTime;
   }
 
+  public void setKilledBySLA(boolean killedBySLA) {
+    this.killedBySLA = killedBySLA;
+  }
+
+  public boolean isKilledBySLA() {
+    return this.killedBySLA;
+  }
+
   public void addOutNode(final String exNode) {
     this.outNodes.add(exNode);
   }
@@ -239,6 +249,7 @@ public class ExecutableNode {
     this.setEndTime(-1);
     this.setUpdateTime(System.currentTimeMillis());
     this.setStatus(Status.READY);
+    this.setKilledBySLA(false);
   }
 
   public List<Object> getAttemptObjects() {
diff --git a/azkaban-common/src/main/java/azkaban/jobExecutor/ProcessJob.java b/azkaban-common/src/main/java/azkaban/jobExecutor/ProcessJob.java
index ead5147..317cd77 100644
--- a/azkaban-common/src/main/java/azkaban/jobExecutor/ProcessJob.java
+++ b/azkaban-common/src/main/java/azkaban/jobExecutor/ProcessJob.java
@@ -256,7 +256,6 @@ public class ProcessJob extends AbstractProcessJob {
 
       boolean success = false;
       this.process = builder.build();
-
       try {
         if (!this.killed) {
           this.process.run();
diff --git a/azkaban-common/src/main/java/azkaban/sla/SlaOption.java b/azkaban-common/src/main/java/azkaban/sla/SlaOption.java
index 526b90f..3e3872b 100644
--- a/azkaban-common/src/main/java/azkaban/sla/SlaOption.java
+++ b/azkaban-common/src/main/java/azkaban/sla/SlaOption.java
@@ -46,8 +46,10 @@ public class SlaOption {
   public static final String ALERT_TYPE = "SlaAlertType";
   public static final String ACTION_CANCEL_FLOW = "SlaCancelFlow";
   public static final String ACTION_ALERT = "SlaAlert";
+  public static final String ACTION_KILL_JOB = "SlaKillJob";
   private static final DateTimeFormatter fmt = DateTimeFormat
       .forPattern("MM/dd, YYYY HH:mm");
+  
   private String type;
   private Map<String, Object> info;
   private List<String> actions;
diff --git a/azkaban-exec-server/src/main/java/azkaban/execapp/action/KillJobAction.java b/azkaban-exec-server/src/main/java/azkaban/execapp/action/KillJobAction.java
new file mode 100644
index 0000000..c33e8c0
--- /dev/null
+++ b/azkaban-exec-server/src/main/java/azkaban/execapp/action/KillJobAction.java
@@ -0,0 +1,100 @@
+/*
+ * Copyright 2017 LinkedIn Corp.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package azkaban.execapp.action;
+
+import azkaban.ServiceProvider;
+import azkaban.execapp.FlowRunnerManager;
+import azkaban.trigger.TriggerAction;
+import java.util.HashMap;
+import java.util.Map;
+import org.apache.log4j.Logger;
+
+
+public class KillJobAction implements TriggerAction {
+
+  public static final String type = "KillJobAction";
+
+  private static final Logger logger = Logger
+      .getLogger(KillJobAction.class);
+
+  private final String actionId;
+  private final int execId;
+  private final String jobId;
+
+  public KillJobAction(final String actionId, final int execId, final String jobId) {
+    this.execId = execId;
+    this.actionId = actionId;
+    this.jobId = jobId;
+  }
+
+  public static KillJobAction createFromJson(final HashMap<String, Object> obj) {
+    final Map<String, Object> jsonObj = (HashMap<String, Object>) obj;
+    final String objType = (String) jsonObj.get("type");
+    if (!objType.equals(type)) {
+      throw new RuntimeException("Cannot create action of " + type + " from "
+          + objType);
+    }
+    final String actionId = (String) jsonObj.get("actionId");
+    final int execId = Integer.valueOf((String) jsonObj.get("execId"));
+    final String jobId = (String) jsonObj.get("jobId");
+    return new KillJobAction(actionId, execId, jobId);
+  }
+
+  @Override
+  public String getId() {
+    return this.actionId;
+  }
+
+  @Override
+  public String getType() {
+    return type;
+  }
+
+  @SuppressWarnings("unchecked")
+  @Override
+  public KillJobAction fromJson(final Object obj) throws Exception {
+    return createFromJson((HashMap<String, Object>) obj);
+  }
+
+  @Override
+  public Object toJson() {
+    final Map<String, Object> jsonObj = new HashMap<>();
+    jsonObj.put("actionId", this.actionId);
+    jsonObj.put("type", type);
+    jsonObj.put("execId", String.valueOf(this.execId));
+    jsonObj.put("jobId", String.valueOf(this.jobId));
+    return jsonObj;
+  }
+
+  @Override
+  public void doAction() throws Exception {
+    logger.info("ready to do action " + getDescription());
+    final FlowRunnerManager flowRunnerManager = ServiceProvider.SERVICE_PROVIDER
+        .getInstance(FlowRunnerManager.class);
+    flowRunnerManager.cancelJobBySLA(this.execId, this.jobId);
+  }
+
+  @Override
+  public void setContext(final Map<String, Object> context) {
+  }
+
+  @Override
+  public String getDescription() {
+    return type + " for execution " + this.execId + " jobId " + this.jobId;
+  }
+
+}
diff --git a/azkaban-exec-server/src/main/java/azkaban/execapp/AzkabanExecServerModule.java b/azkaban-exec-server/src/main/java/azkaban/execapp/AzkabanExecServerModule.java
index 56e0b35..1688705 100644
--- a/azkaban-exec-server/src/main/java/azkaban/execapp/AzkabanExecServerModule.java
+++ b/azkaban-exec-server/src/main/java/azkaban/execapp/AzkabanExecServerModule.java
@@ -36,6 +36,5 @@ public class AzkabanExecServerModule extends AbstractModule {
     bind(AzkabanExecutorServer.class).in(Scopes.SINGLETON);
     bind(TriggerManager.class).in(Scopes.SINGLETON);
     bind(FlowRunnerManager.class).in(Scopes.SINGLETON);
-
   }
 }
diff --git a/azkaban-exec-server/src/main/java/azkaban/execapp/FlowRunner.java b/azkaban-exec-server/src/main/java/azkaban/execapp/FlowRunner.java
index 0083980..12b17ee 100644
--- a/azkaban-exec-server/src/main/java/azkaban/execapp/FlowRunner.java
+++ b/azkaban-exec-server/src/main/java/azkaban/execapp/FlowRunner.java
@@ -45,6 +45,7 @@ import azkaban.sla.SlaOption;
 import azkaban.utils.Props;
 import azkaban.utils.PropsUtils;
 import azkaban.utils.SwapQueue;
+import com.google.common.collect.ImmutableSet;
 import java.io.File;
 import java.io.IOException;
 import java.util.ArrayList;
@@ -65,6 +66,7 @@ import org.apache.log4j.Layout;
 import org.apache.log4j.Logger;
 import org.apache.log4j.PatternLayout;
 
+
 /**
  * Class that handles the running of a ExecutableFlow DAG
  */
@@ -418,9 +420,9 @@ public class FlowRunner extends EventHandler implements Runnable {
       Set<String> outNodeIds = node.getOutNodes();
       ExecutableFlowBase parentFlow = node.getParentFlow();
 
-      // If a job is seen as failed, then we set the parent flow to
+      // If a job is seen as failed or killed due to failing SLA, then we set the parent flow to
       // FAILED_FINISHING
-      if (node.getStatus() == Status.FAILED) {
+      if (node.getStatus() == Status.FAILED || (node.getStatus() == Status.KILLED && node.isKilledBySLA())) {
         // The job cannot be retried or has run out of retry attempts. We will
         // fail the job and its flow now.
         if (!retryJobIfPossible(node)) {
@@ -1144,4 +1146,8 @@ public class FlowRunner extends EventHandler implements Runnable {
       }
     }
   }
+
+  public Set<JobRunner> getActiveJobRunners() {
+    return ImmutableSet.copyOf(this.activeJobRunners);
+  }
 }
diff --git a/azkaban-exec-server/src/main/java/azkaban/execapp/FlowRunnerManager.java b/azkaban-exec-server/src/main/java/azkaban/execapp/FlowRunnerManager.java
index bad88bb..cf10856 100644
--- a/azkaban-exec-server/src/main/java/azkaban/execapp/FlowRunnerManager.java
+++ b/azkaban-exec-server/src/main/java/azkaban/execapp/FlowRunnerManager.java
@@ -390,6 +390,25 @@ public class FlowRunnerManager implements EventListener,
 
   }
 
+
+  public void cancelJobBySLA(int execId, String jobId)
+      throws ExecutorManagerException {
+    FlowRunner flowRunner = runningFlows.get(execId);
+
+    if (flowRunner == null) {
+      throw new ExecutorManagerException("Execution " + execId
+          + " is not running.");
+    }
+
+    for (JobRunner jobRunner : flowRunner.getActiveJobRunners()) {
+      if (jobRunner.getJobId().equals(jobId)) {
+        logger.info("Killing job " + jobId + " in execution " + execId + " by SLA");
+        jobRunner.killBySLA();
+        break;
+      }
+    }
+  }
+
   public void cancelFlow(final int execId, final String user)
       throws ExecutorManagerException {
     final FlowRunner runner = this.runningFlows.get(execId);
diff --git a/azkaban-exec-server/src/main/java/azkaban/execapp/JobRunner.java b/azkaban-exec-server/src/main/java/azkaban/execapp/JobRunner.java
index c9239b1..3191b7d 100644
--- a/azkaban-exec-server/src/main/java/azkaban/execapp/JobRunner.java
+++ b/azkaban-exec-server/src/main/java/azkaban/execapp/JobRunner.java
@@ -246,6 +246,10 @@ public class JobRunner extends EventHandler implements Runnable {
     return this.node;
   }
 
+  public String getJobId() {
+    return node.getId();
+  }
+
   public String getLogFilePath() {
     return this.logFile == null ? null : this.logFile.getPath();
   }
@@ -790,6 +794,11 @@ public class JobRunner extends EventHandler implements Runnable {
     this.fireEventListeners(event);
   }
 
+  public void killBySLA() {
+    kill();
+    this.getNode().setKilledBySLA(true);
+  }
+
   public void kill() {
     synchronized (this.syncObject) {
       if (Status.isStatusFinished(this.node.getStatus())) {
diff --git a/azkaban-exec-server/src/main/java/azkaban/execapp/Trigger.java b/azkaban-exec-server/src/main/java/azkaban/execapp/Trigger.java
index 140f117..1f0e238 100644
--- a/azkaban-exec-server/src/main/java/azkaban/execapp/Trigger.java
+++ b/azkaban-exec-server/src/main/java/azkaban/execapp/Trigger.java
@@ -56,7 +56,6 @@ public class Trigger implements Runnable {
     }
 
     final boolean isTriggerConditionMet = this.triggerCondition.isMet();
-
     if (isTriggerConditionMet) {
       logger.info("Condition " + this.triggerCondition.getExpression() + " met");
       for (final TriggerAction action : this.actions) {
diff --git a/azkaban-exec-server/src/main/java/azkaban/execapp/TriggerManager.java b/azkaban-exec-server/src/main/java/azkaban/execapp/TriggerManager.java
index 6ade640..14a5911 100644
--- a/azkaban-exec-server/src/main/java/azkaban/execapp/TriggerManager.java
+++ b/azkaban-exec-server/src/main/java/azkaban/execapp/TriggerManager.java
@@ -16,6 +16,7 @@
 
 package azkaban.execapp;
 
+import azkaban.execapp.action.KillJobAction;
 import azkaban.execapp.action.KillExecutionAction;
 import azkaban.sla.SlaOption;
 import azkaban.trigger.Condition;
@@ -62,10 +63,14 @@ public class TriggerManager {
       TriggerAction action = null;
       switch (act) {
         case SlaOption.ACTION_ALERT:
-          action = new SlaAlertAction("slaAlert", sla, execId);
+          action = new SlaAlertAction(SlaOption.ACTION_ALERT, sla, execId);
           break;
         case SlaOption.ACTION_CANCEL_FLOW:
-          action = new KillExecutionAction("killExecution", execId);
+          action = new KillExecutionAction(SlaOption.ACTION_CANCEL_FLOW, execId);
+          break;
+        case SlaOption.ACTION_KILL_JOB:
+          String jobId = (String)sla.getInfo().get(SlaOption.INFO_JOB_NAME);
+          action = new KillJobAction(SlaOption.ACTION_KILL_JOB, execId, jobId);
           break;
         default:
           logger.info("Unknown action type " + act);
@@ -88,7 +93,6 @@ public class TriggerManager {
 
       final List<TriggerAction> actions = createActions(sla, execId);
       final Trigger trigger = new Trigger(execId, triggerCond, expireCond, actions);
-
       final ReadablePeriod duration = Utils
           .parsePeriodString((String) sla.getInfo().get(SlaOption.INFO_DURATION));
       final long durationInMillis = duration.toPeriod().toStandardDuration().getMillis();
diff --git a/azkaban-web-server/src/main/java/azkaban/webapp/servlet/ScheduleServlet.java b/azkaban-web-server/src/main/java/azkaban/webapp/servlet/ScheduleServlet.java
index 529e253..67e9ace 100644
--- a/azkaban-web-server/src/main/java/azkaban/webapp/servlet/ScheduleServlet.java
+++ b/azkaban-web-server/src/main/java/azkaban/webapp/servlet/ScheduleServlet.java
@@ -240,7 +240,8 @@ public class ScheduleServlet extends LoginAbstractAzkabanServlet {
         slaInfo.put(SlaOption.ALERT_TYPE, "email");
       }
       if (killAction.equals("true")) {
-        slaActions.add(SlaOption.ACTION_CANCEL_FLOW);
+        String killActionType = id.equals("") ? SlaOption.ACTION_CANCEL_FLOW : SlaOption.ACTION_KILL_JOB;
+        slaActions.add(killActionType);
       }
       if (id.equals("")) {
         if (rule.equals("SUCCESS")) {