azkaban-aplcache

Faster JobCallbackRequestMakerTest (#1114) This test

5/26/2017 3:41:10 PM

Details

diff --git a/azkaban-common/src/main/java/azkaban/Constants.java b/azkaban-common/src/main/java/azkaban/Constants.java
index 4abad36..f3f1ecb 100644
--- a/azkaban-common/src/main/java/azkaban/Constants.java
+++ b/azkaban-common/src/main/java/azkaban/Constants.java
@@ -111,4 +111,12 @@ public class Constants {
     // Job property that enables/disables using Kafka logging of user job logs
     public static final String AZKABAN_JOB_LOGGING_KAFKA_ENABLE = "azkaban.job.logging.kafka.enable";
   }
+
+  public static class JobCallbackProperties {
+    public static final String JOBCALLBACK_CONNECTION_REQUEST_TIMEOUT = "jobcallback.connection.request.timeout";
+    public static final String JOBCALLBACK_CONNECTION_TIMEOUT = "jobcallback.connection.timeout";
+    public static final String JOBCALLBACK_SOCKET_TIMEOUT = "jobcallback.socket.timeout";
+    public static final String JOBCALLBACK_RESPONSE_WAIT_TIMEOUT = "jobcallback.response.wait.timeout";
+    public static final String JOBCALLBACK_THREAD_POOL_SIZE = "jobcallback.thread.pool.size";
+  }
 }
diff --git a/azkaban-exec-server/src/main/java/azkaban/execapp/event/JobCallbackRequestMaker.java b/azkaban-exec-server/src/main/java/azkaban/execapp/event/JobCallbackRequestMaker.java
index 9446f64..2a84151 100644
--- a/azkaban-exec-server/src/main/java/azkaban/execapp/event/JobCallbackRequestMaker.java
+++ b/azkaban-exec-server/src/main/java/azkaban/execapp/event/JobCallbackRequestMaker.java
@@ -1,5 +1,11 @@
 package azkaban.execapp.event;
 
+import static azkaban.Constants.JobCallbackProperties.JOBCALLBACK_CONNECTION_TIMEOUT;
+import static azkaban.Constants.JobCallbackProperties.JOBCALLBACK_CONNECTION_REQUEST_TIMEOUT;
+import static azkaban.Constants.JobCallbackProperties.JOBCALLBACK_RESPONSE_WAIT_TIMEOUT;
+import static azkaban.Constants.JobCallbackProperties.JOBCALLBACK_SOCKET_TIMEOUT;
+import static azkaban.Constants.JobCallbackProperties.JOBCALLBACK_THREAD_POOL_SIZE;
+
 import java.io.BufferedReader;
 import java.io.IOException;
 import java.io.InputStreamReader;
