azkaban-aplcache

Details

diff --git a/azkaban-common/src/main/java/azkaban/executor/ExecutorApiClient.java b/azkaban-common/src/main/java/azkaban/executor/ExecutorApiClient.java
new file mode 100644
index 0000000..bf74b95
--- /dev/null
+++ b/azkaban-common/src/main/java/azkaban/executor/ExecutorApiClient.java
@@ -0,0 +1,101 @@
+/*
+ * Copyright 2015 LinkedIn Corp.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package azkaban.executor;
+
+import java.io.IOException;
+import java.net.URI;
+import java.util.Map;
+
+import org.apache.http.HttpEntity;
+import org.apache.http.HttpResponse;
+import org.apache.http.StatusLine;
+import org.apache.http.client.HttpResponseException;
+import org.apache.http.util.EntityUtils;
+
+import azkaban.utils.JSONUtils;
+import azkaban.utils.Pair;
+import azkaban.utils.RestfulApiClient;
+
+/** Client class that will be used to handle all Restful API calls between Executor and the host application.
+ * */
+public class ExecutorApiClient extends RestfulApiClient<Map<String, Object>> {
+
+  // hide the constructor. we what this class to be in singleton.
+  private ExecutorApiClient(){}
+
+  // international cache for the object instance.
+  private static ExecutorApiClient instance = null;
+
+  /**Singleton creator of the class.
+   * */
+  public static ExecutorApiClient getInstance() {
+    if (null == instance) {
+      synchronized (ExecutorApiClient.class) {
+        if (null == instance) {
+          instance = new ExecutorApiClient();
+        }
+      }
+    }
+    return instance;
+  }
+
+  /**Implementing the parseResponse function to return de-serialized Json object.
+   * @param response  the returned response from the HttpClient.
+   * */
+  @SuppressWarnings("unchecked")
+  @Override
+  protected Map<String, Object> parseResponse(HttpResponse response)
+      throws HttpResponseException, IOException {
+    final StatusLine statusLine = response.getStatusLine();
+    if (statusLine.getStatusCode() >= 300) {
+
+        logger.error(String.format("unable to parse response as the response status is %s",
+            statusLine.getStatusCode()));
+
+        throw new HttpResponseException(statusLine.getStatusCode(),
+                statusLine.getReasonPhrase());
+    }
+
+    final HttpEntity entity = response.getEntity();
+    if (null != entity){
+      Object returnVal = JSONUtils.parseJSONFromString(EntityUtils.toString(entity));
+      if (null!= returnVal){
+        return (Map<String, Object>) returnVal;
+      }
+    }
+    return null;
+  }
+
+  /**function to get executor status .
+   * @param executorHost    Host name of the executor.
+   * @param executorPort    Host port.
+   * @param action          query action.
+   * @param param           extra query parameters
+   * @return  the de-serialized JSON object in Map<String, Object> format.
+   * */
+  @SuppressWarnings("unchecked")
+  public Map<String, Object> callExecutorStats(String executorHost, int executorPort,
+      String action, Pair<String, String>... params) throws IOException {
+
+    // form up the URI.
+    URI uri = ExecutorApiClient.BuildUri(executorHost, executorPort, "/stats", true,params);
+    uri =  ExecutorApiClient.BuildUri(uri, new Pair<String, String>(ConnectorParams.ACTION_PARAM, action));
+    return this.httpGet(uri, null);
+    }
+
+  // TO-DO  reflector other API call functions out from the ExecutorManager.
+}
diff --git a/azkaban-common/src/main/java/azkaban/utils/RestfulApiClient.java b/azkaban-common/src/main/java/azkaban/utils/RestfulApiClient.java
new file mode 100644
index 0000000..0b39a7d
--- /dev/null
+++ b/azkaban-common/src/main/java/azkaban/utils/RestfulApiClient.java
@@ -0,0 +1,239 @@
+/*
+ * Copyright 2015 LinkedIn Corp.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package azkaban.utils;
+
+import java.io.IOException;
+import java.io.UnsupportedEncodingException;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.util.List;
+
+import org.apache.http.HttpEntity;
+import org.apache.http.HttpMessage;
+import org.apache.http.HttpResponse;
+import org.apache.http.NameValuePair;
+import org.apache.http.ParseException;
+import org.apache.http.client.HttpResponseException;
+import org.apache.http.client.methods.HttpDelete;
+import org.apache.http.client.methods.HttpEntityEnclosingRequestBase;
+import org.apache.http.client.methods.HttpGet;
+import org.apache.http.client.methods.HttpPost;
+import org.apache.http.client.methods.HttpPut;
+import org.apache.http.client.methods.HttpUriRequest;
+import org.apache.http.client.utils.URIBuilder;
+import org.apache.http.entity.ByteArrayEntity;
+import org.apache.http.impl.client.CloseableHttpClient;
+import org.apache.http.impl.client.HttpClients;
+import org.apache.log4j.Logger;
+
+/** class handles the communication between the application and
+ *  a Restful API based web server.
+ *  @param T : type of the returning response object.
+ *  Note: the idea of this abstract class is to provide a wrapper for the logic around HTTP layer communication so
+ *        development work can take this as a black box and focus on processing the result.
+ *        With that said the abstract class will be provided as a template, which ideally can support different types
+ *        of returning object (Dictionary, xmlDoc , text etc.)
+ * */
+public abstract class RestfulApiClient<T> {
+  protected static Logger logger = Logger.getLogger(RestfulApiClient.class);
+
+  /** Method to transform the response returned by the httpClient into the
+   *  type specified.
+   *  Note: Method need to handle case such as failed request.
+   *        Also method is not supposed to pass the response object out
+   *        via the returning value as the response will be closed after the
+   *        execution steps out of the method context.
+   * @throws HttpResponseException
+   * @throws IOException
+   * @throws ParseException
+   * **/
+  protected abstract T parseResponse(HttpResponse  response)
+      throws HttpResponseException, IOException;
+
+  /** function to perform a Get http request.
+   * @param uri   the URI of the request.
+   * @param headerEntries   extra entries to be added to request header.
+   * @return the response object type of which is specified by user.
+   * @throws IOException */
+  public T httpGet(URI uri, List<NameValuePair> headerEntries) throws IOException{
+    // shortcut if the passed url is invalid.
+    if (null == uri){
+      logger.error(" unable to perform httpGet as the passed uri is null");
+      return null;
+    }
+
+    HttpGet get = new HttpGet(uri);
+    return this.sendAndReturn((HttpGet)completeRequest(get, headerEntries));
+  }
+
+  /** function to perform a Post http request.
+   * @param uri   the URI of the request.
+   * @param headerEntries   extra entries to be added to request header.
+   * @param postingBody  the content to be posted , optional.
+   * @return the response object type of which is specified by user.
+   * @throws UnsupportedEncodingException, IOException */
+  public T httpPost(URI uri,
+      List<NameValuePair> headerEntries,
+      String postingBody) throws UnsupportedEncodingException, IOException{
+    // shortcut if the passed url is invalid.
+    if (null == uri){
+      logger.error(" unable to perform httpPost as the passed uri is null.");
+      return null;
+    }
+
+    HttpPost post = new HttpPost(uri);
+    return this.sendAndReturn(completeRequest(post,headerEntries,postingBody));
+  }
+
+  /** function to perform a Delete http request.
+   * @param uri   the URI of the request.
+   * @param headerEntries   extra entries to be added to request header.
+   * @return the response object type of which is specified by user.
+   * @throws IOException */
+  public T httpDelete(URI uri, List<NameValuePair> headerEntries) throws IOException{
+    // shortcut if the passed url is invalid.
+    if (null == uri){
+      logger.error(" unable to perform httpDelete as the passed uri is null.");
+      return null;
+    }
+
+    HttpDelete delete = new HttpDelete(uri);
+    return this.sendAndReturn((HttpDelete)completeRequest(delete, headerEntries));
+  }
+
+  /** function to perform a Put http request.
+   * @param uri   the URI of the request.
+   * @param headerEntries   extra entries to be added to request header.
+   * @param postingBody  the content to be posted , optional.
+   * @return the response object type of which is specified by user.
+   * @throws UnsupportedEncodingException, IOException */
+  public T httpPut(URI uri, List<NameValuePair> headerEntries,
+      String postingBody) throws UnsupportedEncodingException, IOException{
+    // shortcut if the passed url is invalid.
+    if (null == uri){
+      logger.error(" unable to perform httpPut as the passed url is null or empty.");
+      return null;
+    }
+
+    HttpPut put = new HttpPut(uri);
+    return this.sendAndReturn(completeRequest(put, headerEntries, postingBody));
+  }
+
+  /** function to dispatch the request and pass back the response.
+   * */
+  protected T sendAndReturn(HttpUriRequest request) throws IOException{
+    CloseableHttpClient client = HttpClients.createDefault();
+    try {
+      return this.parseResponse(client.execute(request));
+    }finally{
+      client.close();
+    }
+  }
+
+  /** helper function to build a valid URI.
+   *  @param host   host name.
+   *  @param port   host port.
+   *  @param path   extra path after host.
+   *  @param isHttp indicates if whether Http or HTTPS should be used.
+   *  @param params extra query parameters.
+   *  @return the URI built from the inputs.
+   *  @throws IOException
+   * */
+  public static URI BuildUri(String host, int port, String path,
+      boolean isHttp, Pair<String, String>... params) throws IOException{
+    URIBuilder builder = new URIBuilder();
+    builder.setScheme(isHttp? "http" : "https").setHost(host).setPort(port);
+
+    if (null != path && path.length() > 0){
+      builder.setPath(path);
+    }
+
+    if (params != null) {
+      for (Pair<String, String> pair : params) {
+        builder.setParameter(pair.getFirst(), pair.getSecond());
+      }
+    }
+
+    URI uri = null;
+    try {
+      uri = builder.build();
+    } catch (URISyntaxException e) {
+      throw new IOException(e);
+    }
+
+    return uri;
+  }
+
+  /** helper function to build a valid URI.
+   *  @param uri    the URI to start with.
+   *  @param params extra query parameters to append.
+   *  @return the URI built from the inputs.
+   *  @throws IOException
+   * */
+  public static URI BuildUri(URI uri, Pair<String, String>... params) throws IOException{
+    URIBuilder builder = new URIBuilder(uri);
+
+    if (params != null) {
+      for (Pair<String, String> pair : params) {
+        builder.setParameter(pair.getFirst(), pair.getSecond());
+      }
+    }
+
+    URI returningUri = null;
+    try {
+      returningUri = builder.build();
+    } catch (URISyntaxException e) {
+      throw new IOException(e);
+    }
+
+    return returningUri;
+  }
+
+  /** helper function to fill  the request with header entries .
+   * */
+  private static HttpMessage completeRequest(HttpMessage request,
+      List<NameValuePair> headerEntries){
+    if (null == request){
+      logger.error("unable to complete request as the passed request object is null");
+      return request;
+    }
+
+    // dump all the header entries to the request.
+    if (null != headerEntries && headerEntries.size() > 0){
+      for (NameValuePair pair : headerEntries){
+        request.addHeader(pair.getName(), pair.getValue());
+      }
+    }
+    return request;
+  }
+
+  /** helper function to fill  the request with header entries and posting body .
+   * */
+  private static HttpEntityEnclosingRequestBase completeRequest(HttpEntityEnclosingRequestBase request,
+      List<NameValuePair> headerEntries,
+      String postingBody) throws UnsupportedEncodingException{
+     if (null != completeRequest(request, headerEntries)){
+      // dump the post body UTF-8 will be used as the default encoding type.
+      if (null != postingBody && postingBody.length() > 0){
+        HttpEntity entity = new ByteArrayEntity(postingBody.getBytes("UTF-8"));
+        request.setHeader("Content-Length",  Long.toString(entity.getContentLength()));
+        request.setEntity(entity);
+      }
+    }
+    return request;
+  }
+}
diff --git a/azkaban-common/src/test/java/azkaban/utils/RestfulApiClientTest.java b/azkaban-common/src/test/java/azkaban/utils/RestfulApiClientTest.java
new file mode 100644
index 0000000..4a36132
--- /dev/null
+++ b/azkaban-common/src/test/java/azkaban/utils/RestfulApiClientTest.java
@@ -0,0 +1,237 @@
+/*
+ * Copyright 2015 LinkedIn Corp.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package azkaban.utils;
+
+import java.io.IOException;
+import java.net.URI;
+import java.util.ArrayList;
+
+import org.apache.http.Header;
+import org.apache.http.HttpEntity;
+import org.apache.http.HttpResponse;
+import org.apache.http.HttpResponseFactory;
+import org.apache.http.HttpStatus;
+import org.apache.http.HttpVersion;
+import org.apache.http.NameValuePair;
+import org.apache.http.StatusLine;
+import org.apache.http.client.HttpResponseException;
+import org.apache.http.client.methods.HttpEntityEnclosingRequestBase;
+import org.apache.http.client.methods.HttpUriRequest;
+import org.apache.http.entity.StringEntity;
+import org.apache.http.impl.DefaultHttpResponseFactory;
+import org.apache.http.message.BasicNameValuePair;
+import org.apache.http.message.BasicStatusLine;
+import org.apache.http.util.EntityUtils;
+import org.junit.Assert;
+import org.junit.Test;
+
+/**
+ *
+ */
+public class RestfulApiClientTest {
+
+  protected class MockRestfulApiClient extends RestfulApiClient<String> {
+    private int  status = HttpStatus.SC_OK;
+
+    @Override
+    protected String parseResponse(HttpResponse response) throws IOException {
+      final StatusLine statusLine = response.getStatusLine();
+      if (statusLine.getStatusCode() >= 300) {
+          throw new HttpResponseException(statusLine.getStatusCode(),
+                  statusLine.getReasonPhrase());
+      }
+      final HttpEntity entity = response.getEntity();
+      return entity == null ? null : EntityUtils.toString(entity);
+    }
+
+    public void setReturnStatus(int newStatus){
+      this.status = newStatus;
+    }
+
+    public void resetReturnStatus(){
+      this.status = HttpStatus.SC_OK;
+    }
+
+    @Override
+    protected String sendAndReturn(HttpUriRequest request) throws IOException{
+      HttpResponseFactory factory = new DefaultHttpResponseFactory();
+
+      HttpResponse response = factory.newHttpResponse(
+          new BasicStatusLine(HttpVersion.HTTP_1_1, this.status, null),null);
+
+      StringBuilder sb = new StringBuilder();
+      sb.append(String.format("%s = %s;", "METHOD", request.getMethod()));
+      sb.append(String.format("%s = %s;", "URI", request.getURI()));
+
+      if (request.getAllHeaders().length > 0){
+        sb.append("HEADER_EXISTS");
+      }
+
+      for (Header h : request.getAllHeaders()){
+        sb.append(String.format("%s = %s;", h.getName(), h.getValue()));
+      }
+
+      if (request instanceof HttpEntityEnclosingRequestBase){
+        HttpEntity entity = ((HttpEntityEnclosingRequestBase)request).getEntity();
+        if (entity != null){
+          sb.append("BODY_EXISTS");
+          sb.append(String.format("%s = %s;", "BODY", EntityUtils.toString(entity)));
+        }
+      }
+
+      response.setEntity(new StringEntity(sb.toString()));
+      return parseResponse(response);
+    }
+
+  }
+
+  @Test
+  public void testHttpGet() throws Exception {
+    MockRestfulApiClient mockClient = new MockRestfulApiClient();
+    @SuppressWarnings("unchecked")
+    URI uri = MockRestfulApiClient.BuildUri("test.com", 80, "test", true,
+        new Pair <String,String>("Entry1","Value1"));
+
+    String result = mockClient.httpGet(uri, null);
+    Assert.assertTrue(result!= null && result.contains(uri.toString()));
+    Assert.assertTrue(result.contains("METHOD = GET"));
+  }
+
+  @Test
+  public void testHttpGetWithHeaderItems() throws Exception {
+    MockRestfulApiClient mockClient = new MockRestfulApiClient();
+    @SuppressWarnings("unchecked")
+    URI uri = MockRestfulApiClient.BuildUri("test.com", 80, "test", true,
+        new Pair <String,String>("Entry1","Value1"));
+
+    ArrayList<NameValuePair> headerItems = new ArrayList<NameValuePair>();
+    headerItems.add(new BasicNameValuePair("h1","v1"));
+    headerItems.add(new BasicNameValuePair("h2","v2"));
+
+    String result = mockClient.httpGet(uri, headerItems);
+    Assert.assertTrue(result!= null && result.contains(uri.toString()));
+    Assert.assertTrue(result.contains("METHOD = GET"));
+    Assert.assertTrue(result.contains("h1 = v1"));
+    Assert.assertTrue(result.contains("h2 = v2"));
+  }
+
+  @Test
+  public void testHttpPost() throws Exception {
+    MockRestfulApiClient mockClient = new MockRestfulApiClient();
+    @SuppressWarnings("unchecked")
+    URI uri = MockRestfulApiClient.BuildUri("test.com", 80, "test", true,
+        new Pair <String,String>("Entry1","Value1"));
+
+    ArrayList<NameValuePair> headerItems = new ArrayList<NameValuePair>();
+    headerItems.add(new BasicNameValuePair("h1","v1"));
+    headerItems.add(new BasicNameValuePair("h2","v2"));
+
+    String content = "123456789";
+
+    String result = mockClient.httpPost(uri, headerItems,content);
+    Assert.assertTrue(result!= null && result.contains(uri.toString()));
+    Assert.assertTrue(result.contains("METHOD = POST"));
+    Assert.assertTrue(result.contains("h1 = v1"));
+    Assert.assertTrue(result.contains("h2 = v2"));
+    Assert.assertTrue(result.contains(String.format("%s = %s;", "BODY", content)));
+  }
+
+  @Test
+  public void testHttpPostWOBody() throws Exception {
+    MockRestfulApiClient mockClient = new MockRestfulApiClient();
+    @SuppressWarnings("unchecked")
+    URI uri = MockRestfulApiClient.BuildUri("test.com", 80, "test", true,
+        new Pair <String,String>("Entry1","Value1"));
+
+    String result = mockClient.httpPost(uri, null,null);
+    Assert.assertTrue(result!= null && result.contains(uri.toString()));
+    Assert.assertTrue(result.contains("METHOD = POST"));
+    Assert.assertFalse(result.contains("BODY_EXISTS"));
+    Assert.assertFalse(result.contains("HEADER_EXISTS"));
+  }
+
+  @Test
+  public void testHttpPut() throws Exception {
+    MockRestfulApiClient mockClient = new MockRestfulApiClient();
+    @SuppressWarnings("unchecked")
+    URI uri = MockRestfulApiClient.BuildUri("test.com", 80, "test", true,
+        new Pair <String,String>("Entry1","Value1"));
+
+    ArrayList<NameValuePair> headerItems = new ArrayList<NameValuePair>();
+    headerItems.add(new BasicNameValuePair("h1","v1"));
+    headerItems.add(new BasicNameValuePair("h2","v2"));
+
+    String content = "123456789";
+
+    String result = mockClient.httpPut(uri, headerItems,content);
+    Assert.assertTrue(result!= null && result.contains(uri.toString()));
+    Assert.assertTrue(result.contains("METHOD = PUT"));
+    Assert.assertTrue(result.contains("h1 = v1"));
+    Assert.assertTrue(result.contains("h2 = v2"));
+    Assert.assertTrue(result.contains(String.format("%s = %s;", "BODY", content)));
+  }
+
+  @Test
+  public void testContentLength() throws Exception {
+    MockRestfulApiClient mockClient = new MockRestfulApiClient();
+    @SuppressWarnings("unchecked")
+    URI uri = MockRestfulApiClient.BuildUri("test.com", 80, "test", true,
+        new Pair <String,String>("Entry1","Value1"));
+
+    String content = "123456789";
+
+    String result = mockClient.httpPut(uri, null,content);
+    Assert.assertTrue(result!= null && result.contains(uri.toString()));
+    Assert.assertTrue(result.contains("Content-Length = " + Integer.toString(content.length())));
+  }
+
+  @Test
+  public void testContentLengthOverride() throws Exception {
+    MockRestfulApiClient mockClient = new MockRestfulApiClient();
+    @SuppressWarnings("unchecked")
+    URI uri = MockRestfulApiClient.BuildUri("test.com", 80, "test", true,
+        new Pair <String,String>("Entry1","Value1"));
+
+    ArrayList<NameValuePair> headerItems = new ArrayList<NameValuePair>();
+    headerItems.add(new BasicNameValuePair("Content-Length","0"));
+
+    String content = "123456789";
+
+    String result = mockClient.httpPut(uri, headerItems,content);
+    Assert.assertTrue(result!= null && result.contains(uri.toString()));
+    Assert.assertEquals(result.lastIndexOf("Content-Length"),result.indexOf("Content-Length"));
+    Assert.assertTrue(result.contains("Content-Length = " + Integer.toString(content.length())));
+  }
+
+  @Test
+  public void testHttpDelete() throws Exception {
+    MockRestfulApiClient mockClient = new MockRestfulApiClient();
+    @SuppressWarnings("unchecked")
+    URI uri = MockRestfulApiClient.BuildUri("test.com", 80, "test", true,
+        new Pair <String,String>("Entry1","Value1"));
+
+    ArrayList<NameValuePair> headerItems = new ArrayList<NameValuePair>();
+    headerItems.add(new BasicNameValuePair("h1","v1"));
+    headerItems.add(new BasicNameValuePair("h2","v2"));
+
+    String result = mockClient.httpDelete(uri, headerItems);
+    Assert.assertTrue(result!= null && result.contains(uri.toString()));
+    Assert.assertTrue(result.contains("METHOD = DELETE"));
+    Assert.assertTrue(result.contains("h1 = v1"));
+    Assert.assertTrue(result.contains("h2 = v2"));
+  }
+}