Details
diff --git a/azkaban-common/src/main/java/azkaban/executor/ExecutionOptions.java b/azkaban-common/src/main/java/azkaban/executor/ExecutionOptions.java
index d8b10f1..5049743 100644
--- a/azkaban-common/src/main/java/azkaban/executor/ExecutionOptions.java
+++ b/azkaban-common/src/main/java/azkaban/executor/ExecutionOptions.java
@@ -37,6 +37,8 @@ public class ExecutionOptions {
/* override dispatcher selection and use executor id specified */
public static final String USE_EXECUTOR = "useExecutor";
public static final int DEFAULT_FLOW_PRIORITY = 5;
+ public static final String TRIGGER_SPEC = "triggerSpec";
+ public static final String TRIGGER_FILE = "triggerFile";
private static final String FLOW_PARAMETERS = "flowParameters";
private static final String NOTIFY_ON_FIRST_FAILURE = "notifyOnFirstFailure";
diff --git a/azkaban-common/src/main/java/azkaban/project/DirectoryFlowLoader.java b/azkaban-common/src/main/java/azkaban/project/DirectoryFlowLoader.java
index 2295435..79a6e87 100644
--- a/azkaban-common/src/main/java/azkaban/project/DirectoryFlowLoader.java
+++ b/azkaban-common/src/main/java/azkaban/project/DirectoryFlowLoader.java
@@ -18,6 +18,7 @@ package azkaban.project;
import java.io.File;
import java.io.FileFilter;
+import java.io.FileInputStream;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
@@ -28,6 +29,7 @@ import java.util.List;
import java.util.Map;
import java.util.Set;
+import org.apache.commons.io.IOUtils;
import org.apache.log4j.Logger;
import azkaban.flow.CommonJobProperties;
@@ -47,6 +49,7 @@ import azkaban.utils.Utils;
public class DirectoryFlowLoader implements ProjectValidator {
private static final DirFilter DIR_FILTER = new DirFilter();
private static final String PROPERTY_SUFFIX = ".properties";
+ private static final String TRIGGER_SUFFIX = ".trigger";
private static final String JOB_SUFFIX = ".job";
public static final String JOB_MAX_XMS = "job.max.Xms";
public static final String MAX_XMS_DEFAULT = "1G";
@@ -60,6 +63,7 @@ public class DirectoryFlowLoader implements ProjectValidator {
private HashSet<String> rootNodes;
private HashMap<String, Flow> flowMap;
private HashMap<String, Node> nodeMap;
+ private HashMap<String, String> triggerMap;
private HashMap<String, Map<String, Edge>> nodeDependencies;
private HashMap<String, Props> jobPropsMap;
@@ -108,6 +112,16 @@ public class DirectoryFlowLoader implements ProjectValidator {
public Map<String, Props> getJobProps() {
return jobPropsMap;
}
+
+ /**
+ * Returns triggers
+ *
+ * @return Map of trigger file name to trigger json
+ */
+ public Map<String, String> getTriggerMap() {
+ return triggerMap;
+ }
+
/**
* Returns list of properties.
@@ -130,6 +144,7 @@ public class DirectoryFlowLoader implements ProjectValidator {
jobPropsMap = new HashMap<String, Props>();
nodeMap = new HashMap<String, Node>();
flowMap = new HashMap<String, Flow>();
+ triggerMap = new HashMap<String, String>();
errors = new HashSet<String>();
duplicateJobs = new HashSet<String>();
nodeDependencies = new HashMap<String, Map<String, Edge>>();
@@ -173,6 +188,28 @@ public class DirectoryFlowLoader implements ProjectValidator {
propsList.add(parent);
}
+ //Loading all trigger files
+ File[] triggerFiles = dir.listFiles(new SuffixFilter(TRIGGER_SUFFIX));
+ for (File file : triggerFiles) {
+ FileInputStream fIStream = null;
+ try {
+ if(!triggerMap.containsKey(file.getName())) {
+ fIStream = new FileInputStream(file);
+ String triggerJSON = IOUtils.toString(fIStream);
+ //TODO: validate trigger JSON
+ triggerMap.put(file.getName(), triggerJSON);
+ }
+ } catch (IOException e) {
+ errors.add("Error loading trigger " + file.getName() + ":"
+ + e.getMessage());
+ } finally {
+ if(fIStream != null) {
+ IOUtils.closeQuietly(fIStream);
+ }
+ }
+ logger.info("Adding " + file.getName());
+ }
+
// Load all Job files. If there's a duplicate name, then we don't load
File[] jobFiles = dir.listFiles(new SuffixFilter(JOB_SUFFIX));
for (File file : jobFiles) {
diff --git a/azkaban-common/src/main/java/azkaban/project/ProjectManager.java b/azkaban-common/src/main/java/azkaban/project/ProjectManager.java
index 52024f1..fb8bbd0 100644
--- a/azkaban-common/src/main/java/azkaban/project/ProjectManager.java
+++ b/azkaban-common/src/main/java/azkaban/project/ProjectManager.java
@@ -30,6 +30,9 @@ import java.util.zip.ZipFile;
import org.apache.commons.io.FileUtils;
import org.apache.log4j.Logger;
+import org.codehaus.jackson.JsonGenerationException;
+import org.codehaus.jackson.map.JsonMappingException;
+import org.codehaus.jackson.map.ObjectMapper;
import azkaban.flow.Flow;
import azkaban.project.DirectoryFlowLoader;
@@ -49,6 +52,7 @@ import azkaban.utils.Utils;
public class ProjectManager {
private static final Logger logger = Logger.getLogger(ProjectManager.class);
+ public static final String TRIGGER_DATA = "triggers";
private ConcurrentHashMap<Integer, Project> projectsById =
new ConcurrentHashMap<Integer, Project>();
private ConcurrentHashMap<String, Project> projectsByName =
@@ -527,6 +531,7 @@ public class ProjectManager {
(DirectoryFlowLoader) validatorManager.getDefaultValidator();
Map<String, Props> jobProps = loader.getJobProps();
List<Props> propProps = loader.getProps();
+ Map<String, String> triggerMap = loader.getTriggerMap();
synchronized (project) {
int newVersion = projectLoader.getLatestProjectVersion(project) + 1;
@@ -550,6 +555,10 @@ public class ProjectManager {
jobProps.values()));
logger.info("Uploading Props properties");
projectLoader.uploadProjectProperties(project, propProps);
+
+ logger.info("Uploading trigger files");
+ project.getMetadata().put(TRIGGER_DATA, triggerMap);
+ projectLoader.updateProjectSettings(project);
}
logger.info("Uploaded project files. Cleaning up temp files.");
diff --git a/azkaban-common/src/main/java/azkaban/scheduler/TriggerBasedScheduleLoader.java b/azkaban-common/src/main/java/azkaban/scheduler/TriggerBasedScheduleLoader.java
index 7a3c12a..5010d27 100644
--- a/azkaban-common/src/main/java/azkaban/scheduler/TriggerBasedScheduleLoader.java
+++ b/azkaban-common/src/main/java/azkaban/scheduler/TriggerBasedScheduleLoader.java
@@ -23,6 +23,7 @@ import java.util.Map;
import org.apache.log4j.Logger;
+import azkaban.executor.ExecutionOptions;
import azkaban.trigger.Condition;
import azkaban.trigger.ConditionChecker;
import azkaban.trigger.Trigger;
@@ -32,6 +33,7 @@ import azkaban.trigger.TriggerManagerAdapter;
import azkaban.trigger.TriggerManagerException;
import azkaban.trigger.builtin.BasicTimeChecker;
import azkaban.trigger.builtin.ExecuteFlowAction;
+import azkaban.trigger.builtin.NyxTriggerChecker;
public class TriggerBasedScheduleLoader implements ScheduleLoader {
@@ -86,6 +88,17 @@ public class TriggerBasedScheduleLoader implements ScheduleLoader {
s.getPeriod());
checkers.put(checker.getId(), checker);
String expr = checker.getId() + ".eval()";
+
+ Map<String, String> flowParams =
+ s.getExecutionOptions().getFlowParameters();
+ if (flowParams != null
+ && flowParams.containsKey(ExecutionOptions.TRIGGER_SPEC)) {
+ ConditionChecker nyxChecker = new NyxTriggerChecker(
+ flowParams.get(ExecutionOptions.TRIGGER_SPEC), "NyxTriggerChecker_1");
+ checkers.put(nyxChecker.getId(), nyxChecker);
+ expr = " && " + nyxChecker.getId() + ".eval() ";
+ }
+
Condition cond = new Condition(checkers, expr);
return cond;
}
diff --git a/azkaban-common/src/main/java/azkaban/server/HttpRequestUtils.java b/azkaban-common/src/main/java/azkaban/server/HttpRequestUtils.java
index fc159ec..48b848d 100644
--- a/azkaban-common/src/main/java/azkaban/server/HttpRequestUtils.java
+++ b/azkaban-common/src/main/java/azkaban/server/HttpRequestUtils.java
@@ -31,6 +31,8 @@ import azkaban.executor.ExecutionOptions;
import azkaban.executor.ExecutionOptions.FailureAction;
import azkaban.executor.ExecutorManagerException;
import azkaban.executor.mail.DefaultMailCreator;
+import azkaban.project.Project;
+import azkaban.project.ProjectManager;
import azkaban.user.Permission;
import azkaban.user.Permission.Type;
import azkaban.user.Role;
@@ -315,4 +317,36 @@ public class HttpRequestUtils {
return groupParam;
}
+ /**
+ * Set correct trigger spec using runtime-config or .json file
+ *
+ * @param flowOptions
+ * @param project
+ * @throws Exception
+ */
+ public static void setTriggerSpecification(ExecutionOptions flowOptions,
+ Project project) throws Exception {
+ Map<String, String> flowParams = flowOptions.getFlowParameters();
+ Map<String, Object> metaData = project.getMetadata();
+ // User specific TRIGGER_SPEC takes higher priority
+ if (flowParams != null
+ && !flowParams.containsKey(ExecutionOptions.TRIGGER_SPEC)
+ && metaData != null
+ && metaData.containsKey(ProjectManager.TRIGGER_DATA)) {
+ String triggerName = flowParams.get(ExecutionOptions.TRIGGER_FILE);
+ @SuppressWarnings("unchecked")
+ Map<String, String> triggers =
+ (Map<String, String>) metaData.get(ProjectManager.TRIGGER_DATA);
+ if (triggers.containsKey(triggerName)) {
+ flowParams.put(ExecutionOptions.TRIGGER_SPEC,
+ triggers.get(triggerName));
+ } else if (triggers.containsKey(triggerName + ".json")) {
+ flowParams.put(ExecutionOptions.TRIGGER_SPEC,
+ triggers.get(triggerName + ".json"));
+ } else {
+ throw new Exception("Unknown trigger file " + triggerName);
+ }
+ }
+ }
+
}
diff --git a/azkaban-common/src/main/java/azkaban/trigger/builtin/NyxTriggerChecker.java b/azkaban-common/src/main/java/azkaban/trigger/builtin/NyxTriggerChecker.java
new file mode 100644
index 0000000..d5067f9
--- /dev/null
+++ b/azkaban-common/src/main/java/azkaban/trigger/builtin/NyxTriggerChecker.java
@@ -0,0 +1,133 @@
+package azkaban.trigger.builtin;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.log4j.Logger;
+
+import azkaban.trigger.ConditionChecker;
+import azkaban.trigger.TriggerManagerException;
+import azkaban.utils.NyxUtils;
+
+/***
+ * Trigger checker leveraging upcoming Nyx service
+ *
+ * @author gaggarwa
+ *
+ */
+public class NyxTriggerChecker implements ConditionChecker {
+ private static Logger logger = Logger.getLogger(NyxTriggerChecker.class);
+
+ public static final String type = "NyxTriggerChecker";
+
+ private String specification;
+ private String id;
+ private long triggerId = -1L;
+
+ public NyxTriggerChecker(String specification, String id)
+ throws TriggerManagerException {
+ this(specification, id, -1);
+ // TODO: register a trigger
+ }
+
+ public NyxTriggerChecker(String specification, String id, long triggerId)
+ throws TriggerManagerException {
+ this.specification = specification;
+ this.id = id;
+ this.triggerId = (triggerId == -1
+ ? NyxUtils.registerNyxTrigger(specification) : triggerId);
+ }
+
+ @Override
+ public Object eval() {
+ try {
+ if (triggerId == -1) {
+ // if trigger is not registered then first register
+ triggerId = NyxUtils.registerNyxTrigger(specification);
+ }
+ return NyxUtils.isNyxTriggerReady(triggerId);
+ } catch (TriggerManagerException ex) {
+ logger.error("Error while evaluating checker " + id, ex);
+ return false;
+ }
+ }
+
+ @Override
+ public Object getNum() {
+ return null;
+ }
+
+ @Override
+ public void reset() {
+ try {
+ NyxUtils.unregisterNyxTrigger(triggerId);
+ triggerId = NyxUtils.registerNyxTrigger(specification);
+ } catch (TriggerManagerException ex) {
+ logger.error("Error while resetting checker " + id, ex);
+ }
+ }
+
+ @Override
+ public String getId() {
+ return id;
+ }
+
+ @Override
+ public String getType() {
+ return type;
+ }
+
+ @SuppressWarnings("unchecked")
+ @Override
+ public ConditionChecker fromJson(Object obj) throws Exception {
+ return createFromJson((HashMap<String, Object>) obj);
+ }
+
+ @Override
+ public Object toJson() {
+ Map<String, Object> jsonObj = new HashMap<String, Object>();
+ jsonObj.put("type", type);
+ jsonObj.put("specification", specification);
+ jsonObj.put("triggerId", String.valueOf(triggerId));
+ jsonObj.put("id", id);
+
+ return jsonObj;
+ }
+
+ @Override
+ public void stopChecker() {
+ try {
+ NyxUtils.unregisterNyxTrigger(triggerId);
+ } catch (TriggerManagerException ex) {
+ logger.error("Error while stopping checker " + id, ex);
+ }
+ }
+
+ @Override
+ public void setContext(Map<String, Object> context) {
+ // Not applicable for Nyx trigger
+ }
+
+ @Override
+ public long getNextCheckTime() {
+ // Not applicable for Nyx trigger
+ return Long.MAX_VALUE;
+ }
+
+ public static NyxTriggerChecker createFromJson(HashMap<String, Object> obj)
+ throws Exception {
+ Map<String, Object> jsonObj = (HashMap<String, Object>) obj;
+ if (!jsonObj.get("type").equals(type)) {
+ throw new Exception(
+ "Cannot create checker of " + type + " from " + jsonObj.get("type"));
+ }
+ Long triggerId = Long.valueOf((String) jsonObj.get("triggerId"));
+ String id = (String) jsonObj.get("id");
+ String specification = (String) jsonObj.get("specification");
+
+ NyxTriggerChecker checker =
+ new NyxTriggerChecker(specification, id, triggerId);
+ return checker;
+ }
+
+}
diff --git a/azkaban-common/src/main/java/azkaban/trigger/TriggerManager.java b/azkaban-common/src/main/java/azkaban/trigger/TriggerManager.java
index 11292e0..ea1018d 100644
--- a/azkaban-common/src/main/java/azkaban/trigger/TriggerManager.java
+++ b/azkaban-common/src/main/java/azkaban/trigger/TriggerManager.java
@@ -67,6 +67,7 @@ public class TriggerManager extends EventHandler implements
private String timezone;
private String scannerStage = "";
private boolean isDayLightSaving;
+ private static Props azprops = null;
public TriggerManager(Props props, TriggerLoader triggerLoader,
ExecutorManager executorManager) throws TriggerManagerException {
@@ -85,7 +86,8 @@ public class TriggerManager extends EventHandler implements
checkerTypeLoader = new CheckerTypeLoader();
actionTypeLoader = new ActionTypeLoader();
-
+ setAzprops(props);
+
try {
checkerTypeLoader.init(props);
actionTypeLoader.init(props);
@@ -528,6 +530,14 @@ public class TriggerManager extends EventHandler implements
actionTypeLoader.registerActionType(name, action);
}
+ public static Props getAzprops() {
+ return azprops;
+ }
+
+ public static void setAzprops(Props azprops) {
+ TriggerManager.azprops = azprops;
+ }
+
private class ExecutorManagerEventListener implements EventListener {
public ExecutorManagerEventListener() {
}
diff --git a/azkaban-common/src/main/java/azkaban/utils/NyxUtils.java b/azkaban-common/src/main/java/azkaban/utils/NyxUtils.java
new file mode 100644
index 0000000..b460572
--- /dev/null
+++ b/azkaban-common/src/main/java/azkaban/utils/NyxUtils.java
@@ -0,0 +1,134 @@
+package azkaban.utils;
+
+import java.net.URI;
+import java.util.Map;
+
+import org.apache.log4j.Logger;
+import org.mortbay.util.ajax.JSON;
+
+import azkaban.executor.ExecutorApiClient;
+import azkaban.trigger.TriggerManager;
+import azkaban.trigger.TriggerManagerException;
+
+/**
+ * Helper class wrapping REST API client for Nyx Service
+ *
+ * @author gaggarwa
+ *
+ */
+public class NyxUtils {
+ private static Logger logger = Logger.getLogger(NyxUtils.class);
+ public static final String NYX_SERVER_PORT = "nyx.service.port";
+ public static final String NYX_SERVER_HOST = "nyx.service.host";
+
+ private static String nyxServiceHost = "loclahost";
+ private static final boolean isHttp = true;
+ private static int port = 8080;
+
+ static {
+ // populating nyx service from .properties configs
+ Props props = TriggerManager.getAzprops();
+ if (props != null) {
+ nyxServiceHost = props.getString(NYX_SERVER_HOST, nyxServiceHost);
+ port = props.getInt(NYX_SERVER_PORT, port);
+ }
+ }
+
+ /**
+ * Use trigger json specification to register a trigger with Nyx Service
+ *
+ * @throws TriggerManagerException
+ */
+ public static long registerNyxTrigger(String specificationJson)
+ throws TriggerManagerException {
+ try {
+ ExecutorApiClient client = ExecutorApiClient.getInstance();
+ URI uri = client.buildUri(nyxServiceHost, port, "/register", isHttp);
+ String rawResponse = client.httpPost(uri, null, specificationJson);
+ Map<String, Object> parsedResponse =
+ (Map<String, Object>) JSON.parse(rawResponse);
+
+ // TODO: to be revisited. Presence of an "id" field signify successfully
+ // registration of trigger
+ if (parsedResponse.containsKey("error")) {
+ throw new IllegalArgumentException(
+ (String) parsedResponse.get("error"));
+ } else if (parsedResponse.containsKey("id")) {
+ return Long.parseLong((String) parsedResponse.get("id"));
+ } else {
+ throw new Exception("Failed to parse Nyx response " + rawResponse);
+ }
+ } catch (Exception ex) {
+ logger.error(
+ "Failed to get Nyx service response for :" + specificationJson, ex);
+ throw new TriggerManagerException(ex);
+ }
+ }
+
+ /**
+ * Delete an already registered trigger from Nyx Service
+ *
+ * @throws TriggerManagerException
+ */
+ public static void unregisterNyxTrigger(Long triggerId)
+ throws TriggerManagerException {
+ if (triggerId == -1) {
+ throw new TriggerManagerException("Trigger is not registered");
+ }
+
+ try {
+ ExecutorApiClient client = ExecutorApiClient.getInstance();
+ URI uri = client.buildUri(nyxServiceHost, port,
+ "/unregister/" + triggerId, isHttp);
+ String response = client.httpGet(uri, null);
+ Map<String, Object> parsedResponse =
+ (Map<String, Object>) JSON.parse(response);
+
+ if (parsedResponse.containsKey("error")) {
+ throw new Exception((String) parsedResponse.get("error"));
+ }
+ } catch (Exception ex) {
+ logger.error(
+ "Failed to get Nyx service response for triggerId : " + triggerId,
+ ex);
+ throw new TriggerManagerException(ex);
+ }
+ }
+
+ /**
+ * Look up status of an already registered trigger
+ *
+ * @param triggerId
+ * @return status fetched from Nyx
+ * @throws TriggerManagerException
+ */
+ public static boolean isNyxTriggerReady(Long triggerId)
+ throws TriggerManagerException {
+ if (triggerId == -1) {
+ throw new TriggerManagerException("Trigger is not registered");
+ }
+
+ try {
+ ExecutorApiClient client = ExecutorApiClient.getInstance();
+ URI uri =
+ client.buildUri(nyxServiceHost, port, "/status/" + triggerId, isHttp);
+ String response = client.httpGet(uri, null);
+ Map<String, Object> parsedResponse =
+ (Map<String, Object>) JSON.parse(response);
+
+ if (parsedResponse.containsKey("error")) {
+ throw new IllegalArgumentException(
+ (String) parsedResponse.get("error"));
+ } else if (parsedResponse.containsKey("ready")) {
+ return (boolean) parsedResponse.get("ready");
+ } else {
+ throw new Exception("Status missing from Nyx response :" + response);
+ }
+ } catch (Exception ex) {
+ logger.error(
+ "Failed to get Nyx service response for triggerId : " + triggerId,
+ ex);
+ throw new TriggerManagerException(ex);
+ }
+ }
+}
diff --git a/azkaban-webserver/src/main/java/azkaban/webapp/servlet/ScheduleServlet.java b/azkaban-webserver/src/main/java/azkaban/webapp/servlet/ScheduleServlet.java
index a36505b..4644468 100644
--- a/azkaban-webserver/src/main/java/azkaban/webapp/servlet/ScheduleServlet.java
+++ b/azkaban-webserver/src/main/java/azkaban/webapp/servlet/ScheduleServlet.java
@@ -661,6 +661,7 @@ public class ScheduleServlet extends LoginAbstractAzkabanServlet {
try {
flowOptions = HttpRequestUtils.parseFlowOptions(req);
HttpRequestUtils.filterAdminOnlyFlowParams(userManager, flowOptions, user);
+ HttpRequestUtils.setTriggerSpecification(flowOptions, project);
} catch (Exception e) {
ret.put("error", e.getMessage());
}