Details
diff --git a/az-core/src/main/java/azkaban/utils/ExecutorServiceUtils.java b/az-core/src/main/java/azkaban/utils/ExecutorServiceUtils.java
new file mode 100644
index 0000000..2cf46d4
--- /dev/null
+++ b/az-core/src/main/java/azkaban/utils/ExecutorServiceUtils.java
@@ -0,0 +1,59 @@
+/*
+ * Copyright 2018 LinkedIn Corp.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package azkaban.utils;
+
+import java.time.Duration;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.TimeUnit;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Executor service related utilities.
+ */
+public class ExecutorServiceUtils {
+
+ private static final Logger logger = LoggerFactory.getLogger(ExecutorServiceUtils.class);
+ private static final TimeUnit MILLI_SECONDS_TIME_UNIT = TimeUnit.MILLISECONDS;
+
+ /**
+ * Gracefully shuts down the given executor service.
+ *
+ * <p>Adopted from
+ * <a href="https://docs.oracle.com/javase/7/docs/api/java/util/concurrent/ExecutorService.html">
+ * the Oracle JAVA Documentation.
+ * </a>
+ *
+ * @param service the service to shutdown
+ * @param timeout max wait time for the tasks to shutdown. Note that the max wait time is 2
+ * times this value due to the two stages shutdown strategy.
+ * @throws InterruptedException if the current thread is interrupted
+ */
+ public void gracefulShutdown(final ExecutorService service, final Duration timeout)
+ throws InterruptedException {
+ service.shutdown(); // Disable new tasks from being submitted
+ final long timeout_in_unit_of_miliseconds = timeout.toMillis();
+ // Wait a while for existing tasks to terminate
+ if (!service.awaitTermination(timeout_in_unit_of_miliseconds, MILLI_SECONDS_TIME_UNIT)) {
+ service.shutdownNow(); // Cancel currently executing tasks
+ // Wait a while for tasks to respond to being cancelled
+ if (!service.awaitTermination(timeout_in_unit_of_miliseconds, MILLI_SECONDS_TIME_UNIT)) {
+ logger.error("The executor service did not terminate.");
+ }
+ }
+ }
+}
diff --git a/az-core/src/test/java/azkaban/utils/ExecutorServiceUtilsTest.java b/az-core/src/test/java/azkaban/utils/ExecutorServiceUtilsTest.java
new file mode 100644
index 0000000..f95e536
--- /dev/null
+++ b/az-core/src/test/java/azkaban/utils/ExecutorServiceUtilsTest.java
@@ -0,0 +1,86 @@
+/*
+ * Copyright 2018 LinkedIn Corp.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package azkaban.utils;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.catchThrowable;
+
+import java.time.Duration;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.RejectedExecutionException;
+import org.junit.Test;
+
+@SuppressWarnings("FutureReturnValueIgnored")
+public class ExecutorServiceUtilsTest {
+
+ private final ExecutorServiceUtils executorServiceUtils = new ExecutorServiceUtils();
+
+ @Test
+ public void gracefulShutdown() throws InterruptedException {
+ // given
+ final ExecutorService service = Executors.newSingleThreadExecutor();
+
+ // when
+ this.executorServiceUtils.gracefulShutdown(service, Duration.ofMillis(1));
+
+ // then
+ assertThat(service.isShutdown()).isTrue();
+ }
+
+ @Test
+ public void force_shutdown_after_timeout() throws InterruptedException {
+ // given
+ final ExecutorService service = Executors.newSingleThreadExecutor();
+
+ // when
+ service.submit(this::sleep);
+ final long beginShutdownTime = System.currentTimeMillis();
+ this.executorServiceUtils.gracefulShutdown(service, Duration.ofMillis(1));
+ final long endShutdownTime = System.currentTimeMillis();
+
+ // then
+ assertThat(service.isShutdown()).isTrue();
+ final long shutdownDuration = endShutdownTime - beginShutdownTime;
+ // Give some buffer for overhead to reduce false positives.
+ assertThat(shutdownDuration).isLessThan(10);
+ }
+
+ @Test
+ public void can_not_submit_tasks_after_shutdown() throws
+ InterruptedException {
+ // given
+ final ExecutorService service = Executors.newSingleThreadExecutor();
+ service.submit(this::sleep);
+
+ // when
+ this.executorServiceUtils.gracefulShutdown(service, Duration.ofMillis(1));
+ final Throwable thrown = catchThrowable(() -> {
+ service.submit(this::sleep);
+ });
+
+ // then
+ assertThat(thrown).isInstanceOf(RejectedExecutionException.class);
+ }
+
+ private void sleep() {
+ try {
+ Thread.sleep(1000);
+ } catch (final InterruptedException ex) {
+ }
+ }
+}
diff --git a/azkaban-exec-server/src/main/java/azkaban/dag/DagService.java b/azkaban-exec-server/src/main/java/azkaban/dag/DagService.java
index be2fda1..0088c36 100644
--- a/azkaban-exec-server/src/main/java/azkaban/dag/DagService.java
+++ b/azkaban-exec-server/src/main/java/azkaban/dag/DagService.java
@@ -16,11 +16,14 @@
package azkaban.dag;
+import azkaban.utils.ExecutorServiceUtils;
+import com.google.common.annotations.VisibleForTesting;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
+import java.time.Duration;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadFactory;
-import java.util.concurrent.TimeUnit;
+import javax.inject.Inject;
import javax.inject.Singleton;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -35,13 +38,16 @@ import org.slf4j.LoggerFactory;
@Singleton
class DagService {
- private static final long SHUTDOWN_WAIT_TIMEOUT = 60;
+ private static final Duration SHUTDOWN_WAIT_TIMEOUT = Duration.ofSeconds(10);
private static final Logger logger = LoggerFactory.getLogger(DagService.class);
+ private final ExecutorServiceUtils executorServiceUtils;
private final ExecutorService executorService;
- DagService() {
+ @Inject
+ DagService(final ExecutorServiceUtils executorServiceUtils) {
// Give the thread a name to make debugging easier.
+ this.executorServiceUtils = executorServiceUtils;
final ThreadFactory namedThreadFactory = new ThreadFactoryBuilder()
.setNameFormat("Dag-service").build();
this.executorService = Executors.newSingleThreadExecutor(namedThreadFactory);
@@ -81,28 +87,14 @@ class DagService {
/**
* Shuts down the service and waits for the tasks to finish.
- *
- * <p>Adopted from
- * <a href="https://docs.oracle.com/javase/7/docs/api/java/util/concurrent/ExecutorService.html">
- * the Oracle JAVA Documentation.
- * </a>
*/
- void shutdownAndAwaitTermination() {
- this.executorService.shutdown(); // Disable new tasks from being submitted
- try {
- // Wait a while for existing tasks to terminate
- if (!this.executorService.awaitTermination(SHUTDOWN_WAIT_TIMEOUT, TimeUnit.SECONDS)) {
- this.executorService.shutdownNow(); // Cancel currently executing tasks
- // Wait a while for tasks to respond to being cancelled
- if (!this.executorService.awaitTermination(SHUTDOWN_WAIT_TIMEOUT, TimeUnit.SECONDS)) {
- logger.error("The DagService did not terminate.");
- }
- }
- } catch (final InterruptedException ie) {
- // (Re-)Cancel if current thread also interrupted
- this.executorService.shutdownNow();
- // Preserve interrupt status
- Thread.currentThread().interrupt();
- }
+ void shutdownAndAwaitTermination() throws InterruptedException {
+ logger.info("DagService is shutting down.");
+ this.executorServiceUtils.gracefulShutdown(this.executorService, SHUTDOWN_WAIT_TIMEOUT);
+ }
+
+ @VisibleForTesting
+ ExecutorService getExecutorService() {
+ return this.executorService;
}
}
diff --git a/azkaban-exec-server/src/test/java/azkaban/dag/DagServiceTest.java b/azkaban-exec-server/src/test/java/azkaban/dag/DagServiceTest.java
index 474e4c7..33cbcb7 100644
--- a/azkaban-exec-server/src/test/java/azkaban/dag/DagServiceTest.java
+++ b/azkaban-exec-server/src/test/java/azkaban/dag/DagServiceTest.java
@@ -17,12 +17,17 @@
package azkaban.dag;
import static org.assertj.core.api.Assertions.assertThat;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.verify;
+import azkaban.utils.ExecutorServiceUtils;
+import java.time.Duration;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;
import javafx.util.Pair;
import org.junit.After;
@@ -37,7 +42,7 @@ import org.junit.Test;
*/
public class DagServiceTest {
- private final DagService dagService = new DagService();
+ private final DagService dagService = new DagService(new ExecutorServiceUtils());
private final StatusChangeRecorder statusChangeRecorder = new StatusChangeRecorder();
// The names of the nodes that are supposed to fail.
@@ -52,10 +57,24 @@ public class DagServiceTest {
@After
- public void tearDown() {
+ public void tearDown() throws InterruptedException {
this.dagService.shutdownAndAwaitTermination();
}
+ @Test
+ public void shutdown_calls_service_util_graceful_shutdown() throws InterruptedException {
+ // given
+ final ExecutorServiceUtils serviceUtils = mock(ExecutorServiceUtils.class);
+ final DagService testDagService = new DagService(serviceUtils);
+
+ // when
+ testDagService.shutdownAndAwaitTermination();
+
+ // then
+ final ExecutorService exService = testDagService.getExecutorService();
+ verify(serviceUtils).gracefulShutdown(exService, Duration.ofSeconds(10));
+ }
+
/**
* Tests a DAG with one node which will run successfully.
*/