azkaban-aplcache

pass dependency name and trigger instance id to dependency

3/12/2018 6:45:25 PM

Details

diff --git a/az-core/src/main/java/azkaban/Constants.java b/az-core/src/main/java/azkaban/Constants.java
index 847d7d3..820b041 100644
--- a/az-core/src/main/java/azkaban/Constants.java
+++ b/az-core/src/main/java/azkaban/Constants.java
@@ -49,11 +49,6 @@ public class Constants {
   // Flow 2.0 flow and job path delimiter
   public static final String PATH_DELIMITER = ":";
 
-  // Flow trigger props
-  public static final String SCHEDULE_TYPE = "type";
-  public static final String CRON_SCHEDULE_TYPE = "cron";
-  public static final String SCHEDULE_VALUE = "value";
-
   // Job properties override suffix
   public static final String JOB_OVERRIDE_SUFFIX = ".jor";
 
@@ -265,4 +260,17 @@ public class Constants {
     public static final String JOBCALLBACK_RESPONSE_WAIT_TIMEOUT = "jobcallback.response.wait.timeout";
     public static final String JOBCALLBACK_THREAD_POOL_SIZE = "jobcallback.thread.pool.size";
   }
+
+  public static class FlowTriggerProps {
+
+    // Flow trigger props
+    public static final String SCHEDULE_TYPE = "type";
+    public static final String CRON_SCHEDULE_TYPE = "cron";
+    public static final String SCHEDULE_VALUE = "value";
+    public static final String DEP_NAME = "name";
+
+    // Flow trigger dependency run time props
+    public static final String START_TIME = "startTime";
+    public static final String TRIGGER_INSTANCE_ID = "triggerInstanceId";
+  }
 }
diff --git a/azkaban-common/src/main/java/azkaban/project/NodeBeanLoader.java b/azkaban-common/src/main/java/azkaban/project/NodeBeanLoader.java
index faa29fb..c411987 100644
--- a/azkaban-common/src/main/java/azkaban/project/NodeBeanLoader.java
+++ b/azkaban-common/src/main/java/azkaban/project/NodeBeanLoader.java
@@ -20,6 +20,7 @@ package azkaban.project;
 import static com.google.common.base.Preconditions.checkArgument;
 
 import azkaban.Constants;
+import azkaban.Constants.FlowTriggerProps;
 import com.google.common.base.Preconditions;
 import com.google.common.io.Files;
 import java.io.File;
@@ -96,12 +97,15 @@ public class NodeBeanLoader {
     Preconditions.checkNotNull(scheduleMap, "flow trigger schedule must not be null");
 
     Preconditions.checkArgument(
-        scheduleMap.containsKey(Constants.SCHEDULE_TYPE) && scheduleMap.get(Constants.SCHEDULE_TYPE)
-            .equals(Constants.CRON_SCHEDULE_TYPE), "flow trigger schedule type must be cron");
+        scheduleMap.containsKey(FlowTriggerProps.SCHEDULE_TYPE) && scheduleMap.get
+            (FlowTriggerProps.SCHEDULE_TYPE)
+            .equals(FlowTriggerProps.CRON_SCHEDULE_TYPE),
+        "flow trigger schedule type must be cron");
 
-    Preconditions.checkArgument(scheduleMap.containsKey(Constants.SCHEDULE_VALUE) && CronExpression
-            .isValidExpression(scheduleMap.get(Constants.SCHEDULE_VALUE)),
-        "flow trigger schedule value must be a valid cron expression");
+    Preconditions
+        .checkArgument(scheduleMap.containsKey(FlowTriggerProps.SCHEDULE_VALUE) && CronExpression
+                .isValidExpression(scheduleMap.get(FlowTriggerProps.SCHEDULE_VALUE)),
+            "flow trigger schedule value must be a valid cron expression");
 
     Preconditions.checkArgument(scheduleMap.size() == 2, "flow trigger schedule must "
         + "contain type and value only");
