diff --git a/azkaban-exec-server/src/main/java/azkaban/execapp/ExecutorServlet.java b/azkaban-exec-server/src/main/java/azkaban/execapp/ExecutorServlet.java
index 0136a93..536f89c 100644
--- a/azkaban-exec-server/src/main/java/azkaban/execapp/ExecutorServlet.java
+++ b/azkaban-exec-server/src/main/java/azkaban/execapp/ExecutorServlet.java
@@ -47,6 +47,7 @@ public class ExecutorServlet extends HttpServlet implements ConnectorParams {
public static final String JSON_MIME_TYPE = "application/json";
private static final Logger logger = Logger.getLogger(ExecutorServlet.class
.getName());
+ private static final long serialVersionUID = -3528600004096666451L;
private AzkabanExecutorServer application;
private FlowRunnerManager flowRunnerManager;
@@ -96,55 +97,55 @@ public class ExecutorServlet extends HttpServlet implements ConnectorParams {
throws IOException {
final HashMap<String, Object> respMap = new HashMap<>();
try {
- if (!hasParam(req, ACTION_PARAM)) {
+ if (!hasParam(req, ConnectorParams.ACTION_PARAM)) {
logger.error("Parameter action not set");
respMap.put("error", "Parameter action not set");
} else {
- final String action = getParam(req, ACTION_PARAM);
- if (action.equals(UPDATE_ACTION)) {
+ final String action = getParam(req, ConnectorParams.ACTION_PARAM);
+ if (action.equals(ConnectorParams.UPDATE_ACTION)) {
handleAjaxUpdateRequest(req, respMap);
- } else if (action.equals(PING_ACTION)) {
- respMap.put(STATUS_PARAM, RESPONSE_ALIVE);
- } else if (action.equals(RELOAD_JOBTYPE_PLUGINS_ACTION)) {
+ } else if (action.equals(ConnectorParams.PING_ACTION)) {
+ respMap.put(ConnectorParams.STATUS_PARAM, ConnectorParams.RESPONSE_ALIVE);
+ } else if (action.equals(ConnectorParams.RELOAD_JOBTYPE_PLUGINS_ACTION)) {
logger.info("Reloading Jobtype plugins");
handleReloadJobTypePlugins(respMap);
- } else if (action.equals(ACTIVATE)) {
+ } else if (action.equals(ConnectorParams.ACTIVATE)) {
logger.warn("Setting ACTIVE flag to true");
setActive(true, respMap);
- } else if (action.equals(GET_STATUS)) {
+ } else if (action.equals(ConnectorParams.GET_STATUS)) {
logger.debug("Get Executor Status: ");
getStatus(respMap);
- } else if (action.equals(DEACTIVATE)) {
+ } else if (action.equals(ConnectorParams.DEACTIVATE)) {
logger.warn("Setting ACTIVE flag to false");
setActive(false, respMap);
- } else if (action.equals(SHUTDOWN)) {
+ } else if (action.equals(ConnectorParams.SHUTDOWN)) {
shutdown(respMap);
} else {
- final int execid = Integer.parseInt(getParam(req, EXECID_PARAM));
- final String user = getParam(req, USER_PARAM, null);
+ final int execid = Integer.parseInt(getParam(req, ConnectorParams.EXECID_PARAM));
+ final String user = getParam(req, ConnectorParams.USER_PARAM, null);
logger.info("User " + user + " has called action " + action + " on "
+ execid);
- if (action.equals(METADATA_ACTION)) {
+ if (action.equals(ConnectorParams.METADATA_ACTION)) {
handleFetchMetaDataEvent(execid, req, resp, respMap);
- } else if (action.equals(LOG_ACTION)) {
+ } else if (action.equals(ConnectorParams.LOG_ACTION)) {
handleFetchLogEvent(execid, req, resp, respMap);
- } else if (action.equals(ATTACHMENTS_ACTION)) {
+ } else if (action.equals(ConnectorParams.ATTACHMENTS_ACTION)) {
handleFetchAttachmentsEvent(execid, req, resp, respMap);
- } else if (action.equals(EXECUTE_ACTION)) {
+ } else if (action.equals(ConnectorParams.EXECUTE_ACTION)) {
handleAjaxExecute(req, respMap, execid);
- } else if (action.equals(STATUS_ACTION)) {
+ } else if (action.equals(ConnectorParams.STATUS_ACTION)) {
handleAjaxFlowStatus(respMap, execid);
- } else if (action.equals(CANCEL_ACTION)) {
+ } else if (action.equals(ConnectorParams.CANCEL_ACTION)) {
logger.info("Cancel called.");
handleAjaxCancel(respMap, execid, user);
- } else if (action.equals(PAUSE_ACTION)) {
+ } else if (action.equals(ConnectorParams.PAUSE_ACTION)) {
logger.info("Paused called.");
handleAjaxPause(respMap, execid, user);
- } else if (action.equals(RESUME_ACTION)) {
+ } else if (action.equals(ConnectorParams.RESUME_ACTION)) {
logger.info("Resume called.");
handleAjaxResume(respMap, execid, user);
- } else if (action.equals(MODIFY_EXECUTION_ACTION)) {
+ } else if (action.equals(ConnectorParams.MODIFY_EXECUTION_ACTION)) {
logger.info("Modify Execution Action");
handleModifyExecutionRequest(respMap, execid, user, req);
} else {
@@ -155,7 +156,7 @@ public class ExecutorServlet extends HttpServlet implements ConnectorParams {
}
} catch (final Exception e) {
logger.error(e.getMessage(), e);
- respMap.put(RESPONSE_ERROR, e.getMessage());
+ respMap.put(ConnectorParams.RESPONSE_ERROR, e.getMessage());
}
writeJSON(resp, respMap);
resp.flushBuffer();
@@ -163,13 +164,13 @@ public class ExecutorServlet extends HttpServlet implements ConnectorParams {
private void handleModifyExecutionRequest(final Map<String, Object> respMap,
final int execId, final String user, final HttpServletRequest req) throws ServletException {
- if (!hasParam(req, MODIFY_EXECUTION_ACTION_TYPE)) {
- respMap.put(RESPONSE_ERROR, "Modification type not set.");
+ if (!hasParam(req, ConnectorParams.MODIFY_EXECUTION_ACTION_TYPE)) {
+ respMap.put(ConnectorParams.RESPONSE_ERROR, "Modification type not set.");
}
- final String modificationType = getParam(req, MODIFY_EXECUTION_ACTION_TYPE);
+ final String modificationType = getParam(req, ConnectorParams.MODIFY_EXECUTION_ACTION_TYPE);
try {
- if (MODIFY_RETRY_FAILURES.equals(modificationType)) {
+ if (ConnectorParams.MODIFY_RETRY_FAILURES.equals(modificationType)) {
this.flowRunnerManager.retryFailures(execId, user);
}
} catch (final ExecutorManagerException e) {
@@ -195,7 +196,7 @@ public class ExecutorServlet extends HttpServlet implements ConnectorParams {
respMap.putAll(result.toObject());
} catch (final Exception e) {
logger.error(e.getMessage(), e);
- respMap.put(RESPONSE_ERROR, e.getMessage());
+ respMap.put(ConnectorParams.RESPONSE_ERROR, e.getMessage());
}
} else {
final int attempt = getIntParam(req, "attempt", 0);
@@ -254,10 +255,10 @@ public class ExecutorServlet extends HttpServlet implements ConnectorParams {
final Map<String, Object> respMap) throws ServletException, IOException {
final ArrayList<Object> updateTimesList =
(ArrayList<Object>) JSONUtils.parseJSONFromString(getParam(req,
- UPDATE_TIME_LIST_PARAM));
+ ConnectorParams.UPDATE_TIME_LIST_PARAM));
final ArrayList<Object> execIDList =
(ArrayList<Object>) JSONUtils.parseJSONFromString(getParam(req,
- EXEC_ID_LIST_PARAM));
+ ConnectorParams.EXEC_ID_LIST_PARAM));
final ArrayList<Object> updateList = new ArrayList<>();
for (int i = 0; i < execIDList.size(); ++i) {
@@ -267,8 +268,8 @@ public class ExecutorServlet extends HttpServlet implements ConnectorParams {
final ExecutableFlowBase flow = this.flowRunnerManager.getExecutableFlow(execId);
if (flow == null) {
final Map<String, Object> errorResponse = new HashMap<>();
- errorResponse.put(RESPONSE_ERROR, "Flow does not exist");
- errorResponse.put(UPDATE_MAP_EXEC_ID, execId);
+ errorResponse.put(ConnectorParams.RESPONSE_ERROR, "Flow does not exist");
+ errorResponse.put(ConnectorParams.UPDATE_MAP_EXEC_ID, execId);
updateList.add(errorResponse);
continue;
}
@@ -278,7 +279,7 @@ public class ExecutorServlet extends HttpServlet implements ConnectorParams {
}
}
- respMap.put(RESPONSE_UPDATED_FLOWS, updateList);
+ respMap.put(ConnectorParams.RESPONSE_UPDATED_FLOWS, updateList);
}
private void handleAjaxExecute(final HttpServletRequest req,
@@ -287,90 +288,90 @@ public class ExecutorServlet extends HttpServlet implements ConnectorParams {
this.flowRunnerManager.submitFlow(execId);
} catch (final ExecutorManagerException e) {
logger.error(e.getMessage(), e);
- respMap.put(RESPONSE_ERROR, e.getMessage());
+ respMap.put(ConnectorParams.RESPONSE_ERROR, e.getMessage());
}
}
private void handleAjaxFlowStatus(final Map<String, Object> respMap, final int execid) {
final ExecutableFlowBase flow = this.flowRunnerManager.getExecutableFlow(execid);
if (flow == null) {
- respMap.put(STATUS_PARAM, RESPONSE_NOTFOUND);
+ respMap.put(ConnectorParams.STATUS_PARAM, ConnectorParams.RESPONSE_NOTFOUND);
} else {
- respMap.put(STATUS_PARAM, flow.getStatus().toString());
- respMap.put(RESPONSE_UPDATETIME, flow.getUpdateTime());
+ respMap.put(ConnectorParams.STATUS_PARAM, flow.getStatus().toString());
+ respMap.put(ConnectorParams.RESPONSE_UPDATETIME, flow.getUpdateTime());
}
}
private void handleAjaxPause(final Map<String, Object> respMap, final int execid,
final String user) {
if (user == null) {
- respMap.put(RESPONSE_ERROR, "user has not been set");
+ respMap.put(ConnectorParams.RESPONSE_ERROR, "user has not been set");
return;
}
try {
this.flowRunnerManager.pauseFlow(execid, user);
- respMap.put(STATUS_PARAM, RESPONSE_SUCCESS);
+ respMap.put(ConnectorParams.STATUS_PARAM, ConnectorParams.RESPONSE_SUCCESS);
} catch (final ExecutorManagerException e) {
logger.error(e.getMessage(), e);
- respMap.put(RESPONSE_ERROR, e.getMessage());
+ respMap.put(ConnectorParams.RESPONSE_ERROR, e.getMessage());
}
}
private void handleAjaxResume(final Map<String, Object> respMap, final int execid,
final String user) throws ServletException {
if (user == null) {
- respMap.put(RESPONSE_ERROR, "user has not been set");
+ respMap.put(ConnectorParams.RESPONSE_ERROR, "user has not been set");
return;
}
try {
this.flowRunnerManager.resumeFlow(execid, user);
- respMap.put(STATUS_PARAM, RESPONSE_SUCCESS);
+ respMap.put(ConnectorParams.STATUS_PARAM, ConnectorParams.RESPONSE_SUCCESS);
} catch (final ExecutorManagerException e) {
logger.error(e.getMessage(), e);
- respMap.put(RESPONSE_ERROR, e.getMessage());
+ respMap.put(ConnectorParams.RESPONSE_ERROR, e.getMessage());
}
}
private void handleAjaxCancel(final Map<String, Object> respMap, final int execid,
final String user) {
if (user == null) {
- respMap.put(RESPONSE_ERROR, "user has not been set");
+ respMap.put(ConnectorParams.RESPONSE_ERROR, "user has not been set");
return;
}
try {
this.flowRunnerManager.cancelFlow(execid, user);
- respMap.put(STATUS_PARAM, RESPONSE_SUCCESS);
+ respMap.put(ConnectorParams.STATUS_PARAM, ConnectorParams.RESPONSE_SUCCESS);
} catch (final ExecutorManagerException e) {
logger.error(e.getMessage(), e);
- respMap.put(RESPONSE_ERROR, e.getMessage());
+ respMap.put(ConnectorParams.RESPONSE_ERROR, e.getMessage());
}
}
private void handleReloadJobTypePlugins(final Map<String, Object> respMap) {
try {
this.flowRunnerManager.reloadJobTypePlugins();
- respMap.put(STATUS_PARAM, RESPONSE_SUCCESS);
+ respMap.put(ConnectorParams.STATUS_PARAM, ConnectorParams.RESPONSE_SUCCESS);
} catch (final Exception e) {
logger.error(e.getMessage(), e);
- respMap.put(RESPONSE_ERROR, e.getMessage());
+ respMap.put(ConnectorParams.RESPONSE_ERROR, e.getMessage());
}
}
private void setActive(final boolean value, final Map<String, Object> respMap) {
try {
setActiveInternal(value);
- respMap.put(STATUS_PARAM, RESPONSE_SUCCESS);
+ respMap.put(ConnectorParams.STATUS_PARAM, ConnectorParams.RESPONSE_SUCCESS);
} catch (final Exception e) {
logger.error(e.getMessage(), e);
- respMap.put(RESPONSE_ERROR, e.getMessage());
+ respMap.put(ConnectorParams.RESPONSE_ERROR, e.getMessage());
}
}
private void setActiveInternal(final boolean value)
- throws ExecutorManagerException {
+ throws ExecutorManagerException, InterruptedException {
this.flowRunnerManager.setExecutorActive(value,
this.application.getHost(), this.application.getPort());
}
@@ -387,10 +388,10 @@ public class ExecutorServlet extends HttpServlet implements ConnectorParams {
// Set the executor to inactive. Will receive no new flows.
setActiveInternal(false);
this.application.shutdown();
- respMap.put(STATUS_PARAM, RESPONSE_SUCCESS);
+ respMap.put(ConnectorParams.STATUS_PARAM, ConnectorParams.RESPONSE_SUCCESS);
} catch (final Exception e) {
logger.error(e.getMessage(), e);
- respMap.put(RESPONSE_ERROR, e.getMessage());
+ respMap.put(ConnectorParams.RESPONSE_ERROR, e.getMessage());
}
}
@@ -403,10 +404,10 @@ public class ExecutorServlet extends HttpServlet implements ConnectorParams {
respMap.put("executor_id", Integer.toString(executor.getId()));
respMap.put("isActive", String.valueOf(executor.isActive()));
- respMap.put(STATUS_PARAM, RESPONSE_SUCCESS);
+ respMap.put(ConnectorParams.STATUS_PARAM, ConnectorParams.RESPONSE_SUCCESS);
} catch (final Exception e) {
logger.error(e.getMessage(), e);
- respMap.put(RESPONSE_ERROR, e.getMessage());
+ respMap.put(ConnectorParams.RESPONSE_ERROR, e.getMessage());
}
}
diff --git a/azkaban-exec-server/src/main/java/azkaban/execapp/FlowRunnerManager.java b/azkaban-exec-server/src/main/java/azkaban/execapp/FlowRunnerManager.java
index b9c6bf1..096d4cb 100644
--- a/azkaban-exec-server/src/main/java/azkaban/execapp/FlowRunnerManager.java
+++ b/azkaban-exec-server/src/main/java/azkaban/execapp/FlowRunnerManager.java
@@ -58,6 +58,7 @@ import com.google.common.base.Preconditions;
import java.io.File;
import java.io.IOException;
import java.lang.Thread.State;
+import java.time.Duration;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
@@ -72,6 +73,7 @@ import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
import javax.annotation.Nullable;
import javax.inject.Inject;
import javax.inject.Singleton;
@@ -119,6 +121,8 @@ public class FlowRunnerManager implements EventListener,
// in the queue waiting to be executed or in executing state.
private final Map<Future<?>, Integer> submittedFlows = new ConcurrentHashMap<>();
private final Map<Integer, FlowRunner> runningFlows = new ConcurrentHashMap<>();
+ // keep track of the number of flow being setup({@link createFlowRunner()})
+ private final AtomicInteger preparingFlowCount = new AtomicInteger(0);
private final Map<Integer, ExecutableFlow> recentlyFinishedFlows = new ConcurrentHashMap<>();
private final TrackingThreadPool executorService;
private final CleanerThread cleanerThread;
@@ -150,7 +154,7 @@ public class FlowRunnerManager implements EventListener,
// date time of the the last flow submitted.
private long lastFlowSubmittedDate = 0;
// Indicate if the executor is set to active.
- private boolean active;
+ private volatile boolean active;
@Inject
public FlowRunnerManager(final Props props,
@@ -277,7 +281,7 @@ public class FlowRunnerManager implements EventListener,
}
public void setExecutorActive(final boolean isActive, final String host, final int port)
- throws ExecutorManagerException {
+ throws ExecutorManagerException, InterruptedException {
final Executor executor = this.executorLoader.fetchExecutor(host, port);
Preconditions.checkState(executor != null, "Unable to obtain self entry in DB");
if (executor.isActive() != isActive) {
@@ -288,6 +292,28 @@ public class FlowRunnerManager implements EventListener,
"Set active action ignored. Executor is already " + (isActive ? "active" : "inactive"));
}
this.active = isActive;
+ if (!this.active) {
+ // When deactivating this executor, this call will wait to return until every thread in {@link
+ // #createFlowRunner} has finished. When deploying new executor, old running executor will be
+ // deactivated before new one is activated and only one executor is allowed to
+ // delete/hard-linking project dirs to avoid race condition described in {@link
+ // FlowPreparer#setup}. So to make deactivation process block until flow preparation work
+ // finishes guarantees the old executor won't access {@link FlowPreparer#setup} after
+ // deactivation.
+ waitUntilFlowPreparationFinish();
+ }
+ }
+
+ /**
+ * Wait until ongoing flow preparation work finishes.
+ */
+ private void waitUntilFlowPreparationFinish() throws InterruptedException {
+ final Duration SLEEP_INTERVAL = Duration.ofSeconds(5);
+ while (this.preparingFlowCount.intValue() != 0) {
+ logger.info(this.preparingFlowCount + " flow(s) is/are still being setup before complete "
+ + "deactivation.");
+ Thread.sleep(SLEEP_INTERVAL.toMillis());
+ }
}
public long getLastFlowSubmittedTime() {
@@ -309,7 +335,9 @@ public class FlowRunnerManager implements EventListener,
if (isAlreadyRunning(execId)) {
return;
}
+
final FlowRunner runner = createFlowRunner(execId);
+
// Check again.
if (isAlreadyRunning(execId)) {
return;
@@ -333,6 +361,15 @@ public class FlowRunnerManager implements EventListener,
return false;
}
+ /**
+ * return whether this execution has useExecutor defined. useExecutor is for running test
+ * executions on inactive executor.
+ */
+ private boolean isExecutorSpecified(final ExecutableFlow flow) {
+ return flow.getExecutionOptions().getFlowParameters()
+ .containsKey(ExecutionOptions.USE_EXECUTOR);
+ }
+
private FlowRunner createFlowRunner(final int execId) throws ExecutorManagerException {
final ExecutableFlow flow;
flow = this.executorLoader.fetchExecutableFlow(execId);
@@ -341,6 +378,8 @@ public class FlowRunnerManager implements EventListener,
+ execId);
}
+ // Sets up the project files and execution directory.
+ this.preparingFlowCount.incrementAndGet();
// Record the time between submission, and when the flow preparation/execution starts.
// Note that since submit time is recorded on the web server, while flow preparation is on
// the executor, there could be some inaccuracies due to clock skew.
@@ -348,10 +387,18 @@ public class FlowRunnerManager implements EventListener,
flow.getExecutableFlow().getSubmitTime());
final Timer.Context flowPrepTimerContext = commonMetrics.getFlowSetupTimerContext();
+
try {
- // Sets up the project files and execution directory.
- this.flowPreparer.setup(flow);
+ if (this.active || isExecutorSpecified(flow)) {
+ this.flowPreparer.setup(flow);
+ } else {
+ // Unset the executor.
+ this.executorLoader.unsetExecutorIdForExecution(execId);
+ throw new ExecutorManagerException("executor became inactive before setting up the "
+ + "flow " + execId);
+ }
} finally {
+ this.preparingFlowCount.decrementAndGet();
flowPrepTimerContext.stop();
}