azkaban-developers

Details

diff --git a/azkaban-common/src/main/java/azkaban/executor/ExecutionOptions.java b/azkaban-common/src/main/java/azkaban/executor/ExecutionOptions.java
index d8b10f1..5049743 100644
--- a/azkaban-common/src/main/java/azkaban/executor/ExecutionOptions.java
+++ b/azkaban-common/src/main/java/azkaban/executor/ExecutionOptions.java
@@ -37,6 +37,8 @@ public class ExecutionOptions {
   /* override dispatcher selection and use executor id specified */
   public static final String USE_EXECUTOR = "useExecutor";
   public static final int DEFAULT_FLOW_PRIORITY = 5;
+  public static final String TRIGGER_SPEC = "triggerSpec";
+  public static final String TRIGGER_FILE = "triggerFile";
 
   private static final String FLOW_PARAMETERS = "flowParameters";
   private static final String NOTIFY_ON_FIRST_FAILURE = "notifyOnFirstFailure";
diff --git a/azkaban-common/src/main/java/azkaban/project/DirectoryFlowLoader.java b/azkaban-common/src/main/java/azkaban/project/DirectoryFlowLoader.java
index 2295435..79a6e87 100644
--- a/azkaban-common/src/main/java/azkaban/project/DirectoryFlowLoader.java
+++ b/azkaban-common/src/main/java/azkaban/project/DirectoryFlowLoader.java
@@ -18,6 +18,7 @@ package azkaban.project;
 
 import java.io.File;
 import java.io.FileFilter;
+import java.io.FileInputStream;
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Arrays;
@@ -28,6 +29,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.Set;
 
+import org.apache.commons.io.IOUtils;
 import org.apache.log4j.Logger;
 
 import azkaban.flow.CommonJobProperties;
@@ -47,6 +49,7 @@ import azkaban.utils.Utils;
 public class DirectoryFlowLoader implements ProjectValidator {
   private static final DirFilter DIR_FILTER = new DirFilter();
   private static final String PROPERTY_SUFFIX = ".properties";
+  private static final String TRIGGER_SUFFIX = ".trigger";
   private static final String JOB_SUFFIX = ".job";
   public static final String JOB_MAX_XMS = "job.max.Xms";
   public static final String MAX_XMS_DEFAULT = "1G";
@@ -60,6 +63,7 @@ public class DirectoryFlowLoader implements ProjectValidator {
   private HashSet<String> rootNodes;
   private HashMap<String, Flow> flowMap;
   private HashMap<String, Node> nodeMap;
+  private HashMap<String, String> triggerMap;
   private HashMap<String, Map<String, Edge>> nodeDependencies;
   private HashMap<String, Props> jobPropsMap;
 
@@ -108,6 +112,16 @@ public class DirectoryFlowLoader implements ProjectValidator {
   public Map<String, Props> getJobProps() {
     return jobPropsMap;
   }
+  
+  /**
+   * Returns triggers
+   *
+   * @return Map of trigger file name to trigger json
+   */
+  public Map<String, String> getTriggerMap() {
+    return triggerMap;
+  }
+
 
   /**
    * Returns list of properties.
@@ -130,6 +144,7 @@ public class DirectoryFlowLoader implements ProjectValidator {
     jobPropsMap = new HashMap<String, Props>();
     nodeMap = new HashMap<String, Node>();
     flowMap = new HashMap<String, Flow>();
+    triggerMap = new HashMap<String, String>();
     errors = new HashSet<String>();
     duplicateJobs = new HashSet<String>();
     nodeDependencies = new HashMap<String, Map<String, Edge>>();
@@ -173,6 +188,28 @@ public class DirectoryFlowLoader implements ProjectValidator {
       propsList.add(parent);
     }
 
+    //Loading all trigger files
+    File[] triggerFiles = dir.listFiles(new SuffixFilter(TRIGGER_SUFFIX));
+    for (File file : triggerFiles) {
+      FileInputStream fIStream = null;
+      try {
+        if(!triggerMap.containsKey(file.getName())) {
+          fIStream = new FileInputStream(file);
+          String triggerJSON = IOUtils.toString(fIStream);
+          //TODO: validate trigger JSON
+          triggerMap.put(file.getName(), triggerJSON);
+        }
+      } catch (IOException e) {
+        errors.add("Error loading trigger " + file.getName() + ":"
+            + e.getMessage());
+      } finally {
+        if(fIStream != null) {
+          IOUtils.closeQuietly(fIStream);
+        }
+      }
+      logger.info("Adding " + file.getName());
+    }
+    
     // Load all Job files. If there's a duplicate name, then we don't load
     File[] jobFiles = dir.listFiles(new SuffixFilter(JOB_SUFFIX));
     for (File file : jobFiles) {
diff --git a/azkaban-common/src/main/java/azkaban/project/ProjectManager.java b/azkaban-common/src/main/java/azkaban/project/ProjectManager.java
index 52024f1..fb8bbd0 100644
--- a/azkaban-common/src/main/java/azkaban/project/ProjectManager.java
+++ b/azkaban-common/src/main/java/azkaban/project/ProjectManager.java
@@ -30,6 +30,9 @@ import java.util.zip.ZipFile;
 
 import org.apache.commons.io.FileUtils;
 import org.apache.log4j.Logger;
+import org.codehaus.jackson.JsonGenerationException;
+import org.codehaus.jackson.map.JsonMappingException;
+import org.codehaus.jackson.map.ObjectMapper;
 
 import azkaban.flow.Flow;
 import azkaban.project.DirectoryFlowLoader;
@@ -49,6 +52,7 @@ import azkaban.utils.Utils;
 public class ProjectManager {
   private static final Logger logger = Logger.getLogger(ProjectManager.class);
 
+  public static final String TRIGGER_DATA = "triggers";
   private ConcurrentHashMap<Integer, Project> projectsById =
       new ConcurrentHashMap<Integer, Project>();
   private ConcurrentHashMap<String, Project> projectsByName =
@@ -527,6 +531,7 @@ public class ProjectManager {
         (DirectoryFlowLoader) validatorManager.getDefaultValidator();
     Map<String, Props> jobProps = loader.getJobProps();
     List<Props> propProps = loader.getProps();
+    Map<String, String> triggerMap = loader.getTriggerMap();
 
     synchronized (project) {
       int newVersion = projectLoader.getLatestProjectVersion(project) + 1;
@@ -550,6 +555,10 @@ public class ProjectManager {
           jobProps.values()));
       logger.info("Uploading Props properties");
       projectLoader.uploadProjectProperties(project, propProps);
+      
+      logger.info("Uploading trigger files");
+      project.getMetadata().put(TRIGGER_DATA, triggerMap);
+      projectLoader.updateProjectSettings(project);
     }
 
     logger.info("Uploaded project files. Cleaning up temp files.");
diff --git a/azkaban-common/src/main/java/azkaban/scheduler/TriggerBasedScheduleLoader.java b/azkaban-common/src/main/java/azkaban/scheduler/TriggerBasedScheduleLoader.java
index 7a3c12a..5010d27 100644
--- a/azkaban-common/src/main/java/azkaban/scheduler/TriggerBasedScheduleLoader.java
+++ b/azkaban-common/src/main/java/azkaban/scheduler/TriggerBasedScheduleLoader.java
@@ -23,6 +23,7 @@ import java.util.Map;
 
 import org.apache.log4j.Logger;
 
+import azkaban.executor.ExecutionOptions;
 import azkaban.trigger.Condition;
 import azkaban.trigger.ConditionChecker;
 import azkaban.trigger.Trigger;
@@ -32,6 +33,7 @@ import azkaban.trigger.TriggerManagerAdapter;
 import azkaban.trigger.TriggerManagerException;
 import azkaban.trigger.builtin.BasicTimeChecker;
 import azkaban.trigger.builtin.ExecuteFlowAction;
+import azkaban.trigger.builtin.NyxTriggerChecker;
 
 public class TriggerBasedScheduleLoader implements ScheduleLoader {
 
@@ -86,6 +88,17 @@ public class TriggerBasedScheduleLoader implements ScheduleLoader {
             s.getPeriod());
     checkers.put(checker.getId(), checker);
     String expr = checker.getId() + ".eval()";
+    
+    Map<String, String> flowParams =
+        s.getExecutionOptions().getFlowParameters();
+    if (flowParams != null
+        && flowParams.containsKey(ExecutionOptions.TRIGGER_SPEC)) {
+      ConditionChecker nyxChecker = new NyxTriggerChecker(
+          flowParams.get(ExecutionOptions.TRIGGER_SPEC), "NyxTriggerChecker_1");
+      checkers.put(nyxChecker.getId(), nyxChecker);
+      expr = " && " + nyxChecker.getId() + ".eval() ";
+    }
+
     Condition cond = new Condition(checkers, expr);
     return cond;
   }
diff --git a/azkaban-common/src/main/java/azkaban/server/HttpRequestUtils.java b/azkaban-common/src/main/java/azkaban/server/HttpRequestUtils.java
index fc159ec..48b848d 100644
--- a/azkaban-common/src/main/java/azkaban/server/HttpRequestUtils.java
+++ b/azkaban-common/src/main/java/azkaban/server/HttpRequestUtils.java
@@ -31,6 +31,8 @@ import azkaban.executor.ExecutionOptions;
 import azkaban.executor.ExecutionOptions.FailureAction;
 import azkaban.executor.ExecutorManagerException;
 import azkaban.executor.mail.DefaultMailCreator;
+import azkaban.project.Project;
+import azkaban.project.ProjectManager;
 import azkaban.user.Permission;
 import azkaban.user.Permission.Type;
 import azkaban.user.Role;
@@ -315,4 +317,36 @@ public class HttpRequestUtils {
     return groupParam;
   }
 
+  /**
+   * Set correct trigger spec using runtime-config or .json file
+   * 
+   * @param flowOptions
+   * @param project
+   * @throws Exception
+   */
+  public static void setTriggerSpecification(ExecutionOptions flowOptions,
+      Project project) throws Exception {
+    Map<String, String> flowParams = flowOptions.getFlowParameters();
+    Map<String, Object> metaData = project.getMetadata();
+    // User specific TRIGGER_SPEC takes higher priority
+    if (flowParams != null
+        && !flowParams.containsKey(ExecutionOptions.TRIGGER_SPEC)
+        && metaData != null
+        && metaData.containsKey(ProjectManager.TRIGGER_DATA)) {
+      String triggerName = flowParams.get(ExecutionOptions.TRIGGER_FILE);
+      @SuppressWarnings("unchecked")
+      Map<String, String> triggers =
+          (Map<String, String>) metaData.get(ProjectManager.TRIGGER_DATA);
+      if (triggers.containsKey(triggerName)) {
+        flowParams.put(ExecutionOptions.TRIGGER_SPEC,
+            triggers.get(triggerName));
+      } else if (triggers.containsKey(triggerName + ".json")) {
+        flowParams.put(ExecutionOptions.TRIGGER_SPEC,
+            triggers.get(triggerName + ".json"));
+      } else {
+        throw new Exception("Unknown trigger file " + triggerName);
+      }
+    }
+  }
+
 }
diff --git a/azkaban-common/src/main/java/azkaban/trigger/builtin/NyxTriggerChecker.java b/azkaban-common/src/main/java/azkaban/trigger/builtin/NyxTriggerChecker.java
new file mode 100644
index 0000000..d5067f9
--- /dev/null
+++ b/azkaban-common/src/main/java/azkaban/trigger/builtin/NyxTriggerChecker.java
@@ -0,0 +1,133 @@
+package azkaban.trigger.builtin;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.log4j.Logger;
+
+import azkaban.trigger.ConditionChecker;
+import azkaban.trigger.TriggerManagerException;
+import azkaban.utils.NyxUtils;
+
+/***
+ * Trigger checker leveraging upcoming Nyx service
+ * 
+ * @author gaggarwa
+ *
+ */
+public class NyxTriggerChecker implements ConditionChecker {
+  private static Logger logger = Logger.getLogger(NyxTriggerChecker.class);
+
+  public static final String type = "NyxTriggerChecker";
+
+  private String specification;
+  private String id;
+  private long triggerId = -1L;
+
+  public NyxTriggerChecker(String specification, String id)
+      throws TriggerManagerException {
+    this(specification, id, -1);
+    // TODO: register a trigger
+  }
+
+  public NyxTriggerChecker(String specification, String id, long triggerId)
+      throws TriggerManagerException {
+    this.specification = specification;
+    this.id = id;
+    this.triggerId = (triggerId == -1
+        ? NyxUtils.registerNyxTrigger(specification) : triggerId);
+  }
+
+  @Override
+  public Object eval() {
+    try {
+      if (triggerId == -1) {
+        // if trigger is not registered then first register
+        triggerId = NyxUtils.registerNyxTrigger(specification);
+      }
+      return NyxUtils.isNyxTriggerReady(triggerId);
+    } catch (TriggerManagerException ex) {
+      logger.error("Error while evaluating checker " + id, ex);
+      return false;
+    }
+  }
+
+  @Override
+  public Object getNum() {
+    return null;
+  }
+
+  @Override
+  public void reset() {
+    try {
+      NyxUtils.unregisterNyxTrigger(triggerId);
+      triggerId = NyxUtils.registerNyxTrigger(specification);
+    } catch (TriggerManagerException ex) {
+      logger.error("Error while resetting checker " + id, ex);
+    }
+  }
+
+  @Override
+  public String getId() {
+    return id;
+  }
+
+  @Override
+  public String getType() {
+    return type;
+  }
+
+  @SuppressWarnings("unchecked")
+  @Override
+  public ConditionChecker fromJson(Object obj) throws Exception {
+    return createFromJson((HashMap<String, Object>) obj);
+  }
+
+  @Override
+  public Object toJson() {
+    Map<String, Object> jsonObj = new HashMap<String, Object>();
+    jsonObj.put("type", type);
+    jsonObj.put("specification", specification);
+    jsonObj.put("triggerId", String.valueOf(triggerId));
+    jsonObj.put("id", id);
+
+    return jsonObj;
+  }
+
+  @Override
+  public void stopChecker() {
+    try {
+      NyxUtils.unregisterNyxTrigger(triggerId);
+    } catch (TriggerManagerException ex) {
+      logger.error("Error while stopping checker " + id, ex);
+    }
+  }
+
+  @Override
+  public void setContext(Map<String, Object> context) {
+    // Not applicable for Nyx trigger
+  }
+
+  @Override
+  public long getNextCheckTime() {
+    // Not applicable for Nyx trigger
+    return Long.MAX_VALUE;
+  }
+
+  public static NyxTriggerChecker createFromJson(HashMap<String, Object> obj)
+      throws Exception {
+    Map<String, Object> jsonObj = (HashMap<String, Object>) obj;
+    if (!jsonObj.get("type").equals(type)) {
+      throw new Exception(
+          "Cannot create checker of " + type + " from " + jsonObj.get("type"));
+    }
+    Long triggerId = Long.valueOf((String) jsonObj.get("triggerId"));
+    String id = (String) jsonObj.get("id");
+    String specification = (String) jsonObj.get("specification");
+
+    NyxTriggerChecker checker =
+        new NyxTriggerChecker(specification, id, triggerId);
+    return checker;
+  }
+
+}
diff --git a/azkaban-common/src/main/java/azkaban/trigger/TriggerManager.java b/azkaban-common/src/main/java/azkaban/trigger/TriggerManager.java
index 11292e0..ea1018d 100644
--- a/azkaban-common/src/main/java/azkaban/trigger/TriggerManager.java
+++ b/azkaban-common/src/main/java/azkaban/trigger/TriggerManager.java
@@ -67,6 +67,7 @@ public class TriggerManager extends EventHandler implements
   private String timezone;
   private String scannerStage = "";
   private boolean isDayLightSaving;
+  private static Props azprops = null;
   
   public TriggerManager(Props props, TriggerLoader triggerLoader,
       ExecutorManager executorManager) throws TriggerManagerException {
@@ -85,7 +86,8 @@ public class TriggerManager extends EventHandler implements
     
     checkerTypeLoader = new CheckerTypeLoader();
     actionTypeLoader = new ActionTypeLoader();
-
+    setAzprops(props);
+    
     try {
       checkerTypeLoader.init(props);
       actionTypeLoader.init(props);
@@ -528,6 +530,14 @@ public class TriggerManager extends EventHandler implements
     actionTypeLoader.registerActionType(name, action);
   }
 
+  public static Props getAzprops() {
+    return azprops;
+  }
+
+  public static void setAzprops(Props azprops) {
+    TriggerManager.azprops = azprops;
+  }
+
   private class ExecutorManagerEventListener implements EventListener {
     public ExecutorManagerEventListener() {
     }
diff --git a/azkaban-common/src/main/java/azkaban/utils/NyxUtils.java b/azkaban-common/src/main/java/azkaban/utils/NyxUtils.java
new file mode 100644
index 0000000..b460572
--- /dev/null
+++ b/azkaban-common/src/main/java/azkaban/utils/NyxUtils.java
@@ -0,0 +1,134 @@
+package azkaban.utils;
+
+import java.net.URI;
+import java.util.Map;
+
+import org.apache.log4j.Logger;
+import org.mortbay.util.ajax.JSON;
+
+import azkaban.executor.ExecutorApiClient;
+import azkaban.trigger.TriggerManager;
+import azkaban.trigger.TriggerManagerException;
+
+/**
+ * Helper class wrapping REST API client for Nyx Service
+ * 
+ * @author gaggarwa
+ *
+ */
+public class NyxUtils {
+  private static Logger logger = Logger.getLogger(NyxUtils.class);
+  public static final String NYX_SERVER_PORT = "nyx.service.port";
+  public static final String NYX_SERVER_HOST = "nyx.service.host";
+
+  private static String nyxServiceHost = "loclahost";
+  private static final boolean isHttp = true;
+  private static int port = 8080;
+
+  static {
+    // populating nyx service from .properties configs
+    Props props = TriggerManager.getAzprops();
+    if (props != null) {
+      nyxServiceHost = props.getString(NYX_SERVER_HOST, nyxServiceHost);
+      port = props.getInt(NYX_SERVER_PORT, port);
+    }
+  }
+
+  /**
+   * Use trigger json specification to register a trigger with Nyx Service
+   * 
+   * @throws TriggerManagerException
+   */
+  public static long registerNyxTrigger(String specificationJson)
+      throws TriggerManagerException {
+    try {
+      ExecutorApiClient client = ExecutorApiClient.getInstance();
+      URI uri = client.buildUri(nyxServiceHost, port, "/register", isHttp);
+      String rawResponse = client.httpPost(uri, null, specificationJson);
+      Map<String, Object> parsedResponse =
+          (Map<String, Object>) JSON.parse(rawResponse);
+
+      // TODO: to be revisited. Presence of an "id" field signify successfully
+      // registration of trigger
+      if (parsedResponse.containsKey("error")) {
+        throw new IllegalArgumentException(
+            (String) parsedResponse.get("error"));
+      } else if (parsedResponse.containsKey("id")) {
+        return Long.parseLong((String) parsedResponse.get("id"));
+      } else {
+        throw new Exception("Failed to parse Nyx response " + rawResponse);
+      }
+    } catch (Exception ex) {
+      logger.error(
+          "Failed to get Nyx service response for :" + specificationJson, ex);
+      throw new TriggerManagerException(ex);
+    }
+  }
+
+  /**
+   * Delete an already registered trigger from Nyx Service
+   * 
+   * @throws TriggerManagerException
+   */
+  public static void unregisterNyxTrigger(Long triggerId)
+      throws TriggerManagerException {
+    if (triggerId == -1) {
+      throw new TriggerManagerException("Trigger is not registered");
+    }
+
+    try {
+      ExecutorApiClient client = ExecutorApiClient.getInstance();
+      URI uri = client.buildUri(nyxServiceHost, port,
+          "/unregister/" + triggerId, isHttp);
+      String response = client.httpGet(uri, null);
+      Map<String, Object> parsedResponse =
+          (Map<String, Object>) JSON.parse(response);
+
+      if (parsedResponse.containsKey("error")) {
+        throw new Exception((String) parsedResponse.get("error"));
+      }
+    } catch (Exception ex) {
+      logger.error(
+          "Failed to get Nyx service response for triggerId : " + triggerId,
+          ex);
+      throw new TriggerManagerException(ex);
+    }
+  }
+
+  /**
+   * Look up status of an already registered trigger
+   * 
+   * @param triggerId
+   * @return status fetched from Nyx
+   * @throws TriggerManagerException
+   */
+  public static boolean isNyxTriggerReady(Long triggerId)
+      throws TriggerManagerException {
+    if (triggerId == -1) {
+      throw new TriggerManagerException("Trigger is not registered");
+    }
+
+    try {
+      ExecutorApiClient client = ExecutorApiClient.getInstance();
+      URI uri =
+          client.buildUri(nyxServiceHost, port, "/status/" + triggerId, isHttp);
+      String response = client.httpGet(uri, null);
+      Map<String, Object> parsedResponse =
+          (Map<String, Object>) JSON.parse(response);
+
+      if (parsedResponse.containsKey("error")) {
+        throw new IllegalArgumentException(
+            (String) parsedResponse.get("error"));
+      } else if (parsedResponse.containsKey("ready")) {
+        return (boolean) parsedResponse.get("ready");
+      } else {
+        throw new Exception("Status missing from Nyx response :" + response);
+      }
+    } catch (Exception ex) {
+      logger.error(
+          "Failed to get Nyx service response for triggerId : " + triggerId,
+          ex);
+      throw new TriggerManagerException(ex);
+    }
+  }
+}
diff --git a/azkaban-webserver/src/main/java/azkaban/webapp/servlet/ScheduleServlet.java b/azkaban-webserver/src/main/java/azkaban/webapp/servlet/ScheduleServlet.java
index a36505b..4644468 100644
--- a/azkaban-webserver/src/main/java/azkaban/webapp/servlet/ScheduleServlet.java
+++ b/azkaban-webserver/src/main/java/azkaban/webapp/servlet/ScheduleServlet.java
@@ -661,6 +661,7 @@ public class ScheduleServlet extends LoginAbstractAzkabanServlet {
     try {
       flowOptions = HttpRequestUtils.parseFlowOptions(req);
       HttpRequestUtils.filterAdminOnlyFlowParams(userManager, flowOptions, user);
+      HttpRequestUtils.setTriggerSpecification(flowOptions, project);
     } catch (Exception e) {
       ret.put("error", e.getMessage());
     }