@@ -177,7 +181,7 @@ public class NodeBeanLoader {
         flowTriggerBean.setMaxWaitMins(Constants.DEFAULT_FLOW_TRIGGER_MAX_WAIT_TIME.toMinutes());
       }
       return new FlowTrigger(
-          new CronSchedule(flowTriggerBean.getSchedule().get(Constants.SCHEDULE_VALUE)),
+          new CronSchedule(flowTriggerBean.getSchedule().get(FlowTriggerProps.SCHEDULE_VALUE)),
           flowTriggerBean.getTriggerDependencies().stream()
               .map(d -> new FlowTriggerDependency(d.getName(), d.getType(), d.getParams()))
               .collect(Collectors.toList()),
diff --git a/azkaban-common/src/test/java/azkaban/project/NodeBeanLoaderTest.java b/azkaban-common/src/test/java/azkaban/project/NodeBeanLoaderTest.java
index e660318..6484d11 100644
--- a/azkaban-common/src/test/java/azkaban/project/NodeBeanLoaderTest.java
+++ b/azkaban-common/src/test/java/azkaban/project/NodeBeanLoaderTest.java
@@ -21,6 +21,7 @@ import static org.assertj.core.api.Assertions.assertThat;
 import static org.assertj.core.api.Assertions.assertThatThrownBy;
 
 import azkaban.Constants;
+import azkaban.Constants.FlowTriggerProps;
 import azkaban.test.executions.ExecutionsTestUtil;
 import azkaban.utils.Props;
 import com.google.common.collect.ImmutableMap;
@@ -125,8 +126,9 @@ public class NodeBeanLoaderTest {
     final NodeBeanLoader loader = new NodeBeanLoader();
     final NodeBean nodeBean = loader.load(ExecutionsTestUtil.getFlowFile(
         TRIGGER_FLOW_YML_TEST_DIR, TRIGGER_FLOW_YML_FILE));
-    final Map<String, String> schedule = ImmutableMap.of(Constants.SCHEDULE_TYPE, Constants
-        .CRON_SCHEDULE_TYPE, Constants.SCHEDULE_VALUE, CRON_EXPRESSION);
+    final Map<String, String> schedule = ImmutableMap
+        .of(FlowTriggerProps.SCHEDULE_TYPE, FlowTriggerProps
+            .CRON_SCHEDULE_TYPE, FlowTriggerProps.SCHEDULE_VALUE, CRON_EXPRESSION);
     validateFlowTriggerBean(nodeBean.getTrigger(), MAX_WAIT_MINS, schedule, 2);
     final List<TriggerDependencyBean> triggerDependencyBeans = nodeBean.getTrigger()
         .getTriggerDependencies();
diff --git a/azkaban-web-server/src/main/java/azkaban/flowtrigger/FlowTriggerService.java b/azkaban-web-server/src/main/java/azkaban/flowtrigger/FlowTriggerService.java
index 3eefcfc..f7f17f6 100644
--- a/azkaban-web-server/src/main/java/azkaban/flowtrigger/FlowTriggerService.java
+++ b/azkaban-web-server/src/main/java/azkaban/flowtrigger/FlowTriggerService.java
@@ -17,6 +17,7 @@
 package azkaban.flowtrigger;
 
 import azkaban.Constants;
+import azkaban.Constants.FlowTriggerProps;
 import azkaban.flowtrigger.database.FlowTriggerInstanceLoader;
 import azkaban.flowtrigger.plugin.FlowTriggerDependencyPluginException;
 import azkaban.flowtrigger.plugin.FlowTriggerDependencyPluginManager;
@@ -28,7 +29,9 @@ import com.google.common.util.concurrent.ThreadFactoryBuilder;
 import java.time.Duration;
 import java.util.ArrayList;
 import java.util.Collection;
+import java.util.HashMap;
 import java.util.List;
+import java.util.Map;
 import java.util.Set;
 import java.util.UUID;
 import java.util.concurrent.ExecutorService;
@@ -70,7 +73,6 @@ public class FlowTriggerService {
 
   private static final Duration CANCELLING_GRACE_PERIOD_AFTER_RESTART = Duration.ofMinutes(1);
   private static final int RECENTLY_FINISHED_TRIGGER_LIMIT = 20;
-  private static final String START_TIME = "starttime";
   private static final Logger logger = LoggerFactory.getLogger(FlowTriggerService.class);
   private final ExecutorService executorService;
   private final List<TriggerInstance> runningTriggers;
@@ -102,13 +104,20 @@ public class FlowTriggerService {
   }
 
   private DependencyInstanceContext createDepContext(final FlowTriggerDependency dep, final long
-      starttimeInMills) throws Exception {
+      startTimeInMills, final String triggerInstId) throws Exception {
     final DependencyCheck dependencyCheck = this.triggerPluginManager
         .getDependencyCheck(dep.getType());
     final DependencyInstanceCallback callback = new DependencyInstanceCallbackImpl(this);
-    final DependencyInstanceConfigImpl config = new DependencyInstanceConfigImpl(dep.getProps());
+
+    final Map<String, String> depInstConfig = new HashMap<>();
+    depInstConfig.putAll(dep.getProps());
+    depInstConfig.put(FlowTriggerProps.DEP_NAME, dep.getName());
+
+    final DependencyInstanceConfigImpl config = new DependencyInstanceConfigImpl(depInstConfig);
     final DependencyInstanceRuntimeProps runtimeProps = new DependencyInstanceRuntimePropsImpl
-        (ImmutableMap.of(START_TIME, String.valueOf(starttimeInMills)));
+        (ImmutableMap
+            .of(FlowTriggerProps.START_TIME, String.valueOf(startTimeInMills), FlowTriggerProps
+                .TRIGGER_INSTANCE_ID, triggerInstId));
     return dependencyCheck.run(config, runtimeProps, callback);
   }
 
@@ -122,7 +131,7 @@ public class FlowTriggerService {
       final String depName = dep.getName();
       DependencyInstanceContext context = null;
       try {
-        context = createDepContext(dep, startTime);
+        context = createDepContext(dep, startTime, triggerInstId);
       } catch (final Exception ex) {
         logger.error(String.format("unable to create dependency context for trigger instance[id ="
             + " %s]", triggerInstId), ex);
@@ -190,7 +199,8 @@ public class FlowTriggerService {
         DependencyInstanceContext context = null;
         try {
           //recreate dependency instance context
-          context = createDepContext(dependency, depInst.getStartTime());
+          context = createDepContext(dependency, depInst.getStartTime(), depInst
+              .getTriggerInstance().getId());
         } catch (final Exception ex) {
           logger
               .error(