azkaban-aplcache

Reportal tableau jobtype (#1932) This PR adds tableau

8/23/2018 8:59:54 PM

Details

diff --git a/az-reportal/src/main/java/azkaban/jobtype/ReportalTableauRunner.java b/az-reportal/src/main/java/azkaban/jobtype/ReportalTableauRunner.java
new file mode 100644
index 0000000..ec10334
--- /dev/null
+++ b/az-reportal/src/main/java/azkaban/jobtype/ReportalTableauRunner.java
@@ -0,0 +1,101 @@
+/*
+ * Copyright 2018 LinkedIn Corp.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package azkaban.jobtype;
+
+import azkaban.reportal.util.tableau.Countdown;
+import azkaban.reportal.util.tableau.Result;
+import azkaban.reportal.util.tableau.URLResponse;
+import java.time.Duration;
+import java.util.Properties;
+import java.util.concurrent.TimeUnit;
+import org.apache.log4j.Logger;
+
+public class ReportalTableauRunner extends ReportalAbstractRunner {
+
+  public static final String TIMEOUT = "tableau.timeout.minutes";
+  public static final String TABLEAU_URL = "tableau.url";
+  private static final Logger logger = Logger.getLogger(ReportalTableauRunner.class);
+  private final int timeout;
+  private final String tableauUrl;
+
+  public ReportalTableauRunner(final String jobName, final Properties props) {
+    super(props);
+    this.timeout = this.props.getInt(TIMEOUT);
+    this.tableauUrl = this.props.getString(TABLEAU_URL);
+  }
+
+  private void refreshExtract(final String tableauUrl, final String workbook) throws Exception {
+    final URLResponse urlResponse = new URLResponse(tableauUrl, URLResponse.Path.REFRESH_EXTRACT,
+        workbook);
+    logger.info(urlResponse.getContents());
+  }
+
+  private Result getLastExtractStatus(final String tableauUrl, final String workbook, final Duration
+      maxRunningDuration)
+      throws Exception {
+    final URLResponse urlResponse = new URLResponse(tableauUrl, URLResponse.Path
+        .LAST_EXTRACT_STATUS,
+        workbook);
+    final Countdown countdown = new Countdown(maxRunningDuration);
+
+    while (countdown.moreTimeRemaining()) {
+      urlResponse.refreshContents();
+      if (urlResponse.indicatesSuccess()) {
+        logger.info(urlResponse.getContents());
+        return (Result.SUCCESS);
+      } else if (urlResponse.indicatesError()) {
+        logger.error(urlResponse.getContents());
+        return (Result.FAIL);
+      }
+      TimeUnit.MINUTES.sleep(1);
+      countdown.countDownByOneMinute();
+      logger.info("Re-attempting connection with workbook " + workbook + ".");
+    }
+    return Result.TIMEOUT;
+  }
+
+
+  private void handleRefreshFailure(final Result result, final String workbook) throws Exception {
+    assert result == Result.FAIL || result == Result.TIMEOUT;
+    final String errorMsg = result == Result.FAIL ? "failed to extract status from workbook " +
+        workbook : "extract status from workbook " + workbook + " times out";
+    throw new Exception(errorMsg);
+  }
+
+  @Override
+  protected void runReportal() throws Exception {
+    final String workbook = this.jobQuery;
+    /**
+     * First refresh the extract
+     * once the status is found, log the results and cancel the job if
+     * the status was an error or a timeout
+     */
+    logger.info("Refreshing extract to workbook " + workbook);
+    refreshExtract(this.tableauUrl, workbook);
+    logger.info("Getting last extract status from workbook " + workbook + "\n"
+        + "Will wait for Tableau to refresh for up to " + this.timeout + " mins");
+
+    final Result result = getLastExtractStatus(this.tableauUrl, workbook, Duration.ofMinutes(
+        this.timeout));
+
+    logger.info("result:" + result.getMessage());
+    if (result == Result.FAIL || result == Result.TIMEOUT) {
+      handleRefreshFailure(result, workbook);
+    }
+  }
+
+}
diff --git a/az-reportal/src/main/java/azkaban/reportal/util/tableau/Countdown.java b/az-reportal/src/main/java/azkaban/reportal/util/tableau/Countdown.java
new file mode 100644
index 0000000..83bc4a0
--- /dev/null
+++ b/az-reportal/src/main/java/azkaban/reportal/util/tableau/Countdown.java
@@ -0,0 +1,26 @@
+package azkaban.reportal.util.tableau;
+
+import java.time.Duration;
+
+/**
+ * Countdown is a class used by Tableau Job to
+ * keep track of time as the Tableau extractions are
+ * being refreshed.
+ */
+public class Countdown {
+
+  private Duration duration;
+
+  public Countdown(final Duration duration) {
+    this.duration = duration;
+  }
+
+
+  public void countDownByOneMinute() throws InterruptedException {
+    this.duration = this.duration.minusMinutes(1);
+  }
+
+  public boolean moreTimeRemaining() {
+    return this.duration.toMillis() > 0;
+  }
+}
diff --git a/az-reportal/src/main/java/azkaban/reportal/util/tableau/Result.java b/az-reportal/src/main/java/azkaban/reportal/util/tableau/Result.java
new file mode 100644
index 0000000..9034d38
--- /dev/null
+++ b/az-reportal/src/main/java/azkaban/reportal/util/tableau/Result.java
@@ -0,0 +1,21 @@
+package azkaban.reportal.util.tableau;
+
+/**
+ * Result enum stores resulting information from
+ * the Tableau refresh
+ */
+public enum Result {
+  SUCCESS("SUCCESS"), FAIL("FAILURE"), TIMEOUT("TIMEOUT");
+
+  private final String message;
+
+  Result(final String resultMessage) {
+    this.message = "The refresh finished with status: " + resultMessage + ".\n"
+        + "See logs for more information.";
+  }
+
+  public String getMessage() {
+    return this.message;
+  }
+
+}
diff --git a/az-reportal/src/main/java/azkaban/reportal/util/tableau/URLResponse.java b/az-reportal/src/main/java/azkaban/reportal/util/tableau/URLResponse.java
new file mode 100644
index 0000000..5c5f6cd
--- /dev/null
+++ b/az-reportal/src/main/java/azkaban/reportal/util/tableau/URLResponse.java
@@ -0,0 +1,68 @@
+package azkaban.reportal.util.tableau;
+
+import com.google.common.annotations.VisibleForTesting;
+import java.io.IOException;
+import java.net.URL;
+import org.apache.commons.io.IOUtils;
+
+/**
+ * URL Response is a class used by Tableau Job to interact
+ * with the proxy server which interfaces with the Tableau
+ * server.
+ */
+public class URLResponse {
+
+  private final URL _url;
+  private String _urlContents;
+
+  public URLResponse(final String tableauUrl, final Path path, final String workbook) throws
+      Exception {
+    this._url = new URL(tableauUrl + "/" + path.getPath() + workbook);
+    refreshContents();
+  }
+
+  public void refreshContents() throws IOException {
+    this._urlContents = IOUtils.toString(this._url.openStream(), "UTF-8");
+  }
+
+  @VisibleForTesting
+  void setURLContents(final String urlContents) {
+    this._urlContents = urlContents;
+  }
+
+  public String getContents() {
+    return (this._urlContents);
+  }
+
+  private boolean indicates(final String word) {
+    if (this._urlContents == null) {
+      return false;
+    } else {
+      return this._urlContents.contains(word);
+    }
+  }
+
+  public boolean indicatesSuccess() {
+    return indicates("Success");
+  }
+
+  public boolean indicatesError() {
+    return indicates("Error");
+  }
+
+  public enum Path {
+    REFRESH_EXTRACT("tableau_refresh_extract?workbook="), LAST_EXTRACT_STATUS(
+        "tableau_last_extract_status?workbook=");
+
+    private final String path;
+
+    Path(final String pathContents) {
+      this.path = pathContents;
+    }
+
+    public String getPath() {
+      return (this.path);
+    }
+  }
+
+}
diff --git a/az-reportal/src/main/java/azkaban/viewer/reportal/ReportalType.java b/az-reportal/src/main/java/azkaban/viewer/reportal/ReportalType.java
index 1dd845a..1fe7e35 100644
--- a/az-reportal/src/main/java/azkaban/viewer/reportal/ReportalType.java
+++ b/az-reportal/src/main/java/azkaban/viewer/reportal/ReportalType.java
@@ -58,9 +58,10 @@ public enum ReportalType {
     }
   },
   HiveJob("ReportalHive", "reportalhive", "hadoop"), TeraDataJob(
-      "ReportalTeraData", "reportalteradata", "teradata"), DataCollectorJob(
-      ReportalTypeManager.DATA_COLLECTOR_JOB,
-      ReportalTypeManager.DATA_COLLECTOR_JOB_TYPE, "") {
+      "ReportalTeraData", "reportalteradata", "teradata"),
+  TableauJob("ReportalTableau", "reportaltableau", "hadoop"),
+  DataCollectorJob(
+      ReportalTypeManager.DATA_COLLECTOR_JOB, ReportalTypeManager.DATA_COLLECTOR_JOB_TYPE, "") {
     @Override
     public void buildJobFiles(final Reportal reportal, final Props propertiesFile,
         final File jobFile, final String jobName, final String queryScript,
diff --git a/az-reportal/src/main/resources/azkaban/viewer/reportal/reportaleditpage.vm b/az-reportal/src/main/resources/azkaban/viewer/reportal/reportaleditpage.vm
index 728f8a1..a40c484 100644
--- a/az-reportal/src/main/resources/azkaban/viewer/reportal/reportaleditpage.vm
+++ b/az-reportal/src/main/resources/azkaban/viewer/reportal/reportaleditpage.vm
@@ -66,6 +66,16 @@
         #end
       #end];
   </script>
+  <script type="text/javascript">
+    function change(select) {
+      if (select.value == "ReportalTableau") {
+        select.parentNode.parentNode.nextElementSibling.firstElementChild.innerHTML = "Workbook";
+      } else {
+        select.parentNode.parentNode.nextElementSibling.firstElementChild.innerHTML = "Script";
+      }
+    }
+  </script>
+
   <script type="text/javascript" src="${context}/reportal/js/reportal.js"></script>
   <script type="text/javascript" src="${context}/reportal/js/reportal-edit.js"></script>
   <link href="${context}/reportal/css/reportal.css" rel="stylesheet">
@@ -285,20 +295,25 @@
             <div class="control-group required">
               <label class="control-label">Type<abbr title="Required" class="required-mark">*</abbr></label>
               <div class="controls">
-                <select class="querytype" nametemplate="query#type">
-                  <option value="ReportalHive">Hive</option>
+                <select id="querytype" class="querytype" onChange="change(this)"
+                        nametemplate="query#type">
+                  <option id="ReportalHive" value="ReportalHive">Hive</option>
                   <option value="ReportalTeraData">Teradata</option>
                   <option value="ReportalPig">Pig</option>
+                  <option value="ReportalTableau">Tableau</option>
                 </select>
               </div>
             </div>
-            <div class="control-group">
-              <label class="control-label">Script</label>
+
+            <div id="query" class="control-group">
+              <label class="control-label" id="queryLabel">Script</label>
               <div class="controls"><textarea class="span8 queryscript"
                                               nametemplate="query#script"></textarea></div>
             </div>
+
           </li>
         </ol>
+
         <div class="control-group">
           <label class="control-label"></label>
           <div class="controls">
@@ -308,6 +323,7 @@
             </button>
           </div>
         </div>
+
       </fieldset>
       <fieldset>
         <legend>Access</legend>
diff --git a/az-reportal/src/test/java/azkaban/reportal/util/tableau/CountDownTest.java b/az-reportal/src/test/java/azkaban/reportal/util/tableau/CountDownTest.java
new file mode 100644
index 0000000..b0baff8
--- /dev/null
+++ b/az-reportal/src/test/java/azkaban/reportal/util/tableau/CountDownTest.java
@@ -0,0 +1,33 @@
+/*
+ * Copyright 2018 LinkedIn Corp.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package azkaban.reportal.util.tableau;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+import java.time.Duration;
+import org.junit.Test;
+
+public class CountDownTest {
+
+  @Test
+  public void testMoreTimeRemaining() throws InterruptedException {
+    final Countdown countDown = new Countdown(Duration.ofMinutes(1));
+    assertThat(countDown.moreTimeRemaining()).isTrue();
+    countDown.countDownByOneMinute();
+    assertThat(countDown.moreTimeRemaining()).isFalse();
+  }
+}
diff --git a/az-reportal/src/test/java/azkaban/reportal/util/tableau/ReportalTableauRunnerTest.java b/az-reportal/src/test/java/azkaban/reportal/util/tableau/ReportalTableauRunnerTest.java
new file mode 100644
index 0000000..5b5d5ce
--- /dev/null
+++ b/az-reportal/src/test/java/azkaban/reportal/util/tableau/ReportalTableauRunnerTest.java
@@ -0,0 +1,49 @@
+/*
+ * Copyright 2018 LinkedIn Corp.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package azkaban.reportal.util.tableau;
+
+import static azkaban.jobtype.ReportalTableauRunner.TABLEAU_URL;
+import static azkaban.jobtype.ReportalTableauRunner.TIMEOUT;
+import static org.assertj.core.api.Java6Assertions.assertThatThrownBy;
+
+import azkaban.jobtype.ReportalTableauRunner;
+import azkaban.utils.UndefinedPropertyException;
+import java.util.Properties;
+import org.junit.Test;
+
+public class ReportalTableauRunnerTest {
+
+  @Test
+  public void testNoTimeout() throws InterruptedException {
+    final Properties props = new Properties();
+    props.put(TABLEAU_URL, "test");
+    assertThatThrownBy(() -> {
+      new ReportalTableauRunner("tableau", props);
+    }).isInstanceOf
+        (UndefinedPropertyException.class);
+  }
+
+  @Test
+  public void testNoTableauUrl() throws InterruptedException {
+    final Properties props = new Properties();
+    props.put(TIMEOUT, "1");
+    assertThatThrownBy(() -> {
+      new ReportalTableauRunner("tableau", props);
+    }).isInstanceOf
+        (UndefinedPropertyException.class);
+  }
+}
diff --git a/az-reportal/src/test/java/azkaban/reportal/util/tableau/URLResponseTest.java b/az-reportal/src/test/java/azkaban/reportal/util/tableau/URLResponseTest.java
new file mode 100644
index 0000000..729e18b
--- /dev/null
+++ b/az-reportal/src/test/java/azkaban/reportal/util/tableau/URLResponseTest.java
@@ -0,0 +1,48 @@
+/*
+ * Copyright 2018 LinkedIn Corp.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package azkaban.reportal.util.tableau;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.mockito.ArgumentMatchers.anyString;
+import static org.mockito.Mockito.doCallRealMethod;
+import static org.mockito.Mockito.doNothing;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+import org.junit.Test;
+
+public class URLResponseTest {
+
+
+  @Test
+  public void testURLResponse() throws Exception {
+    final URLResponse urlResponse = mock(URLResponse.class);
+    doNothing().when(urlResponse).refreshContents();
+    when(urlResponse.indicatesSuccess()).thenCallRealMethod();
+    when(urlResponse.indicatesError()).thenCallRealMethod();
+    doCallRealMethod().when(urlResponse).setURLContents(anyString());
+
+    urlResponse.setURLContents("Success");
+    assertThat(urlResponse.indicatesSuccess()).isTrue();
+    assertThat(urlResponse.indicatesError()).isFalse();
+
+    urlResponse.setURLContents("Error");
+    assertThat(urlResponse.indicatesError()).isTrue();
+    assertThat(urlResponse.indicatesSuccess()).isFalse();
+  }
+
+}