azkaban-aplcache

Merge pull request #291 from hluu/master Add flow name, job

7/16/2014 10:53:21 PM

Details

diff --git a/azkaban-common/src/main/java/azkaban/jobtype/JobTypeManager.java b/azkaban-common/src/main/java/azkaban/jobtype/JobTypeManager.java
index e5d0dae..212f1a9 100644
--- a/azkaban-common/src/main/java/azkaban/jobtype/JobTypeManager.java
+++ b/azkaban-common/src/main/java/azkaban/jobtype/JobTypeManager.java
@@ -78,7 +78,7 @@ public class JobTypeManager {
         try {
           loadPluginJobTypes(plugins);
         } catch (Exception e) {
-          logger.info("Plugin jobtypes failed to load. " + e.getCause());
+          logger.info("Plugin jobtypes failed to load. " + e.getCause(), e);
           throw new JobTypeManagerException(e);
         }
       }
@@ -163,8 +163,8 @@ public class JobTypeManager {
         try {
           loadJobTypes(dir, plugins);
         } catch (Exception e) {
-          logger.error("Failed to load jobtype " + dir.getName()
-              + e.getMessage());
+          logger.error(
+              "Failed to load jobtype " + dir.getName() + e.getMessage(), e);
           throw new JobTypeManagerException(e);
         }
       }
@@ -202,8 +202,9 @@ public class JobTypeManager {
       pluginLoadProps = new Props(commonPluginLoadProps, pluginLoadPropsFile);
       pluginLoadProps = PropsUtils.resolveProps(pluginLoadProps);
     } catch (Exception e) {
+      logger.error("pluginLoadProps to help with debugging: " + pluginLoadProps);
       throw new JobTypeManagerException("Failed to get jobtype properties"
-          + e.getMessage());
+          + e.getMessage(), e);
     }
     // Add properties into the plugin set
     pluginLoadProps.put("plugin.dir", pluginDir.getAbsolutePath());
