diff --git a/azkaban-common/src/main/java/azkaban/Constants.java b/azkaban-common/src/main/java/azkaban/Constants.java
index 4abad36..f3f1ecb 100644
--- a/azkaban-common/src/main/java/azkaban/Constants.java
+++ b/azkaban-common/src/main/java/azkaban/Constants.java
@@ -111,4 +111,12 @@ public class Constants {
// Job property that enables/disables using Kafka logging of user job logs
public static final String AZKABAN_JOB_LOGGING_KAFKA_ENABLE = "azkaban.job.logging.kafka.enable";
}
+
+ public static class JobCallbackProperties {
+ public static final String JOBCALLBACK_CONNECTION_REQUEST_TIMEOUT = "jobcallback.connection.request.timeout";
+ public static final String JOBCALLBACK_CONNECTION_TIMEOUT = "jobcallback.connection.timeout";
+ public static final String JOBCALLBACK_SOCKET_TIMEOUT = "jobcallback.socket.timeout";
+ public static final String JOBCALLBACK_RESPONSE_WAIT_TIMEOUT = "jobcallback.response.wait.timeout";
+ public static final String JOBCALLBACK_THREAD_POOL_SIZE = "jobcallback.thread.pool.size";
+ }
}
diff --git a/azkaban-exec-server/src/main/java/azkaban/execapp/event/JobCallbackRequestMaker.java b/azkaban-exec-server/src/main/java/azkaban/execapp/event/JobCallbackRequestMaker.java
index 9446f64..2a84151 100644
--- a/azkaban-exec-server/src/main/java/azkaban/execapp/event/JobCallbackRequestMaker.java
+++ b/azkaban-exec-server/src/main/java/azkaban/execapp/event/JobCallbackRequestMaker.java
@@ -1,5 +1,11 @@
package azkaban.execapp.event;
+import static azkaban.Constants.JobCallbackProperties.JOBCALLBACK_CONNECTION_TIMEOUT;
+import static azkaban.Constants.JobCallbackProperties.JOBCALLBACK_CONNECTION_REQUEST_TIMEOUT;
+import static azkaban.Constants.JobCallbackProperties.JOBCALLBACK_RESPONSE_WAIT_TIMEOUT;
+import static azkaban.Constants.JobCallbackProperties.JOBCALLBACK_SOCKET_TIMEOUT;
+import static azkaban.Constants.JobCallbackProperties.JOBCALLBACK_THREAD_POOL_SIZE;
+
import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
@@ -84,18 +90,14 @@ public class JobCallbackRequestMaker {
private JobCallbackRequestMaker(Props props) {
int connectionRequestTimeout =
- props.getInt("jobcallback.connection.request.timeout",
- DEFAULT_TIME_OUT_MS);
+ props.getInt(JOBCALLBACK_CONNECTION_REQUEST_TIMEOUT, DEFAULT_TIME_OUT_MS);
- int connectionTimeout =
- props.getInt("jobcallback.connection.timeout", DEFAULT_TIME_OUT_MS);
+ int connectionTimeout = props.getInt(JOBCALLBACK_CONNECTION_TIMEOUT, DEFAULT_TIME_OUT_MS);
- int socketTimeout =
- props.getInt("jobcallback.socket.timeout", DEFAULT_TIME_OUT_MS);
+ int socketTimeout = props.getInt(JOBCALLBACK_SOCKET_TIMEOUT, DEFAULT_TIME_OUT_MS);
responseWaitTimeoutMS =
- props.getInt("jobcallback.response.wait.timeout",
- DEFAULT_RESPONSE_WAIT_TIME_OUT_MS);
+ props.getInt(JOBCALLBACK_RESPONSE_WAIT_TIMEOUT, DEFAULT_RESPONSE_WAIT_TIME_OUT_MS);
logger.info("responseWaitTimeoutMS: " + responseWaitTimeoutMS);
@@ -112,7 +114,7 @@ public class JobCallbackRequestMaker {
.build();
int jobCallbackThreadPoolSize =
- props.getInt("jobcallback.thread.pool.size", DEFAULT_THREAD_POOL_SIZE);
+ props.getInt(JOBCALLBACK_THREAD_POOL_SIZE, DEFAULT_THREAD_POOL_SIZE);
logger.info("Jobcall thread pool size: " + jobCallbackThreadPoolSize);
ExecutorService executorService =
diff --git a/azkaban-exec-server/src/test/java/azkaban/execapp/event/JobCallbackRequestMakerTest.java b/azkaban-exec-server/src/test/java/azkaban/execapp/event/JobCallbackRequestMakerTest.java
index bd14da4..9c654b9 100644
--- a/azkaban-exec-server/src/test/java/azkaban/execapp/event/JobCallbackRequestMakerTest.java
+++ b/azkaban-exec-server/src/test/java/azkaban/execapp/event/JobCallbackRequestMakerTest.java
@@ -1,5 +1,8 @@
package azkaban.execapp.event;
+import static azkaban.Constants.JobCallbackProperties.JOBCALLBACK_CONNECTION_REQUEST_TIMEOUT;
+import static azkaban.Constants.JobCallbackProperties.JOBCALLBACK_CONNECTION_TIMEOUT;
+import static azkaban.Constants.JobCallbackProperties.JOBCALLBACK_SOCKET_TIMEOUT;
import static azkaban.jobcallback.JobCallbackConstants.CONTEXT_EXECUTION_ID_TOKEN;
import static azkaban.jobcallback.JobCallbackConstants.CONTEXT_FLOW_TOKEN;
import static azkaban.jobcallback.JobCallbackConstants.CONTEXT_JOB_STATUS_TOKEN;
@@ -57,29 +60,29 @@ public class JobCallbackRequestMakerTest {
@BeforeClass
public static void setup() throws Exception {
- try {
- JobCallbackRequestMaker.initialize(new Props());
- jobCBMaker = JobCallbackRequestMaker.getInstance();
-
- contextInfo = new HashMap<String, String>();
- contextInfo.put(CONTEXT_SERVER_TOKEN, SERVER_NAME);
- contextInfo.put(CONTEXT_PROJECT_TOKEN, PROJECT_NANE);
- contextInfo.put(CONTEXT_FLOW_TOKEN, FLOW_NANE);
- contextInfo.put(CONTEXT_EXECUTION_ID_TOKEN, EXECUTION_ID);
- contextInfo.put(CONTEXT_JOB_TOKEN, JOB_NANE);
- contextInfo.put(CONTEXT_JOB_STATUS_TOKEN, JobCallbackStatusEnum.STARTED.name());
-
- embeddedJettyServer = new Server(PORT_NUMBER);
-
- Context context = new Context(embeddedJettyServer, "/", Context.SESSIONS);
- context.addServlet(new ServletHolder(new DelayServlet()), "/delay");
-
- System.out.println("Start server");
- embeddedJettyServer.start();
- } catch (Exception e) {
- e.printStackTrace();
- throw e;
- }
+ Props props = new Props();
+ int timeout = 50;
+ props.put(JOBCALLBACK_CONNECTION_REQUEST_TIMEOUT, timeout);
+ props.put(JOBCALLBACK_CONNECTION_TIMEOUT, timeout);
+ props.put(JOBCALLBACK_SOCKET_TIMEOUT, timeout);
+ JobCallbackRequestMaker.initialize(props);
+ jobCBMaker = JobCallbackRequestMaker.getInstance();
+
+ contextInfo = new HashMap<String, String>();
+ contextInfo.put(CONTEXT_SERVER_TOKEN, SERVER_NAME);
+ contextInfo.put(CONTEXT_PROJECT_TOKEN, PROJECT_NANE);
+ contextInfo.put(CONTEXT_FLOW_TOKEN, FLOW_NANE);
+ contextInfo.put(CONTEXT_EXECUTION_ID_TOKEN, EXECUTION_ID);
+ contextInfo.put(CONTEXT_JOB_TOKEN, JOB_NANE);
+ contextInfo.put(CONTEXT_JOB_STATUS_TOKEN, JobCallbackStatusEnum.STARTED.name());
+
+ embeddedJettyServer = new Server(PORT_NUMBER);
+
+ Context context = new Context(embeddedJettyServer, "/", Context.SESSIONS);
+ context.addServlet(new ServletHolder(new DelayServlet()), "/delay");
+
+ System.out.println("Start server");
+ embeddedJettyServer.start();
}
@AfterClass