Details
diff --git a/azkaban-common/src/main/java/azkaban/jobcallback/JobCallbackValidator.java b/azkaban-common/src/main/java/azkaban/jobcallback/JobCallbackValidator.java
index 9f9ea38..7b1462f 100644
--- a/azkaban-common/src/main/java/azkaban/jobcallback/JobCallbackValidator.java
+++ b/azkaban-common/src/main/java/azkaban/jobcallback/JobCallbackValidator.java
@@ -77,12 +77,21 @@ public class JobCallbackValidator {
JOB_CALLBACK_BODY_TEMPLATE.replaceFirst(STATUS_TOKEN, jobStatus.name()
.toLowerCase());
- for (int i = 1; i <= maxNumCallback; i++) {
+ for (int i = 0; i <= maxNumCallback; i++) {
// callback url
String callbackUrlKey =
jobCallBackUrl.replaceFirst(SEQUENCE_TOKEN, Integer.toString(i));
String callbackUrlValue = jobProps.get(callbackUrlKey);
+ // sequence number should start at 1, this is to check for sequence
+ // number that starts a 0
+ if (i == 0) {
+ if (callbackUrlValue != null) {
+ errors.add("Sequence number starts at 1, not 0");
+ }
+ continue;
+ }
+
if (callbackUrlValue == null || callbackUrlValue.length() == 0) {
break;
} else {
diff --git a/azkaban-common/src/test/java/azkaban/jobcallback/JobCallbackValidatorTest.java b/azkaban-common/src/test/java/azkaban/jobcallback/JobCallbackValidatorTest.java
index 53c76ff..9741b6a 100644
--- a/azkaban-common/src/test/java/azkaban/jobcallback/JobCallbackValidatorTest.java
+++ b/azkaban-common/src/test/java/azkaban/jobcallback/JobCallbackValidatorTest.java
@@ -35,6 +35,25 @@ public class JobCallbackValidatorTest {
}
@Test
+ public void sequenceStartWithZeroProps() {
+ Props jobProps = new Props();
+ Set<String> errors = new HashSet<String>();
+
+ jobProps.put("job.notification."
+ + JobCallbackStatusEnum.FAILURE.name().toLowerCase() + ".0.url",
+ "http://www.linkedin.com");
+
+ jobProps.put("job.notification."
+ + JobCallbackStatusEnum.COMPLETED.name().toLowerCase() + ".1.url",
+ "http://www.linkedin.com");
+
+ Assert.assertEquals(1, JobCallbackValidator.validate("bogusJob",
+ serverProps, jobProps, errors));
+
+ Assert.assertEquals(1, errors.size());
+ }
+
+ @Test
public void oneGetJobCallback() {
Props jobProps = new Props();
jobProps.put("job.notification."
diff --git a/azkaban-execserver/src/main/java/azkaban/execapp/AzkabanExecutorServer.java b/azkaban-execserver/src/main/java/azkaban/execapp/AzkabanExecutorServer.java
index 6fb4540..39004b0 100644
--- a/azkaban-execserver/src/main/java/azkaban/execapp/AzkabanExecutorServer.java
+++ b/azkaban-execserver/src/main/java/azkaban/execapp/AzkabanExecutorServer.java
@@ -37,7 +37,6 @@ import org.mortbay.jetty.Connector;
import org.mortbay.jetty.Server;
import org.mortbay.jetty.servlet.Context;
import org.mortbay.jetty.servlet.ServletHolder;
-import org.mortbay.log.Log;
import org.mortbay.thread.QueuedThreadPool;
import azkaban.execapp.event.JobCallbackManager;
@@ -79,7 +78,7 @@ public class AzkabanExecutorServer {
public static final String METRIC_INTERVAL =
"executor.metric.milisecinterval.";
public static final int DEFAULT_PORT_NUMBER = 12321;
- public static final int DEFAULT_HEADER_BUFFER_SIZE = 4096;
+ public static final int DEFAULT_HEADER_BUFFER_SIZE = 4096;
private static final String DEFAULT_TIMEZONE_ID = "default.timezone.id";
private static final int DEFAULT_THREAD_NUMBER = 50;
@@ -113,14 +112,17 @@ public class AzkabanExecutorServer {
boolean isStatsOn = props.getBoolean("executor.connector.stats", true);
logger.info("Setting up connector with stats on: " + isStatsOn);
-
- for (Connector connector : server.getConnectors()) {
+
+ for (Connector connector : server.getConnectors()) {
connector.setStatsOn(isStatsOn);
- logger.info(String.format("Jetty connector name: %s, default header buffer size: %d",
- connector.getName(), connector.getHeaderBufferSize()));
- connector.setHeaderBufferSize(props.getInt("jetty.headerBufferSize", DEFAULT_HEADER_BUFFER_SIZE));
- logger.info(String.format("Jetty connector name: %s, (if) new header buffer size: %d",
- connector.getName(), connector.getHeaderBufferSize()));
+ logger.info(String.format(
+ "Jetty connector name: %s, default header buffer size: %d",
+ connector.getName(), connector.getHeaderBufferSize()));
+ connector.setHeaderBufferSize(props.getInt("jetty.headerBufferSize",
+ DEFAULT_HEADER_BUFFER_SIZE));
+ logger.info(String.format(
+ "Jetty connector name: %s, (if) new header buffer size: %d",
+ connector.getName(), connector.getHeaderBufferSize()));
}
Context root = new Context(server, "/", Context.SESSIONS);
@@ -217,11 +219,20 @@ public class AzkabanExecutorServer {
logger.info("Completed configuring Metric Reports");
}
- // initialize event emitter
- // AsyncEventEmitterFactory.getInstance().initialize(props);
-
}
+ /**
+ * Load a custom class, which is provided by a configuration
+ * CUSTOM_JMX_ATTRIBUTE_PROCESSOR_PROPERTY.
+ *
+ * This method will try to instantiate an instance of this custom class and
+ * with given properties as the argument in the constructor.
+ *
+ * Basically the custom class must have a constructor that takes an argument
+ * with type Properties.
+ *
+ * @param props
+ */
private void loadCustomJMXAttributeProcessor(Props props) {
String jmxAttributeEmitter =
props.get(CUSTOM_JMX_ATTRIBUTE_PROCESSOR_PROPERTY);
diff --git a/azkaban-execserver/src/main/java/azkaban/execapp/event/JobCallbackManager.java b/azkaban-execserver/src/main/java/azkaban/execapp/event/JobCallbackManager.java
index 2714c12..c417b1c 100644
--- a/azkaban-execserver/src/main/java/azkaban/execapp/event/JobCallbackManager.java
+++ b/azkaban-execserver/src/main/java/azkaban/execapp/event/JobCallbackManager.java
@@ -61,6 +61,11 @@ public class JobCallbackManager implements EventListener {
{ SUCCESS, FAILURE, COMPLETED };
public static void initialize(Props props) {
+ if (isInitialized) {
+ logger.info("Already initialized");
+ return;
+ }
+
logger.info("Initializing");
instance = new JobCallbackManager(props);
diff --git a/azkaban-execserver/src/main/java/azkaban/execapp/event/JobCallbackRequestMaker.java b/azkaban-execserver/src/main/java/azkaban/execapp/event/JobCallbackRequestMaker.java
index f94afe0..44ea07d 100644
--- a/azkaban-execserver/src/main/java/azkaban/execapp/event/JobCallbackRequestMaker.java
+++ b/azkaban-execserver/src/main/java/azkaban/execapp/event/JobCallbackRequestMaker.java
@@ -58,6 +58,11 @@ public class JobCallbackRequestMaker {
if (props == null) {
throw new NullPointerException("props argument can't be null");
}
+
+ if (isInitialized) {
+ return;
+ }
+
instance = new JobCallbackRequestMaker(props);
isInitialized = true;
logger.info("Initialization for " + JobCallbackRequestMaker.class.getName()
diff --git a/azkaban-execserver/src/main/java/azkaban/execapp/jmx/JmxJobMBeanManager.java b/azkaban-execserver/src/main/java/azkaban/execapp/jmx/JmxJobMBeanManager.java
index 686347d..4fb0c62 100644
--- a/azkaban-execserver/src/main/java/azkaban/execapp/jmx/JmxJobMBeanManager.java
+++ b/azkaban-execserver/src/main/java/azkaban/execapp/jmx/JmxJobMBeanManager.java
@@ -104,7 +104,7 @@ public class JmxJobMBeanManager implements JmxJobMXBean, EventListener {
ExecutableNode node = jobRunner.getNode();
if (logger.isDebugEnabled()) {
- logger.info("*** got " + event.getType() + " " + node.getId() + " "
+ logger.debug("*** got " + event.getType() + " " + node.getId() + " "
+ event.getRunner().getClass().getName() + " status: "
+ node.getStatus());
}