@@ -84,18 +90,14 @@ public class JobCallbackRequestMaker {
   private JobCallbackRequestMaker(Props props) {
 
     int connectionRequestTimeout =
-        props.getInt("jobcallback.connection.request.timeout",
-            DEFAULT_TIME_OUT_MS);
+        props.getInt(JOBCALLBACK_CONNECTION_REQUEST_TIMEOUT, DEFAULT_TIME_OUT_MS);
 
-    int connectionTimeout =
-        props.getInt("jobcallback.connection.timeout", DEFAULT_TIME_OUT_MS);
+    int connectionTimeout = props.getInt(JOBCALLBACK_CONNECTION_TIMEOUT, DEFAULT_TIME_OUT_MS);
 
-    int socketTimeout =
-        props.getInt("jobcallback.socket.timeout", DEFAULT_TIME_OUT_MS);
+    int socketTimeout = props.getInt(JOBCALLBACK_SOCKET_TIMEOUT, DEFAULT_TIME_OUT_MS);
 
     responseWaitTimeoutMS =
-        props.getInt("jobcallback.response.wait.timeout",
-            DEFAULT_RESPONSE_WAIT_TIME_OUT_MS);
+        props.getInt(JOBCALLBACK_RESPONSE_WAIT_TIMEOUT, DEFAULT_RESPONSE_WAIT_TIME_OUT_MS);
 
     logger.info("responseWaitTimeoutMS: " + responseWaitTimeoutMS);
 
@@ -112,7 +114,7 @@ public class JobCallbackRequestMaker {
             .build();
 
     int jobCallbackThreadPoolSize =
-        props.getInt("jobcallback.thread.pool.size", DEFAULT_THREAD_POOL_SIZE);
+        props.getInt(JOBCALLBACK_THREAD_POOL_SIZE, DEFAULT_THREAD_POOL_SIZE);
     logger.info("Jobcall thread pool size: " + jobCallbackThreadPoolSize);
 
     ExecutorService executorService =
diff --git a/azkaban-exec-server/src/test/java/azkaban/execapp/event/JobCallbackRequestMakerTest.java b/azkaban-exec-server/src/test/java/azkaban/execapp/event/JobCallbackRequestMakerTest.java
index bd14da4..9c654b9 100644
--- a/azkaban-exec-server/src/test/java/azkaban/execapp/event/JobCallbackRequestMakerTest.java
+++ b/azkaban-exec-server/src/test/java/azkaban/execapp/event/JobCallbackRequestMakerTest.java
@@ -1,5 +1,8 @@
 package azkaban.execapp.event;
 
+import static azkaban.Constants.JobCallbackProperties.JOBCALLBACK_CONNECTION_REQUEST_TIMEOUT;
+import static azkaban.Constants.JobCallbackProperties.JOBCALLBACK_CONNECTION_TIMEOUT;
+import static azkaban.Constants.JobCallbackProperties.JOBCALLBACK_SOCKET_TIMEOUT;
 import static azkaban.jobcallback.JobCallbackConstants.CONTEXT_EXECUTION_ID_TOKEN;
 import static azkaban.jobcallback.JobCallbackConstants.CONTEXT_FLOW_TOKEN;
 import static azkaban.jobcallback.JobCallbackConstants.CONTEXT_JOB_STATUS_TOKEN;
@@ -57,29 +60,29 @@ public class JobCallbackRequestMakerTest {
 
   @BeforeClass
   public static void setup() throws Exception {
-    try {
-      JobCallbackRequestMaker.initialize(new Props());
-      jobCBMaker = JobCallbackRequestMaker.getInstance();
-
-      contextInfo = new HashMap<String, String>();
-      contextInfo.put(CONTEXT_SERVER_TOKEN, SERVER_NAME);
-      contextInfo.put(CONTEXT_PROJECT_TOKEN, PROJECT_NANE);
-      contextInfo.put(CONTEXT_FLOW_TOKEN, FLOW_NANE);
-      contextInfo.put(CONTEXT_EXECUTION_ID_TOKEN, EXECUTION_ID);
-      contextInfo.put(CONTEXT_JOB_TOKEN, JOB_NANE);
-      contextInfo.put(CONTEXT_JOB_STATUS_TOKEN, JobCallbackStatusEnum.STARTED.name());
-
-      embeddedJettyServer = new Server(PORT_NUMBER);
-
-      Context context = new Context(embeddedJettyServer, "/", Context.SESSIONS);
-      context.addServlet(new ServletHolder(new DelayServlet()), "/delay");
-
-      System.out.println("Start server");
-      embeddedJettyServer.start();
-    } catch (Exception e) {
-      e.printStackTrace();
-      throw e;
-    }
+    Props props = new Props();
+    int timeout = 50;
+    props.put(JOBCALLBACK_CONNECTION_REQUEST_TIMEOUT, timeout);
+    props.put(JOBCALLBACK_CONNECTION_TIMEOUT, timeout);
+    props.put(JOBCALLBACK_SOCKET_TIMEOUT, timeout);
+    JobCallbackRequestMaker.initialize(props);
+    jobCBMaker = JobCallbackRequestMaker.getInstance();
+
+    contextInfo = new HashMap<String, String>();
+    contextInfo.put(CONTEXT_SERVER_TOKEN, SERVER_NAME);
+    contextInfo.put(CONTEXT_PROJECT_TOKEN, PROJECT_NANE);
+    contextInfo.put(CONTEXT_FLOW_TOKEN, FLOW_NANE);
+    contextInfo.put(CONTEXT_EXECUTION_ID_TOKEN, EXECUTION_ID);
+    contextInfo.put(CONTEXT_JOB_TOKEN, JOB_NANE);
+    contextInfo.put(CONTEXT_JOB_STATUS_TOKEN, JobCallbackStatusEnum.STARTED.name());
+
+    embeddedJettyServer = new Server(PORT_NUMBER);
+
+    Context context = new Context(embeddedJettyServer, "/", Context.SESSIONS);
+    context.addServlet(new ServletHolder(new DelayServlet()), "/delay");
+
+    System.out.println("Start server");
+    embeddedJettyServer.start();
   }
 
   @AfterClass