azkaban-aplcache
Changes
az-core/src/main/java/azkaban/Constants.java 18(+13 -5)
Details
az-core/src/main/java/azkaban/Constants.java 18(+13 -5)
diff --git a/az-core/src/main/java/azkaban/Constants.java b/az-core/src/main/java/azkaban/Constants.java
index 847d7d3..820b041 100644
--- a/az-core/src/main/java/azkaban/Constants.java
+++ b/az-core/src/main/java/azkaban/Constants.java
@@ -49,11 +49,6 @@ public class Constants {
// Flow 2.0 flow and job path delimiter
public static final String PATH_DELIMITER = ":";
- // Flow trigger props
- public static final String SCHEDULE_TYPE = "type";
- public static final String CRON_SCHEDULE_TYPE = "cron";
- public static final String SCHEDULE_VALUE = "value";
-
// Job properties override suffix
public static final String JOB_OVERRIDE_SUFFIX = ".jor";
@@ -265,4 +260,17 @@ public class Constants {
public static final String JOBCALLBACK_RESPONSE_WAIT_TIMEOUT = "jobcallback.response.wait.timeout";
public static final String JOBCALLBACK_THREAD_POOL_SIZE = "jobcallback.thread.pool.size";
}
+
+ public static class FlowTriggerProps {
+
+ // Flow trigger props
+ public static final String SCHEDULE_TYPE = "type";
+ public static final String CRON_SCHEDULE_TYPE = "cron";
+ public static final String SCHEDULE_VALUE = "value";
+ public static final String DEP_NAME = "name";
+
+ // Flow trigger dependency run time props
+ public static final String START_TIME = "startTime";
+ public static final String TRIGGER_INSTANCE_ID = "triggerInstanceId";
+ }
}
diff --git a/azkaban-common/src/main/java/azkaban/project/NodeBeanLoader.java b/azkaban-common/src/main/java/azkaban/project/NodeBeanLoader.java
index faa29fb..c411987 100644
--- a/azkaban-common/src/main/java/azkaban/project/NodeBeanLoader.java
+++ b/azkaban-common/src/main/java/azkaban/project/NodeBeanLoader.java
@@ -20,6 +20,7 @@ package azkaban.project;
import static com.google.common.base.Preconditions.checkArgument;
import azkaban.Constants;
+import azkaban.Constants.FlowTriggerProps;
import com.google.common.base.Preconditions;
import com.google.common.io.Files;
import java.io.File;
@@ -96,12 +97,15 @@ public class NodeBeanLoader {
Preconditions.checkNotNull(scheduleMap, "flow trigger schedule must not be null");
Preconditions.checkArgument(
- scheduleMap.containsKey(Constants.SCHEDULE_TYPE) && scheduleMap.get(Constants.SCHEDULE_TYPE)
- .equals(Constants.CRON_SCHEDULE_TYPE), "flow trigger schedule type must be cron");
+ scheduleMap.containsKey(FlowTriggerProps.SCHEDULE_TYPE) && scheduleMap.get
+ (FlowTriggerProps.SCHEDULE_TYPE)
+ .equals(FlowTriggerProps.CRON_SCHEDULE_TYPE),
+ "flow trigger schedule type must be cron");
- Preconditions.checkArgument(scheduleMap.containsKey(Constants.SCHEDULE_VALUE) && CronExpression
- .isValidExpression(scheduleMap.get(Constants.SCHEDULE_VALUE)),
- "flow trigger schedule value must be a valid cron expression");
+ Preconditions
+ .checkArgument(scheduleMap.containsKey(FlowTriggerProps.SCHEDULE_VALUE) && CronExpression
+ .isValidExpression(scheduleMap.get(FlowTriggerProps.SCHEDULE_VALUE)),
+ "flow trigger schedule value must be a valid cron expression");
Preconditions.checkArgument(scheduleMap.size() == 2, "flow trigger schedule must "
+ "contain type and value only");
@@ -177,7 +181,7 @@ public class NodeBeanLoader {
flowTriggerBean.setMaxWaitMins(Constants.DEFAULT_FLOW_TRIGGER_MAX_WAIT_TIME.toMinutes());
}
return new FlowTrigger(
- new CronSchedule(flowTriggerBean.getSchedule().get(Constants.SCHEDULE_VALUE)),
+ new CronSchedule(flowTriggerBean.getSchedule().get(FlowTriggerProps.SCHEDULE_VALUE)),
flowTriggerBean.getTriggerDependencies().stream()
.map(d -> new FlowTriggerDependency(d.getName(), d.getType(), d.getParams()))
.collect(Collectors.toList()),
diff --git a/azkaban-common/src/test/java/azkaban/project/NodeBeanLoaderTest.java b/azkaban-common/src/test/java/azkaban/project/NodeBeanLoaderTest.java
index e660318..6484d11 100644
--- a/azkaban-common/src/test/java/azkaban/project/NodeBeanLoaderTest.java
+++ b/azkaban-common/src/test/java/azkaban/project/NodeBeanLoaderTest.java
@@ -21,6 +21,7 @@ import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatThrownBy;
import azkaban.Constants;
+import azkaban.Constants.FlowTriggerProps;
import azkaban.test.executions.ExecutionsTestUtil;
import azkaban.utils.Props;
import com.google.common.collect.ImmutableMap;
@@ -125,8 +126,9 @@ public class NodeBeanLoaderTest {
final NodeBeanLoader loader = new NodeBeanLoader();
final NodeBean nodeBean = loader.load(ExecutionsTestUtil.getFlowFile(
TRIGGER_FLOW_YML_TEST_DIR, TRIGGER_FLOW_YML_FILE));
- final Map<String, String> schedule = ImmutableMap.of(Constants.SCHEDULE_TYPE, Constants
- .CRON_SCHEDULE_TYPE, Constants.SCHEDULE_VALUE, CRON_EXPRESSION);
+ final Map<String, String> schedule = ImmutableMap
+ .of(FlowTriggerProps.SCHEDULE_TYPE, FlowTriggerProps
+ .CRON_SCHEDULE_TYPE, FlowTriggerProps.SCHEDULE_VALUE, CRON_EXPRESSION);
validateFlowTriggerBean(nodeBean.getTrigger(), MAX_WAIT_MINS, schedule, 2);
final List<TriggerDependencyBean> triggerDependencyBeans = nodeBean.getTrigger()
.getTriggerDependencies();
diff --git a/azkaban-web-server/src/main/java/azkaban/flowtrigger/FlowTriggerService.java b/azkaban-web-server/src/main/java/azkaban/flowtrigger/FlowTriggerService.java
index 3eefcfc..f7f17f6 100644
--- a/azkaban-web-server/src/main/java/azkaban/flowtrigger/FlowTriggerService.java
+++ b/azkaban-web-server/src/main/java/azkaban/flowtrigger/FlowTriggerService.java
@@ -17,6 +17,7 @@
package azkaban.flowtrigger;
import azkaban.Constants;
+import azkaban.Constants.FlowTriggerProps;
import azkaban.flowtrigger.database.FlowTriggerInstanceLoader;
import azkaban.flowtrigger.plugin.FlowTriggerDependencyPluginException;
import azkaban.flowtrigger.plugin.FlowTriggerDependencyPluginManager;
@@ -28,7 +29,9 @@ import com.google.common.util.concurrent.ThreadFactoryBuilder;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Collection;
+import java.util.HashMap;
import java.util.List;
+import java.util.Map;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.ExecutorService;
@@ -70,7 +73,6 @@ public class FlowTriggerService {
private static final Duration CANCELLING_GRACE_PERIOD_AFTER_RESTART = Duration.ofMinutes(1);
private static final int RECENTLY_FINISHED_TRIGGER_LIMIT = 20;
- private static final String START_TIME = "starttime";
private static final Logger logger = LoggerFactory.getLogger(FlowTriggerService.class);
private final ExecutorService executorService;
private final List<TriggerInstance> runningTriggers;
@@ -102,13 +104,20 @@ public class FlowTriggerService {
}
private DependencyInstanceContext createDepContext(final FlowTriggerDependency dep, final long
- starttimeInMills) throws Exception {
+ startTimeInMills, final String triggerInstId) throws Exception {
final DependencyCheck dependencyCheck = this.triggerPluginManager
.getDependencyCheck(dep.getType());
final DependencyInstanceCallback callback = new DependencyInstanceCallbackImpl(this);
- final DependencyInstanceConfigImpl config = new DependencyInstanceConfigImpl(dep.getProps());
+
+ final Map<String, String> depInstConfig = new HashMap<>();
+ depInstConfig.putAll(dep.getProps());
+ depInstConfig.put(FlowTriggerProps.DEP_NAME, dep.getName());
+
+ final DependencyInstanceConfigImpl config = new DependencyInstanceConfigImpl(depInstConfig);
final DependencyInstanceRuntimeProps runtimeProps = new DependencyInstanceRuntimePropsImpl
- (ImmutableMap.of(START_TIME, String.valueOf(starttimeInMills)));
+ (ImmutableMap
+ .of(FlowTriggerProps.START_TIME, String.valueOf(startTimeInMills), FlowTriggerProps
+ .TRIGGER_INSTANCE_ID, triggerInstId));
return dependencyCheck.run(config, runtimeProps, callback);
}
@@ -122,7 +131,7 @@ public class FlowTriggerService {
final String depName = dep.getName();
DependencyInstanceContext context = null;
try {
- context = createDepContext(dep, startTime);
+ context = createDepContext(dep, startTime, triggerInstId);
} catch (final Exception ex) {
logger.error(String.format("unable to create dependency context for trigger instance[id ="
+ " %s]", triggerInstId), ex);
@@ -190,7 +199,8 @@ public class FlowTriggerService {
DependencyInstanceContext context = null;
try {
//recreate dependency instance context
- context = createDepContext(dependency, depInst.getStartTime());
+ context = createDepContext(dependency, depInst.getStartTime(), depInst
+ .getTriggerInstance().getId());
} catch (final Exception ex) {
logger
.error(