diff --git a/azkaban-common/src/main/java/azkaban/executor/ExecutorApiClient.java b/azkaban-common/src/main/java/azkaban/executor/ExecutorApiClient.java
new file mode 100644
index 0000000..bf74b95
--- /dev/null
+++ b/azkaban-common/src/main/java/azkaban/executor/ExecutorApiClient.java
@@ -0,0 +1,101 @@
+/*
+ * Copyright 2015 LinkedIn Corp.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package azkaban.executor;
+
+import java.io.IOException;
+import java.net.URI;
+import java.util.Map;
+
+import org.apache.http.HttpEntity;
+import org.apache.http.HttpResponse;
+import org.apache.http.StatusLine;
+import org.apache.http.client.HttpResponseException;
+import org.apache.http.util.EntityUtils;
+
+import azkaban.utils.JSONUtils;
+import azkaban.utils.Pair;
+import azkaban.utils.RestfulApiClient;
+
+/** Client class that will be used to handle all Restful API calls between Executor and the host application.
+ * */
+public class ExecutorApiClient extends RestfulApiClient<Map<String, Object>> {
+
+ // hide the constructor. we what this class to be in singleton.
+ private ExecutorApiClient(){}
+
+ // international cache for the object instance.
+ private static ExecutorApiClient instance = null;
+
+ /**Singleton creator of the class.
+ * */
+ public static ExecutorApiClient getInstance() {
+ if (null == instance) {
+ synchronized (ExecutorApiClient.class) {
+ if (null == instance) {
+ instance = new ExecutorApiClient();
+ }
+ }
+ }
+ return instance;
+ }
+
+ /**Implementing the parseResponse function to return de-serialized Json object.
+ * @param response the returned response from the HttpClient.
+ * */
+ @SuppressWarnings("unchecked")
+ @Override
+ protected Map<String, Object> parseResponse(HttpResponse response)
+ throws HttpResponseException, IOException {
+ final StatusLine statusLine = response.getStatusLine();
+ if (statusLine.getStatusCode() >= 300) {
+
+ logger.error(String.format("unable to parse response as the response status is %s",
+ statusLine.getStatusCode()));
+
+ throw new HttpResponseException(statusLine.getStatusCode(),
+ statusLine.getReasonPhrase());
+ }
+
+ final HttpEntity entity = response.getEntity();
+ if (null != entity){
+ Object returnVal = JSONUtils.parseJSONFromString(EntityUtils.toString(entity));
+ if (null!= returnVal){
+ return (Map<String, Object>) returnVal;
+ }
+ }
+ return null;
+ }
+
+ /**function to get executor status .
+ * @param executorHost Host name of the executor.
+ * @param executorPort Host port.
+ * @param action query action.
+ * @param param extra query parameters
+ * @return the de-serialized JSON object in Map<String, Object> format.
+ * */
+ @SuppressWarnings("unchecked")
+ public Map<String, Object> callExecutorStats(String executorHost, int executorPort,
+ String action, Pair<String, String>... params) throws IOException {
+
+ // form up the URI.
+ URI uri = ExecutorApiClient.BuildUri(executorHost, executorPort, "/stats", true,params);
+ uri = ExecutorApiClient.BuildUri(uri, new Pair<String, String>(ConnectorParams.ACTION_PARAM, action));
+ return this.httpGet(uri, null);
+ }
+
+ // TO-DO reflector other API call functions out from the ExecutorManager.
+}
diff --git a/azkaban-common/src/main/java/azkaban/utils/RestfulApiClient.java b/azkaban-common/src/main/java/azkaban/utils/RestfulApiClient.java
new file mode 100644
index 0000000..0b39a7d
--- /dev/null
+++ b/azkaban-common/src/main/java/azkaban/utils/RestfulApiClient.java
@@ -0,0 +1,239 @@
+/*
+ * Copyright 2015 LinkedIn Corp.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package azkaban.utils;
+
+import java.io.IOException;
+import java.io.UnsupportedEncodingException;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.util.List;
+
+import org.apache.http.HttpEntity;
+import org.apache.http.HttpMessage;
+import org.apache.http.HttpResponse;
+import org.apache.http.NameValuePair;
+import org.apache.http.ParseException;
+import org.apache.http.client.HttpResponseException;
+import org.apache.http.client.methods.HttpDelete;
+import org.apache.http.client.methods.HttpEntityEnclosingRequestBase;
+import org.apache.http.client.methods.HttpGet;
+import org.apache.http.client.methods.HttpPost;
+import org.apache.http.client.methods.HttpPut;
+import org.apache.http.client.methods.HttpUriRequest;
+import org.apache.http.client.utils.URIBuilder;
+import org.apache.http.entity.ByteArrayEntity;
+import org.apache.http.impl.client.CloseableHttpClient;
+import org.apache.http.impl.client.HttpClients;
+import org.apache.log4j.Logger;
+
+/** class handles the communication between the application and
+ * a Restful API based web server.
+ * @param T : type of the returning response object.
+ * Note: the idea of this abstract class is to provide a wrapper for the logic around HTTP layer communication so
+ * development work can take this as a black box and focus on processing the result.
+ * With that said the abstract class will be provided as a template, which ideally can support different types
+ * of returning object (Dictionary, xmlDoc , text etc.)
+ * */
+public abstract class RestfulApiClient<T> {
+ protected static Logger logger = Logger.getLogger(RestfulApiClient.class);
+
+ /** Method to transform the response returned by the httpClient into the
+ * type specified.
+ * Note: Method need to handle case such as failed request.
+ * Also method is not supposed to pass the response object out
+ * via the returning value as the response will be closed after the
+ * execution steps out of the method context.
+ * @throws HttpResponseException
+ * @throws IOException
+ * @throws ParseException
+ * **/
+ protected abstract T parseResponse(HttpResponse response)
+ throws HttpResponseException, IOException;
+
+ /** function to perform a Get http request.
+ * @param uri the URI of the request.
+ * @param headerEntries extra entries to be added to request header.
+ * @return the response object type of which is specified by user.
+ * @throws IOException */
+ public T httpGet(URI uri, List<NameValuePair> headerEntries) throws IOException{
+ // shortcut if the passed url is invalid.
+ if (null == uri){
+ logger.error(" unable to perform httpGet as the passed uri is null");
+ return null;
+ }
+
+ HttpGet get = new HttpGet(uri);
+ return this.sendAndReturn((HttpGet)completeRequest(get, headerEntries));
+ }
+
+ /** function to perform a Post http request.
+ * @param uri the URI of the request.
+ * @param headerEntries extra entries to be added to request header.
+ * @param postingBody the content to be posted , optional.
+ * @return the response object type of which is specified by user.
+ * @throws UnsupportedEncodingException, IOException */
+ public T httpPost(URI uri,
+ List<NameValuePair> headerEntries,
+ String postingBody) throws UnsupportedEncodingException, IOException{
+ // shortcut if the passed url is invalid.
+ if (null == uri){
+ logger.error(" unable to perform httpPost as the passed uri is null.");
+ return null;
+ }
+
+ HttpPost post = new HttpPost(uri);
+ return this.sendAndReturn(completeRequest(post,headerEntries,postingBody));
+ }
+
+ /** function to perform a Delete http request.
+ * @param uri the URI of the request.
+ * @param headerEntries extra entries to be added to request header.
+ * @return the response object type of which is specified by user.
+ * @throws IOException */
+ public T httpDelete(URI uri, List<NameValuePair> headerEntries) throws IOException{
+ // shortcut if the passed url is invalid.
+ if (null == uri){
+ logger.error(" unable to perform httpDelete as the passed uri is null.");
+ return null;
+ }
+
+ HttpDelete delete = new HttpDelete(uri);
+ return this.sendAndReturn((HttpDelete)completeRequest(delete, headerEntries));
+ }
+
+ /** function to perform a Put http request.
+ * @param uri the URI of the request.
+ * @param headerEntries extra entries to be added to request header.
+ * @param postingBody the content to be posted , optional.
+ * @return the response object type of which is specified by user.
+ * @throws UnsupportedEncodingException, IOException */
+ public T httpPut(URI uri, List<NameValuePair> headerEntries,
+ String postingBody) throws UnsupportedEncodingException, IOException{
+ // shortcut if the passed url is invalid.
+ if (null == uri){
+ logger.error(" unable to perform httpPut as the passed url is null or empty.");
+ return null;
+ }
+
+ HttpPut put = new HttpPut(uri);
+ return this.sendAndReturn(completeRequest(put, headerEntries, postingBody));
+ }
+
+ /** function to dispatch the request and pass back the response.
+ * */
+ protected T sendAndReturn(HttpUriRequest request) throws IOException{
+ CloseableHttpClient client = HttpClients.createDefault();
+ try {
+ return this.parseResponse(client.execute(request));
+ }finally{
+ client.close();
+ }
+ }
+
+ /** helper function to build a valid URI.
+ * @param host host name.
+ * @param port host port.
+ * @param path extra path after host.
+ * @param isHttp indicates if whether Http or HTTPS should be used.
+ * @param params extra query parameters.
+ * @return the URI built from the inputs.
+ * @throws IOException
+ * */
+ public static URI BuildUri(String host, int port, String path,
+ boolean isHttp, Pair<String, String>... params) throws IOException{
+ URIBuilder builder = new URIBuilder();
+ builder.setScheme(isHttp? "http" : "https").setHost(host).setPort(port);
+
+ if (null != path && path.length() > 0){
+ builder.setPath(path);
+ }
+
+ if (params != null) {
+ for (Pair<String, String> pair : params) {
+ builder.setParameter(pair.getFirst(), pair.getSecond());
+ }
+ }
+
+ URI uri = null;
+ try {
+ uri = builder.build();
+ } catch (URISyntaxException e) {
+ throw new IOException(e);
+ }
+
+ return uri;
+ }
+
+ /** helper function to build a valid URI.
+ * @param uri the URI to start with.
+ * @param params extra query parameters to append.
+ * @return the URI built from the inputs.
+ * @throws IOException
+ * */
+ public static URI BuildUri(URI uri, Pair<String, String>... params) throws IOException{
+ URIBuilder builder = new URIBuilder(uri);
+
+ if (params != null) {
+ for (Pair<String, String> pair : params) {
+ builder.setParameter(pair.getFirst(), pair.getSecond());
+ }
+ }
+
+ URI returningUri = null;
+ try {
+ returningUri = builder.build();
+ } catch (URISyntaxException e) {
+ throw new IOException(e);
+ }
+
+ return returningUri;
+ }
+
+ /** helper function to fill the request with header entries .
+ * */
+ private static HttpMessage completeRequest(HttpMessage request,
+ List<NameValuePair> headerEntries){
+ if (null == request){
+ logger.error("unable to complete request as the passed request object is null");
+ return request;
+ }
+
+ // dump all the header entries to the request.
+ if (null != headerEntries && headerEntries.size() > 0){
+ for (NameValuePair pair : headerEntries){
+ request.addHeader(pair.getName(), pair.getValue());
+ }
+ }
+ return request;
+ }
+
+ /** helper function to fill the request with header entries and posting body .
+ * */
+ private static HttpEntityEnclosingRequestBase completeRequest(HttpEntityEnclosingRequestBase request,
+ List<NameValuePair> headerEntries,
+ String postingBody) throws UnsupportedEncodingException{
+ if (null != completeRequest(request, headerEntries)){
+ // dump the post body UTF-8 will be used as the default encoding type.
+ if (null != postingBody && postingBody.length() > 0){
+ HttpEntity entity = new ByteArrayEntity(postingBody.getBytes("UTF-8"));
+ request.setHeader("Content-Length", Long.toString(entity.getContentLength()));
+ request.setEntity(entity);
+ }
+ }
+ return request;
+ }
+}
diff --git a/azkaban-common/src/test/java/azkaban/utils/RestfulApiClientTest.java b/azkaban-common/src/test/java/azkaban/utils/RestfulApiClientTest.java
new file mode 100644
index 0000000..4a36132
--- /dev/null
+++ b/azkaban-common/src/test/java/azkaban/utils/RestfulApiClientTest.java
@@ -0,0 +1,237 @@
+/*
+ * Copyright 2015 LinkedIn Corp.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package azkaban.utils;
+
+import java.io.IOException;
+import java.net.URI;
+import java.util.ArrayList;
+
+import org.apache.http.Header;
+import org.apache.http.HttpEntity;
+import org.apache.http.HttpResponse;
+import org.apache.http.HttpResponseFactory;
+import org.apache.http.HttpStatus;
+import org.apache.http.HttpVersion;
+import org.apache.http.NameValuePair;
+import org.apache.http.StatusLine;
+import org.apache.http.client.HttpResponseException;
+import org.apache.http.client.methods.HttpEntityEnclosingRequestBase;
+import org.apache.http.client.methods.HttpUriRequest;
+import org.apache.http.entity.StringEntity;
+import org.apache.http.impl.DefaultHttpResponseFactory;
+import org.apache.http.message.BasicNameValuePair;
+import org.apache.http.message.BasicStatusLine;
+import org.apache.http.util.EntityUtils;
+import org.junit.Assert;
+import org.junit.Test;
+
+/**
+ *
+ */
+public class RestfulApiClientTest {
+
+ protected class MockRestfulApiClient extends RestfulApiClient<String> {
+ private int status = HttpStatus.SC_OK;
+
+ @Override
+ protected String parseResponse(HttpResponse response) throws IOException {
+ final StatusLine statusLine = response.getStatusLine();
+ if (statusLine.getStatusCode() >= 300) {
+ throw new HttpResponseException(statusLine.getStatusCode(),
+ statusLine.getReasonPhrase());
+ }
+ final HttpEntity entity = response.getEntity();
+ return entity == null ? null : EntityUtils.toString(entity);
+ }
+
+ public void setReturnStatus(int newStatus){
+ this.status = newStatus;
+ }
+
+ public void resetReturnStatus(){
+ this.status = HttpStatus.SC_OK;
+ }
+
+ @Override
+ protected String sendAndReturn(HttpUriRequest request) throws IOException{
+ HttpResponseFactory factory = new DefaultHttpResponseFactory();
+
+ HttpResponse response = factory.newHttpResponse(
+ new BasicStatusLine(HttpVersion.HTTP_1_1, this.status, null),null);
+
+ StringBuilder sb = new StringBuilder();
+ sb.append(String.format("%s = %s;", "METHOD", request.getMethod()));
+ sb.append(String.format("%s = %s;", "URI", request.getURI()));
+
+ if (request.getAllHeaders().length > 0){
+ sb.append("HEADER_EXISTS");
+ }
+
+ for (Header h : request.getAllHeaders()){
+ sb.append(String.format("%s = %s;", h.getName(), h.getValue()));
+ }
+
+ if (request instanceof HttpEntityEnclosingRequestBase){
+ HttpEntity entity = ((HttpEntityEnclosingRequestBase)request).getEntity();
+ if (entity != null){
+ sb.append("BODY_EXISTS");
+ sb.append(String.format("%s = %s;", "BODY", EntityUtils.toString(entity)));
+ }
+ }
+
+ response.setEntity(new StringEntity(sb.toString()));
+ return parseResponse(response);
+ }
+
+ }
+
+ @Test
+ public void testHttpGet() throws Exception {
+ MockRestfulApiClient mockClient = new MockRestfulApiClient();
+ @SuppressWarnings("unchecked")
+ URI uri = MockRestfulApiClient.BuildUri("test.com", 80, "test", true,
+ new Pair <String,String>("Entry1","Value1"));
+
+ String result = mockClient.httpGet(uri, null);
+ Assert.assertTrue(result!= null && result.contains(uri.toString()));
+ Assert.assertTrue(result.contains("METHOD = GET"));
+ }
+
+ @Test
+ public void testHttpGetWithHeaderItems() throws Exception {
+ MockRestfulApiClient mockClient = new MockRestfulApiClient();
+ @SuppressWarnings("unchecked")
+ URI uri = MockRestfulApiClient.BuildUri("test.com", 80, "test", true,
+ new Pair <String,String>("Entry1","Value1"));
+
+ ArrayList<NameValuePair> headerItems = new ArrayList<NameValuePair>();
+ headerItems.add(new BasicNameValuePair("h1","v1"));
+ headerItems.add(new BasicNameValuePair("h2","v2"));
+
+ String result = mockClient.httpGet(uri, headerItems);
+ Assert.assertTrue(result!= null && result.contains(uri.toString()));
+ Assert.assertTrue(result.contains("METHOD = GET"));
+ Assert.assertTrue(result.contains("h1 = v1"));
+ Assert.assertTrue(result.contains("h2 = v2"));
+ }
+
+ @Test
+ public void testHttpPost() throws Exception {
+ MockRestfulApiClient mockClient = new MockRestfulApiClient();
+ @SuppressWarnings("unchecked")
+ URI uri = MockRestfulApiClient.BuildUri("test.com", 80, "test", true,
+ new Pair <String,String>("Entry1","Value1"));
+
+ ArrayList<NameValuePair> headerItems = new ArrayList<NameValuePair>();
+ headerItems.add(new BasicNameValuePair("h1","v1"));
+ headerItems.add(new BasicNameValuePair("h2","v2"));
+
+ String content = "123456789";
+
+ String result = mockClient.httpPost(uri, headerItems,content);
+ Assert.assertTrue(result!= null && result.contains(uri.toString()));
+ Assert.assertTrue(result.contains("METHOD = POST"));
+ Assert.assertTrue(result.contains("h1 = v1"));
+ Assert.assertTrue(result.contains("h2 = v2"));
+ Assert.assertTrue(result.contains(String.format("%s = %s;", "BODY", content)));
+ }
+
+ @Test
+ public void testHttpPostWOBody() throws Exception {
+ MockRestfulApiClient mockClient = new MockRestfulApiClient();
+ @SuppressWarnings("unchecked")
+ URI uri = MockRestfulApiClient.BuildUri("test.com", 80, "test", true,
+ new Pair <String,String>("Entry1","Value1"));
+
+ String result = mockClient.httpPost(uri, null,null);
+ Assert.assertTrue(result!= null && result.contains(uri.toString()));
+ Assert.assertTrue(result.contains("METHOD = POST"));
+ Assert.assertFalse(result.contains("BODY_EXISTS"));
+ Assert.assertFalse(result.contains("HEADER_EXISTS"));
+ }
+
+ @Test
+ public void testHttpPut() throws Exception {
+ MockRestfulApiClient mockClient = new MockRestfulApiClient();
+ @SuppressWarnings("unchecked")
+ URI uri = MockRestfulApiClient.BuildUri("test.com", 80, "test", true,
+ new Pair <String,String>("Entry1","Value1"));
+
+ ArrayList<NameValuePair> headerItems = new ArrayList<NameValuePair>();
+ headerItems.add(new BasicNameValuePair("h1","v1"));
+ headerItems.add(new BasicNameValuePair("h2","v2"));
+
+ String content = "123456789";
+
+ String result = mockClient.httpPut(uri, headerItems,content);
+ Assert.assertTrue(result!= null && result.contains(uri.toString()));
+ Assert.assertTrue(result.contains("METHOD = PUT"));
+ Assert.assertTrue(result.contains("h1 = v1"));
+ Assert.assertTrue(result.contains("h2 = v2"));
+ Assert.assertTrue(result.contains(String.format("%s = %s;", "BODY", content)));
+ }
+
+ @Test
+ public void testContentLength() throws Exception {
+ MockRestfulApiClient mockClient = new MockRestfulApiClient();
+ @SuppressWarnings("unchecked")
+ URI uri = MockRestfulApiClient.BuildUri("test.com", 80, "test", true,
+ new Pair <String,String>("Entry1","Value1"));
+
+ String content = "123456789";
+
+ String result = mockClient.httpPut(uri, null,content);
+ Assert.assertTrue(result!= null && result.contains(uri.toString()));
+ Assert.assertTrue(result.contains("Content-Length = " + Integer.toString(content.length())));
+ }
+
+ @Test
+ public void testContentLengthOverride() throws Exception {
+ MockRestfulApiClient mockClient = new MockRestfulApiClient();
+ @SuppressWarnings("unchecked")
+ URI uri = MockRestfulApiClient.BuildUri("test.com", 80, "test", true,
+ new Pair <String,String>("Entry1","Value1"));
+
+ ArrayList<NameValuePair> headerItems = new ArrayList<NameValuePair>();
+ headerItems.add(new BasicNameValuePair("Content-Length","0"));
+
+ String content = "123456789";
+
+ String result = mockClient.httpPut(uri, headerItems,content);
+ Assert.assertTrue(result!= null && result.contains(uri.toString()));
+ Assert.assertEquals(result.lastIndexOf("Content-Length"),result.indexOf("Content-Length"));
+ Assert.assertTrue(result.contains("Content-Length = " + Integer.toString(content.length())));
+ }
+
+ @Test
+ public void testHttpDelete() throws Exception {
+ MockRestfulApiClient mockClient = new MockRestfulApiClient();
+ @SuppressWarnings("unchecked")
+ URI uri = MockRestfulApiClient.BuildUri("test.com", 80, "test", true,
+ new Pair <String,String>("Entry1","Value1"));
+
+ ArrayList<NameValuePair> headerItems = new ArrayList<NameValuePair>();
+ headerItems.add(new BasicNameValuePair("h1","v1"));
+ headerItems.add(new BasicNameValuePair("h2","v2"));
+
+ String result = mockClient.httpDelete(uri, headerItems);
+ Assert.assertTrue(result!= null && result.contains(uri.toString()));
+ Assert.assertTrue(result.contains("METHOD = DELETE"));
+ Assert.assertTrue(result.contains("h1 = v1"));
+ Assert.assertTrue(result.contains("h2 = v2"));
+ }
+}