azkaban-developers

Propagate select properties from flow or job to event metadata

3/22/2019 6:31:24 PM

Details

diff --git a/az-core/src/main/java/azkaban/Constants.java b/az-core/src/main/java/azkaban/Constants.java
index 850acc4..45bf29c 100644
--- a/az-core/src/main/java/azkaban/Constants.java
+++ b/az-core/src/main/java/azkaban/Constants.java
@@ -184,6 +184,8 @@ public class Constants {
     public static final String AZKABAN_EVENT_REPORTING_CLASS_PARAM =
         "azkaban.event.reporting.class";
     public static final String AZKABAN_EVENT_REPORTING_ENABLED = "azkaban.event.reporting.enabled";
+    // Comma separated list of properties to propagate from flow to Event reporter metadata
+    public static final String AZKABAN_EVENT_REPORTING_PROPERTIES_TO_PROPAGATE = "azkaban.event.reporting.propagateProperties";
     public static final String AZKABAN_EVENT_REPORTING_KAFKA_BROKERS =
         "azkaban.event.reporting.kafka.brokers";
     public static final String AZKABAN_EVENT_REPORTING_KAFKA_TOPIC =
diff --git a/azkaban-exec-server/src/main/java/azkaban/execapp/FlowRunner.java b/azkaban-exec-server/src/main/java/azkaban/execapp/FlowRunner.java
index b7d091b..3515f76 100644
--- a/azkaban-exec-server/src/main/java/azkaban/execapp/FlowRunner.java
+++ b/azkaban-exec-server/src/main/java/azkaban/execapp/FlowRunner.java
@@ -17,6 +17,7 @@
 package azkaban.execapp;
 
 import static azkaban.Constants.ConfigurationKeys.AZKABAN_SERVER_HOST_NAME;
+import static azkaban.Constants.ConfigurationKeys.AZKABAN_EVENT_REPORTING_PROPERTIES_TO_PROPAGATE;
 import static azkaban.execapp.ConditionalWorkflowUtils.FAILED;
 import static azkaban.execapp.ConditionalWorkflowUtils.PENDING;
 import static azkaban.execapp.ConditionalWorkflowUtils.checkConditionOnJobStatus;
@@ -61,6 +62,8 @@ import azkaban.spi.EventType;
 import azkaban.utils.Props;
 import azkaban.utils.SwapQueue;
 import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Splitter;
+import com.google.common.base.Strings;
 import com.google.common.collect.ImmutableList;
 import com.google.common.collect.ImmutableSet;
 import com.google.common.io.Files;
@@ -99,6 +102,8 @@ import org.apache.log4j.PatternLayout;
  */
 public class FlowRunner extends EventHandler implements Runnable {
 
+  private static Splitter SPLIT_ON_COMMA = Splitter.on(",").omitEmptyStrings().trimResults();
+
   private static final Layout DEFAULT_LAYOUT = new PatternLayout(
       "%d{dd-MM-yyyy HH:mm:ss z} %c{1} %p - %m\n");
   // We check update every 5 minutes, just in case things get stuck. But for the
@@ -1366,6 +1371,26 @@ public class FlowRunner extends EventHandler implements Runnable {
       metaData.put("executionId", String.valueOf(flow.getExecutionId()));
       metaData.put("startTime", String.valueOf(flow.getStartTime()));
       metaData.put("submitTime", String.valueOf(flow.getSubmitTime()));
+
+      // Propagate flow properties to Event Reporter
+      if (FlowLoaderUtils.isAzkabanFlowVersion20(flow.getAzkabanFlowVersion())) {
+        // In Flow 2.0, flow has designated properties (defined at its own level in Yaml)
+        propagateMetadataFromProps(metaData, flow.getInputProps(), "flow", flow.getId(), logger);
+      } else {
+        // In Flow 1.0, flow properties are combination of shared properties in individual files (order not defined,
+        // .. because it's loaded by fs list order and put in a HashMap).
+        Props combinedProps = new Props();
+        for (Props sharedProp : flowRunner.sharedProps.values()) {
+          // sharedProp.getFlattened() gets its parent's props too, so we don't have to recurse
+          combinedProps.putAll(sharedProp.getFlattened());
+        }
+
+        // In Flow 1.0, flow's inputProps contains overrides, so apply that as override to combined shared props
+        combinedProps = new Props(combinedProps, flow.getInputProps());
+
+        propagateMetadataFromProps(metaData, combinedProps, "flow", flow.getId(), logger);
+      }
+
       return metaData;
     }
 
@@ -1405,6 +1430,10 @@ public class FlowRunner extends EventHandler implements Runnable {
       metaData.put("azkabanHost", props.getString(AZKABAN_SERVER_HOST_NAME, "unknown"));
       metaData.put("jobProxyUser",
           jobRunner.getProps().getString(JobProperties.USER_TO_PROXY, null));
+
+      // Propagate job properties to Event Reporter
+      propagateMetadataFromProps(metaData, node.getInputProps(), "job", node.getId(), logger);
+
       return metaData;
     }
 
@@ -1457,4 +1486,48 @@ public class FlowRunner extends EventHandler implements Runnable {
       }
     }
   }
