azkaban-developers

Details

diff --git a/azkaban-common/src/main/java/azkaban/jobcallback/JobCallbackValidator.java b/azkaban-common/src/main/java/azkaban/jobcallback/JobCallbackValidator.java
index 9f9ea38..7b1462f 100644
--- a/azkaban-common/src/main/java/azkaban/jobcallback/JobCallbackValidator.java
+++ b/azkaban-common/src/main/java/azkaban/jobcallback/JobCallbackValidator.java
@@ -77,12 +77,21 @@ public class JobCallbackValidator {
         JOB_CALLBACK_BODY_TEMPLATE.replaceFirst(STATUS_TOKEN, jobStatus.name()
             .toLowerCase());
 
-    for (int i = 1; i <= maxNumCallback; i++) {
+    for (int i = 0; i <= maxNumCallback; i++) {
       // callback url
       String callbackUrlKey =
           jobCallBackUrl.replaceFirst(SEQUENCE_TOKEN, Integer.toString(i));
       String callbackUrlValue = jobProps.get(callbackUrlKey);
 
+      // sequence number should start at 1, this is to check for sequence
+      // number that starts a 0
+      if (i == 0) {
+        if (callbackUrlValue != null) {
+          errors.add("Sequence number starts at 1, not 0");
+        }
+        continue;
+      }
+
       if (callbackUrlValue == null || callbackUrlValue.length() == 0) {
         break;
       } else {
diff --git a/azkaban-common/src/test/java/azkaban/jobcallback/JobCallbackValidatorTest.java b/azkaban-common/src/test/java/azkaban/jobcallback/JobCallbackValidatorTest.java
index 53c76ff..9741b6a 100644
--- a/azkaban-common/src/test/java/azkaban/jobcallback/JobCallbackValidatorTest.java
+++ b/azkaban-common/src/test/java/azkaban/jobcallback/JobCallbackValidatorTest.java
@@ -35,6 +35,25 @@ public class JobCallbackValidatorTest {
   }
 
   @Test
+  public void sequenceStartWithZeroProps() {
+    Props jobProps = new Props();
+    Set<String> errors = new HashSet<String>();
+
+    jobProps.put("job.notification."
+        + JobCallbackStatusEnum.FAILURE.name().toLowerCase() + ".0.url",
+        "http://www.linkedin.com");
+
+    jobProps.put("job.notification."
+        + JobCallbackStatusEnum.COMPLETED.name().toLowerCase() + ".1.url",
+        "http://www.linkedin.com");
+
+    Assert.assertEquals(1, JobCallbackValidator.validate("bogusJob",
+        serverProps, jobProps, errors));
+
+    Assert.assertEquals(1, errors.size());
+  }
+
+  @Test
   public void oneGetJobCallback() {
     Props jobProps = new Props();
     jobProps.put("job.notification."
diff --git a/azkaban-execserver/src/main/java/azkaban/execapp/AzkabanExecutorServer.java b/azkaban-execserver/src/main/java/azkaban/execapp/AzkabanExecutorServer.java
index 6fb4540..39004b0 100644
--- a/azkaban-execserver/src/main/java/azkaban/execapp/AzkabanExecutorServer.java
+++ b/azkaban-execserver/src/main/java/azkaban/execapp/AzkabanExecutorServer.java
@@ -37,7 +37,6 @@ import org.mortbay.jetty.Connector;
 import org.mortbay.jetty.Server;
 import org.mortbay.jetty.servlet.Context;
 import org.mortbay.jetty.servlet.ServletHolder;
-import org.mortbay.log.Log;
 import org.mortbay.thread.QueuedThreadPool;
 
 import azkaban.execapp.event.JobCallbackManager;
@@ -79,7 +78,7 @@ public class AzkabanExecutorServer {
   public static final String METRIC_INTERVAL =
       "executor.metric.milisecinterval.";
   public static final int DEFAULT_PORT_NUMBER = 12321;
-  public static final int DEFAULT_HEADER_BUFFER_SIZE = 4096;  
+  public static final int DEFAULT_HEADER_BUFFER_SIZE = 4096;
 
   private static final String DEFAULT_TIMEZONE_ID = "default.timezone.id";
   private static final int DEFAULT_THREAD_NUMBER = 50;
@@ -113,14 +112,17 @@ public class AzkabanExecutorServer {
 
     boolean isStatsOn = props.getBoolean("executor.connector.stats", true);
     logger.info("Setting up connector with stats on: " + isStatsOn);
-    
-    for (Connector connector : server.getConnectors()) {      
+
+    for (Connector connector : server.getConnectors()) {
       connector.setStatsOn(isStatsOn);
-      logger.info(String.format("Jetty connector name: %s, default header buffer size: %d",
-              connector.getName(), connector.getHeaderBufferSize()));     
-      connector.setHeaderBufferSize(props.getInt("jetty.headerBufferSize", DEFAULT_HEADER_BUFFER_SIZE));
-      logger.info(String.format("Jetty connector name: %s, (if) new header buffer size: %d",
-              connector.getName(), connector.getHeaderBufferSize()));
+      logger.info(String.format(
+          "Jetty connector name: %s, default header buffer size: %d",
+          connector.getName(), connector.getHeaderBufferSize()));
+      connector.setHeaderBufferSize(props.getInt("jetty.headerBufferSize",
+          DEFAULT_HEADER_BUFFER_SIZE));
+      logger.info(String.format(
+          "Jetty connector name: %s, (if) new header buffer size: %d",
+          connector.getName(), connector.getHeaderBufferSize()));
     }
 
     Context root = new Context(server, "/", Context.SESSIONS);
@@ -217,11 +219,20 @@ public class AzkabanExecutorServer {
       logger.info("Completed configuring Metric Reports");
     }
 
-    // initialize event emitter
-    // AsyncEventEmitterFactory.getInstance().initialize(props);
-
   }
 
+  /**
+   * Load a custom class, which is provided by a configuration
+   * CUSTOM_JMX_ATTRIBUTE_PROCESSOR_PROPERTY.
+   * 
+   * This method will try to instantiate an instance of this custom class and
+   * with given properties as the argument in the constructor.
+   * 
+   * Basically the custom class must have a constructor that takes an argument
+   * with type Properties.
+   * 
+   * @param props
+   */
   private void loadCustomJMXAttributeProcessor(Props props) {
     String jmxAttributeEmitter =
         props.get(CUSTOM_JMX_ATTRIBUTE_PROCESSOR_PROPERTY);
diff --git a/azkaban-execserver/src/main/java/azkaban/execapp/event/JobCallbackManager.java b/azkaban-execserver/src/main/java/azkaban/execapp/event/JobCallbackManager.java
index 2714c12..c417b1c 100644
--- a/azkaban-execserver/src/main/java/azkaban/execapp/event/JobCallbackManager.java
+++ b/azkaban-execserver/src/main/java/azkaban/execapp/event/JobCallbackManager.java
@@ -61,6 +61,11 @@ public class JobCallbackManager implements EventListener {
       { SUCCESS, FAILURE, COMPLETED };
 
   public static void initialize(Props props) {
+    if (isInitialized) {
+      logger.info("Already initialized");
+      return;
+    }
+
     logger.info("Initializing");
     instance = new JobCallbackManager(props);
 
diff --git a/azkaban-execserver/src/main/java/azkaban/execapp/event/JobCallbackRequestMaker.java b/azkaban-execserver/src/main/java/azkaban/execapp/event/JobCallbackRequestMaker.java
index f94afe0..44ea07d 100644
--- a/azkaban-execserver/src/main/java/azkaban/execapp/event/JobCallbackRequestMaker.java
+++ b/azkaban-execserver/src/main/java/azkaban/execapp/event/JobCallbackRequestMaker.java
@@ -58,6 +58,11 @@ public class JobCallbackRequestMaker {
     if (props == null) {
       throw new NullPointerException("props argument can't be null");
     }
+
+    if (isInitialized) {
+      return;
+    }
+
     instance = new JobCallbackRequestMaker(props);
     isInitialized = true;
     logger.info("Initialization for " + JobCallbackRequestMaker.class.getName()
diff --git a/azkaban-execserver/src/main/java/azkaban/execapp/jmx/JmxJobMBeanManager.java b/azkaban-execserver/src/main/java/azkaban/execapp/jmx/JmxJobMBeanManager.java
index 686347d..4fb0c62 100644
--- a/azkaban-execserver/src/main/java/azkaban/execapp/jmx/JmxJobMBeanManager.java
+++ b/azkaban-execserver/src/main/java/azkaban/execapp/jmx/JmxJobMBeanManager.java
@@ -104,7 +104,7 @@ public class JmxJobMBeanManager implements JmxJobMXBean, EventListener {
       ExecutableNode node = jobRunner.getNode();
 
       if (logger.isDebugEnabled()) {
-        logger.info("*** got " + event.getType() + " " + node.getId() + " "
+        logger.debug("*** got " + event.getType() + " " + node.getId() + " "
             + event.getRunner().getClass().getName() + " status: "
             + node.getStatus());
       }