Details
diff --git a/az-hadoop-jobtype-plugin/src/main/java/azkaban/jobtype/HadoopHiveJob.java b/az-hadoop-jobtype-plugin/src/main/java/azkaban/jobtype/HadoopHiveJob.java
index 7df4569..a729c84 100644
--- a/az-hadoop-jobtype-plugin/src/main/java/azkaban/jobtype/HadoopHiveJob.java
+++ b/az-hadoop-jobtype-plugin/src/main/java/azkaban/jobtype/HadoopHiveJob.java
@@ -81,6 +81,11 @@ public class HadoopHiveJob extends JavaProcessJob {
@Override
public void run() throws Exception {
+ String[] tagKeys = new String[] { CommonJobProperties.EXEC_ID,
+ CommonJobProperties.FLOW_ID, CommonJobProperties.PROJECT_NAME };
+ getJobProps().put(HadoopConfigurationInjector.INJECT_PREFIX
+ + HadoopJobUtils.MAPREDUCE_JOB_TAGS,
+ HadoopJobUtils.constructHadoopTags(getJobProps(), tagKeys));
HadoopConfigurationInjector.prepareResourcesToInject(getJobProps(),
getWorkingDirectory());
diff --git a/az-hadoop-jobtype-plugin/src/main/java/azkaban/jobtype/HadoopJavaJob.java b/az-hadoop-jobtype-plugin/src/main/java/azkaban/jobtype/HadoopJavaJob.java
index 4d103ad..ed902ba 100644
--- a/az-hadoop-jobtype-plugin/src/main/java/azkaban/jobtype/HadoopJavaJob.java
+++ b/az-hadoop-jobtype-plugin/src/main/java/azkaban/jobtype/HadoopJavaJob.java
@@ -178,6 +178,11 @@ public class HadoopJavaJob extends JavaProcessJob {
@Override
public void run() throws Exception {
+ String[] tagKeys = new String[] { CommonJobProperties.EXEC_ID,
+ CommonJobProperties.FLOW_ID, CommonJobProperties.PROJECT_NAME };
+ getJobProps().put(HadoopConfigurationInjector.INJECT_PREFIX
+ + HadoopJobUtils.MAPREDUCE_JOB_TAGS,
+ HadoopJobUtils.constructHadoopTags(getJobProps(), tagKeys));
HadoopConfigurationInjector.prepareResourcesToInject(getJobProps(),
getWorkingDirectory());
diff --git a/az-hadoop-jobtype-plugin/src/main/java/azkaban/jobtype/HadoopJobUtils.java b/az-hadoop-jobtype-plugin/src/main/java/azkaban/jobtype/HadoopJobUtils.java
index d1fbfe5..08b504a 100644
--- a/az-hadoop-jobtype-plugin/src/main/java/azkaban/jobtype/HadoopJobUtils.java
+++ b/az-hadoop-jobtype-plugin/src/main/java/azkaban/jobtype/HadoopJobUtils.java
@@ -16,6 +16,8 @@
package azkaban.jobtype;
+import com.google.common.base.Joiner;
+
import java.io.BufferedReader;
import java.io.File;
import java.io.FileInputStream;
@@ -91,6 +93,9 @@ public class HadoopJobUtils {
// MapReduce config for specifying additional namenodes for delegation tokens
public static final String MAPREDUCE_JOB_OTHER_NAMENODES = "mapreduce.job.hdfs-servers";
+ // MapReduce config for mapreduce job tags
+ public static final String MAPREDUCE_JOB_TAGS = "mapreduce.job.tags";
+
// Azkaban property for listing additional namenodes for delegation tokens
private static final String OTHER_NAMENODES_PROPERTY = "other_namenodes";
@@ -593,4 +598,21 @@ public class HadoopJobUtils {
}
return String.format("-D%s=%s", key, value);
}
+
+ /**
+ * Construct a CSV of tags for the Hadoop application.
+ *
+ * @param List of keys to construct tags from.
+ * @return a CSV of tags
+ */
+ public static String constructHadoopTags(Props props, String[] keys) {
+ String[] keysAndValues = new String[keys.length];
+ for (int i = 0; i < keys.length; i++) {
+ if (props.containsKey(keys[i])) {
+ keysAndValues[i] = keys[i] + ":" + props.get(keys[i]);
+ }
+ }
+ Joiner joiner = Joiner.on(',').skipNulls();
+ return joiner.join(keysAndValues);
+ }
}
diff --git a/az-hadoop-jobtype-plugin/src/main/java/azkaban/jobtype/HadoopPigJob.java b/az-hadoop-jobtype-plugin/src/main/java/azkaban/jobtype/HadoopPigJob.java
index 0d70425..b799405 100644
--- a/az-hadoop-jobtype-plugin/src/main/java/azkaban/jobtype/HadoopPigJob.java
+++ b/az-hadoop-jobtype-plugin/src/main/java/azkaban/jobtype/HadoopPigJob.java
@@ -102,6 +102,11 @@ public class HadoopPigJob extends JavaProcessJob {
@Override
public void run() throws Exception {
+ String[] tagKeys = new String[] { CommonJobProperties.EXEC_ID,
+ CommonJobProperties.FLOW_ID, CommonJobProperties.PROJECT_NAME };
+ getJobProps().put(HadoopConfigurationInjector.INJECT_PREFIX
+ + HadoopJobUtils.MAPREDUCE_JOB_TAGS,
+ HadoopJobUtils.constructHadoopTags(getJobProps(), tagKeys));
HadoopConfigurationInjector.prepareResourcesToInject(getJobProps(),
getWorkingDirectory());
diff --git a/az-hadoop-jobtype-plugin/src/test/java/azkaban/jobtype/TestHadoopJobUtilsConstructHadoopTags.java b/az-hadoop-jobtype-plugin/src/test/java/azkaban/jobtype/TestHadoopJobUtilsConstructHadoopTags.java
new file mode 100644
index 0000000..78fe6db
--- /dev/null
+++ b/az-hadoop-jobtype-plugin/src/test/java/azkaban/jobtype/TestHadoopJobUtilsConstructHadoopTags.java
@@ -0,0 +1,50 @@
+package azkaban.jobtype;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+import azkaban.utils.Props;
+
+import org.junit.Before;
+import org.junit.Test;
+
+/**
+ * Test class for constructHadoopTags method in HadoopJobUtils
+ */
+public class TestHadoopJobUtilsConstructHadoopTags {
+
+ private Props props;
+
+ @Before
+ public void beforeMethod() {
+ props = new Props();
+ }
+
+ @Test
+ public void testNoTags() {
+ String[] tags = new String[0];
+ assertThat(HadoopJobUtils.constructHadoopTags(props, tags)).isEqualTo("");
+ }
+
+ @Test
+ public void testWithTags() {
+ String tag0 = "tag0";
+ String tag1 = "tag1";
+ props.put(tag0, "val0");
+ props.put(tag1, "val1");
+ String[] tags = new String[] { tag0, tag1 };
+ assertThat(HadoopJobUtils.constructHadoopTags(props, tags))
+ .isEqualTo("tag0:val0,tag1:val1");
+ }
+
+ @Test
+ public void testWithNonExistentTags() {
+ String tag0 = "tag0";
+ String tag1 = "tag1";
+ String tag2 = "tag2";
+ props.put(tag0, "val0");
+ props.put(tag2, "val2");
+ String[] tags = new String[] { tag0, tag1, tag2 };
+ assertThat(HadoopJobUtils.constructHadoopTags(props, tags))
+ .isEqualTo("tag0:val0,tag2:val2");
+ }
+}