Details
diff --git a/azkaban-common/src/main/java/azkaban/executor/ExecutableNode.java b/azkaban-common/src/main/java/azkaban/executor/ExecutableNode.java
index a66d9b7..ad63a36 100644
--- a/azkaban-common/src/main/java/azkaban/executor/ExecutableNode.java
+++ b/azkaban-common/src/main/java/azkaban/executor/ExecutableNode.java
@@ -53,6 +53,8 @@ public class ExecutableNode {
private long startTime = -1;
private long endTime = -1;
private long updateTime = -1;
+ private volatile boolean killedBySLA = false;
+
// Path to Job File
private String jobSource;
// Path to top level props file
@@ -155,6 +157,14 @@ public class ExecutableNode {
this.updateTime = updateTime;
}
+ public void setKilledBySLA(boolean killedBySLA) {
+ this.killedBySLA = killedBySLA;
+ }
+
+ public boolean isKilledBySLA() {
+ return this.killedBySLA;
+ }
+
public void addOutNode(final String exNode) {
this.outNodes.add(exNode);
}
@@ -239,6 +249,7 @@ public class ExecutableNode {
this.setEndTime(-1);
this.setUpdateTime(System.currentTimeMillis());
this.setStatus(Status.READY);
+ this.setKilledBySLA(false);
}
public List<Object> getAttemptObjects() {
diff --git a/azkaban-common/src/main/java/azkaban/jobExecutor/ProcessJob.java b/azkaban-common/src/main/java/azkaban/jobExecutor/ProcessJob.java
index ead5147..317cd77 100644
--- a/azkaban-common/src/main/java/azkaban/jobExecutor/ProcessJob.java
+++ b/azkaban-common/src/main/java/azkaban/jobExecutor/ProcessJob.java
@@ -256,7 +256,6 @@ public class ProcessJob extends AbstractProcessJob {
boolean success = false;
this.process = builder.build();
-
try {
if (!this.killed) {
this.process.run();
diff --git a/azkaban-common/src/main/java/azkaban/sla/SlaOption.java b/azkaban-common/src/main/java/azkaban/sla/SlaOption.java
index 526b90f..3e3872b 100644
--- a/azkaban-common/src/main/java/azkaban/sla/SlaOption.java
+++ b/azkaban-common/src/main/java/azkaban/sla/SlaOption.java
@@ -46,8 +46,10 @@ public class SlaOption {
public static final String ALERT_TYPE = "SlaAlertType";
public static final String ACTION_CANCEL_FLOW = "SlaCancelFlow";
public static final String ACTION_ALERT = "SlaAlert";
+ public static final String ACTION_KILL_JOB = "SlaKillJob";
private static final DateTimeFormatter fmt = DateTimeFormat
.forPattern("MM/dd, YYYY HH:mm");
+
private String type;
private Map<String, Object> info;
private List<String> actions;
diff --git a/azkaban-exec-server/src/main/java/azkaban/execapp/action/KillJobAction.java b/azkaban-exec-server/src/main/java/azkaban/execapp/action/KillJobAction.java
new file mode 100644
index 0000000..c33e8c0
--- /dev/null
+++ b/azkaban-exec-server/src/main/java/azkaban/execapp/action/KillJobAction.java
@@ -0,0 +1,100 @@
+/*
+ * Copyright 2017 LinkedIn Corp.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package azkaban.execapp.action;
+
+import azkaban.ServiceProvider;
+import azkaban.execapp.FlowRunnerManager;
+import azkaban.trigger.TriggerAction;
+import java.util.HashMap;
+import java.util.Map;
+import org.apache.log4j.Logger;
+
+
+public class KillJobAction implements TriggerAction {
+
+ public static final String type = "KillJobAction";
+
+ private static final Logger logger = Logger
+ .getLogger(KillJobAction.class);
+
+ private final String actionId;
+ private final int execId;
+ private final String jobId;
+
+ public KillJobAction(final String actionId, final int execId, final String jobId) {
+ this.execId = execId;
+ this.actionId = actionId;
+ this.jobId = jobId;
+ }
+
+ public static KillJobAction createFromJson(final HashMap<String, Object> obj) {
+ final Map<String, Object> jsonObj = (HashMap<String, Object>) obj;
+ final String objType = (String) jsonObj.get("type");
+ if (!objType.equals(type)) {
+ throw new RuntimeException("Cannot create action of " + type + " from "
+ + objType);
+ }
+ final String actionId = (String) jsonObj.get("actionId");
+ final int execId = Integer.valueOf((String) jsonObj.get("execId"));
+ final String jobId = (String) jsonObj.get("jobId");
+ return new KillJobAction(actionId, execId, jobId);
+ }
+
+ @Override
+ public String getId() {
+ return this.actionId;
+ }
+
+ @Override
+ public String getType() {
+ return type;
+ }
+
+ @SuppressWarnings("unchecked")
+ @Override
+ public KillJobAction fromJson(final Object obj) throws Exception {
+ return createFromJson((HashMap<String, Object>) obj);
+ }
+
+ @Override
+ public Object toJson() {
+ final Map<String, Object> jsonObj = new HashMap<>();
+ jsonObj.put("actionId", this.actionId);
+ jsonObj.put("type", type);
+ jsonObj.put("execId", String.valueOf(this.execId));
+ jsonObj.put("jobId", String.valueOf(this.jobId));
+ return jsonObj;
+ }
+
+ @Override
+ public void doAction() throws Exception {
+ logger.info("ready to do action " + getDescription());
+ final FlowRunnerManager flowRunnerManager = ServiceProvider.SERVICE_PROVIDER
+ .getInstance(FlowRunnerManager.class);
+ flowRunnerManager.cancelJobBySLA(this.execId, this.jobId);
+ }
+
+ @Override
+ public void setContext(final Map<String, Object> context) {
+ }
+
+ @Override
+ public String getDescription() {
+ return type + " for execution " + this.execId + " jobId " + this.jobId;
+ }
+
+}
diff --git a/azkaban-exec-server/src/main/java/azkaban/execapp/AzkabanExecServerModule.java b/azkaban-exec-server/src/main/java/azkaban/execapp/AzkabanExecServerModule.java
index 56e0b35..1688705 100644
--- a/azkaban-exec-server/src/main/java/azkaban/execapp/AzkabanExecServerModule.java
+++ b/azkaban-exec-server/src/main/java/azkaban/execapp/AzkabanExecServerModule.java
@@ -36,6 +36,5 @@ public class AzkabanExecServerModule extends AbstractModule {
bind(AzkabanExecutorServer.class).in(Scopes.SINGLETON);
bind(TriggerManager.class).in(Scopes.SINGLETON);
bind(FlowRunnerManager.class).in(Scopes.SINGLETON);
-
}
}
diff --git a/azkaban-exec-server/src/main/java/azkaban/execapp/FlowRunner.java b/azkaban-exec-server/src/main/java/azkaban/execapp/FlowRunner.java
index 0083980..12b17ee 100644
--- a/azkaban-exec-server/src/main/java/azkaban/execapp/FlowRunner.java
+++ b/azkaban-exec-server/src/main/java/azkaban/execapp/FlowRunner.java
@@ -45,6 +45,7 @@ import azkaban.sla.SlaOption;
import azkaban.utils.Props;
import azkaban.utils.PropsUtils;
import azkaban.utils.SwapQueue;
+import com.google.common.collect.ImmutableSet;
import java.io.File;
import java.io.IOException;
import java.util.ArrayList;
@@ -65,6 +66,7 @@ import org.apache.log4j.Layout;
import org.apache.log4j.Logger;
import org.apache.log4j.PatternLayout;
+
/**
* Class that handles the running of a ExecutableFlow DAG
*/
@@ -418,9 +420,9 @@ public class FlowRunner extends EventHandler implements Runnable {
Set<String> outNodeIds = node.getOutNodes();
ExecutableFlowBase parentFlow = node.getParentFlow();
- // If a job is seen as failed, then we set the parent flow to
+ // If a job is seen as failed or killed due to failing SLA, then we set the parent flow to
// FAILED_FINISHING
- if (node.getStatus() == Status.FAILED) {
+ if (node.getStatus() == Status.FAILED || (node.getStatus() == Status.KILLED && node.isKilledBySLA())) {
// The job cannot be retried or has run out of retry attempts. We will
// fail the job and its flow now.
if (!retryJobIfPossible(node)) {
@@ -1144,4 +1146,8 @@ public class FlowRunner extends EventHandler implements Runnable {
}
}
}
+
+ public Set<JobRunner> getActiveJobRunners() {
+ return ImmutableSet.copyOf(this.activeJobRunners);
+ }
}
diff --git a/azkaban-exec-server/src/main/java/azkaban/execapp/FlowRunnerManager.java b/azkaban-exec-server/src/main/java/azkaban/execapp/FlowRunnerManager.java
index bad88bb..cf10856 100644
--- a/azkaban-exec-server/src/main/java/azkaban/execapp/FlowRunnerManager.java
+++ b/azkaban-exec-server/src/main/java/azkaban/execapp/FlowRunnerManager.java
@@ -390,6 +390,25 @@ public class FlowRunnerManager implements EventListener,
}
+
+ public void cancelJobBySLA(int execId, String jobId)
+ throws ExecutorManagerException {
+ FlowRunner flowRunner = runningFlows.get(execId);
+
+ if (flowRunner == null) {
+ throw new ExecutorManagerException("Execution " + execId
+ + " is not running.");
+ }
+
+ for (JobRunner jobRunner : flowRunner.getActiveJobRunners()) {
+ if (jobRunner.getJobId().equals(jobId)) {
+ logger.info("Killing job " + jobId + " in execution " + execId + " by SLA");
+ jobRunner.killBySLA();
+ break;
+ }
+ }
+ }
+
public void cancelFlow(final int execId, final String user)
throws ExecutorManagerException {
final FlowRunner runner = this.runningFlows.get(execId);
diff --git a/azkaban-exec-server/src/main/java/azkaban/execapp/JobRunner.java b/azkaban-exec-server/src/main/java/azkaban/execapp/JobRunner.java
index c9239b1..3191b7d 100644
--- a/azkaban-exec-server/src/main/java/azkaban/execapp/JobRunner.java
+++ b/azkaban-exec-server/src/main/java/azkaban/execapp/JobRunner.java
@@ -246,6 +246,10 @@ public class JobRunner extends EventHandler implements Runnable {
return this.node;
}
+ public String getJobId() {
+ return node.getId();
+ }
+
public String getLogFilePath() {
return this.logFile == null ? null : this.logFile.getPath();
}
@@ -790,6 +794,11 @@ public class JobRunner extends EventHandler implements Runnable {
this.fireEventListeners(event);
}
+ public void killBySLA() {
+ kill();
+ this.getNode().setKilledBySLA(true);
+ }
+
public void kill() {
synchronized (this.syncObject) {
if (Status.isStatusFinished(this.node.getStatus())) {
diff --git a/azkaban-exec-server/src/main/java/azkaban/execapp/Trigger.java b/azkaban-exec-server/src/main/java/azkaban/execapp/Trigger.java
index 140f117..1f0e238 100644
--- a/azkaban-exec-server/src/main/java/azkaban/execapp/Trigger.java
+++ b/azkaban-exec-server/src/main/java/azkaban/execapp/Trigger.java
@@ -56,7 +56,6 @@ public class Trigger implements Runnable {
}
final boolean isTriggerConditionMet = this.triggerCondition.isMet();
-
if (isTriggerConditionMet) {
logger.info("Condition " + this.triggerCondition.getExpression() + " met");
for (final TriggerAction action : this.actions) {
diff --git a/azkaban-exec-server/src/main/java/azkaban/execapp/TriggerManager.java b/azkaban-exec-server/src/main/java/azkaban/execapp/TriggerManager.java
index 6ade640..14a5911 100644
--- a/azkaban-exec-server/src/main/java/azkaban/execapp/TriggerManager.java
+++ b/azkaban-exec-server/src/main/java/azkaban/execapp/TriggerManager.java
@@ -16,6 +16,7 @@
package azkaban.execapp;
+import azkaban.execapp.action.KillJobAction;
import azkaban.execapp.action.KillExecutionAction;
import azkaban.sla.SlaOption;
import azkaban.trigger.Condition;
@@ -62,10 +63,14 @@ public class TriggerManager {
TriggerAction action = null;
switch (act) {
case SlaOption.ACTION_ALERT:
- action = new SlaAlertAction("slaAlert", sla, execId);
+ action = new SlaAlertAction(SlaOption.ACTION_ALERT, sla, execId);
break;
case SlaOption.ACTION_CANCEL_FLOW:
- action = new KillExecutionAction("killExecution", execId);
+ action = new KillExecutionAction(SlaOption.ACTION_CANCEL_FLOW, execId);
+ break;
+ case SlaOption.ACTION_KILL_JOB:
+ String jobId = (String)sla.getInfo().get(SlaOption.INFO_JOB_NAME);
+ action = new KillJobAction(SlaOption.ACTION_KILL_JOB, execId, jobId);
break;
default:
logger.info("Unknown action type " + act);
@@ -88,7 +93,6 @@ public class TriggerManager {
final List<TriggerAction> actions = createActions(sla, execId);
final Trigger trigger = new Trigger(execId, triggerCond, expireCond, actions);
-
final ReadablePeriod duration = Utils
.parsePeriodString((String) sla.getInfo().get(SlaOption.INFO_DURATION));
final long durationInMillis = duration.toPeriod().toStandardDuration().getMillis();
diff --git a/azkaban-web-server/src/main/java/azkaban/webapp/servlet/ScheduleServlet.java b/azkaban-web-server/src/main/java/azkaban/webapp/servlet/ScheduleServlet.java
index 529e253..67e9ace 100644
--- a/azkaban-web-server/src/main/java/azkaban/webapp/servlet/ScheduleServlet.java
+++ b/azkaban-web-server/src/main/java/azkaban/webapp/servlet/ScheduleServlet.java
@@ -240,7 +240,8 @@ public class ScheduleServlet extends LoginAbstractAzkabanServlet {
slaInfo.put(SlaOption.ALERT_TYPE, "email");
}
if (killAction.equals("true")) {
- slaActions.add(SlaOption.ACTION_CANCEL_FLOW);
+ String killActionType = id.equals("") ? SlaOption.ACTION_CANCEL_FLOW : SlaOption.ACTION_KILL_JOB;
+ slaActions.add(killActionType);
}
if (id.equals("")) {
if (rule.equals("SUCCESS")) {