azkaban-aplcache

Refactor DagService shutdown method and add tests (#1765) *

5/22/2018 12:29:58 PM

Details

diff --git a/az-core/src/main/java/azkaban/utils/ExecutorServiceUtils.java b/az-core/src/main/java/azkaban/utils/ExecutorServiceUtils.java
new file mode 100644
index 0000000..2cf46d4
--- /dev/null
+++ b/az-core/src/main/java/azkaban/utils/ExecutorServiceUtils.java
@@ -0,0 +1,59 @@
+/*
+ * Copyright 2018 LinkedIn Corp.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package azkaban.utils;
+
+import java.time.Duration;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.TimeUnit;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Executor service related utilities.
+ */
+public class ExecutorServiceUtils {
+
+  private static final Logger logger = LoggerFactory.getLogger(ExecutorServiceUtils.class);
+  private static final TimeUnit MILLI_SECONDS_TIME_UNIT = TimeUnit.MILLISECONDS;
+
+  /**
+   * Gracefully shuts down the given executor service.
+   *
+   * <p>Adopted from
+   * <a href="https://docs.oracle.com/javase/7/docs/api/java/util/concurrent/ExecutorService.html">
+   * the Oracle JAVA Documentation.
+   * </a>
+   *
+   * @param service the service to shutdown
+   * @param timeout max wait time for the tasks to shutdown. Note that the max wait time is 2
+   *     times this value due to the two stages shutdown strategy.
+   * @throws InterruptedException if the current thread is interrupted
+   */
+  public void gracefulShutdown(final ExecutorService service, final Duration timeout)
+      throws InterruptedException {
+    service.shutdown(); // Disable new tasks from being submitted
+    final long timeout_in_unit_of_miliseconds = timeout.toMillis();
+    // Wait a while for existing tasks to terminate
+    if (!service.awaitTermination(timeout_in_unit_of_miliseconds, MILLI_SECONDS_TIME_UNIT)) {
+      service.shutdownNow(); // Cancel currently executing tasks
+      // Wait a while for tasks to respond to being cancelled
+      if (!service.awaitTermination(timeout_in_unit_of_miliseconds, MILLI_SECONDS_TIME_UNIT)) {
+        logger.error("The executor service did not terminate.");
+      }
+    }
+  }
+}
diff --git a/az-core/src/test/java/azkaban/utils/ExecutorServiceUtilsTest.java b/az-core/src/test/java/azkaban/utils/ExecutorServiceUtilsTest.java
new file mode 100644
index 0000000..f95e536
--- /dev/null
+++ b/az-core/src/test/java/azkaban/utils/ExecutorServiceUtilsTest.java
@@ -0,0 +1,86 @@
+/*
+ * Copyright 2018 LinkedIn Corp.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package azkaban.utils;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.catchThrowable;
+
+import java.time.Duration;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.RejectedExecutionException;
+import org.junit.Test;
+
+@SuppressWarnings("FutureReturnValueIgnored")
+public class ExecutorServiceUtilsTest {
+
+  private final ExecutorServiceUtils executorServiceUtils = new ExecutorServiceUtils();
+
+  @Test
+  public void gracefulShutdown() throws InterruptedException {
+    // given
+    final ExecutorService service = Executors.newSingleThreadExecutor();
+
+    // when
+    this.executorServiceUtils.gracefulShutdown(service, Duration.ofMillis(1));
+
+    // then
+    assertThat(service.isShutdown()).isTrue();
+  }
+
+  @Test
+  public void force_shutdown_after_timeout() throws InterruptedException {
+    // given
+    final ExecutorService service = Executors.newSingleThreadExecutor();
+
+    // when
+    service.submit(this::sleep);
+    final long beginShutdownTime = System.currentTimeMillis();
+    this.executorServiceUtils.gracefulShutdown(service, Duration.ofMillis(1));
+    final long endShutdownTime = System.currentTimeMillis();
+
+    // then
+    assertThat(service.isShutdown()).isTrue();
+    final long shutdownDuration = endShutdownTime - beginShutdownTime;
+    // Give some buffer for overhead to reduce false positives.
+    assertThat(shutdownDuration).isLessThan(10);
+  }
+
+  @Test
+  public void can_not_submit_tasks_after_shutdown() throws
+      InterruptedException {
+    // given
+    final ExecutorService service = Executors.newSingleThreadExecutor();
+    service.submit(this::sleep);
+
+    // when
+    this.executorServiceUtils.gracefulShutdown(service, Duration.ofMillis(1));
+    final Throwable thrown = catchThrowable(() -> {
+      service.submit(this::sleep);
+    });
+
+    // then
+    assertThat(thrown).isInstanceOf(RejectedExecutionException.class);
+  }
+
+  private void sleep() {
+    try {
+      Thread.sleep(1000);
+    } catch (final InterruptedException ex) {
+    }
+  }
+}
diff --git a/azkaban-exec-server/src/main/java/azkaban/dag/DagService.java b/azkaban-exec-server/src/main/java/azkaban/dag/DagService.java
index be2fda1..0088c36 100644
--- a/azkaban-exec-server/src/main/java/azkaban/dag/DagService.java
+++ b/azkaban-exec-server/src/main/java/azkaban/dag/DagService.java
@@ -16,11 +16,14 @@
 
 package azkaban.dag;
 
+import azkaban.utils.ExecutorServiceUtils;
+import com.google.common.annotations.VisibleForTesting;
 import com.google.common.util.concurrent.ThreadFactoryBuilder;
+import java.time.Duration;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.ThreadFactory;
-import java.util.concurrent.TimeUnit;
+import javax.inject.Inject;
 import javax.inject.Singleton;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -35,13 +38,16 @@ import org.slf4j.LoggerFactory;
 @Singleton
 class DagService {
 
-  private static final long SHUTDOWN_WAIT_TIMEOUT = 60;
+  private static final Duration SHUTDOWN_WAIT_TIMEOUT = Duration.ofSeconds(10);
   private static final Logger logger = LoggerFactory.getLogger(DagService.class);
 
+  private final ExecutorServiceUtils executorServiceUtils;
   private final ExecutorService executorService;
 
-  DagService() {
+  @Inject
+  DagService(final ExecutorServiceUtils executorServiceUtils) {
     // Give the thread a name to make debugging easier.
+    this.executorServiceUtils = executorServiceUtils;
     final ThreadFactory namedThreadFactory = new ThreadFactoryBuilder()
         .setNameFormat("Dag-service").build();
     this.executorService = Executors.newSingleThreadExecutor(namedThreadFactory);
@@ -81,28 +87,14 @@ class DagService {
 
   /**
    * Shuts down the service and waits for the tasks to finish.
-   *
-   * <p>Adopted from
-   * <a href="https://docs.oracle.com/javase/7/docs/api/java/util/concurrent/ExecutorService.html">
-   *   the Oracle JAVA Documentation.
-   * </a>
    */
-  void shutdownAndAwaitTermination() {
-    this.executorService.shutdown(); // Disable new tasks from being submitted
-    try {
-      // Wait a while for existing tasks to terminate
-      if (!this.executorService.awaitTermination(SHUTDOWN_WAIT_TIMEOUT, TimeUnit.SECONDS)) {
-        this.executorService.shutdownNow(); // Cancel currently executing tasks
-        // Wait a while for tasks to respond to being cancelled
-        if (!this.executorService.awaitTermination(SHUTDOWN_WAIT_TIMEOUT, TimeUnit.SECONDS)) {
-          logger.error("The DagService did not terminate.");
-        }
-      }
-    } catch (final InterruptedException ie) {
-      // (Re-)Cancel if current thread also interrupted
-      this.executorService.shutdownNow();
-      // Preserve interrupt status
-      Thread.currentThread().interrupt();
-    }
+  void shutdownAndAwaitTermination() throws InterruptedException {
+    logger.info("DagService is shutting down.");
+    this.executorServiceUtils.gracefulShutdown(this.executorService, SHUTDOWN_WAIT_TIMEOUT);
+  }
+
+  @VisibleForTesting
+  ExecutorService getExecutorService() {
+    return this.executorService;
   }
 }
diff --git a/azkaban-exec-server/src/test/java/azkaban/dag/DagServiceTest.java b/azkaban-exec-server/src/test/java/azkaban/dag/DagServiceTest.java
index 474e4c7..33cbcb7 100644
--- a/azkaban-exec-server/src/test/java/azkaban/dag/DagServiceTest.java
+++ b/azkaban-exec-server/src/test/java/azkaban/dag/DagServiceTest.java
@@ -17,12 +17,17 @@
 package azkaban.dag;
 
 import static org.assertj.core.api.Assertions.assertThat;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.verify;
 
+import azkaban.utils.ExecutorServiceUtils;
+import java.time.Duration;
 import java.util.ArrayList;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Set;
 import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutorService;
 import java.util.concurrent.TimeUnit;
 import javafx.util.Pair;
 import org.junit.After;
@@ -37,7 +42,7 @@ import org.junit.Test;
  */
 public class DagServiceTest {
 
-  private final DagService dagService = new DagService();
+  private final DagService dagService = new DagService(new ExecutorServiceUtils());
   private final StatusChangeRecorder statusChangeRecorder = new StatusChangeRecorder();
 
   // The names of the nodes that are supposed to fail.
@@ -52,10 +57,24 @@ public class DagServiceTest {
 
 
   @After
-  public void tearDown() {
+  public void tearDown() throws InterruptedException {
     this.dagService.shutdownAndAwaitTermination();
   }
 
+  @Test
+  public void shutdown_calls_service_util_graceful_shutdown() throws InterruptedException {
+    // given
+    final ExecutorServiceUtils serviceUtils = mock(ExecutorServiceUtils.class);
+    final DagService testDagService = new DagService(serviceUtils);
+
+    // when
+    testDagService.shutdownAndAwaitTermination();
+
+    // then
+    final ExecutorService exService = testDagService.getExecutorService();
+    verify(serviceUtils).gracefulShutdown(exService, Duration.ofSeconds(10));
+  }
+
   /**
    * Tests a DAG with one node which will run successfully.
    */