Details
diff --git a/azkaban-common/src/main/java/azkaban/executor/ExecutorApiGateway.java b/azkaban-common/src/main/java/azkaban/executor/ExecutorApiGateway.java
index 30fc44b..b3ad9c7 100644
--- a/azkaban-common/src/main/java/azkaban/executor/ExecutorApiGateway.java
+++ b/azkaban-common/src/main/java/azkaban/executor/ExecutorApiGateway.java
@@ -118,10 +118,9 @@ public class ExecutorApiGateway {
}
@SuppressWarnings("unchecked") final URI uri =
- ExecutorApiClient.buildUri(host, port, path, true,
- paramList.toArray(new Pair[0]));
+ ExecutorApiClient.buildUri(host, port, path, true);
- return this.apiClient.httpGet(uri, null);
+ return this.apiClient.httpPost(uri, paramList);
}
}
diff --git a/azkaban-common/src/main/java/azkaban/utils/RestfulApiClient.java b/azkaban-common/src/main/java/azkaban/utils/RestfulApiClient.java
index 251e764..4f139fa 100644
--- a/azkaban-common/src/main/java/azkaban/utils/RestfulApiClient.java
+++ b/azkaban-common/src/main/java/azkaban/utils/RestfulApiClient.java
@@ -21,21 +21,18 @@ import java.io.UnsupportedEncodingException;
import java.net.URI;
import java.net.URISyntaxException;
import java.util.List;
+import java.util.stream.Collectors;
import org.apache.http.HttpEntity;
-import org.apache.http.HttpMessage;
import org.apache.http.HttpResponse;
import org.apache.http.NameValuePair;
-import org.apache.http.client.HttpResponseException;
-import org.apache.http.client.methods.HttpDelete;
+import org.apache.http.client.entity.UrlEncodedFormEntity;
import org.apache.http.client.methods.HttpEntityEnclosingRequestBase;
-import org.apache.http.client.methods.HttpGet;
import org.apache.http.client.methods.HttpPost;
-import org.apache.http.client.methods.HttpPut;
import org.apache.http.client.methods.HttpUriRequest;
import org.apache.http.client.utils.URIBuilder;
-import org.apache.http.entity.ByteArrayEntity;
import org.apache.http.impl.client.CloseableHttpClient;
import org.apache.http.impl.client.HttpClients;
+import org.apache.http.message.BasicNameValuePair;
import org.apache.log4j.Logger;
/**
@@ -76,60 +73,11 @@ public abstract class RestfulApiClient<T> {
}
}
- URI uri = null;
try {
- uri = builder.build();
+ return builder.build();
} catch (final URISyntaxException e) {
throw new IOException(e);
}
-
- return uri;
- }
-
- /**
- * helper function to build a valid URI.
- *
- * @param uri the URI to start with.
- * @param params extra query parameters to append.
- * @return the URI built from the inputs.
- */
- public static URI BuildUri(final URI uri, final Pair<String, String>... params)
- throws IOException {
- final URIBuilder builder = new URIBuilder(uri);
-
- if (params != null) {
- for (final Pair<String, String> pair : params) {
- builder.setParameter(pair.getFirst(), pair.getSecond());
- }
- }
-
- URI returningUri = null;
- try {
- returningUri = builder.build();
- } catch (final URISyntaxException e) {
- throw new IOException(e);
- }
-
- return returningUri;
- }
-
- /**
- * helper function to fill the request with header entries .
- */
- private static HttpMessage completeRequest(final HttpMessage request,
- final List<NameValuePair> headerEntries) {
- if (null == request) {
- logger.error("unable to complete request as the passed request object is null");
- return request;
- }
-
- // dump all the header entries to the request.
- if (null != headerEntries && headerEntries.size() > 0) {
- for (final NameValuePair pair : headerEntries) {
- request.addHeader(pair.getName(), pair.getValue());
- }
- }
- return request;
}
/**
@@ -137,13 +85,13 @@ public abstract class RestfulApiClient<T> {
*/
private static HttpEntityEnclosingRequestBase completeRequest(
final HttpEntityEnclosingRequestBase request,
- final List<NameValuePair> headerEntries,
- final String postingBody) throws UnsupportedEncodingException {
- if (null != completeRequest(request, headerEntries)) {
- // dump the post body UTF-8 will be used as the default encoding type.
- if (null != postingBody && postingBody.length() > 0) {
- final HttpEntity entity = new ByteArrayEntity(postingBody.getBytes("UTF-8"));
- request.setHeader("Content-Length", Long.toString(entity.getContentLength()));
+ final List<Pair<String, String>> params) throws UnsupportedEncodingException {
+ if (request != null) {
+ if (null != params && !params.isEmpty()) {
+ final List<NameValuePair> formParams = params.stream()
+ .map(pair -> new BasicNameValuePair(pair.getFirst(), pair.getSecond()))
+ .collect(Collectors.toList());
+ final HttpEntity entity = new UrlEncodedFormEntity(formParams, "UTF-8");
request.setEntity(entity);
}
}
@@ -156,39 +104,17 @@ public abstract class RestfulApiClient<T> {
* response object out via the returning value as the response will be closed after the execution
* steps out of the method context.
**/
- protected abstract T parseResponse(HttpResponse response)
- throws HttpResponseException, IOException;
-
- /**
- * function to perform a Get http request.
- *
- * @param uri the URI of the request.
- * @param headerEntries extra entries to be added to request header.
- * @return the response object type of which is specified by user.
- */
- public T httpGet(final URI uri, final List<NameValuePair> headerEntries) throws IOException {
- // shortcut if the passed url is invalid.
- if (null == uri) {
- logger.error(" unable to perform httpGet as the passed uri is null");
- return null;
- }
-
- final HttpGet get = new HttpGet(uri);
- return this.sendAndReturn((HttpGet) completeRequest(get, headerEntries));
- }
+ protected abstract T parseResponse(HttpResponse response) throws IOException;
/**
* function to perform a Post http request.
*
* @param uri the URI of the request.
- * @param headerEntries extra entries to be added to request header.
- * @param postingBody the content to be posted , optional.
+ * @param params the form params to be posted, optional.
* @return the response object type of which is specified by user.
* @throws UnsupportedEncodingException, IOException
*/
- public T httpPost(final URI uri,
- final List<NameValuePair> headerEntries,
- final String postingBody) throws UnsupportedEncodingException, IOException {
+ public T httpPost(final URI uri, final List<Pair<String, String>> params) throws IOException {
// shortcut if the passed url is invalid.
if (null == uri) {
logger.error(" unable to perform httpPost as the passed uri is null.");
@@ -196,57 +122,15 @@ public abstract class RestfulApiClient<T> {
}
final HttpPost post = new HttpPost(uri);
- return this.sendAndReturn(completeRequest(post, headerEntries, postingBody));
- }
-
- /**
- * function to perform a Delete http request.
- *
- * @param uri the URI of the request.
- * @param headerEntries extra entries to be added to request header.
- * @return the response object type of which is specified by user.
- */
- public T httpDelete(final URI uri, final List<NameValuePair> headerEntries) throws IOException {
- // shortcut if the passed url is invalid.
- if (null == uri) {
- logger.error(" unable to perform httpDelete as the passed uri is null.");
- return null;
- }
-
- final HttpDelete delete = new HttpDelete(uri);
- return this.sendAndReturn((HttpDelete) completeRequest(delete, headerEntries));
- }
-
- /**
- * function to perform a Put http request.
- *
- * @param uri the URI of the request.
- * @param headerEntries extra entries to be added to request header.
- * @param postingBody the content to be posted , optional.
- * @return the response object type of which is specified by user.
- * @throws UnsupportedEncodingException, IOException
- */
- public T httpPut(final URI uri, final List<NameValuePair> headerEntries,
- final String postingBody) throws UnsupportedEncodingException, IOException {
- // shortcut if the passed url is invalid.
- if (null == uri) {
- logger.error(" unable to perform httpPut as the passed url is null or empty.");
- return null;
- }
-
- final HttpPut put = new HttpPut(uri);
- return this.sendAndReturn(completeRequest(put, headerEntries, postingBody));
+ return this.sendAndReturn(completeRequest(post, params));
}
/**
* function to dispatch the request and pass back the response.
*/
protected T sendAndReturn(final HttpUriRequest request) throws IOException {
- final CloseableHttpClient client = HttpClients.createDefault();
- try {
+ try (CloseableHttpClient client = HttpClients.createDefault()) {
return this.parseResponse(client.execute(request));
- } finally {
- client.close();
}
}
}
diff --git a/azkaban-common/src/test/java/azkaban/executor/ExecutorApiGatewaySystemTest.java b/azkaban-common/src/test/java/azkaban/executor/ExecutorApiGatewaySystemTest.java
new file mode 100644
index 0000000..a6c50b7
--- /dev/null
+++ b/azkaban-common/src/test/java/azkaban/executor/ExecutorApiGatewaySystemTest.java
@@ -0,0 +1,73 @@
+package azkaban.executor;
+
+import azkaban.utils.JSONUtils;
+import azkaban.utils.Pair;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Ignore;
+import org.junit.Test;
+
+@Ignore("Requires azkaban server running -> run AzkabanSingleServer first")
+public class ExecutorApiGatewaySystemTest {
+
+ private ExecutorApiGateway apiGateway;
+
+ @Before
+ public void setUp() throws Exception {
+ ExecutorApiClient client = new ExecutorApiClient();
+ apiGateway = new ExecutorApiGateway(client);
+ }
+
+ @Test
+ public void update100Executions() throws Exception {
+ updateExecutions(100);
+ }
+
+ @Test
+ public void update300Executions() throws Exception {
+ // used to fail because the URL is too long
+ // works after switching to HTTP POST
+ updateExecutions(300);
+ }
+
+ @Test
+ public void update100kExecutions() throws Exception {
+ // used to fail because the URL is too long
+ // works after switching to HTTP POST
+ updateExecutions(100_000);
+ }
+
+ private void updateExecutions(int count) throws ExecutorManagerException {
+ final List<Integer> executionIdsList = new ArrayList<>();
+ final List<Long> updateTimesList = new ArrayList<>();
+
+ for (int i = 100000; i < 100000 + count; i++) {
+ executionIdsList.add(i);
+ updateTimesList.add(0L);
+ }
+
+ final Pair<String, String> executionIds =
+ new Pair<>(ConnectorParams.EXEC_ID_LIST_PARAM,
+ JSONUtils.toJSON(executionIdsList));
+
+ final Pair<String, String> updateTimes =
+ new Pair<>(
+ ConnectorParams.UPDATE_TIME_LIST_PARAM,
+ JSONUtils.toJSON(updateTimesList));
+
+ Map<String, Object> results = apiGateway.callWithExecutionId("localhost", 12321,
+ ConnectorParams.UPDATE_ACTION, null, null, executionIds, updateTimes);
+
+ Assert.assertTrue(results != null);
+ final List<Map<String, Object>> executionUpdates =
+ (List<Map<String, Object>>) results
+ .get(ConnectorParams.RESPONSE_UPDATED_FLOWS);
+ Assert.assertEquals(count, executionUpdates.size());
+ System.out.println("executionUpdates.get(count - 1): " + executionUpdates.get(count - 1));
+ }
+
+}
diff --git a/azkaban-common/src/test/java/azkaban/executor/ExecutorApiGatewayTest.java b/azkaban-common/src/test/java/azkaban/executor/ExecutorApiGatewayTest.java
index 7732067..aad96b2 100644
--- a/azkaban-common/src/test/java/azkaban/executor/ExecutorApiGatewayTest.java
+++ b/azkaban-common/src/test/java/azkaban/executor/ExecutorApiGatewayTest.java
@@ -40,7 +40,7 @@ public class ExecutorApiGatewayTest {
final ExecutorInfo exeInfo = new ExecutorInfo(99.9, 14095, 50, System.currentTimeMillis(), 89,
10);
final String json = JSONUtils.toJSON(exeInfo);
- when(this.client.httpGet(Mockito.any(), Mockito.any())).thenReturn(json);
+ when(this.client.httpPost(Mockito.any(), Mockito.any())).thenReturn(json);
final ExecutorInfo exeInfo2 = this.gateway
.callForJsonType("localhost", 1234, "executor", null, ExecutorInfo.class);
Assert.assertTrue(exeInfo.equals(exeInfo2));
diff --git a/azkaban-common/src/test/java/azkaban/utils/RestfulApiClientTest.java b/azkaban-common/src/test/java/azkaban/utils/RestfulApiClientTest.java
index 3c3995f..a5edf9f 100644
--- a/azkaban-common/src/test/java/azkaban/utils/RestfulApiClientTest.java
+++ b/azkaban-common/src/test/java/azkaban/utils/RestfulApiClientTest.java
@@ -18,162 +18,47 @@ package azkaban.utils;
import java.io.IOException;
import java.net.URI;
-import java.util.ArrayList;
-import org.apache.http.Header;
+import java.util.Collections;
+import java.util.List;
import org.apache.http.HttpEntity;
import org.apache.http.HttpResponse;
import org.apache.http.HttpResponseFactory;
import org.apache.http.HttpStatus;
import org.apache.http.HttpVersion;
-import org.apache.http.NameValuePair;
import org.apache.http.StatusLine;
import org.apache.http.client.HttpResponseException;
import org.apache.http.client.methods.HttpEntityEnclosingRequestBase;
import org.apache.http.client.methods.HttpUriRequest;
import org.apache.http.entity.StringEntity;
import org.apache.http.impl.DefaultHttpResponseFactory;
-import org.apache.http.message.BasicNameValuePair;
import org.apache.http.message.BasicStatusLine;
import org.apache.http.util.EntityUtils;
import org.junit.Assert;
import org.junit.Test;
-/**
- *
- */
public class RestfulApiClientTest {
@Test
- public void testHttpGet() throws Exception {
- final MockRestfulApiClient mockClient = new MockRestfulApiClient();
- final URI uri = MockRestfulApiClient.buildUri("test.com", 80, "test", true,
- new Pair<>("Entry1", "Value1"));
-
- final String result = mockClient.httpGet(uri, null);
- Assert.assertTrue(result != null && result.contains(uri.toString()));
- Assert.assertTrue(result.contains("METHOD = GET"));
- }
-
- @Test
- public void testHttpGetWithHeaderItems() throws Exception {
- final MockRestfulApiClient mockClient = new MockRestfulApiClient();
- final URI uri = MockRestfulApiClient.buildUri("test.com", 80, "test", true,
- new Pair<>("Entry1", "Value1"));
-
- final ArrayList<NameValuePair> headerItems = new ArrayList<>();
- headerItems.add(new BasicNameValuePair("h1", "v1"));
- headerItems.add(new BasicNameValuePair("h2", "v2"));
-
- final String result = mockClient.httpGet(uri, headerItems);
- Assert.assertTrue(result != null && result.contains(uri.toString()));
- Assert.assertTrue(result.contains("METHOD = GET"));
- Assert.assertTrue(result.contains("h1 = v1"));
- Assert.assertTrue(result.contains("h2 = v2"));
- }
-
- @Test
public void testHttpPost() throws Exception {
final MockRestfulApiClient mockClient = new MockRestfulApiClient();
final URI uri = MockRestfulApiClient.buildUri("test.com", 80, "test", true,
new Pair<>("Entry1", "Value1"));
- final ArrayList<NameValuePair> headerItems = new ArrayList<>();
- headerItems.add(new BasicNameValuePair("h1", "v1"));
- headerItems.add(new BasicNameValuePair("h2", "v2"));
-
final String content = "123456789";
- final String result = mockClient.httpPost(uri, headerItems, content);
+ final String result = mockClient.httpPost(uri, toPairList(content));
Assert.assertTrue(result != null && result.contains(uri.toString()));
Assert.assertTrue(result.contains("METHOD = POST"));
- Assert.assertTrue(result.contains("h1 = v1"));
- Assert.assertTrue(result.contains("h2 = v2"));
- Assert.assertTrue(result.contains(String.format("%s = %s;", "BODY", content)));
+ Assert.assertTrue(result.contains(String.format("%s = value=%s;", "BODY", content)));
}
- @Test
- public void testHttpPostWOBody() throws Exception {
- final MockRestfulApiClient mockClient = new MockRestfulApiClient();
- final URI uri = MockRestfulApiClient.buildUri("test.com", 80, "test", true,
- new Pair<>("Entry1", "Value1"));
-
- final String result = mockClient.httpPost(uri, null, null);
- Assert.assertTrue(result != null && result.contains(uri.toString()));
- Assert.assertTrue(result.contains("METHOD = POST"));
- Assert.assertFalse(result.contains("BODY_EXISTS"));
- Assert.assertFalse(result.contains("HEADER_EXISTS"));
- }
-
- @Test
- public void testHttpPut() throws Exception {
- final MockRestfulApiClient mockClient = new MockRestfulApiClient();
- final URI uri = MockRestfulApiClient.buildUri("test.com", 80, "test", true,
- new Pair<>("Entry1", "Value1"));
-
- final ArrayList<NameValuePair> headerItems = new ArrayList<>();
- headerItems.add(new BasicNameValuePair("h1", "v1"));
- headerItems.add(new BasicNameValuePair("h2", "v2"));
-
- final String content = "123456789";
-
- final String result = mockClient.httpPut(uri, headerItems, content);
- Assert.assertTrue(result != null && result.contains(uri.toString()));
- Assert.assertTrue(result.contains("METHOD = PUT"));
- Assert.assertTrue(result.contains("h1 = v1"));
- Assert.assertTrue(result.contains("h2 = v2"));
- Assert.assertTrue(result.contains(String.format("%s = %s;", "BODY", content)));
- }
-
- @Test
- public void testContentLength() throws Exception {
- final MockRestfulApiClient mockClient = new MockRestfulApiClient();
- final URI uri = MockRestfulApiClient.buildUri("test.com", 80, "test", true,
- new Pair<>("Entry1", "Value1"));
-
- final String content = "123456789";
-
- final String result = mockClient.httpPut(uri, null, content);
- Assert.assertTrue(result != null && result.contains(uri.toString()));
- Assert.assertTrue(result.contains("Content-Length = " + Integer.toString(content.length())));
- }
-
- @Test
- public void testContentLengthOverride() throws Exception {
- final MockRestfulApiClient mockClient = new MockRestfulApiClient();
- final URI uri = MockRestfulApiClient.buildUri("test.com", 80, "test", true,
- new Pair<>("Entry1", "Value1"));
-
- final ArrayList<NameValuePair> headerItems = new ArrayList<>();
- headerItems.add(new BasicNameValuePair("Content-Length", "0"));
-
- final String content = "123456789";
-
- final String result = mockClient.httpPut(uri, headerItems, content);
- Assert.assertTrue(result != null && result.contains(uri.toString()));
- Assert.assertEquals(result.lastIndexOf("Content-Length"), result.indexOf("Content-Length"));
- Assert.assertTrue(result.contains("Content-Length = " + Integer.toString(content.length())));
- }
-
- @Test
- public void testHttpDelete() throws Exception {
- final MockRestfulApiClient mockClient = new MockRestfulApiClient();
- final URI uri = MockRestfulApiClient.buildUri("test.com", 80, "test", true,
- new Pair<>("Entry1", "Value1"));
-
- final ArrayList<NameValuePair> headerItems = new ArrayList<>();
- headerItems.add(new BasicNameValuePair("h1", "v1"));
- headerItems.add(new BasicNameValuePair("h2", "v2"));
-
- final String result = mockClient.httpDelete(uri, headerItems);
- Assert.assertTrue(result != null && result.contains(uri.toString()));
- Assert.assertTrue(result.contains("METHOD = DELETE"));
- Assert.assertTrue(result.contains("h1 = v1"));
- Assert.assertTrue(result.contains("h2 = v2"));
+ private List<Pair<String, String>> toPairList(final String content) {
+ return Collections.singletonList(new Pair<>("value", content));
}
static class MockRestfulApiClient extends RestfulApiClient<String> {
- private int status = HttpStatus.SC_OK;
+ private final int status = HttpStatus.SC_OK;
@Override
protected String parseResponse(final HttpResponse response) throws IOException {
@@ -182,16 +67,7 @@ public class RestfulApiClientTest {
throw new HttpResponseException(statusLine.getStatusCode(),
statusLine.getReasonPhrase());
}
- final HttpEntity entity = response.getEntity();
- return entity == null ? null : EntityUtils.toString(entity);
- }
-
- public void setReturnStatus(final int newStatus) {
- this.status = newStatus;
- }
-
- public void resetReturnStatus() {
- this.status = HttpStatus.SC_OK;
+ return EntityUtils.toString(response.getEntity());
}
@Override
@@ -205,21 +81,8 @@ public class RestfulApiClientTest {
sb.append(String.format("%s = %s;", "METHOD", request.getMethod()));
sb.append(String.format("%s = %s;", "URI", request.getURI()));
- if (request.getAllHeaders().length > 0) {
- sb.append("HEADER_EXISTS");
- }
-
- for (final Header h : request.getAllHeaders()) {
- sb.append(String.format("%s = %s;", h.getName(), h.getValue()));
- }
-
- if (request instanceof HttpEntityEnclosingRequestBase) {
- final HttpEntity entity = ((HttpEntityEnclosingRequestBase) request).getEntity();
- if (entity != null) {
- sb.append("BODY_EXISTS");
- sb.append(String.format("%s = %s;", "BODY", EntityUtils.toString(entity)));
- }
- }
+ final HttpEntity entity = ((HttpEntityEnclosingRequestBase) request).getEntity();
+ sb.append(String.format("%s = %s;", "BODY", EntityUtils.toString(entity)));
response.setEntity(new StringEntity(sb.toString()));
return parseResponse(response);
diff --git a/azkaban-exec-server/src/main/java/azkaban/execapp/ExecutorServlet.java b/azkaban-exec-server/src/main/java/azkaban/execapp/ExecutorServlet.java
index 58297e0..9dc20a1 100644
--- a/azkaban-exec-server/src/main/java/azkaban/execapp/ExecutorServlet.java
+++ b/azkaban-exec-server/src/main/java/azkaban/execapp/ExecutorServlet.java
@@ -78,11 +78,25 @@ public class ExecutorServlet extends HttpServlet implements ConnectorParams {
mapper.writeValue(stream, obj);
}
+ /**
+ * @deprecated GET available for seamless upgrade. azkaban-web now uses POST.
+ */
+ @Deprecated
@Override
public void doGet(final HttpServletRequest req, final HttpServletResponse resp)
throws ServletException, IOException {
+ handleRequest(req, resp);
+ }
+
+ @Override
+ public void doPost(final HttpServletRequest req, final HttpServletResponse resp)
+ throws ServletException, IOException {
+ handleRequest(req, resp);
+ }
+
+ public void handleRequest(final HttpServletRequest req, final HttpServletResponse resp)
+ throws IOException {
final HashMap<String, Object> respMap = new HashMap<>();
- // logger.info("ExecutorServer called by " + req.getRemoteAddr());
try {
if (!hasParam(req, ACTION_PARAM)) {
logger.error("Parameter action not set");
@@ -90,7 +104,6 @@ public class ExecutorServlet extends HttpServlet implements ConnectorParams {
} else {
final String action = getParam(req, ACTION_PARAM);
if (action.equals(UPDATE_ACTION)) {
- // logger.info("Updated called");
handleAjaxUpdateRequest(req, respMap);
} else if (action.equals(PING_ACTION)) {
respMap.put("status", "alive");
@@ -414,12 +427,6 @@ public class ExecutorServlet extends HttpServlet implements ConnectorParams {
}
}
- @Override
- public void doPost(final HttpServletRequest req, final HttpServletResponse resp)
- throws ServletException, IOException {
-
- }
-
/**
* Duplicated code with AbstractAzkabanServlet, but ne
*/