azkaban-aplcache

Executor REST API for safe shutdown (#849) * Added Shutdown

12/15/2016 11:22:00 PM

Details

diff --git a/azkaban-common/src/main/java/azkaban/database/DataSourceUtils.java b/azkaban-common/src/main/java/azkaban/database/DataSourceUtils.java
index 8267104..74fd38e 100644
--- a/azkaban-common/src/main/java/azkaban/database/DataSourceUtils.java
+++ b/azkaban-common/src/main/java/azkaban/database/DataSourceUtils.java
@@ -169,6 +169,7 @@ public class DataSourceUtils {
 
       public MonitorThread(MySQLBasicDataSource mysqlSource) {
         this.setName("MySQL-DB-Monitor-Thread");
+        setDaemon(true);
         dataSource = mysqlSource;
       }
 
diff --git a/azkaban-common/src/main/java/azkaban/executor/ConnectorParams.java b/azkaban-common/src/main/java/azkaban/executor/ConnectorParams.java
index 5ac1a44..d13a0f1 100644
--- a/azkaban-common/src/main/java/azkaban/executor/ConnectorParams.java
+++ b/azkaban-common/src/main/java/azkaban/executor/ConnectorParams.java
@@ -37,6 +37,7 @@ public interface ConnectorParams {
   public static final String ACTIVATE = "activate";
   public static final String DEACTIVATE = "deactivate";
   public static final String GET_STATUS = "getStatus";
+  public static final String SHUTDOWN = "shutdown";
 
   public static final String MODIFY_EXECUTION_ACTION = "modifyExecution";
   public static final String MODIFY_EXECUTION_ACTION_TYPE = "modifyType";
diff --git a/azkaban-common/src/main/java/azkaban/utils/SystemMemoryInfo.java b/azkaban-common/src/main/java/azkaban/utils/SystemMemoryInfo.java
index fc474db..7b633e9 100644
--- a/azkaban-common/src/main/java/azkaban/utils/SystemMemoryInfo.java
+++ b/azkaban-common/src/main/java/azkaban/utils/SystemMemoryInfo.java
@@ -170,4 +170,11 @@ public class SystemMemoryInfo {
       }
     }
   }
-}
\ No newline at end of file
+
+  public static void shutdown() {
+    logger.warn("Shutting down SystemMemoryInfo...");
+    if (scheduledExecutorService != null) {
+      scheduledExecutorService.shutdown();
+    }
+  }
+}
diff --git a/azkaban-exec-server/src/main/java/azkaban/execapp/AzkabanExecutorServer.java b/azkaban-exec-server/src/main/java/azkaban/execapp/AzkabanExecutorServer.java
index 3dfc4c3..87925c8 100644
--- a/azkaban-exec-server/src/main/java/azkaban/execapp/AzkabanExecutorServer.java
+++ b/azkaban-exec-server/src/main/java/azkaban/execapp/AzkabanExecutorServer.java
@@ -80,15 +80,12 @@ import static com.google.common.base.Preconditions.checkState;
 import static java.util.Objects.requireNonNull;
 
 public class AzkabanExecutorServer {
-  private static final String CUSTOM_JMX_ATTRIBUTE_PROCESSOR_PROPERTY =
-      "jmx.attribute.processor.class";
-  private static final Logger logger = Logger
-      .getLogger(AzkabanExecutorServer.class);
+  private static final String CUSTOM_JMX_ATTRIBUTE_PROCESSOR_PROPERTY = "jmx.attribute.processor.class";
+  private static final Logger logger = Logger.getLogger(AzkabanExecutorServer.class);
   private static final int MAX_FORM_CONTENT_SIZE = 10 * 1024 * 1024;
 
   public static final String JOBTYPE_PLUGIN_DIR = "azkaban.jobtype.plugin.dir";
-  public static final String METRIC_INTERVAL =
-      "executor.metric.milisecinterval.";
+  public static final String METRIC_INTERVAL = "executor.metric.milisecinterval.";
   public static final int DEFAULT_HEADER_BUFFER_SIZE = 4096;
 
   private static final String DEFAULT_TIMEZONE_ID = "default.timezone.id";
@@ -319,10 +316,6 @@ public class AzkabanExecutorServer {
     }
   }
 
