diff --git a/azkaban-exec-server/src/main/java/azkaban/execapp/AzkabanExecutorServer.java b/azkaban-exec-server/src/main/java/azkaban/execapp/AzkabanExecutorServer.java
index 3dfc4c3..87925c8 100644
--- a/azkaban-exec-server/src/main/java/azkaban/execapp/AzkabanExecutorServer.java
+++ b/azkaban-exec-server/src/main/java/azkaban/execapp/AzkabanExecutorServer.java
@@ -80,15 +80,12 @@ import static com.google.common.base.Preconditions.checkState;
import static java.util.Objects.requireNonNull;
public class AzkabanExecutorServer {
- private static final String CUSTOM_JMX_ATTRIBUTE_PROCESSOR_PROPERTY =
- "jmx.attribute.processor.class";
- private static final Logger logger = Logger
- .getLogger(AzkabanExecutorServer.class);
+ private static final String CUSTOM_JMX_ATTRIBUTE_PROCESSOR_PROPERTY = "jmx.attribute.processor.class";
+ private static final Logger logger = Logger.getLogger(AzkabanExecutorServer.class);
private static final int MAX_FORM_CONTENT_SIZE = 10 * 1024 * 1024;
public static final String JOBTYPE_PLUGIN_DIR = "azkaban.jobtype.plugin.dir";
- public static final String METRIC_INTERVAL =
- "executor.metric.milisecinterval.";
+ public static final String METRIC_INTERVAL = "executor.metric.milisecinterval.";
public static final int DEFAULT_HEADER_BUFFER_SIZE = 4096;
private static final String DEFAULT_TIMEZONE_ID = "default.timezone.id";
@@ -319,10 +316,6 @@ public class AzkabanExecutorServer {
}
}
- public void stopServer() throws Exception {
- server.stop();
- server.destroy();
- }
public ProjectLoader getProjectLoader() {
return projectLoader;
@@ -391,13 +384,12 @@ public class AzkabanExecutorServer {
logger.info(("Exception when logging top memory consumers"), e);
}
- logger.info("Shutting down http server...");
+ logger.info("Shutting down...");
try {
- app.stopServer();
+ app.shutdownNow();
} catch (Exception e) {
logger.error("Error while shutting down http server.", e);
}
- logger.info("kk thx bye.");
}
public void logTopMemoryConsumers() throws Exception, IOException {
@@ -588,4 +580,51 @@ public class AzkabanExecutorServer {
public String getExecutorHostPort() {
return getHost() + ":" + getPort();
}
+
+ /**
+ * Shutdown the server.
+ * - performs a safe shutdown. Waits for completion of current tasks
+ * - spawns a shutdown thread and returns immediately.
+ */
+ public void shutdown() {
+ logger.warn("Shutting down AzkabanExecutorServer...");
+ new Thread(() -> {
+ try {
+ // Hack: Sleep for a little time to allow API calls to complete
+ Thread.sleep(2000);
+ } catch (InterruptedException e) {
+ logger.error(e);
+ }
+ shutdownInternal();
+ }, "shutdown").start();
+ }
+
+ /**
+ * (internal API)
+ * Note: This should be run in a separate thread.
+ *
+ * Shutdown the server. (blocking call)
+ * - waits for jobs to finish
+ * - doesn't accept any new jobs
+ */
+ private void shutdownInternal() {
+ getFlowRunnerManager().shutdown();
+ try {
+ shutdownNow();
+ logger.warn("Shutdown AzkabanExecutorServer complete");
+ } catch (Exception e) {
+ logger.error(e);
+ }
+ }
+
+ /**
+ * Shutdown the server now! (unsafe)
+ * @throws Exception
+ */
+ public void shutdownNow() throws Exception {
+ server.stop();
+ server.destroy();
+ SystemMemoryInfo.shutdown();
+ getFlowRunnerManager().shutdownNow();
+ }
}
diff --git a/azkaban-exec-server/src/main/java/azkaban/execapp/ExecutorServlet.java b/azkaban-exec-server/src/main/java/azkaban/execapp/ExecutorServlet.java
index 154cf12..0f87d72 100644
--- a/azkaban-exec-server/src/main/java/azkaban/execapp/ExecutorServlet.java
+++ b/azkaban-exec-server/src/main/java/azkaban/execapp/ExecutorServlet.java
@@ -111,6 +111,8 @@ public class ExecutorServlet extends HttpServlet implements ConnectorParams {
} else if (action.equals(DEACTIVATE)) {
logger.warn("Setting ACTIVE flag to false");
setActive(false, respMap);
+ } else if (action.equals(SHUTDOWN)) {
+ shutdown(respMap);
} else {
int execid = Integer.parseInt(getParam(req, EXECID_PARAM));
String user = getParam(req, USER_PARAM, null);
@@ -357,16 +359,42 @@ public class ExecutorServlet extends HttpServlet implements ConnectorParams {
private void setActive(boolean value, Map<String, Object> respMap)
throws ServletException {
try {
- ExecutorLoader executorLoader = application.getExecutorLoader();
- Executor executor = executorLoader.fetchExecutor(application.getHost(), application.getPort());
- Preconditions.checkState(executor != null, "Unable to obtain self entry in DB");
- if (executor.isActive() != value) {
- executor.setActive(value);
- executorLoader.updateExecutor(executor);
- flowRunnerManager.setActive(value);
- } else {
- logger.warn("Set active action ignored. Executor is already " + (value? "active" : "inactive"));
- }
+ setActiveInternal(value);
+ respMap.put(STATUS_PARAM, RESPONSE_SUCCESS);
+ } catch (Exception e) {
+ logger.error(e);
+ respMap.put(RESPONSE_ERROR, e.getMessage());
+ }
+ }
+
+ private void setActiveInternal(boolean value)
+ throws ExecutorManagerException {
+ ExecutorLoader executorLoader = application.getExecutorLoader();
+ Executor executor = executorLoader.fetchExecutor(application.getHost(), application.getPort());
+ Preconditions.checkState(executor != null, "Unable to obtain self entry in DB");
+ if (executor.isActive() != value) {
+ executor.setActive(value);
+ executorLoader.updateExecutor(executor);
+ flowRunnerManager.setActive(value);
+ } else {
+ logger.warn("Set active action ignored. Executor is already " + (value? "active" : "inactive"));
+ }
+ }
+
+ /**
+ * Prepare the executor for shutdown.
+ *
+ * @param respMap json response object
+ * @throws ServletException
+ */
+ private void shutdown(Map<String, Object> respMap)
+ throws ServletException {
+ try {
+ logger.warn("Shutting down executor...");
+
+ // Set the executor to inactive. Will receive no new flows.
+ setActiveInternal(false);
+ application.shutdown();
respMap.put(STATUS_PARAM, RESPONSE_SUCCESS);
} catch (Exception e) {
logger.error(e);
diff --git a/azkaban-exec-server/src/main/java/azkaban/execapp/FlowRunnerManager.java b/azkaban-exec-server/src/main/java/azkaban/execapp/FlowRunnerManager.java
index 074d1b1..af64bc7 100644
--- a/azkaban-exec-server/src/main/java/azkaban/execapp/FlowRunnerManager.java
+++ b/azkaban-exec-server/src/main/java/azkaban/execapp/FlowRunnerManager.java
@@ -45,7 +45,6 @@ import azkaban.execapp.event.RemoteFlowWatcher;
import azkaban.execapp.metric.NumFailedFlowMetric;
import azkaban.executor.ExecutableFlow;
import azkaban.executor.ExecutionOptions;
-import azkaban.executor.Executor;
import azkaban.executor.ExecutorLoader;
import azkaban.executor.ExecutorManagerException;
import azkaban.jobtype.JobTypeManager;
@@ -88,53 +87,47 @@ import azkaban.utils.TrackingThreadPool;
*/
public class FlowRunnerManager implements EventListener,
ThreadPoolExecutingListener {
- private static final String EXECUTOR_USE_BOUNDED_THREADPOOL_QUEUE =
- "executor.use.bounded.threadpool.queue";
- private static final String EXECUTOR_THREADPOOL_WORKQUEUE_SIZE =
- "executor.threadpool.workqueue.size";
+ private static final Logger logger = Logger.getLogger(FlowRunnerManager.class);
+
+ private static final String EXECUTOR_USE_BOUNDED_THREADPOOL_QUEUE = "executor.use.bounded.threadpool.queue";
+ private static final String EXECUTOR_THREADPOOL_WORKQUEUE_SIZE = "executor.threadpool.workqueue.size";
private static final String EXECUTOR_FLOW_THREADS = "executor.flow.threads";
private static final String FLOW_NUM_JOB_THREADS = "flow.num.job.threads";
- private static Logger logger = Logger.getLogger(FlowRunnerManager.class);
- private File executionDirectory;
- private File projectDirectory;
// recently finished secs to clean up. 1 minute
- private static final long RECENTLY_FINISHED_TIME_TO_LIVE = 60 * 1000;
+ private static final int RECENTLY_FINISHED_TIME_TO_LIVE = 60 * 1000;
private static final int DEFAULT_NUM_EXECUTING_FLOWS = 30;
private static final int DEFAULT_FLOW_NUM_JOB_TREADS = 10;
- private Map<Pair<Integer, Integer>, ProjectVersion> installedProjects =
- new ConcurrentHashMap<Pair<Integer, Integer>, ProjectVersion>();
// this map is used to store the flows that have been submitted to
// the executor service. Once a flow has been submitted, it is either
// in the queue waiting to be executed or in executing state.
- private Map<Future<?>, Integer> submittedFlows =
- new ConcurrentHashMap<Future<?>, Integer>();
- private Map<Integer, FlowRunner> runningFlows =
- new ConcurrentHashMap<Integer, FlowRunner>();
- private Map<Integer, ExecutableFlow> recentlyFinishedFlows =
- new ConcurrentHashMap<Integer, ExecutableFlow>();
-
- private int numThreads = DEFAULT_NUM_EXECUTING_FLOWS;
- private int threadPoolQueueSize = -1;
+ private final Map<Future<?>, Integer> submittedFlows = new ConcurrentHashMap<>();
+ private final Map<Integer, FlowRunner> runningFlows = new ConcurrentHashMap<>();
+ private final Map<Integer, ExecutableFlow> recentlyFinishedFlows = new ConcurrentHashMap<>();
+ private final Map<Pair<Integer, Integer>, ProjectVersion> installedProjects;
- private TrackingThreadPool executorService;
+ private final TrackingThreadPool executorService;
+ private final CleanerThread cleanerThread;
+ private final ExecutorLoader executorLoader;
+ private final ProjectLoader projectLoader;
+ private final JobTypeManager jobtypeManager;
- private CleanerThread cleanerThread;
- private int numJobThreadPerFlow = DEFAULT_FLOW_NUM_JOB_TREADS;
-
- private ExecutorLoader executorLoader;
- private ProjectLoader projectLoader;
+ private final Props azkabanProps;
+ private final File executionDirectory;
+ private final File projectDirectory;
- private JobTypeManager jobtypeManager;
+ private final Object executionDirDeletionSync = new Object();
- private Props globalProps = null;
+ private int numThreads = DEFAULT_NUM_EXECUTING_FLOWS;
+ private int threadPoolQueueSize = -1;
+ private int numJobThreadPerFlow = DEFAULT_FLOW_NUM_JOB_TREADS;
- private final Props azkabanProps;
+ private Props globalProps;
private long lastCleanerThreadCheckTime = -1;
- private long executionDirRetention = 1 * 24 * 60 * 60 * 1000;
+ private long executionDirRetention = 1 * 24 * 60 * 60 * 1000; // 1 Day
// We want to limit the log sizes to about 20 megs
private String jobLogChunkSize = "5MB";
@@ -143,8 +136,6 @@ public class FlowRunnerManager implements EventListener,
// If true, jobs will validate proxy user against a list of valid proxy users.
private boolean validateProxyUser = false;
- private Object executionDirDeletionSync = new Object();
-
// date time of the the last flow submitted.
private long lastFlowSubmittedDate = 0;
@@ -152,24 +143,18 @@ public class FlowRunnerManager implements EventListener,
private volatile boolean isActive = false;
public FlowRunnerManager(Props props, ExecutorLoader executorLoader,
- ProjectLoader projectLoader, ClassLoader parentClassLoader)
- throws IOException {
- executionDirectory =
- new File(props.getString("azkaban.execution.dir", "executions"));
- projectDirectory =
- new File(props.getString("azkaban.project.dir", "projects"));
-
+ ProjectLoader projectLoader, ClassLoader parentClassLoader) throws IOException {
azkabanProps = props;
// JobWrappingFactory.init(props, getClass().getClassLoader());
- executionDirRetention =
- props.getLong("execution.dir.retention", executionDirRetention);
- logger.info("Execution dir retention set to " + executionDirRetention
- + " ms");
+ executionDirRetention = props.getLong("execution.dir.retention", executionDirRetention);
+ logger.info("Execution dir retention set to " + executionDirRetention + " ms");
+ executionDirectory = new File(props.getString("azkaban.execution.dir", "executions"));
if (!executionDirectory.exists()) {
executionDirectory.mkdirs();
}
+ projectDirectory = new File(props.getString("azkaban.project.dir", "projects"));
if (!projectDirectory.exists()) {
projectDirectory.mkdirs();
}
@@ -177,10 +162,8 @@ public class FlowRunnerManager implements EventListener,
installedProjects = loadExistingProjects();
// azkaban.temp.dir
- numThreads =
- props.getInt(EXECUTOR_FLOW_THREADS, DEFAULT_NUM_EXECUTING_FLOWS);
- numJobThreadPerFlow =
- props.getInt(FLOW_NUM_JOB_THREADS, DEFAULT_FLOW_NUM_JOB_TREADS);
+ numThreads = props.getInt(EXECUTOR_FLOW_THREADS, DEFAULT_NUM_EXECUTING_FLOWS);
+ numJobThreadPerFlow = props.getInt(FLOW_NUM_JOB_THREADS, DEFAULT_FLOW_NUM_JOB_TREADS);
executorService = createExecutorService(numThreads);
this.executorLoader = executorLoader;
@@ -189,16 +172,14 @@ public class FlowRunnerManager implements EventListener,
this.jobLogChunkSize = azkabanProps.getString("job.log.chunk.size", "5MB");
this.jobLogNumFiles = azkabanProps.getInt("job.log.backup.index", 4);
- this.validateProxyUser =
- azkabanProps.getBoolean("proxy.user.lock.down", false);
+ this.validateProxyUser = azkabanProps.getBoolean("proxy.user.lock.down", false);
setActive(true);
cleanerThread = new CleanerThread();
cleanerThread.start();
- String globalPropsPath =
- props.getString("executor.global.properties", null);
+ String globalPropsPath = props.getString("executor.global.properties", null);
if (globalPropsPath != null) {
globalProps = new Props(null, globalPropsPath);
}
@@ -299,6 +280,7 @@ public class FlowRunnerManager implements EventListener,
public CleanerThread() {
this.setName("FlowRunnerManager-Cleaner-Thread");
+ setDaemon(true);
}
@SuppressWarnings("unused")
@@ -312,6 +294,7 @@ public class FlowRunnerManager implements EventListener,
synchronized (this) {
try {
lastCleanerThreadCheckTime = System.currentTimeMillis();
+ logger.info("# of executing flows: " + getNumRunningFlows());
// Cleanup old stuff.
long currentTime = System.currentTimeMillis();
@@ -349,15 +332,7 @@ public class FlowRunnerManager implements EventListener,
final long pastTimeThreshold =
System.currentTimeMillis() - executionDirRetention;
- File[] executionDirs = dir.listFiles(new FileFilter() {
- @Override
- public boolean accept(File path) {
- if (path.isDirectory() && path.lastModified() < pastTimeThreshold) {
- return true;
- }
- return false;
- }
- });
+ File[] executionDirs = dir.listFiles(path -> path.isDirectory() && path.lastModified() < pastTimeThreshold);
for (File exDir : executionDirs) {
try {
@@ -890,4 +865,31 @@ public class FlowRunnerManager implements EventListener,
submittedFlows.remove(r);
}
+ /**
+ * This shuts down the flow runner. The call is blocking and awaits execution of all jobs.
+ */
+ public void shutdown() {
+ logger.warn("Shutting down FlowRunnerManager...");
+ executorService.shutdown();
+ boolean result = false;
+ while (!result) {
+ logger.info("Awaiting Shutdown. # of executing flows: " + getNumRunningFlows());
+ try {
+ result = executorService.awaitTermination(1, TimeUnit.MINUTES);
+ } catch (InterruptedException e) {
+ logger.error(e);
+ }
+ }
+ logger.warn("Shutdown FlowRunnerManager complete.");
+ }
+
+ /**
+ * This attempts shuts down the flow runner immediately (unsafe).
+ * This doesn't wait for jobs to finish but interrupts all threads.
+ */
+ public void shutdownNow() {
+ logger.warn("Shutting down FlowRunnerManager now...");
+ executorService.shutdownNow();
+ }
+
}