azkaban-aplcache
Changes
azkaban-common/src/main/java/azkaban/trigger/Trigger.java 450(+220 -230)
Details
azkaban-common/src/main/java/azkaban/trigger/Trigger.java 450(+220 -230)
diff --git a/azkaban-common/src/main/java/azkaban/trigger/Trigger.java b/azkaban-common/src/main/java/azkaban/trigger/Trigger.java
index 5768beb..1bf12ed 100644
--- a/azkaban-common/src/main/java/azkaban/trigger/Trigger.java
+++ b/azkaban-common/src/main/java/azkaban/trigger/Trigger.java
@@ -16,41 +16,33 @@
package azkaban.trigger;
+import static java.util.Objects.requireNonNull;
+
+import azkaban.utils.JSONUtils;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
-
import org.apache.log4j.Logger;
-
import org.joda.time.DateTime;
-import azkaban.utils.JSONUtils;
-
-import static java.util.Objects.*;
-
public class Trigger {
- private static Logger logger = Logger.getLogger(Trigger.class);
-
- private int triggerId = -1;
- private long lastModifyTime;
+ private static final Logger logger = Logger.getLogger(Trigger.class);
+ private static ActionTypeLoader actionTypeLoader;
private final long submitTime;
private final String submitUser;
private final String source;
- private TriggerStatus status = TriggerStatus.READY;
-
private final Condition triggerCondition;
private final Condition expireCondition;
private final List<TriggerAction> actions;
private final List<TriggerAction> expireActions;
-
- private Map<String, Object> info = new HashMap<String, Object>();
- private Map<String, Object> context = new HashMap<String, Object>();
-
- private static ActionTypeLoader actionTypeLoader;
-
+ private int triggerId = -1;
+ private long lastModifyTime;
+ private TriggerStatus status = TriggerStatus.READY;
+ private Map<String, Object> info = new HashMap<>();
+ private Map<String, Object> context = new HashMap<>();
private boolean resetOnTrigger = true;
private boolean resetOnExpire = true;
@@ -61,6 +53,132 @@ public class Trigger {
throw new TriggerManagerException("Triggers should always be specified");
}
+ private Trigger(int triggerId, long lastModifyTime, long submitTime,
+ String submitUser, String source, Condition triggerCondition,
+ Condition expireCondition, List<TriggerAction> actions,
+ List<TriggerAction> expireActions, Map<String, Object> info,
+ Map<String, Object> context) {
+ requireNonNull(submitUser);
+ requireNonNull(source);
+ requireNonNull(triggerCondition);
+ requireNonNull(expireActions);
+ requireNonNull(info);
+ requireNonNull(context);
+
+ this.lastModifyTime = lastModifyTime;
+ this.submitTime = submitTime;
+ this.submitUser = submitUser;
+ this.source = source;
+ this.triggerCondition = triggerCondition;
+ this.expireCondition = expireCondition;
+ this.actions = actions;
+ this.triggerId = triggerId;
+ this.expireActions = expireActions;
+ this.info = info;
+ this.context = context;
+ }
+
+ public static ActionTypeLoader getActionTypeLoader() {
+ return actionTypeLoader;
+ }
+
+ public static synchronized void setActionTypeLoader(ActionTypeLoader loader) {
+ Trigger.actionTypeLoader = loader;
+ }
+
+ @SuppressWarnings("unchecked")
+ public static Trigger fromJson(Object obj) throws Exception {
+
+ if (actionTypeLoader == null) {
+ throw new Exception("Trigger Action Type loader not initialized.");
+ }
+
+ Map<String, Object> jsonObj = (HashMap<String, Object>) obj;
+
+ Trigger trigger = null;
+ try {
+ logger.info("Decoding for " + JSONUtils.toJSON(obj));
+ Condition triggerCond =
+ Condition.fromJson(jsonObj.get("triggerCondition"));
+ Condition expireCond = Condition.fromJson(jsonObj.get("expireCondition"));
+ List<TriggerAction> actions = new ArrayList<>();
+ List<Object> actionsJson = (List<Object>) jsonObj.get("actions");
+ for (Object actObj : actionsJson) {
+ Map<String, Object> oneActionJson = (HashMap<String, Object>) actObj;
+ String type = (String) oneActionJson.get("type");
+ TriggerAction act =
+ actionTypeLoader.createActionFromJson(type,
+ oneActionJson.get("actionJson"));
+ actions.add(act);
+ }
+ List<TriggerAction> expireActions = new ArrayList<>();
+ List<Object> expireActionsJson =
+ (List<Object>) jsonObj.get("expireActions");
+ for (Object expireActObj : expireActionsJson) {
+ Map<String, Object> oneExpireActionJson =
+ (HashMap<String, Object>) expireActObj;
+ String type = (String) oneExpireActionJson.get("type");
+ TriggerAction expireAct =
+ actionTypeLoader.createActionFromJson(type,
+ oneExpireActionJson.get("actionJson"));
+ expireActions.add(expireAct);
+ }
+ boolean resetOnTrigger =
+ Boolean.valueOf((String) jsonObj.get("resetOnTrigger"));
+ boolean resetOnExpire =
+ Boolean.valueOf((String) jsonObj.get("resetOnExpire"));
+ String submitUser = (String) jsonObj.get("submitUser");
+ String source = (String) jsonObj.get("source");
+ long submitTime = Long.valueOf((String) jsonObj.get("submitTime"));
+ long lastModifyTime =
+ Long.valueOf((String) jsonObj.get("lastModifyTime"));
+ int triggerId = Integer.valueOf((String) jsonObj.get("triggerId"));
+ TriggerStatus status =
+ TriggerStatus.valueOf((String) jsonObj.get("status"));
+ Map<String, Object> info = (Map<String, Object>) jsonObj.get("info");
+ Map<String, Object> context =
+ (Map<String, Object>) jsonObj.get("context");
+ if (context == null) {
+ context = new HashMap<>();
+ }
+ for (ConditionChecker checker : triggerCond.getCheckers().values()) {
+ checker.setContext(context);
+ }
+ for (ConditionChecker checker : expireCond.getCheckers().values()) {
+ checker.setContext(context);
+ }
+ for (TriggerAction action : actions) {
+ action.setContext(context);
+ }
+ for (TriggerAction action : expireActions) {
+ action.setContext(context);
+ }
+
+ trigger = new Trigger.TriggerBuilder("azkaban",
+ source,
+ triggerCond,
+ expireCond,
+ actions)
+ .setId(triggerId)
+ .setLastModifyTime(lastModifyTime)
+ .setSubmitTime(submitTime)
+ .setExpireActions(expireActions)
+ .setInfo(info)
+ .setContext(context)
+ .build();
+
+ trigger.setResetOnExpire(resetOnExpire);
+ trigger.setResetOnTrigger(resetOnTrigger);
+ trigger.setStatus(status);
+ } catch (Exception e) {
+ e.printStackTrace();
+ logger.error("Failed to decode the trigger.", e);
+ throw new Exception("Failed to decode the trigger.", e);
+ }
+
+ return trigger;
+ }
+
public void updateNextCheckTime() {
this.nextCheckTime =
Math.min(triggerCondition.getNextCheckTime(),
@@ -123,116 +241,6 @@ public class Trigger {
this.context = context;
}
-
- public static class TriggerBuilder {
- private int triggerId = -1;
- private long lastModifyTime;
- private long submitTime;
- private final String submitUser;
- private final String source;
- private final TriggerStatus status = TriggerStatus.READY;
-
- private final Condition triggerCondition;
- private final List<TriggerAction> actions;
- private final Condition expireCondition;
- private List<TriggerAction> expireActions = new ArrayList<>();
-
- private Map<String, Object> info = new HashMap<String, Object>();
- private Map<String, Object> context = new HashMap<String, Object>();
-
- public TriggerBuilder(String submitUser,
- String source,
- Condition triggerCondition,
- Condition expireCondition,
- List<TriggerAction> actions) {
- this.submitUser = submitUser;
- this.source = source;
- this.triggerCondition = triggerCondition;
- this.actions = actions;
- this.expireCondition = expireCondition;
- long now = DateTime.now().getMillis();
- this.submitTime = now;
- this.lastModifyTime = now;
- }
-
- public TriggerBuilder setId(int id) {
- this.triggerId = id;
- return this;
- }
-
- public TriggerBuilder setSubmitTime(long time) {
- this.submitTime = time;
- return this;
- }
-
- public TriggerBuilder setLastModifyTime(long time) {
- this.lastModifyTime = time;
- return this;
- }
-
- public TriggerBuilder setExpireActions(List<TriggerAction> actions) {
- this.expireActions = actions;
- return this;
- }
-
- public TriggerBuilder setInfo(Map<String, Object> info) {
- this.info = info;
- return this;
- }
-
- public TriggerBuilder setContext(Map<String, Object> context) {
- this.context = context;
- return this;
- }
-
- public Trigger build() {
- return new Trigger(triggerId,
- lastModifyTime,
- submitTime,
- submitUser,
- source,
- triggerCondition,
- expireCondition,
- actions,
- expireActions,
- info,
- context);
- }
- }
-
- private Trigger(int triggerId, long lastModifyTime, long submitTime,
- String submitUser, String source, Condition triggerCondition,
- Condition expireCondition, List<TriggerAction> actions,
- List<TriggerAction> expireActions, Map<String, Object> info,
- Map<String, Object> context) {
- requireNonNull(submitUser);
- requireNonNull(source);
- requireNonNull(triggerCondition);
- requireNonNull(expireActions);
- requireNonNull(info);
- requireNonNull(context);
-
- this.lastModifyTime = lastModifyTime;
- this.submitTime = submitTime;
- this.submitUser = submitUser;
- this.source = source;
- this.triggerCondition = triggerCondition;
- this.expireCondition = expireCondition;
- this.actions = actions;
- this.triggerId = triggerId;
- this.expireActions = expireActions;
- this.info = info;
- this.context = context;
- }
-
- public static synchronized void setActionTypeLoader(ActionTypeLoader loader) {
- Trigger.actionTypeLoader = loader;
- }
-
- public static ActionTypeLoader getActionTypeLoader() {
- return actionTypeLoader;
- }
-
public boolean isResetOnTrigger() {
return resetOnTrigger;
}
@@ -257,14 +265,14 @@ public class Trigger {
this.lastModifyTime = lastModifyTime;
}
- public void setTriggerId(int id) {
- this.triggerId = id;
- }
-
public int getTriggerId() {
return triggerId;
}
+ public void setTriggerId(int id) {
+ this.triggerId = id;
+ }
+
public boolean triggerConditionMet() {
return triggerCondition.isMet();
}
@@ -288,20 +296,20 @@ public class Trigger {
}
public Map<String, Object> toJson() {
- Map<String, Object> jsonObj = new HashMap<String, Object>();
+ Map<String, Object> jsonObj = new HashMap<>();
jsonObj.put("triggerCondition", triggerCondition.toJson());
jsonObj.put("expireCondition", expireCondition.toJson());
- List<Object> actionsJson = new ArrayList<Object>();
+ List<Object> actionsJson = new ArrayList<>();
for (TriggerAction action : actions) {
- Map<String, Object> oneActionJson = new HashMap<String, Object>();
+ Map<String, Object> oneActionJson = new HashMap<>();
oneActionJson.put("type", action.getType());
oneActionJson.put("actionJson", action.toJson());
actionsJson.add(oneActionJson);
}
jsonObj.put("actions", actionsJson);
- List<Object> expireActionsJson = new ArrayList<Object>();
+ List<Object> expireActionsJson = new ArrayList<>();
for (TriggerAction expireAction : expireActions) {
- Map<String, Object> oneExpireActionJson = new HashMap<String, Object>();
+ Map<String, Object> oneExpireActionJson = new HashMap<>();
oneExpireActionJson.put("type", expireAction.getType());
oneExpireActionJson.put("actionJson", expireAction.toJson());
expireActionsJson.add(oneExpireActionJson);
@@ -325,99 +333,6 @@ public class Trigger {
return source;
}
- @SuppressWarnings("unchecked")
- public static Trigger fromJson(Object obj) throws Exception {
-
- if (actionTypeLoader == null) {
- throw new Exception("Trigger Action Type loader not initialized.");
- }
-
- Map<String, Object> jsonObj = (HashMap<String, Object>) obj;
-
- Trigger trigger = null;
- try {
- logger.info("Decoding for " + JSONUtils.toJSON(obj));
- Condition triggerCond =
- Condition.fromJson(jsonObj.get("triggerCondition"));
- Condition expireCond = Condition.fromJson(jsonObj.get("expireCondition"));
- List<TriggerAction> actions = new ArrayList<TriggerAction>();
- List<Object> actionsJson = (List<Object>) jsonObj.get("actions");
- for (Object actObj : actionsJson) {
- Map<String, Object> oneActionJson = (HashMap<String, Object>) actObj;
- String type = (String) oneActionJson.get("type");
- TriggerAction act =
- actionTypeLoader.createActionFromJson(type,
- oneActionJson.get("actionJson"));
- actions.add(act);
- }
- List<TriggerAction> expireActions = new ArrayList<TriggerAction>();
- List<Object> expireActionsJson =
- (List<Object>) jsonObj.get("expireActions");
- for (Object expireActObj : expireActionsJson) {
- Map<String, Object> oneExpireActionJson =
- (HashMap<String, Object>) expireActObj;
- String type = (String) oneExpireActionJson.get("type");
- TriggerAction expireAct =
- actionTypeLoader.createActionFromJson(type,
- oneExpireActionJson.get("actionJson"));
- expireActions.add(expireAct);
- }
- boolean resetOnTrigger =
- Boolean.valueOf((String) jsonObj.get("resetOnTrigger"));
- boolean resetOnExpire =
- Boolean.valueOf((String) jsonObj.get("resetOnExpire"));
- String submitUser = (String) jsonObj.get("submitUser");
- String source = (String) jsonObj.get("source");
- long submitTime = Long.valueOf((String) jsonObj.get("submitTime"));
- long lastModifyTime =
- Long.valueOf((String) jsonObj.get("lastModifyTime"));
- int triggerId = Integer.valueOf((String) jsonObj.get("triggerId"));
- TriggerStatus status =
- TriggerStatus.valueOf((String) jsonObj.get("status"));
- Map<String, Object> info = (Map<String, Object>) jsonObj.get("info");
- Map<String, Object> context =
- (Map<String, Object>) jsonObj.get("context");
- if (context == null) {
- context = new HashMap<String, Object>();
- }
- for (ConditionChecker checker : triggerCond.getCheckers().values()) {
- checker.setContext(context);
- }
- for (ConditionChecker checker : expireCond.getCheckers().values()) {
- checker.setContext(context);
- }
- for (TriggerAction action : actions) {
- action.setContext(context);
- }
- for (TriggerAction action : expireActions) {
- action.setContext(context);
- }
-
- trigger = new Trigger.TriggerBuilder("azkaban",
- source,
- triggerCond,
- expireCond,
- actions)
- .setId(triggerId)
- .setLastModifyTime(lastModifyTime)
- .setSubmitTime(submitTime)
- .setExpireActions(expireActions)
- .setInfo(info)
- .setContext(context)
- .build();
-
- trigger.setResetOnExpire(resetOnExpire);
- trigger.setResetOnTrigger(resetOnTrigger);
- trigger.setStatus(status);
- } catch (Exception e) {
- e.printStackTrace();
- logger.error("Failed to decode the trigger.", e);
- throw new Exception("Failed to decode the trigger.", e);
- }
-
- return trigger;
- }
-
public String getDescription() {
StringBuffer actionsString = new StringBuffer();
for (TriggerAction act : actions) {
@@ -443,4 +358,79 @@ public class Trigger {
return "Trigger Id: " + getTriggerId() + ", Description: " + getDescription();
}
+ public static class TriggerBuilder {
+ private final String submitUser;
+ private final String source;
+ private final TriggerStatus status = TriggerStatus.READY;
+ private final Condition triggerCondition;
+ private final List<TriggerAction> actions;
+ private final Condition expireCondition;
+ private int triggerId = -1;
+ private long lastModifyTime;
+ private long submitTime;
+ private List<TriggerAction> expireActions = new ArrayList<>();
+
+ private Map<String, Object> info = new HashMap<>();
+ private Map<String, Object> context = new HashMap<>();
+
+ public TriggerBuilder(String submitUser,
+ String source,
+ Condition triggerCondition,
+ Condition expireCondition,
+ List<TriggerAction> actions) {
+ this.submitUser = submitUser;
+ this.source = source;
+ this.triggerCondition = triggerCondition;
+ this.actions = actions;
+ this.expireCondition = expireCondition;
+ long now = DateTime.now().getMillis();
+ this.submitTime = now;
+ this.lastModifyTime = now;
+ }
+
+ public TriggerBuilder setId(int id) {
+ this.triggerId = id;
+ return this;
+ }
+
+ public TriggerBuilder setSubmitTime(long time) {
+ this.submitTime = time;
+ return this;
+ }
+
+ public TriggerBuilder setLastModifyTime(long time) {
+ this.lastModifyTime = time;
+ return this;
+ }
+
+ public TriggerBuilder setExpireActions(List<TriggerAction> actions) {
+ this.expireActions = actions;
+ return this;
+ }
+
+ public TriggerBuilder setInfo(Map<String, Object> info) {
+ this.info = info;
+ return this;
+ }
+
+ public TriggerBuilder setContext(Map<String, Object> context) {
+ this.context = context;
+ return this;
+ }
+
+ public Trigger build() {
+ return new Trigger(triggerId,
+ lastModifyTime,
+ submitTime,
+ submitUser,
+ source,
+ triggerCondition,
+ expireCondition,
+ actions,
+ expireActions,
+ info,
+ context);
+ }
+ }
+
}