azkaban-aplcache

Inject azkaban configurations into hadoop application tags

8/9/2018 1:36:42 PM
3.51.1

Details

diff --git a/az-hadoop-jobtype-plugin/src/main/java/azkaban/jobtype/HadoopHiveJob.java b/az-hadoop-jobtype-plugin/src/main/java/azkaban/jobtype/HadoopHiveJob.java
index 7df4569..a729c84 100644
--- a/az-hadoop-jobtype-plugin/src/main/java/azkaban/jobtype/HadoopHiveJob.java
+++ b/az-hadoop-jobtype-plugin/src/main/java/azkaban/jobtype/HadoopHiveJob.java
@@ -81,6 +81,11 @@ public class HadoopHiveJob extends JavaProcessJob {
 
   @Override
   public void run() throws Exception {
+    String[] tagKeys = new String[] { CommonJobProperties.EXEC_ID,
+        CommonJobProperties.FLOW_ID, CommonJobProperties.PROJECT_NAME };
+    getJobProps().put(HadoopConfigurationInjector.INJECT_PREFIX
+        + HadoopJobUtils.MAPREDUCE_JOB_TAGS,
+        HadoopJobUtils.constructHadoopTags(getJobProps(), tagKeys));
     HadoopConfigurationInjector.prepareResourcesToInject(getJobProps(),
         getWorkingDirectory());
 
diff --git a/az-hadoop-jobtype-plugin/src/main/java/azkaban/jobtype/HadoopJavaJob.java b/az-hadoop-jobtype-plugin/src/main/java/azkaban/jobtype/HadoopJavaJob.java
index 4d103ad..ed902ba 100644
--- a/az-hadoop-jobtype-plugin/src/main/java/azkaban/jobtype/HadoopJavaJob.java
+++ b/az-hadoop-jobtype-plugin/src/main/java/azkaban/jobtype/HadoopJavaJob.java
@@ -178,6 +178,11 @@ public class HadoopJavaJob extends JavaProcessJob {
 
   @Override
   public void run() throws Exception {
+    String[] tagKeys = new String[] { CommonJobProperties.EXEC_ID,
+        CommonJobProperties.FLOW_ID, CommonJobProperties.PROJECT_NAME };
+    getJobProps().put(HadoopConfigurationInjector.INJECT_PREFIX
+        + HadoopJobUtils.MAPREDUCE_JOB_TAGS,
+        HadoopJobUtils.constructHadoopTags(getJobProps(), tagKeys));
     HadoopConfigurationInjector.prepareResourcesToInject(getJobProps(),
         getWorkingDirectory());
 
diff --git a/az-hadoop-jobtype-plugin/src/main/java/azkaban/jobtype/HadoopJobUtils.java b/az-hadoop-jobtype-plugin/src/main/java/azkaban/jobtype/HadoopJobUtils.java
index d1fbfe5..08b504a 100644
--- a/az-hadoop-jobtype-plugin/src/main/java/azkaban/jobtype/HadoopJobUtils.java
+++ b/az-hadoop-jobtype-plugin/src/main/java/azkaban/jobtype/HadoopJobUtils.java
@@ -16,6 +16,8 @@
 
 package azkaban.jobtype;
 
+import com.google.common.base.Joiner;
+
 import java.io.BufferedReader;
 import java.io.File;
 import java.io.FileInputStream;
@@ -91,6 +93,9 @@ public class HadoopJobUtils {
   // MapReduce config for specifying additional namenodes for delegation tokens
   public static final String MAPREDUCE_JOB_OTHER_NAMENODES = "mapreduce.job.hdfs-servers";
 
+  // MapReduce config for mapreduce job tags
+  public static final String MAPREDUCE_JOB_TAGS = "mapreduce.job.tags";
+
   // Azkaban property for listing additional namenodes for delegation tokens
   private static final String OTHER_NAMENODES_PROPERTY = "other_namenodes";
 
@@ -593,4 +598,21 @@ public class HadoopJobUtils {
     }
     return String.format("-D%s=%s", key, value);
   }
+
+  /**
+   * Construct a CSV of tags for the Hadoop application.
+   *
+   * @param List of keys to construct tags from.
+   * @return a CSV of tags
+   */
+  public static String constructHadoopTags(Props props, String[] keys) {
+    String[] keysAndValues = new String[keys.length];
+    for (int i = 0; i < keys.length; i++) {
+      if (props.containsKey(keys[i])) {
+        keysAndValues[i] = keys[i] + ":" + props.get(keys[i]);
+      }
+    }
+    Joiner joiner = Joiner.on(',').skipNulls();
+    return joiner.join(keysAndValues);
+  }
 }
diff --git a/az-hadoop-jobtype-plugin/src/main/java/azkaban/jobtype/HadoopPigJob.java b/az-hadoop-jobtype-plugin/src/main/java/azkaban/jobtype/HadoopPigJob.java
index 0d70425..b799405 100644
--- a/az-hadoop-jobtype-plugin/src/main/java/azkaban/jobtype/HadoopPigJob.java
+++ b/az-hadoop-jobtype-plugin/src/main/java/azkaban/jobtype/HadoopPigJob.java
@@ -102,6 +102,11 @@ public class HadoopPigJob extends JavaProcessJob {
 
   @Override
   public void run() throws Exception {
+    String[] tagKeys = new String[] { CommonJobProperties.EXEC_ID,
+        CommonJobProperties.FLOW_ID, CommonJobProperties.PROJECT_NAME };
+    getJobProps().put(HadoopConfigurationInjector.INJECT_PREFIX
+        + HadoopJobUtils.MAPREDUCE_JOB_TAGS,
+        HadoopJobUtils.constructHadoopTags(getJobProps(), tagKeys));
     HadoopConfigurationInjector.prepareResourcesToInject(getJobProps(),
         getWorkingDirectory());
 
diff --git a/az-hadoop-jobtype-plugin/src/test/java/azkaban/jobtype/TestHadoopJobUtilsConstructHadoopTags.java b/az-hadoop-jobtype-plugin/src/test/java/azkaban/jobtype/TestHadoopJobUtilsConstructHadoopTags.java
new file mode 100644
index 0000000..78fe6db
--- /dev/null
+++ b/az-hadoop-jobtype-plugin/src/test/java/azkaban/jobtype/TestHadoopJobUtilsConstructHadoopTags.java
@@ -0,0 +1,50 @@
+package azkaban.jobtype;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+import azkaban.utils.Props;
+
+import org.junit.Before;
+import org.junit.Test;
+
+/**
+ * Test class for constructHadoopTags method in HadoopJobUtils
+ */
+public class TestHadoopJobUtilsConstructHadoopTags {
+
+  private Props props;
+
+  @Before
+  public void beforeMethod() {
+    props = new Props();
+  }
+
+  @Test
+  public void testNoTags() {
+    String[] tags = new String[0];
+    assertThat(HadoopJobUtils.constructHadoopTags(props, tags)).isEqualTo("");
+  }
+
+  @Test
+  public void testWithTags() {
+    String tag0 = "tag0";
+    String tag1 = "tag1";
+    props.put(tag0, "val0");
+    props.put(tag1, "val1");
+    String[] tags = new String[] { tag0, tag1 };
+    assertThat(HadoopJobUtils.constructHadoopTags(props, tags))
+        .isEqualTo("tag0:val0,tag1:val1");
+  }
+
+  @Test
+  public void testWithNonExistentTags() {
+    String tag0 = "tag0";
+    String tag1 = "tag1";
+    String tag2 = "tag2";
+    props.put(tag0, "val0");
+    props.put(tag2, "val2");
+    String[] tags = new String[] { tag0, tag1, tag2 };
+    assertThat(HadoopJobUtils.constructHadoopTags(props, tags))
+        .isEqualTo("tag0:val0,tag2:val2");
+  }
+}