-  public void stopServer() throws Exception {
-    server.stop();
-    server.destroy();
-  }
 
   public ProjectLoader getProjectLoader() {
     return projectLoader;
@@ -391,13 +384,12 @@ public class AzkabanExecutorServer {
           logger.info(("Exception when logging top memory consumers"), e);
         }
 
-        logger.info("Shutting down http server...");
+        logger.info("Shutting down...");
         try {
-          app.stopServer();
+          app.shutdownNow();
         } catch (Exception e) {
           logger.error("Error while shutting down http server.", e);
         }
-        logger.info("kk thx bye.");
       }
 
       public void logTopMemoryConsumers() throws Exception, IOException {
@@ -588,4 +580,51 @@ public class AzkabanExecutorServer {
   public String getExecutorHostPort() {
     return getHost() + ":" + getPort();
   }
+
+  /**
+   * Shutdown the server.
+   *  - performs a safe shutdown. Waits for completion of current tasks
+   *  - spawns a shutdown thread and returns immediately.
+   */
+  public void shutdown() {
+    logger.warn("Shutting down AzkabanExecutorServer...");
+    new Thread(() -> {
+      try {
+        // Hack: Sleep for a little time to allow API calls to complete
+        Thread.sleep(2000);
+      } catch (InterruptedException e) {
+        logger.error(e);
+      }
+      shutdownInternal();
+    }, "shutdown").start();
+  }
+
+  /**
+   * (internal API)
+   * Note: This should be run in a separate thread.
+   *
+   * Shutdown the server. (blocking call)
+   *  - waits for jobs to finish
+   *  - doesn't accept any new jobs
+   */
+  private void shutdownInternal() {
+    getFlowRunnerManager().shutdown();
+    try {
+      shutdownNow();
+      logger.warn("Shutdown AzkabanExecutorServer complete");
+    } catch (Exception e) {
+      logger.error(e);
+    }
+  }
+
+  /**
+   * Shutdown the server now! (unsafe)
+   * @throws Exception
+   */
+  public void shutdownNow() throws Exception {
+    server.stop();
+    server.destroy();
+    SystemMemoryInfo.shutdown();
+    getFlowRunnerManager().shutdownNow();
+  }
 }
diff --git a/azkaban-exec-server/src/main/java/azkaban/execapp/ExecutorServlet.java b/azkaban-exec-server/src/main/java/azkaban/execapp/ExecutorServlet.java
index 154cf12..0f87d72 100644
--- a/azkaban-exec-server/src/main/java/azkaban/execapp/ExecutorServlet.java
+++ b/azkaban-exec-server/src/main/java/azkaban/execapp/ExecutorServlet.java
@@ -111,6 +111,8 @@ public class ExecutorServlet extends HttpServlet implements ConnectorParams {
         } else if (action.equals(DEACTIVATE)) {
           logger.warn("Setting ACTIVE flag to false");
           setActive(false, respMap);
+        } else if (action.equals(SHUTDOWN)) {
+          shutdown(respMap);
         } else {
           int execid = Integer.parseInt(getParam(req, EXECID_PARAM));
           String user = getParam(req, USER_PARAM, null);
@@ -357,16 +359,42 @@ public class ExecutorServlet extends HttpServlet implements ConnectorParams {
   private void setActive(boolean value, Map<String, Object> respMap)
       throws ServletException {
     try {
-      ExecutorLoader executorLoader = application.getExecutorLoader();
-      Executor executor = executorLoader.fetchExecutor(application.getHost(), application.getPort());
-      Preconditions.checkState(executor != null, "Unable to obtain self entry in DB");
-      if (executor.isActive() != value) {
-        executor.setActive(value);
-        executorLoader.updateExecutor(executor);
-        flowRunnerManager.setActive(value);
-      } else {
-        logger.warn("Set active action ignored. Executor is already " + (value? "active" : "inactive"));
-      }
+      setActiveInternal(value);
+      respMap.put(STATUS_PARAM, RESPONSE_SUCCESS);
+    } catch (Exception e) {
+      logger.error(e);
+      respMap.put(RESPONSE_ERROR, e.getMessage());
+    }
+  }
+
+  private void setActiveInternal(boolean value)
+      throws ExecutorManagerException {
+    ExecutorLoader executorLoader = application.getExecutorLoader();
+    Executor executor = executorLoader.fetchExecutor(application.getHost(), application.getPort());
+    Preconditions.checkState(executor != null, "Unable to obtain self entry in DB");
+    if (executor.isActive() != value) {
+      executor.setActive(value);
+      executorLoader.updateExecutor(executor);
+      flowRunnerManager.setActive(value);
+    } else {
+      logger.warn("Set active action ignored. Executor is already " + (value? "active" : "inactive"));
+    }
+  }
+
+  /**
+   * Prepare the executor for shutdown.
+   *
+   * @param respMap json response object
+   * @throws ServletException
+   */
+  private void shutdown(Map<String, Object> respMap)
+      throws ServletException {
+    try {
+      logger.warn("Shutting down executor...");
+
+      // Set the executor to inactive. Will receive no new flows.
+      setActiveInternal(false);
+      application.shutdown();
       respMap.put(STATUS_PARAM, RESPONSE_SUCCESS);
     } catch (Exception e) {
       logger.error(e);
diff --git a/azkaban-exec-server/src/main/java/azkaban/execapp/FlowRunnerManager.java b/azkaban-exec-server/src/main/java/azkaban/execapp/FlowRunnerManager.java
index 074d1b1..af64bc7 100644
--- a/azkaban-exec-server/src/main/java/azkaban/execapp/FlowRunnerManager.java
+++ b/azkaban-exec-server/src/main/java/azkaban/execapp/FlowRunnerManager.java
@@ -45,7 +45,6 @@ import azkaban.execapp.event.RemoteFlowWatcher;
 import azkaban.execapp.metric.NumFailedFlowMetric;
 import azkaban.executor.ExecutableFlow;
 import azkaban.executor.ExecutionOptions;
-import azkaban.executor.Executor;
 import azkaban.executor.ExecutorLoader;
 import azkaban.executor.ExecutorManagerException;
 import azkaban.jobtype.JobTypeManager;
@@ -88,53 +87,47 @@ import azkaban.utils.TrackingThreadPool;
  */
 public class FlowRunnerManager implements EventListener,
     ThreadPoolExecutingListener {
-  private static final String EXECUTOR_USE_BOUNDED_THREADPOOL_QUEUE =
-      "executor.use.bounded.threadpool.queue";
-  private static final String EXECUTOR_THREADPOOL_WORKQUEUE_SIZE =
-      "executor.threadpool.workqueue.size";
+  private static final Logger logger = Logger.getLogger(FlowRunnerManager.class);
+
+  private static final String EXECUTOR_USE_BOUNDED_THREADPOOL_QUEUE = "executor.use.bounded.threadpool.queue";
+  private static final String EXECUTOR_THREADPOOL_WORKQUEUE_SIZE = "executor.threadpool.workqueue.size";
   private static final String EXECUTOR_FLOW_THREADS = "executor.flow.threads";
   private static final String FLOW_NUM_JOB_THREADS = "flow.num.job.threads";
-  private static Logger logger = Logger.getLogger(FlowRunnerManager.class);
-  private File executionDirectory;
-  private File projectDirectory;
 
   // recently finished secs to clean up. 1 minute
-  private static final long RECENTLY_FINISHED_TIME_TO_LIVE = 60 * 1000;
+  private static final int RECENTLY_FINISHED_TIME_TO_LIVE = 60 * 1000;
 
   private static final int DEFAULT_NUM_EXECUTING_FLOWS = 30;
   private static final int DEFAULT_FLOW_NUM_JOB_TREADS = 10;
 
-  private Map<Pair<Integer, Integer>, ProjectVersion> installedProjects =
-      new ConcurrentHashMap<Pair<Integer, Integer>, ProjectVersion>();
   // this map is used to store the flows that have been submitted to
   // the executor service. Once a flow has been submitted, it is either
   // in the queue waiting to be executed or in executing state.
-  private Map<Future<?>, Integer> submittedFlows =
-      new ConcurrentHashMap<Future<?>, Integer>();
-  private Map<Integer, FlowRunner> runningFlows =
-      new ConcurrentHashMap<Integer, FlowRunner>();
-  private Map<Integer, ExecutableFlow> recentlyFinishedFlows =
-      new ConcurrentHashMap<Integer, ExecutableFlow>();
-
-  private int numThreads = DEFAULT_NUM_EXECUTING_FLOWS;
-  private int threadPoolQueueSize = -1;
+  private final Map<Future<?>, Integer> submittedFlows = new ConcurrentHashMap<>();
+  private final Map<Integer, FlowRunner> runningFlows = new ConcurrentHashMap<>();
+  private final Map<Integer, ExecutableFlow> recentlyFinishedFlows = new ConcurrentHashMap<>();
+  private final Map<Pair<Integer, Integer>, ProjectVersion> installedProjects;
 
-  private TrackingThreadPool executorService;
+  private final TrackingThreadPool executorService;
+  private final CleanerThread cleanerThread;
+  private final ExecutorLoader executorLoader;
+  private final ProjectLoader projectLoader;
+  private final JobTypeManager jobtypeManager;
 
-  private CleanerThread cleanerThread;
-  private int numJobThreadPerFlow = DEFAULT_FLOW_NUM_JOB_TREADS;
-
-  private ExecutorLoader executorLoader;
-  private ProjectLoader projectLoader;
+  private final Props azkabanProps;
+  private final File executionDirectory;
+  private final File projectDirectory;
 
-  private JobTypeManager jobtypeManager;
+  private final Object executionDirDeletionSync = new Object();
 
-  private Props globalProps = null;
+  private int numThreads = DEFAULT_NUM_EXECUTING_FLOWS;
+  private int threadPoolQueueSize = -1;
+  private int numJobThreadPerFlow = DEFAULT_FLOW_NUM_JOB_TREADS;
 
-  private final Props azkabanProps;
+  private Props globalProps;
 
   private long lastCleanerThreadCheckTime = -1;
-  private long executionDirRetention = 1 * 24 * 60 * 60 * 1000;
+  private long executionDirRetention = 1 * 24 * 60 * 60 * 1000; // 1 Day
 
   // We want to limit the log sizes to about 20 megs
   private String jobLogChunkSize = "5MB";
@@ -143,8 +136,6 @@ public class FlowRunnerManager implements EventListener,
   // If true, jobs will validate proxy user against a list of valid proxy users.
   private boolean validateProxyUser = false;
 
-  private Object executionDirDeletionSync = new Object();
-
   // date time of the the last flow submitted.
   private long lastFlowSubmittedDate = 0;
 
@@ -152,24 +143,18 @@ public class FlowRunnerManager implements EventListener,
   private volatile boolean isActive = false;
 
   public FlowRunnerManager(Props props, ExecutorLoader executorLoader,
-      ProjectLoader projectLoader, ClassLoader parentClassLoader)
-      throws IOException {
-    executionDirectory =
-        new File(props.getString("azkaban.execution.dir", "executions"));
-    projectDirectory =
-        new File(props.getString("azkaban.project.dir", "projects"));
-
+      ProjectLoader projectLoader, ClassLoader parentClassLoader) throws IOException {
     azkabanProps = props;
 
     // JobWrappingFactory.init(props, getClass().getClassLoader());
-    executionDirRetention =
-        props.getLong("execution.dir.retention", executionDirRetention);
-    logger.info("Execution dir retention set to " + executionDirRetention
-        + " ms");
+    executionDirRetention = props.getLong("execution.dir.retention", executionDirRetention);
+    logger.info("Execution dir retention set to " + executionDirRetention + " ms");
 
+    executionDirectory = new File(props.getString("azkaban.execution.dir", "executions"));
     if (!executionDirectory.exists()) {
       executionDirectory.mkdirs();
     }
+    projectDirectory = new File(props.getString("azkaban.project.dir", "projects"));
     if (!projectDirectory.exists()) {
       projectDirectory.mkdirs();
     }
@@ -177,10 +162,8 @@ public class FlowRunnerManager implements EventListener,
     installedProjects = loadExistingProjects();
 
     // azkaban.temp.dir
-    numThreads =
-        props.getInt(EXECUTOR_FLOW_THREADS, DEFAULT_NUM_EXECUTING_FLOWS);
-    numJobThreadPerFlow =
-        props.getInt(FLOW_NUM_JOB_THREADS, DEFAULT_FLOW_NUM_JOB_TREADS);
+    numThreads = props.getInt(EXECUTOR_FLOW_THREADS, DEFAULT_NUM_EXECUTING_FLOWS);
+    numJobThreadPerFlow = props.getInt(FLOW_NUM_JOB_THREADS, DEFAULT_FLOW_NUM_JOB_TREADS);
     executorService = createExecutorService(numThreads);
 
     this.executorLoader = executorLoader;
@@ -189,16 +172,14 @@ public class FlowRunnerManager implements EventListener,
     this.jobLogChunkSize = azkabanProps.getString("job.log.chunk.size", "5MB");
     this.jobLogNumFiles = azkabanProps.getInt("job.log.backup.index", 4);
 
-    this.validateProxyUser =
-        azkabanProps.getBoolean("proxy.user.lock.down", false);
+    this.validateProxyUser = azkabanProps.getBoolean("proxy.user.lock.down", false);
 
     setActive(true);
 
     cleanerThread = new CleanerThread();
     cleanerThread.start();
 
-    String globalPropsPath =
-        props.getString("executor.global.properties", null);
+    String globalPropsPath = props.getString("executor.global.properties", null);
     if (globalPropsPath != null) {
       globalProps = new Props(null, globalPropsPath);
     }
@@ -299,6 +280,7 @@ public class FlowRunnerManager implements EventListener,
 
     public CleanerThread() {
       this.setName("FlowRunnerManager-Cleaner-Thread");
+      setDaemon(true);
     }
 
     @SuppressWarnings("unused")
@@ -312,6 +294,7 @@ public class FlowRunnerManager implements EventListener,
         synchronized (this) {
           try {
             lastCleanerThreadCheckTime = System.currentTimeMillis();
+            logger.info("# of executing flows: " + getNumRunningFlows());
 
             // Cleanup old stuff.
             long currentTime = System.currentTimeMillis();
@@ -349,15 +332,7 @@ public class FlowRunnerManager implements EventListener,
 
       final long pastTimeThreshold =
           System.currentTimeMillis() - executionDirRetention;
-      File[] executionDirs = dir.listFiles(new FileFilter() {
-        @Override
-        public boolean accept(File path) {
-          if (path.isDirectory() && path.lastModified() < pastTimeThreshold) {
-            return true;
-          }
-          return false;
-        }
-      });
+      File[] executionDirs = dir.listFiles(path -> path.isDirectory() && path.lastModified() < pastTimeThreshold);
 
       for (File exDir : executionDirs) {
         try {
@@ -890,4 +865,31 @@ public class FlowRunnerManager implements EventListener,
     submittedFlows.remove(r);
   }
 
+  /**
+   * This shuts down the flow runner. The call is blocking and awaits execution of all jobs.
+   */
+  public void shutdown() {
+    logger.warn("Shutting down FlowRunnerManager...");
+    executorService.shutdown();
+    boolean result = false;
+    while (!result) {
+      logger.info("Awaiting Shutdown. # of executing flows: " + getNumRunningFlows());
+      try {
+        result = executorService.awaitTermination(1, TimeUnit.MINUTES);
+      } catch (InterruptedException e) {
+        logger.error(e);
+      }
+    }
+    logger.warn("Shutdown FlowRunnerManager complete.");
+  }
+
+  /**
+   * This attempts shuts down the flow runner immediately (unsafe).
+   * This doesn't wait for jobs to finish but interrupts all threads.
+   */
+  public void shutdownNow() {
+    logger.warn("Shutting down FlowRunnerManager now...");
+    executorService.shutdownNow();
+  }
+
 }