+
+  /***
+   * Propagate properties (specified in {@code AZKABAN_EVENT_REPORTING_PROPERTIES_TO_PROPAGATE})
+   * to metadata for event reporting.
+   * @param metaData Metadata map to update with properties.
+   * @param inputProps Input properties for flow or job.
+   * @param nodeType Flow or job.
+   * @param nodeName Flow or job name.
+   * @param logger Logger from invoking class for log sanity.
+   */
+  @VisibleForTesting
+  static void propagateMetadataFromProps(Map<String, String> metaData, Props inputProps, String nodeType,
+      String nodeName, Logger logger) {
+
+    // Backward compatibility: Unless user specifies, this will be absent from flows and jobs
+    // .. if so, do a no-op like before
+    if (!inputProps.containsKey(AZKABAN_EVENT_REPORTING_PROPERTIES_TO_PROPAGATE)) {
+      return;
+    }
+
+    if (null == metaData || null == inputProps || null == logger ||
+        Strings.isNullOrEmpty(nodeType) || Strings.isNullOrEmpty(nodeName)) {
+      throw new IllegalArgumentException("Input params should not be null or empty.");
+    }
+
+    final String propsToPropagate = inputProps.getString(AZKABAN_EVENT_REPORTING_PROPERTIES_TO_PROPAGATE);
+    if (Strings.isNullOrEmpty(propsToPropagate)) {
+      // Nothing to propagate
+      logger.info(String.format("No properties to propagate to metadata for %s: %s", nodeType, nodeName));
+      return;
+    } else {
+      logger.info(String.format("Propagating: %s to metadata for %s: %s", propsToPropagate, nodeType, nodeName));
+    }
+
+    final List<String> propsToPropagateList = SPLIT_ON_COMMA.splitToList(propsToPropagate);
+    for (String propKey : propsToPropagateList) {
+      if (!inputProps.containsKey(propKey)) {
+        logger.warn(String.format("%s does not contains: %s property; "
+            + "skipping propagation to metadata", nodeName, propKey));
+        continue;
+      }
+      metaData.put(propKey, inputProps.getString(propKey));
+    }
+  }
 }
diff --git a/azkaban-exec-server/src/test/java/azkaban/execapp/FlowRunnerTest.java b/azkaban-exec-server/src/test/java/azkaban/execapp/FlowRunnerTest.java
index e534171..cf0df24 100644
--- a/azkaban-exec-server/src/test/java/azkaban/execapp/FlowRunnerTest.java
+++ b/azkaban-exec-server/src/test/java/azkaban/execapp/FlowRunnerTest.java
@@ -16,6 +16,7 @@
 
 package azkaban.execapp;
 
+import azkaban.Constants;
 import azkaban.executor.ExecutableFlow;
 import azkaban.executor.ExecutableNode;
 import azkaban.executor.ExecutionOptions;
@@ -23,6 +24,10 @@ import azkaban.executor.ExecutionOptions.FailureAction;
 import azkaban.executor.InteractiveTestJob;
 import azkaban.executor.Status;
 import azkaban.spi.EventType;
+import azkaban.utils.Props;
+import java.util.HashMap;
+import java.util.Map;
+import org.apache.log4j.Logger;
 import org.junit.Assert;
 import org.junit.Before;
 import org.junit.Test;
@@ -246,6 +251,39 @@ public class FlowRunnerTest extends FlowRunnerTestBase {
     waitForAndAssertFlowStatus(Status.FAILED);
   }
 
+  @Test
+  public void addMetadataFromProperties() throws Exception {
+    Map<String, String> metadataMap = new HashMap<>();
+    Props inputProps = new Props();
+    inputProps.put(Constants.ConfigurationKeys.AZKABAN_EVENT_REPORTING_PROPERTIES_TO_PROPAGATE, "my.prop1,my.prop2");
+    inputProps.put("my.prop1", "value1");
+    inputProps.put("my.prop2", "value2");
+
+    // Test happy path
+    FlowRunner.propagateMetadataFromProps(metadataMap, inputProps, "flow", "dummyFlow",
+        Logger.getLogger(FlowRunnerTest.class));
+
+    Assert.assertEquals("Metadata not propagated correctly.", metadataMap.size(), 2);
+    Assert.assertEquals("Metadata not propagated correctly.", "value1", metadataMap.get("my.prop1"));
+    Assert.assertEquals("Metadata not propagated correctly.", "value2", metadataMap.get("my.prop2"));
+
+    // Test backward compatibility: pass no value for AZKABAN_EVENT_REPORTING_PROPERTIES_TO_PROPAGATE and expect
+    // .. nothing
+    metadataMap = new HashMap<>();
+    FlowRunner.propagateMetadataFromProps(metadataMap, new Props(), "flow", "dummyFlow",
+        Logger.getLogger(FlowRunnerTest.class));
+    Assert.assertEquals("Metadata propagation backward compatibility has issues.", metadataMap.size(), 0);
+
+    // Test negative path
+    try {
+      FlowRunner.propagateMetadataFromProps(null, inputProps, "flow", "dummyFlow",
+          Logger.getLogger(FlowRunnerTest.class));
+      Assert.fail("Metadata propagation did not fail with bad data.");
+    } catch (Exception e) {
+      // Ignore exception, since its expected.
+    }
+  }
+
   private void assertAttempts(final String name, final int attempt) {
     final ExecutableNode node = this.runner.getExecutableFlow().getExecutableNode(name);
     if (node.getAttempt() != attempt) {