@@ -232,9 +233,6 @@ public class JobTypeManager {
       Job job =
           (Job) Utils.callConstructor(clazz, "dummy", fakeSysProps,
               fakeJobProps, logger);
-    } catch (Exception e) {
-      logger.info("Jobtype " + jobTypeName + " failed test!", e);
-      throw new JobExecutionException(e);
     } catch (Throwable t) {
       logger.info("Jobtype " + jobTypeName + " failed test!", t);
       throw new JobExecutionException(t);
diff --git a/azkaban-execserver/src/main/java/azkaban/execapp/JMXHttpServlet.java b/azkaban-execserver/src/main/java/azkaban/execapp/JMXHttpServlet.java
index 8461548..6412e0b 100644
--- a/azkaban-execserver/src/main/java/azkaban/execapp/JMXHttpServlet.java
+++ b/azkaban-execserver/src/main/java/azkaban/execapp/JMXHttpServlet.java
@@ -18,6 +18,8 @@ package azkaban.execapp;
 
 import java.io.IOException;
 import java.util.HashMap;
+import java.util.Map;
+import java.util.TreeMap;
 
 import javax.management.MBeanAttributeInfo;
 import javax.management.MBeanInfo;
@@ -31,8 +33,8 @@ import javax.servlet.http.HttpServletResponse;
 import org.apache.log4j.Logger;
 
 import azkaban.executor.ConnectorParams;
-import azkaban.server.ServerConstants;
 import azkaban.server.HttpRequestUtils;
+import azkaban.server.ServerConstants;
 import azkaban.utils.JSONUtils;
 
 public class JMXHttpServlet extends HttpServlet implements ConnectorParams {
@@ -57,7 +59,7 @@ public class JMXHttpServlet extends HttpServlet implements ConnectorParams {
 
   protected void doGet(HttpServletRequest req, HttpServletResponse resp)
       throws ServletException, IOException {
-    HashMap<String, Object> ret = new HashMap<String, Object>();
+    Map<String, Object> ret = new HashMap<String, Object>();
 
     if (hasParam(req, JMX_GET_MBEANS)) {
       ret.put("mbeans", server.getMbeanNames());
@@ -71,7 +73,7 @@ public class JMXHttpServlet extends HttpServlet implements ConnectorParams {
           MBeanInfo info = server.getMBeanInfo(name);
 
           MBeanAttributeInfo[] mbeanAttrs = info.getAttributes();
-          HashMap<String, Object> attributes = new HashMap<String, Object>();
+          Map<String, Object> attributes = new TreeMap<String, Object>();
 
           for (MBeanAttributeInfo attrInfo : mbeanAttrs) {
             Object obj = server.getMBeanAttribute(name, attrInfo.getName());
diff --git a/azkaban-execserver/src/main/java/azkaban/execapp/JobRunner.java b/azkaban-execserver/src/main/java/azkaban/execapp/JobRunner.java
index 63e139c..b4040c0 100644
--- a/azkaban-execserver/src/main/java/azkaban/execapp/JobRunner.java
+++ b/azkaban-execserver/src/main/java/azkaban/execapp/JobRunner.java
@@ -43,6 +43,7 @@ import azkaban.executor.ExecutorManagerException;
 import azkaban.executor.Status;
 import azkaban.flow.CommonJobProperties;
 import azkaban.jobExecutor.AbstractProcessJob;
+import azkaban.jobExecutor.JavaProcessJob;
 import azkaban.jobExecutor.Job;
 import azkaban.jobtype.JobTypeManager;
 import azkaban.jobtype.JobTypeManagerException;
@@ -488,6 +489,7 @@ public class JobRunner extends EventHandler implements Runnable {
       }
 
       insertLinks();
+      insertJVMAargs();
 
       props.put(CommonJobProperties.JOB_ATTEMPT, node.getAttempt());
       props.put(CommonJobProperties.JOB_METADATA_FILE,
@@ -521,6 +523,26 @@ public class JobRunner extends EventHandler implements Runnable {
   }
 
   /**
+   * Add useful JVM arguments so it is easier to map a running Java process to a
+   * flow, execution id and job
+   */
+  private void insertJVMAargs() {
+    String flowName = node.getParentFlow().getFlowId();
+    String jobId = node.getId();
+
+    String jobJVMArgs =
+        String.format(
+            "-Dazkaban.flowid=%s -Dazkaban.execid=%s -Dazkaban.jobid=%s",
+            flowName, executionId, jobId);
+    
+    String previousJVMArgs = props.get(JavaProcessJob.JVM_PARAMS);    
+    jobJVMArgs += (previousJVMArgs == null) ? "" : " " + previousJVMArgs; 
+
+    logger.info("job JVM args: " + jobJVMArgs);
+    props.put(JavaProcessJob.JVM_PARAMS, jobJVMArgs);
+  }
+
+  /**
    * Add relevant links to the job properties so that downstream consumers may
    * know what executions initiated their execution.
    */
diff --git a/azkaban-webserver/src/main/java/azkaban/webapp/servlet/JMXHttpServlet.java b/azkaban-webserver/src/main/java/azkaban/webapp/servlet/JMXHttpServlet.java
index 2c690d9..f1760a1 100644
--- a/azkaban-webserver/src/main/java/azkaban/webapp/servlet/JMXHttpServlet.java
+++ b/azkaban-webserver/src/main/java/azkaban/webapp/servlet/JMXHttpServlet.java
@@ -19,6 +19,7 @@ package azkaban.webapp.servlet;
 import java.io.IOException;
 import java.util.HashMap;
 import java.util.Map;
+import java.util.TreeMap;
 
 import javax.management.MBeanAttributeInfo;
 import javax.management.MBeanInfo;
@@ -95,6 +96,14 @@ public class JMXHttpServlet extends LoginAbstractAzkabanServlet implements
         Map<String, Object> result =
             executorManager.callExecutorJMX(hostPort,
                 JMX_GET_ALL_MBEAN_ATTRIBUTES, mbean);
+        // order the attribute by name
+        for (Map.Entry<String, Object> entry : result.entrySet()) {
+          if (entry.getValue() instanceof Map) {
+            @SuppressWarnings("unchecked")
+            Map<String, Object> entryValue = (Map<String, Object>) entry.getValue();
+            result.put(entry.getKey(), new TreeMap<String,Object>(entryValue));
+          }
+        }
         ret = result;
       } else if (JMX_GET_MBEANS.equals(ajax)) {
         ret.put("mbeans", server.getMbeanNames());
@@ -139,7 +148,7 @@ public class JMXHttpServlet extends LoginAbstractAzkabanServlet implements
             MBeanInfo info = server.getMBeanInfo(name);
 
             MBeanAttributeInfo[] mbeanAttrs = info.getAttributes();
-            HashMap<String, Object> attributes = new HashMap<String, Object>();
+            Map<String, Object> attributes = new TreeMap<String, Object>();
 
             for (MBeanAttributeInfo attrInfo : mbeanAttrs) {
               Object obj = server.getMBeanAttribute(name, attrInfo.getName());