diff --git a/azkaban-exec-server/src/main/java/azkaban/execapp/FlowRunner.java b/azkaban-exec-server/src/main/java/azkaban/execapp/FlowRunner.java
index b7d091b..3515f76 100644
--- a/azkaban-exec-server/src/main/java/azkaban/execapp/FlowRunner.java
+++ b/azkaban-exec-server/src/main/java/azkaban/execapp/FlowRunner.java
@@ -17,6 +17,7 @@
package azkaban.execapp;
import static azkaban.Constants.ConfigurationKeys.AZKABAN_SERVER_HOST_NAME;
+import static azkaban.Constants.ConfigurationKeys.AZKABAN_EVENT_REPORTING_PROPERTIES_TO_PROPAGATE;
import static azkaban.execapp.ConditionalWorkflowUtils.FAILED;
import static azkaban.execapp.ConditionalWorkflowUtils.PENDING;
import static azkaban.execapp.ConditionalWorkflowUtils.checkConditionOnJobStatus;
@@ -61,6 +62,8 @@ import azkaban.spi.EventType;
import azkaban.utils.Props;
import azkaban.utils.SwapQueue;
import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Splitter;
+import com.google.common.base.Strings;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableSet;
import com.google.common.io.Files;
@@ -99,6 +102,8 @@ import org.apache.log4j.PatternLayout;
*/
public class FlowRunner extends EventHandler implements Runnable {
+ private static Splitter SPLIT_ON_COMMA = Splitter.on(",").omitEmptyStrings().trimResults();
+
private static final Layout DEFAULT_LAYOUT = new PatternLayout(
"%d{dd-MM-yyyy HH:mm:ss z} %c{1} %p - %m\n");
// We check update every 5 minutes, just in case things get stuck. But for the
@@ -1366,6 +1371,26 @@ public class FlowRunner extends EventHandler implements Runnable {
metaData.put("executionId", String.valueOf(flow.getExecutionId()));
metaData.put("startTime", String.valueOf(flow.getStartTime()));
metaData.put("submitTime", String.valueOf(flow.getSubmitTime()));
+
+ // Propagate flow properties to Event Reporter
+ if (FlowLoaderUtils.isAzkabanFlowVersion20(flow.getAzkabanFlowVersion())) {
+ // In Flow 2.0, flow has designated properties (defined at its own level in Yaml)
+ propagateMetadataFromProps(metaData, flow.getInputProps(), "flow", flow.getId(), logger);
+ } else {
+ // In Flow 1.0, flow properties are combination of shared properties in individual files (order not defined,
+ // .. because it's loaded by fs list order and put in a HashMap).
+ Props combinedProps = new Props();
+ for (Props sharedProp : flowRunner.sharedProps.values()) {
+ // sharedProp.getFlattened() gets its parent's props too, so we don't have to recurse
+ combinedProps.putAll(sharedProp.getFlattened());
+ }
+
+ // In Flow 1.0, flow's inputProps contains overrides, so apply that as override to combined shared props
+ combinedProps = new Props(combinedProps, flow.getInputProps());
+
+ propagateMetadataFromProps(metaData, combinedProps, "flow", flow.getId(), logger);
+ }
+
return metaData;
}
@@ -1405,6 +1430,10 @@ public class FlowRunner extends EventHandler implements Runnable {
metaData.put("azkabanHost", props.getString(AZKABAN_SERVER_HOST_NAME, "unknown"));
metaData.put("jobProxyUser",
jobRunner.getProps().getString(JobProperties.USER_TO_PROXY, null));
+
+ // Propagate job properties to Event Reporter
+ propagateMetadataFromProps(metaData, node.getInputProps(), "job", node.getId(), logger);
+
return metaData;
}
@@ -1457,4 +1486,48 @@ public class FlowRunner extends EventHandler implements Runnable {
}
}
}
+
+ /***
+ * Propagate properties (specified in {@code AZKABAN_EVENT_REPORTING_PROPERTIES_TO_PROPAGATE})
+ * to metadata for event reporting.
+ * @param metaData Metadata map to update with properties.
+ * @param inputProps Input properties for flow or job.
+ * @param nodeType Flow or job.
+ * @param nodeName Flow or job name.
+ * @param logger Logger from invoking class for log sanity.
+ */
+ @VisibleForTesting
+ static void propagateMetadataFromProps(Map<String, String> metaData, Props inputProps, String nodeType,
+ String nodeName, Logger logger) {
+
+ // Backward compatibility: Unless user specifies, this will be absent from flows and jobs
+ // .. if so, do a no-op like before
+ if (!inputProps.containsKey(AZKABAN_EVENT_REPORTING_PROPERTIES_TO_PROPAGATE)) {
+ return;
+ }
+
+ if (null == metaData || null == inputProps || null == logger ||
+ Strings.isNullOrEmpty(nodeType) || Strings.isNullOrEmpty(nodeName)) {
+ throw new IllegalArgumentException("Input params should not be null or empty.");
+ }
+
+ final String propsToPropagate = inputProps.getString(AZKABAN_EVENT_REPORTING_PROPERTIES_TO_PROPAGATE);
+ if (Strings.isNullOrEmpty(propsToPropagate)) {
+ // Nothing to propagate
+ logger.info(String.format("No properties to propagate to metadata for %s: %s", nodeType, nodeName));
+ return;
+ } else {
+ logger.info(String.format("Propagating: %s to metadata for %s: %s", propsToPropagate, nodeType, nodeName));
+ }
+
+ final List<String> propsToPropagateList = SPLIT_ON_COMMA.splitToList(propsToPropagate);
+ for (String propKey : propsToPropagateList) {
+ if (!inputProps.containsKey(propKey)) {
+ logger.warn(String.format("%s does not contains: %s property; "
+ + "skipping propagation to metadata", nodeName, propKey));
+ continue;
+ }
+ metaData.put(propKey, inputProps.getString(propKey));
+ }
+ }
}
diff --git a/azkaban-exec-server/src/test/java/azkaban/execapp/FlowRunnerTest.java b/azkaban-exec-server/src/test/java/azkaban/execapp/FlowRunnerTest.java
index e534171..cf0df24 100644
--- a/azkaban-exec-server/src/test/java/azkaban/execapp/FlowRunnerTest.java
+++ b/azkaban-exec-server/src/test/java/azkaban/execapp/FlowRunnerTest.java
@@ -16,6 +16,7 @@
package azkaban.execapp;
+import azkaban.Constants;
import azkaban.executor.ExecutableFlow;
import azkaban.executor.ExecutableNode;
import azkaban.executor.ExecutionOptions;
@@ -23,6 +24,10 @@ import azkaban.executor.ExecutionOptions.FailureAction;
import azkaban.executor.InteractiveTestJob;
import azkaban.executor.Status;
import azkaban.spi.EventType;
+import azkaban.utils.Props;
+import java.util.HashMap;
+import java.util.Map;
+import org.apache.log4j.Logger;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
@@ -246,6 +251,39 @@ public class FlowRunnerTest extends FlowRunnerTestBase {
waitForAndAssertFlowStatus(Status.FAILED);
}
+ @Test
+ public void addMetadataFromProperties() throws Exception {
+ Map<String, String> metadataMap = new HashMap<>();
+ Props inputProps = new Props();
+ inputProps.put(Constants.ConfigurationKeys.AZKABAN_EVENT_REPORTING_PROPERTIES_TO_PROPAGATE, "my.prop1,my.prop2");
+ inputProps.put("my.prop1", "value1");
+ inputProps.put("my.prop2", "value2");
+
+ // Test happy path
+ FlowRunner.propagateMetadataFromProps(metadataMap, inputProps, "flow", "dummyFlow",
+ Logger.getLogger(FlowRunnerTest.class));
+
+ Assert.assertEquals("Metadata not propagated correctly.", metadataMap.size(), 2);
+ Assert.assertEquals("Metadata not propagated correctly.", "value1", metadataMap.get("my.prop1"));
+ Assert.assertEquals("Metadata not propagated correctly.", "value2", metadataMap.get("my.prop2"));
+
+ // Test backward compatibility: pass no value for AZKABAN_EVENT_REPORTING_PROPERTIES_TO_PROPAGATE and expect
+ // .. nothing
+ metadataMap = new HashMap<>();
+ FlowRunner.propagateMetadataFromProps(metadataMap, new Props(), "flow", "dummyFlow",
+ Logger.getLogger(FlowRunnerTest.class));
+ Assert.assertEquals("Metadata propagation backward compatibility has issues.", metadataMap.size(), 0);
+
+ // Test negative path
+ try {
+ FlowRunner.propagateMetadataFromProps(null, inputProps, "flow", "dummyFlow",
+ Logger.getLogger(FlowRunnerTest.class));
+ Assert.fail("Metadata propagation did not fail with bad data.");
+ } catch (Exception e) {
+ // Ignore exception, since its expected.
+ }
+ }
+
private void assertAttempts(final String name, final int attempt) {
final ExecutableNode node = this.runner.getExecutableFlow().getExecutableNode(name);
if (node.getAttempt() != attempt) {