Details
diff --git a/azkaban-common/src/main/java/azkaban/Constants.java b/azkaban-common/src/main/java/azkaban/Constants.java
index aea1499..4abad36 100644
--- a/azkaban-common/src/main/java/azkaban/Constants.java
+++ b/azkaban-common/src/main/java/azkaban/Constants.java
@@ -37,6 +37,8 @@ public class Constants {
public static final String AZKABAN_SERVLET_CONTEXT_KEY = "azkaban_app";
+ // Internal username used to perform SLA action
+ public static final String AZKABAN_SLA_CHECKER_USERNAME = "azkaban_sla";
// Memory check retry interval when OOM in ms
public static final long MEMORY_CHECK_INTERVAL_MS = 1000 * 60 * 1;
diff --git a/azkaban-common/src/main/java/azkaban/executor/ExecutableFlow.java b/azkaban-common/src/main/java/azkaban/executor/ExecutableFlow.java
index 806106c..ea72c80 100644
--- a/azkaban-common/src/main/java/azkaban/executor/ExecutableFlow.java
+++ b/azkaban-common/src/main/java/azkaban/executor/ExecutableFlow.java
@@ -15,6 +15,7 @@
*/
package azkaban.executor;
+import azkaban.sla.SlaOption;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
@@ -22,11 +23,13 @@ import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
+import java.util.stream.Collectors;
import azkaban.flow.Flow;
import azkaban.project.Project;
import azkaban.utils.TypedMapWrapper;
+
public class ExecutableFlow extends ExecutableFlowBase {
public static final String EXECUTIONID_PARAM = "executionId";
public static final String EXECUTIONPATH_PARAM = "executionPath";
@@ -40,6 +43,7 @@ public class ExecutableFlow extends ExecutableFlowBase {
public static final String PROJECTNAME_PARAM = "projectName";
public static final String LASTMODIFIEDTIME_PARAM = "lastModfiedTime";
public static final String LASTMODIFIEDUSER_PARAM = "lastModifiedUser";
+ public static final String SLAOPTIONS_PARAM = "slaOptions";
private int executionId = -1;
@@ -55,6 +59,7 @@ public class ExecutableFlow extends ExecutableFlowBase {
private HashSet<String> proxyUsers = new HashSet<String>();
private ExecutionOptions executionOptions;
+ private List<SlaOption> slaOptions = new ArrayList<>();
public ExecutableFlow(Project project, Flow flow) {
this.projectId = project.getId();
@@ -95,6 +100,11 @@ public class ExecutableFlow extends ExecutableFlowBase {
return executionOptions;
}
+ public List<SlaOption> getSlaOptions() {
+ return slaOptions;
+ }
+
+
@Override
protected void setFlow(Project project, Flow flow) {
super.setFlow(project, flow);
@@ -191,6 +201,10 @@ public class ExecutableFlow extends ExecutableFlowBase {
this.submitTime = submitTime;
}
+ public void setSlaOptions(List<SlaOption> slaOptions) {
+ this.slaOptions = slaOptions;
+ }
+
@Override
public Map<String, Object> toObject() {
HashMap<String, Object> flowObj = new HashMap<String, Object>();
@@ -218,11 +232,16 @@ public class ExecutableFlow extends ExecutableFlowBase {
flowObj.put(SUBMITTIME_PARAM, submitTime);
+ List<Map<String, Object>> slaOptions = new ArrayList<>();
+ this.getSlaOptions().stream().forEach((slaOption) -> slaOptions.add(slaOption.toObject()));
+
+ flowObj.put(SLAOPTIONS_PARAM, slaOptions);
+
return flowObj;
}
@SuppressWarnings("unchecked")
- public static ExecutableFlow createExecutableFlowFromObject(Object obj) {
+ public static ExecutableFlow createExecutableFlowFromObject(Object obj) {
ExecutableFlow exFlow = new ExecutableFlow();
HashMap<String, Object> flowObj = (HashMap<String, Object>) obj;
exFlow.fillExecutableFromMapObject(flowObj);
@@ -259,7 +278,12 @@ public class ExecutableFlow extends ExecutableFlowBase {
if (flowObj.containsKey(PROXYUSERS_PARAM)) {
List<String> proxyUserList = flowObj.<String> getList(PROXYUSERS_PARAM);
this.addAllProxyUsers(proxyUserList);
+ }
+ if (flowObj.containsKey(SLAOPTIONS_PARAM)) {
+ List<SlaOption> slaOptions =
+ flowObj.getList(SLAOPTIONS_PARAM).stream().map(SlaOption::fromObject).collect(Collectors.toList());
+ this.setSlaOptions(slaOptions);
}
}
diff --git a/azkaban-common/src/main/java/azkaban/sla/SlaOption.java b/azkaban-common/src/main/java/azkaban/sla/SlaOption.java
index 767b752..4683b51 100644
--- a/azkaban-common/src/main/java/azkaban/sla/SlaOption.java
+++ b/azkaban-common/src/main/java/azkaban/sla/SlaOption.java
@@ -16,11 +16,16 @@
package azkaban.sla;
+import com.google.common.collect.ImmutableSet;
import java.util.ArrayList;
+import java.util.Arrays;
import java.util.HashMap;
+import java.util.HashSet;
import java.util.List;
import java.util.Map;
+import java.util.Set;
+import java.util.stream.Collectors;
import org.joda.time.DateTime;
import org.joda.time.format.DateTimeFormat;
import org.joda.time.format.DateTimeFormatter;
@@ -31,16 +36,13 @@ public class SlaOption {
public static final String TYPE_FLOW_FINISH = "FlowFinish";
public static final String TYPE_FLOW_SUCCEED = "FlowSucceed";
- public static final String TYPE_FLOW_PROGRESS = "FlowProgress";
public static final String TYPE_JOB_FINISH = "JobFinish";
public static final String TYPE_JOB_SUCCEED = "JobSucceed";
- public static final String TYPE_JOB_PROGRESS = "JobProgress";
public static final String INFO_DURATION = "Duration";
public static final String INFO_FLOW_NAME = "FlowName";
public static final String INFO_JOB_NAME = "JobName";
- public static final String INFO_PROGRESS_PERCENT = "ProgressPercent";
public static final String INFO_EMAIL_LIST = "EmailList";
// always alert
@@ -61,6 +63,18 @@ public class SlaOption {
this.actions = actions;
}
+ public static List<SlaOption> getJobLevelSLAOptions(ExecutableFlow flow) {
+ Set<String> jobLevelSLAs = new HashSet<>(Arrays.asList(SlaOption.TYPE_JOB_FINISH, SlaOption.TYPE_JOB_SUCCEED));
+ return flow.getSlaOptions().stream().filter(slaOption -> jobLevelSLAs.contains(slaOption.getType()))
+ .collect(Collectors.toList());
+ }
+
+ public static List<SlaOption> getFlowLevelSLAOptions(ExecutableFlow flow) {
+ Set<String> flowLevelSLAs = new HashSet<>(Arrays.asList(SlaOption.TYPE_FLOW_FINISH, SlaOption.TYPE_FLOW_SUCCEED));
+ return flow.getSlaOptions().stream().filter(slaOption -> flowLevelSLAs.contains(slaOption.getType()))
+ .collect(Collectors.toList());
+ }
+
public String getType() {
return type;
}
diff --git a/azkaban-common/src/main/java/azkaban/trigger/builtin/ExecuteFlowAction.java b/azkaban-common/src/main/java/azkaban/trigger/builtin/ExecuteFlowAction.java
index db2dc53..dfbdbe0 100644
--- a/azkaban-common/src/main/java/azkaban/trigger/builtin/ExecuteFlowAction.java
+++ b/azkaban-common/src/main/java/azkaban/trigger/builtin/ExecuteFlowAction.java
@@ -239,6 +239,10 @@ public class ExecuteFlowAction implements TriggerAction {
}
exflow.setExecutionOptions(executionOptions);
+ if (slaOptions != null && slaOptions.size() > 0) {
+ exflow.setSlaOptions(slaOptions);
+ }
+
try {
logger.info("Invoking flow " + project.getName() + "." + flowName);
executorManager.submitExecutableFlow(exflow, submitUser);
@@ -246,59 +250,6 @@ public class ExecuteFlowAction implements TriggerAction {
} catch (ExecutorManagerException e) {
throw new RuntimeException(e);
}
-
- // deal with sla
- if (slaOptions != null && slaOptions.size() > 0) {
- int execId = exflow.getExecutionId();
- for (SlaOption sla : slaOptions) {
- logger.info("Adding sla trigger " + sla.toString() + " to execution "
- + execId);
- SlaChecker slaFailChecker =
- new SlaChecker("slaFailChecker", sla, execId);
- Map<String, ConditionChecker> slaCheckers =
- new HashMap<String, ConditionChecker>();
- slaCheckers.put(slaFailChecker.getId(), slaFailChecker);
- Condition triggerCond =
- new Condition(slaCheckers, slaFailChecker.getId()
- + ".isSlaFailed()");
- // if whole flow finish before violate sla, just expire
- SlaChecker slaPassChecker =
- new SlaChecker("slaPassChecker", sla, execId);
- Map<String, ConditionChecker> expireCheckers =
- new HashMap<String, ConditionChecker>();
- expireCheckers.put(slaPassChecker.getId(), slaPassChecker);
- Condition expireCond =
- new Condition(expireCheckers, slaPassChecker.getId()
- + ".isSlaPassed()");
- List<TriggerAction> actions = new ArrayList<TriggerAction>();
- List<String> slaActions = sla.getActions();
- for (String act : slaActions) {
- if (act.equals(SlaOption.ACTION_ALERT)) {
- SlaAlertAction slaAlert =
- new SlaAlertAction("slaAlert", sla, execId);
- actions.add(slaAlert);
- } else if (act.equals(SlaOption.ACTION_CANCEL_FLOW)) {
- KillExecutionAction killAct =
- new KillExecutionAction("killExecution", execId);
- actions.add(killAct);
- }
- }
- Trigger slaTrigger = new Trigger.TriggerBuilder("azkaban_sla",
- "azkaban",
- triggerCond,
- expireCond,
- actions).build();
-
- slaTrigger.getInfo().put("monitored.finished.execution",
- String.valueOf(execId));
- slaTrigger.setResetOnTrigger(false);
- slaTrigger.setResetOnExpire(false);
- logger.info("Ready to put in the sla trigger");
- triggerManager.insertTrigger(slaTrigger);
- logger.info("Sla inserted.");
- }
- }
-
}
@Override
diff --git a/azkaban-common/src/main/java/azkaban/trigger/builtin/KillExecutionAction.java b/azkaban-common/src/main/java/azkaban/trigger/builtin/KillExecutionAction.java
index e4a1dbf..01d9537 100644
--- a/azkaban-common/src/main/java/azkaban/trigger/builtin/KillExecutionAction.java
+++ b/azkaban-common/src/main/java/azkaban/trigger/builtin/KillExecutionAction.java
@@ -16,6 +16,7 @@
package azkaban.trigger.builtin;
+import azkaban.Constants;
import java.util.HashMap;
import java.util.Map;
@@ -26,6 +27,14 @@ import azkaban.executor.ExecutorManagerAdapter;
import azkaban.executor.Status;
import azkaban.trigger.TriggerAction;
+/**
+ * @deprecated Create a new KillExecutionAction using FlowRunnerManager
+ * instead of ExecutorManager to kill flow. Still keep the old one here
+ * for being compatible with existing SLA trigger in the database.
+ * Will remove the old one when all existing triggers expire.
+ */
+
+@Deprecated
public class KillExecutionAction implements TriggerAction {
public static final String type = "KillExecutionAction";
@@ -37,6 +46,7 @@ public class KillExecutionAction implements TriggerAction {
private int execId;
private static ExecutorManagerAdapter executorManager;
+ //todo chengren311: delete this class to executor module when all existing triggers in db are expired
public KillExecutionAction(String actionId, int execId) {
this.execId = execId;
this.actionId = actionId;
@@ -94,7 +104,7 @@ public class KillExecutionAction implements TriggerAction {
logger.info("ready to kill execution " + execId);
if (!Status.isStatusFinished(exFlow.getStatus())) {
logger.info("Killing execution " + execId);
- executorManager.cancelFlow(exFlow, "azkaban_sla");
+ executorManager.cancelFlow(exFlow, Constants.AZKABAN_SLA_CHECKER_USERNAME);
}
}
diff --git a/azkaban-common/src/main/java/azkaban/trigger/builtin/SlaAlertAction.java b/azkaban-common/src/main/java/azkaban/trigger/builtin/SlaAlertAction.java
index 2da27f8..b300e18 100644
--- a/azkaban-common/src/main/java/azkaban/trigger/builtin/SlaAlertAction.java
+++ b/azkaban-common/src/main/java/azkaban/trigger/builtin/SlaAlertAction.java
@@ -16,8 +16,6 @@
package azkaban.trigger.builtin;
-import azkaban.ServiceProvider;
-import azkaban.executor.AlerterHolder;
import java.util.HashMap;
import java.util.Map;
@@ -25,9 +23,11 @@ import org.apache.log4j.Logger;
import azkaban.alert.Alerter;
import azkaban.executor.ExecutableFlow;
-import azkaban.executor.ExecutorManagerAdapter;
import azkaban.sla.SlaOption;
import azkaban.trigger.TriggerAction;
+import azkaban.ServiceProvider;
+import azkaban.executor.AlerterHolder;
+import azkaban.executor.ExecutorLoader;
public class SlaAlertAction implements TriggerAction {
@@ -35,21 +35,19 @@ public class SlaAlertAction implements TriggerAction {
private static final Logger logger = Logger.getLogger(SlaAlertAction.class);
- private String actionId;
- private SlaOption slaOption;
- private int execId;
- private AlerterHolder alerters;
- private static ExecutorManagerAdapter executorManager;
+ private final String actionId;
+ private final SlaOption slaOption;
+ private final int execId;
+ private final AlerterHolder alerters;
+ private final ExecutorLoader executorLoader;
+ //todo chengren311: move this class to executor module when all existing triggers in db are expired
public SlaAlertAction(String id, SlaOption slaOption, int execId) {
this.actionId = id;
this.slaOption = slaOption;
this.execId = execId;
this.alerters = ServiceProvider.SERVICE_PROVIDER.getInstance(AlerterHolder.class);
- }
-
- public static void setExecutorManager(ExecutorManagerAdapter em) {
- executorManager = em;
+ this.executorLoader = ServiceProvider.SERVICE_PROVIDER.getInstance(ExecutorLoader.class);
}
@Override
@@ -105,9 +103,8 @@ public class SlaAlertAction implements TriggerAction {
Alerter alerter = alerters.get(alertType);
if (alerter != null) {
try {
- ExecutableFlow flow = executorManager.getExecutableFlow(execId);
- alerter.alertOnSla(slaOption,
- SlaOption.createSlaMessage(slaOption, flow));
+ ExecutableFlow flow = executorLoader.fetchExecutableFlow(execId);
+ alerter.alertOnSla(slaOption, SlaOption.createSlaMessage(slaOption, flow));
} catch (Exception e) {
e.printStackTrace();
logger.error("Failed to alert by " + alertType);
diff --git a/azkaban-common/src/main/java/azkaban/trigger/builtin/SlaChecker.java b/azkaban-common/src/main/java/azkaban/trigger/builtin/SlaChecker.java
index 27b9ba4..d65b273 100644
--- a/azkaban-common/src/main/java/azkaban/trigger/builtin/SlaChecker.java
+++ b/azkaban-common/src/main/java/azkaban/trigger/builtin/SlaChecker.java
@@ -16,6 +16,8 @@
package azkaban.trigger.builtin;
+import azkaban.ServiceProvider;
+import azkaban.executor.ExecutorLoader;
import java.util.HashMap;
import java.util.Map;
@@ -25,7 +27,6 @@ import org.joda.time.ReadablePeriod;
import azkaban.executor.ExecutableFlow;
import azkaban.executor.ExecutableNode;
-import azkaban.executor.ExecutorManagerAdapter;
import azkaban.executor.ExecutorManagerException;
import azkaban.executor.Status;
import azkaban.sla.SlaOption;
@@ -37,21 +38,18 @@ public class SlaChecker implements ConditionChecker {
private static final Logger logger = Logger.getLogger(SlaChecker.class);
public static final String type = "SlaChecker";
- private String id;
- private SlaOption slaOption;
- private int execId;
+ private final String id;
+ private final SlaOption slaOption;
+ private final int execId;
private long checkTime = -1;
+ private final ExecutorLoader executorLoader;
- private static ExecutorManagerAdapter executorManager;
-
+ //todo chengren311: move this class to executor module when all existing triggers in db are expired
public SlaChecker(String id, SlaOption slaOption, int execId) {
this.id = id;
this.slaOption = slaOption;
this.execId = execId;
- }
-
- public static void setExecutorManager(ExecutorManagerAdapter em) {
- executorManager = em;
+ this.executorLoader = ServiceProvider.SERVICE_PROVIDER.getInstance(ExecutorLoader.class);
}
private Boolean isSlaMissed(ExecutableFlow flow) {
@@ -204,7 +202,7 @@ public class SlaChecker implements ConditionChecker {
logger.info("Checking sla for execution " + execId);
ExecutableFlow flow;
try {
- flow = executorManager.getExecutableFlow(execId);
+ flow = executorLoader.fetchExecutableFlow(execId);
} catch (ExecutorManagerException e) {
logger.error("Can't get executable flow.", e);
e.printStackTrace();
@@ -217,7 +215,7 @@ public class SlaChecker implements ConditionChecker {
public Object isSlaFailed() {
ExecutableFlow flow;
try {
- flow = executorManager.getExecutableFlow(execId);
+ flow = executorLoader.fetchExecutableFlow(execId);
} catch (ExecutorManagerException e) {
logger.error("Can't get executable flow.", e);
// something wrong, send out alerts
@@ -229,7 +227,7 @@ public class SlaChecker implements ConditionChecker {
public Object isSlaPassed() {
ExecutableFlow flow;
try {
- flow = executorManager.getExecutableFlow(execId);
+ flow = executorLoader.fetchExecutableFlow(execId);
} catch (ExecutorManagerException e) {
logger.error("Can't get executable flow.", e);
// something wrong, send out alerts
diff --git a/azkaban-exec-server/src/main/java/azkaban/execapp/action/KillExecutionAction.java b/azkaban-exec-server/src/main/java/azkaban/execapp/action/KillExecutionAction.java
new file mode 100644
index 0000000..0d0f4e6
--- /dev/null
+++ b/azkaban-exec-server/src/main/java/azkaban/execapp/action/KillExecutionAction.java
@@ -0,0 +1,102 @@
+/*
+ * Copyright 2017 LinkedIn Corp.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package azkaban.execapp.action;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import azkaban.Constants;
+import azkaban.ServiceProvider;
+import azkaban.execapp.FlowRunnerManager;
+import azkaban.trigger.TriggerAction;
+
+import org.apache.log4j.Logger;
+
+
+public class KillExecutionAction implements TriggerAction {
+
+ public static final String type = "KillExecutionAction";
+
+ private static final Logger logger = Logger
+ .getLogger(KillExecutionAction.class);
+
+ private String actionId;
+ private int execId;
+
+ public KillExecutionAction(String actionId, int execId) {
+ this.execId = execId;
+ this.actionId = actionId;
+ }
+
+ @Override
+ public String getId() {
+ return actionId;
+ }
+
+ @Override
+ public String getType() {
+ return type;
+ }
+
+ @SuppressWarnings("unchecked")
+ public static KillExecutionAction createFromJson(Object obj) {
+ return createFromJson((HashMap<String, Object>) obj);
+ }
+
+ public static KillExecutionAction createFromJson(HashMap<String, Object> obj) {
+ Map<String, Object> jsonObj = (HashMap<String, Object>) obj;
+ String objType = (String) jsonObj.get("type");
+ if (!objType.equals(type)) {
+ throw new RuntimeException("Cannot create action of " + type + " from "
+ + objType);
+ }
+ String actionId = (String) jsonObj.get("actionId");
+ int execId = Integer.valueOf((String) jsonObj.get("execId"));
+ return new KillExecutionAction(actionId, execId);
+ }
+
+ @SuppressWarnings("unchecked")
+ @Override
+ public KillExecutionAction fromJson(Object obj) throws Exception {
+ return createFromJson((HashMap<String, Object>) obj);
+ }
+
+ @Override
+ public Object toJson() {
+ Map<String, Object> jsonObj = new HashMap<String, Object>();
+ jsonObj.put("actionId", actionId);
+ jsonObj.put("type", type);
+ jsonObj.put("execId", String.valueOf(execId));
+ return jsonObj;
+ }
+
+ @Override
+ public void doAction() throws Exception {
+ logger.info("ready to kill execution " + execId);
+ ServiceProvider.SERVICE_PROVIDER.getInstance(FlowRunnerManager.class).cancelFlow(execId, Constants.AZKABAN_SLA_CHECKER_USERNAME);
+ }
+
+ @Override
+ public void setContext(Map<String, Object> context) {
+ }
+
+ @Override
+ public String getDescription() {
+ return type + " for " + execId;
+ }
+
+}
diff --git a/azkaban-exec-server/src/main/java/azkaban/execapp/AzkabanExecServerModule.java b/azkaban-exec-server/src/main/java/azkaban/execapp/AzkabanExecServerModule.java
index 3f9772b..3654919 100644
--- a/azkaban-exec-server/src/main/java/azkaban/execapp/AzkabanExecServerModule.java
+++ b/azkaban-exec-server/src/main/java/azkaban/execapp/AzkabanExecServerModule.java
@@ -33,5 +33,8 @@ public class AzkabanExecServerModule extends AbstractModule {
protected void configure() {
bind(ExecutorLoader.class).to(JdbcExecutorLoader.class).in(Scopes.SINGLETON);
bind(AzkabanExecutorServer.class).in(Scopes.SINGLETON);
+ bind(TriggerManager.class).in(Scopes.SINGLETON);
+ bind(FlowRunnerManager.class).in(Scopes.SINGLETON);
+
}
}
diff --git a/azkaban-exec-server/src/main/java/azkaban/execapp/FlowRunner.java b/azkaban-exec-server/src/main/java/azkaban/execapp/FlowRunner.java
index f5bd936..b86cfbb 100644
--- a/azkaban-exec-server/src/main/java/azkaban/execapp/FlowRunner.java
+++ b/azkaban-exec-server/src/main/java/azkaban/execapp/FlowRunner.java
@@ -16,6 +16,8 @@
package azkaban.execapp;
+import azkaban.ServiceProvider;
+import azkaban.sla.SlaOption;
import java.io.File;
import java.io.IOException;
import java.util.ArrayList;
@@ -1061,7 +1063,8 @@ public class FlowRunner extends EventHandler implements Runnable {
if (event.getType() == Type.JOB_STATUS_CHANGED) {
updateFlow();
- } else if (event.getType() == Type.JOB_FINISHED) {
+ }
+ else if (event.getType() == Type.JOB_FINISHED) {
ExecutableNode node = runner.getNode();
EventData eventData = event.getData();
long seconds = (node.getEndTime() - node.getStartTime()) / 1000;
@@ -1083,6 +1086,11 @@ public class FlowRunner extends EventHandler implements Runnable {
fireEventListeners(event);
}
}
+ else if (event.getType() == Type.JOB_STARTED) {
+ // add job level checker
+ TriggerManager triggerManager = ServiceProvider.SERVICE_PROVIDER.getInstance(TriggerManager.class);
+ triggerManager.addTrigger(flow.getExecutionId(), SlaOption.getJobLevelSLAOptions(flow));
+ }
}
}
diff --git a/azkaban-exec-server/src/main/java/azkaban/execapp/FlowRunnerManager.java b/azkaban-exec-server/src/main/java/azkaban/execapp/FlowRunnerManager.java
index 22ff364..fadffbd 100644
--- a/azkaban-exec-server/src/main/java/azkaban/execapp/FlowRunnerManager.java
+++ b/azkaban-exec-server/src/main/java/azkaban/execapp/FlowRunnerManager.java
@@ -18,6 +18,7 @@ package azkaban.execapp;
import azkaban.Constants;
import azkaban.executor.Status;
+import azkaban.sla.SlaOption;
import azkaban.storage.StorageManager;
import com.google.inject.Inject;
import java.io.File;
@@ -38,6 +39,7 @@ import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
import org.apache.commons.io.FileUtils;
import org.apache.log4j.Logger;
@@ -118,6 +120,8 @@ public class FlowRunnerManager implements EventListener,
private final ProjectLoader projectLoader;
private final JobTypeManager jobtypeManager;
private final FlowPreparer flowPreparer;
+ private final TriggerManager triggerManager;
+
private final Props azkabanProps;
private final File executionDirectory;
@@ -151,7 +155,8 @@ public class FlowRunnerManager implements EventListener,
public FlowRunnerManager(Props props,
ExecutorLoader executorLoader,
ProjectLoader projectLoader,
- StorageManager storageManager) throws IOException {
+ StorageManager storageManager,
+ TriggerManager triggerManager) throws IOException {
azkabanProps = props;
executionDirRetention = props.getLong("execution.dir.retention", executionDirRetention);
@@ -178,6 +183,7 @@ public class FlowRunnerManager implements EventListener,
this.executorLoader = executorLoader;
this.projectLoader = projectLoader;
+ this.triggerManager = triggerManager;
this.jobLogChunkSize = azkabanProps.getString("job.log.chunk.size", "5MB");
this.jobLogNumFiles = azkabanProps.getInt("job.log.backup.index", 4);
@@ -629,16 +635,19 @@ public class FlowRunnerManager implements EventListener,
@Override
public void handleEvent(Event event) {
- if (event.getType() == Event.Type.FLOW_FINISHED) {
-
- FlowRunner flowRunner = (FlowRunner) event.getRunner();
- ExecutableFlow flow = flowRunner.getExecutableFlow();
+ FlowRunner flowRunner = (FlowRunner) event.getRunner();
+ ExecutableFlow flow = flowRunner.getExecutableFlow();
+ if (event.getType() == Event.Type.FLOW_FINISHED) {
recentlyFinishedFlows.put(flow.getExecutionId(), flow);
logger.info("Flow " + flow.getExecutionId()
+ " is finished. Adding it to recently finished flows list.");
runningFlows.remove(flow.getExecutionId());
}
+ else if (event.getType() == Event.Type.FLOW_STARTED) {
+ // add flow level checker
+ triggerManager.addTrigger(flow.getExecutionId(), SlaOption.getFlowLevelSLAOptions(flow));
+ }
}
public LogData readFlowLogs(int execId, int startByte, int length)
@@ -889,6 +898,7 @@ public class FlowRunnerManager implements EventListener,
public void shutdownNow() {
logger.warn("Shutting down FlowRunnerManager now...");
executorService.shutdownNow();
+ triggerManager.shutdown();
}
}
diff --git a/azkaban-exec-server/src/main/java/azkaban/execapp/Trigger.java b/azkaban-exec-server/src/main/java/azkaban/execapp/Trigger.java
new file mode 100644
index 0000000..e1dfc73
--- /dev/null
+++ b/azkaban-exec-server/src/main/java/azkaban/execapp/Trigger.java
@@ -0,0 +1,93 @@
+/*
+ * Copyright 2017 LinkedIn Corp.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+
+package azkaban.execapp;
+
+import java.util.List;
+
+import azkaban.trigger.Condition;
+import azkaban.trigger.TriggerAction;
+import org.apache.log4j.Logger;
+
+
+public class Trigger implements Runnable {
+ private static Logger logger = Logger.getLogger(azkaban.execapp.Trigger.class);
+ private final int execId;
+
+ // condition to trigger actions(ex. flow running longer than X mins)
+ private final Condition triggerCondition;
+ // condition to expire this trigger(ex. flow finishes before violating SLA)
+ private final Condition expireCondition;
+ private final List<TriggerAction> actions;
+
+ public Trigger(int execId,
+ Condition triggerCondition,
+ Condition expireCondition,
+ List<TriggerAction> actions)
+ {
+ this.execId = execId;
+ this.triggerCondition = triggerCondition;
+ this.expireCondition = expireCondition;
+ this.actions = actions;
+ }
+
+
+ /**
+ * Perform the action if trigger condition is met
+ */
+ @Override
+ public void run() {
+ if(isTriggerExpired()) {
+ logger.info(this + " expired");
+ return ;
+ }
+
+ boolean isTriggerConditionMet = triggerCondition.isMet();
+
+ if (isTriggerConditionMet) {
+ logger.info("Condition " + triggerCondition.getExpression() + " met");
+ for (TriggerAction action : actions) {
+ try {
+ action.doAction();
+ } catch (Exception e) {
+ logger.error("Failed to do action " + action.getDescription()
+ + " for execution " + azkaban.execapp.Trigger.this.execId, e);
+ }
+ }
+ }
+ }
+
+ /**
+ * Check if the trigger is expired and reset isExpired
+ * @return true if trigger is expired
+ */
+ public boolean isTriggerExpired() {
+ return expireCondition.isMet();
+ }
+
+ public String toString() {
+ StringBuilder actionsString = new StringBuilder();
+ for (TriggerAction act : actions) {
+ actionsString.append(", ");
+ actionsString.append(act.getDescription());
+ }
+
+ return "Trigger for execution " + execId + " with trigger condition of "
+ + triggerCondition.getExpression() + " and expire condition of "
+ + expireCondition.getExpression() + actionsString;
+ }
+}
diff --git a/azkaban-exec-server/src/main/java/azkaban/execapp/TriggerManager.java b/azkaban-exec-server/src/main/java/azkaban/execapp/TriggerManager.java
new file mode 100644
index 0000000..1a7e394
--- /dev/null
+++ b/azkaban-exec-server/src/main/java/azkaban/execapp/TriggerManager.java
@@ -0,0 +1,100 @@
+/*
+ * Copyright 2017 LinkedIn Corp.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package azkaban.execapp;
+
+import azkaban.sla.SlaOption;
+import azkaban.trigger.Condition;
+import azkaban.trigger.ConditionChecker;
+import azkaban.trigger.TriggerAction;
+import azkaban.trigger.builtin.SlaAlertAction;
+import azkaban.trigger.builtin.SlaChecker;
+import azkaban.utils.Utils;
+import azkaban.execapp.action.KillExecutionAction;
+import com.google.inject.Inject;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import org.apache.log4j.Logger;
+import org.joda.time.ReadablePeriod;
+
+
+public class TriggerManager {
+ private static Logger logger = Logger.getLogger(TriggerManager.class);
+ private static final int SCHEDULED_THREAD_POOL_SIZE = 4;
+ private final ScheduledExecutorService scheduledService;
+
+ @Inject
+ public TriggerManager() {
+ this.scheduledService = Executors.newScheduledThreadPool(SCHEDULED_THREAD_POOL_SIZE);
+ }
+
+ private Condition createCondition(SlaOption sla, int execId, String checkerName, String checkerMethod) {
+ SlaChecker slaFailChecker = new SlaChecker(checkerName, sla, execId);
+ Map<String, ConditionChecker> slaCheckers = new HashMap<>();
+ slaCheckers.put(slaFailChecker.getId(), slaFailChecker);
+ return new Condition(slaCheckers, slaFailChecker.getId() + "." + checkerMethod);
+ }
+
+ private List<TriggerAction> createActions(SlaOption sla, int execId) {
+ List<TriggerAction> actions = new ArrayList<>();
+ List<String> slaActions = sla.getActions();
+ for (String act : slaActions) {
+ TriggerAction action = null;
+ switch (act) {
+ case SlaOption.ACTION_ALERT:
+ action = new SlaAlertAction("slaAlert", sla, execId);
+ break;
+ case SlaOption.ACTION_CANCEL_FLOW:
+ action = new KillExecutionAction("killExecution", execId);
+ break;
+ default:
+ logger.info("Unknown action type " + act);
+ break;
+ }
+ if (action != null) {
+ actions.add(action);
+ }
+ }
+ return actions;
+ }
+
+ public void addTrigger(int execId, List<SlaOption> slaOptions) {
+ for (SlaOption sla : slaOptions) {
+ Condition triggerCond = createCondition(sla, execId, "slaFailChecker", "isSlaFailed()");
+
+ // if whole flow finish before violating sla, just expire the checker
+ Condition expireCond = createCondition(sla, execId, "slaPassChecker", "isSlaPassed()");
+
+ List<TriggerAction> actions = createActions(sla, execId);
+ Trigger trigger = new Trigger(execId, triggerCond, expireCond, actions);
+
+ ReadablePeriod duration = Utils.parsePeriodString((String) sla.getInfo().get(SlaOption.INFO_DURATION));
+ long durationInMillis = duration.toPeriod().toStandardDuration().getMillis();
+
+ logger.info("Adding sla trigger " + sla.toString() + " to execution " + execId + ", scheduled to trigger in " + durationInMillis/1000 + " seconds");
+ scheduledService.schedule(trigger, durationInMillis, TimeUnit.MILLISECONDS);
+ }
+ }
+
+ public void shutdown() {
+ scheduledService.shutdownNow();
+ }
+}
diff --git a/azkaban-web-server/src/main/java/azkaban/webapp/AzkabanWebServer.java b/azkaban-web-server/src/main/java/azkaban/webapp/AzkabanWebServer.java
index e0a57cf..407f00f 100644
--- a/azkaban-web-server/src/main/java/azkaban/webapp/AzkabanWebServer.java
+++ b/azkaban-web-server/src/main/java/azkaban/webapp/AzkabanWebServer.java
@@ -286,13 +286,10 @@ public class AzkabanWebServer extends AzkabanServer {
private void loadBuiltinCheckersAndActions() {
logger.info("Loading built-in checker and action types");
- SlaChecker.setExecutorManager(executorManager);
ExecuteFlowAction.setExecutorManager(executorManager);
ExecuteFlowAction.setProjectManager(projectManager);
ExecuteFlowAction.setTriggerManager(triggerManager);
KillExecutionAction.setExecutorManager(executorManager);
- SlaAlertAction.setExecutorManager(executorManager);
- SlaAlertAction.setExecutorManager(executorManager);
CreateTriggerAction.setTriggerManager(triggerManager);
ExecutionChecker.setExecutorManager(executorManager);