Details
diff --git a/az-core/src/main/java/azkaban/Constants.java b/az-core/src/main/java/azkaban/Constants.java
index e05090e..9f8409f 100644
--- a/az-core/src/main/java/azkaban/Constants.java
+++ b/az-core/src/main/java/azkaban/Constants.java
@@ -90,6 +90,9 @@ public class Constants {
public static class ConfigurationKeys {
+ // Configures Azkaban to use new polling model for dispatching
+ public static final String AZKABAN_POLL_MODEL = "azkaban.poll.model";
+
// Configures Azkaban Flow Version in project YAML file
public static final String AZKABAN_FLOW_VERSION = "azkaban-flow-version";
diff --git a/az-jobsummary/src/main/java/azkaban/viewer/jobsummary/JobSummaryServlet.java b/az-jobsummary/src/main/java/azkaban/viewer/jobsummary/JobSummaryServlet.java
index 94e19d8..c9ddcde 100644
--- a/az-jobsummary/src/main/java/azkaban/viewer/jobsummary/JobSummaryServlet.java
+++ b/az-jobsummary/src/main/java/azkaban/viewer/jobsummary/JobSummaryServlet.java
@@ -54,7 +54,7 @@ public class JobSummaryServlet extends LoginAbstractAzkabanServlet {
private final String viewerName;
private final String viewerPath;
- private ExecutorManagerAdapter executorManager;
+ private ExecutorManagerAdapter executorManagerAdapter;
private ProjectManager projectManager;
private String outputDir;
@@ -87,7 +87,7 @@ public class JobSummaryServlet extends LoginAbstractAzkabanServlet {
public void init(final ServletConfig config) throws ServletException {
super.init(config);
final AzkabanWebServer server = (AzkabanWebServer) getApplication();
- this.executorManager = server.getExecutorManager();
+ this.executorManagerAdapter = server.getExecutorManagerAdapter();
this.projectManager = server.getProjectManager();
}
@@ -112,7 +112,7 @@ public class JobSummaryServlet extends LoginAbstractAzkabanServlet {
ExecutableFlow flow = null;
ExecutableNode node = null;
try {
- flow = this.executorManager.getExecutableFlow(execId);
+ flow = this.executorManagerAdapter.getExecutableFlow(execId);
if (flow == null) {
page.add("errorMsg", "Error loading executing flow " + execId
+ ": not found.");
diff --git a/azkaban-common/src/main/java/azkaban/executor/ExecutionController.java b/azkaban-common/src/main/java/azkaban/executor/ExecutionController.java
new file mode 100644
index 0000000..fee1499
--- /dev/null
+++ b/azkaban-common/src/main/java/azkaban/executor/ExecutionController.java
@@ -0,0 +1,332 @@
+/*
+* Copyright 2018 LinkedIn Corp.
+*
+* Licensed under the Apache License, Version 2.0 (the “License”); you may not
+* use this file except in compliance with the License. You may obtain a copy of
+* the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an “AS IS” BASIS, WITHOUT
+* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+* License for the specific language governing permissions and limitations under
+* the License.
+*/
+package azkaban.executor;
+
+import azkaban.event.EventHandler;
+import azkaban.project.Project;
+import azkaban.utils.FileIOUtils.LogData;
+import azkaban.utils.Pair;
+import java.io.IOException;
+import java.lang.Thread.State;
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import javax.inject.Inject;
+import javax.inject.Singleton;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Controls flow executions on web server. This module implements the polling model
+ * in the new AZ dispatching design. It's injected only when azkaban.poll.model is configured to
+ * true. It will ultimately replace ExecutorManager in the future.
+ */
+@Singleton
+public class ExecutionController extends EventHandler implements ExecutorManagerAdapter {
+
+ private static final Logger logger = LoggerFactory.getLogger(ExecutionController.class);
+ private static final Duration RECENTLY_FINISHED_LIFETIME = Duration.ofMinutes(10);
+ private final ExecutorLoader executorLoader;
+ private final ExecutorApiGateway apiGateway;
+
+ @Inject
+ ExecutionController(final ExecutorLoader executorLoader, final ExecutorApiGateway apiGateway) {
+ this.executorLoader = executorLoader;
+ this.apiGateway = apiGateway;
+ }
+
+ @Override
+ public void setupExecutors() throws ExecutorManagerException {
+ // Todo: deprecate this method
+ }
+
+ @Override
+ public void disableQueueProcessorThread() {
+ // Todo: deprecate this method
+ }
+
+ @Override
+ public void enableQueueProcessorThread() {
+ // Todo: deprecate this method
+ }
+
+ @Override
+ public State getExecutorManagerThreadState() {
+ // Todo: deprecate this method
+ return State.RUNNABLE;
+ }
+
+ @Override
+ public boolean isExecutorManagerThreadActive() {
+ // Todo: deprecate this method
+ return true;
+ }
+
+ @Override
+ public long getLastExecutorManagerThreadCheckTime() {
+ // Todo: deprecate this method
+ return 1L;
+ }
+
+ @Override
+ public Collection<Executor> getAllActiveExecutors() {
+ // Todo: get the executor info from DB
+ return Collections.emptyList();
+ }
+
+ @Override
+ public Executor fetchExecutor(final int executorId) throws ExecutorManagerException {
+ return this.executorLoader.fetchExecutor(executorId);
+ }
+
+ @Override
+ public Set<String> getPrimaryServerHosts() {
+ // Todo: get the primary executor host info from DB
+ return Collections.emptySet();
+ }
+
+ @Override
+ public Set<String> getAllActiveExecutorServerHosts() {
+ // Todo: get all executor host info from DB
+ return Collections.emptySet();
+ }
+
+ /**
+ * Gets a list of all the active (running flows and non-dispatched flows) executions for a given
+ * project and flow from database. {@inheritDoc}
+ */
+ @Override
+ public List<Integer> getRunningFlows(final int projectId, final String flowId) {
+ // Todo: get running flows from DB
+ return Collections.emptyList();
+ }
+
+ @Override
+ public List<Pair<ExecutableFlow, Optional<Executor>>> getActiveFlowsWithExecutor()
+ throws IOException {
+ // Todo: get active flows with executor from DB
+ return Collections.emptyList();
+ }
+
+ /**
+ * Checks whether the given flow has an active (running, non-dispatched) execution from
+ * database. {@inheritDoc}
+ */
+ @Override
+ public boolean isFlowRunning(final int projectId, final String flowId) {
+ // Todo: check DB to see if flow is running
+ return true;
+ }
+
+ /**
+ * Fetch ExecutableFlow from database. {@inheritDoc}
+ */
+ @Override
+ public ExecutableFlow getExecutableFlow(final int execId)
+ throws ExecutorManagerException {
+ return this.executorLoader.fetchExecutableFlow(execId);
+ }
+
+ /**
+ * Get all active (running, non-dispatched) flows from database. {@inheritDoc}
+ */
+ @Override
+ public List<ExecutableFlow> getRunningFlows() {
+ // Todo: get running flows from DB
+ return Collections.emptyList();
+ }
+
+ @Override
+ public List<ExecutableFlow> getRecentlyFinishedFlows() {
+ List<ExecutableFlow> flows = new ArrayList<>();
+ try {
+ flows = this.executorLoader.fetchRecentlyFinishedFlows(
+ RECENTLY_FINISHED_LIFETIME);
+ } catch (final ExecutorManagerException e) {
+ logger.error("Failed to fetch recently finished flows.", e);
+ }
+ return flows;
+ }
+
+ @Override
+ public List<ExecutableFlow> getExecutableFlows(final int skip, final int size)
+ throws ExecutorManagerException {
+ final List<ExecutableFlow> flows = this.executorLoader.fetchFlowHistory(skip, size);
+ return flows;
+ }
+
+ @Override
+ public List<ExecutableFlow> getExecutableFlows(final String flowIdContains,
+ final int skip, final int size) throws ExecutorManagerException {
+ final List<ExecutableFlow> flows =
+ this.executorLoader.fetchFlowHistory(null, '%' + flowIdContains + '%', null,
+ 0, -1, -1, skip, size);
+ return flows;
+ }
+
+ @Override
+ public List<ExecutableFlow> getExecutableFlows(final String projContain,
+ final String flowContain, final String userContain, final int status, final long begin,
+ final long end,
+ final int skip, final int size) throws ExecutorManagerException {
+ final List<ExecutableFlow> flows =
+ this.executorLoader.fetchFlowHistory(projContain, flowContain, userContain,
+ status, begin, end, skip, size);
+ return flows;
+ }
+
+ @Override
+ public List<ExecutableJobInfo> getExecutableJobs(final Project project,
+ final String jobId, final int skip, final int size) throws ExecutorManagerException {
+ final List<ExecutableJobInfo> nodes =
+ this.executorLoader.fetchJobHistory(project.getId(), jobId, skip, size);
+ return nodes;
+ }
+
+ @Override
+ public int getNumberOfJobExecutions(final Project project, final String jobId)
+ throws ExecutorManagerException {
+ return this.executorLoader.fetchNumExecutableNodes(project.getId(), jobId);
+ }
+
+ @Override
+ public LogData getExecutableFlowLog(final ExecutableFlow exFlow, final int offset,
+ final int length) throws ExecutorManagerException {
+ // Todo: get the flow log from executor if the flow is running, else get it from DB.
+ return new LogData(0, 0, "dummy");
+ }
+
+ @Override
+ public LogData getExecutionJobLog(final ExecutableFlow exFlow, final String jobId,
+ final int offset, final int length, final int attempt) throws ExecutorManagerException {
+ // Todo: get the job log from executor if the flow is running, else get it from DB.
+ return new LogData(0, 0, "dummy");
+ }
+
+ @Override
+ public List<Object> getExecutionJobStats(final ExecutableFlow exFlow, final String jobId,
+ final int attempt) throws ExecutorManagerException {
+ // Todo: get execution job status from executor if the flow is running, else get if from DB.
+ return Collections.emptyList();
+ }
+
+ @Override
+ public String getJobLinkUrl(final ExecutableFlow exFlow, final String jobId, final int attempt) {
+ // Todo: deprecate this method
+ return null;
+ }
+
+ /**
+ * If a flow was dispatched to an executor, cancel by calling Executor. Else if it's still
+ * queued in database, remove it from database queue and finalize. {@inheritDoc}
+ */
+ @Override
+ public void cancelFlow(final ExecutableFlow exFlow, final String userId)
+ throws ExecutorManagerException {
+ // Todo: call executor to cancel the flow if it's running or remove from DB queue if it
+ // hasn't started
+ }
+
+ @Override
+ public void resumeFlow(final ExecutableFlow exFlow, final String userId)
+ throws ExecutorManagerException {
+ // Todo: call executor to resume the flow
+ }
+
+ @Override
+ public void pauseFlow(final ExecutableFlow exFlow, final String userId)
+ throws ExecutorManagerException {
+ // Todo: call executor to pause the flow
+ }
+
+ @Override
+ public void retryFailures(final ExecutableFlow exFlow, final String userId)
+ throws ExecutorManagerException {
+ // Todo: call executor to retry failed flows
+ }
+
+ /**
+ * When a flow is submitted, insert a new execution into the database queue. {@inheritDoc}
+ */
+ @Override
+ public String submitExecutableFlow(final ExecutableFlow exflow, final String userId)
+ throws ExecutorManagerException {
+ // Todo: insert the execution to DB queue
+ return null;
+ }
+
+ @Override
+ public Map<String, Object> callExecutorStats(final int executorId, final String action,
+ final Pair<String, String>... params) throws IOException, ExecutorManagerException {
+ final Executor executor = fetchExecutor(executorId);
+ final List<Pair<String, String>> paramList = new ArrayList<>();
+
+ if (params != null) {
+ paramList.addAll(Arrays.asList(params));
+ }
+
+ paramList.add(new Pair<>(ConnectorParams.ACTION_PARAM, action));
+
+ return this.apiGateway.callForJsonObjectMap(executor.getHost(), executor.getPort(),
+ "/stats", paramList);
+ }
+
+ @Override
+ public Map<String, Object> callExecutorJMX(final String hostPort, final String action,
+ final String mBean) throws IOException {
+ final List<Pair<String, String>> paramList =
+ new ArrayList<>();
+
+ paramList.add(new Pair<>(action, ""));
+ if (mBean != null) {
+ paramList.add(new Pair<>(ConnectorParams.JMX_MBEAN, mBean));
+ }
+
+ final String[] hostPortSplit = hostPort.split(":");
+ return this.apiGateway.callForJsonObjectMap(hostPortSplit[0],
+ Integer.valueOf(hostPortSplit[1]), "/jmx", paramList);
+ }
+
+
+ @Override
+ public void shutdown() {
+ //Todo: shutdown any thread that is running
+ }
+
+ @Override
+ public int getExecutableFlows(final int projectId, final String flowId, final int from,
+ final int length, final List<ExecutableFlow> outputList)
+ throws ExecutorManagerException {
+ final List<ExecutableFlow> flows =
+ this.executorLoader.fetchFlowHistory(projectId, flowId, from, length);
+ outputList.addAll(flows);
+ return this.executorLoader.fetchNumExecutableFlows(projectId, flowId);
+ }
+
+ @Override
+ public List<ExecutableFlow> getExecutableFlows(final int projectId, final String flowId,
+ final int from, final int length, final Status status) throws ExecutorManagerException {
+ return this.executorLoader.fetchFlowHistory(projectId, flowId, from, length,
+ status);
+ }
+
+}
diff --git a/azkaban-common/src/main/java/azkaban/trigger/builtin/ExecuteFlowAction.java b/azkaban-common/src/main/java/azkaban/trigger/builtin/ExecuteFlowAction.java
index 6d00500..ebf06e3 100644
--- a/azkaban-common/src/main/java/azkaban/trigger/builtin/ExecuteFlowAction.java
+++ b/azkaban-common/src/main/java/azkaban/trigger/builtin/ExecuteFlowAction.java
@@ -38,7 +38,7 @@ public class ExecuteFlowAction implements TriggerAction {
public static final String EXEC_ID = "ExecuteFlowAction.execid";
- private static ExecutorManagerAdapter executorManager;
+ private static ExecutorManagerAdapter executorManagerAdapter;
private static TriggerManager triggerManager;
private static ProjectManager projectManager;
private static Logger logger = Logger.getLogger(ExecuteFlowAction.class);
@@ -66,12 +66,13 @@ public class ExecuteFlowAction implements TriggerAction {
ExecuteFlowAction.logger = logger;
}
- public static ExecutorManagerAdapter getExecutorManager() {
- return executorManager;
+ public static ExecutorManagerAdapter getExecutorManagerAdapter() {
+ return executorManagerAdapter;
}
- public static void setExecutorManager(final ExecutorManagerAdapter executorManager) {
- ExecuteFlowAction.executorManager = executorManager;
+ public static void setExecutorManagerAdapter(
+ final ExecutorManagerAdapter executorManagerAdapter) {
+ ExecuteFlowAction.executorManagerAdapter = executorManagerAdapter;
}
public static TriggerManager getTriggerManager() {
@@ -197,7 +198,7 @@ public class ExecuteFlowAction implements TriggerAction {
@Override
public void doAction() throws Exception {
- if (projectManager == null || executorManager == null) {
+ if (projectManager == null || executorManagerAdapter == null) {
throw new Exception("ExecuteFlowAction not properly initialized!");
}
@@ -224,7 +225,7 @@ public class ExecuteFlowAction implements TriggerAction {
}
logger.info("Invoking flow " + project.getName() + "." + this.flowName);
- executorManager.submitExecutableFlow(exflow, this.submitUser);
+ executorManagerAdapter.submitExecutableFlow(exflow, this.submitUser);
logger.info("Invoked flow " + project.getName() + "." + this.flowName);
}
diff --git a/azkaban-common/src/main/java/azkaban/trigger/builtin/ExecutionChecker.java b/azkaban-common/src/main/java/azkaban/trigger/builtin/ExecutionChecker.java
index 30a021c..24fd2b6 100644
--- a/azkaban-common/src/main/java/azkaban/trigger/builtin/ExecutionChecker.java
+++ b/azkaban-common/src/main/java/azkaban/trigger/builtin/ExecutionChecker.java
@@ -28,7 +28,7 @@ import java.util.Map;
public class ExecutionChecker implements ConditionChecker {
public static final String type = "ExecutionChecker";
- public static ExecutorManagerAdapter executorManager;
+ public static ExecutorManagerAdapter executorManagerAdapter;
private final String checkerId;
private final int execId;
@@ -43,8 +43,8 @@ public class ExecutionChecker implements ConditionChecker {
this.wantedStatus = wantedStatus;
}
- public static void setExecutorManager(final ExecutorManagerAdapter em) {
- executorManager = em;
+ public static void setExecutorManagerAdapter(final ExecutorManagerAdapter em) {
+ executorManagerAdapter = em;
}
public static ExecutionChecker createFromJson(final HashMap<String, Object> jsonObj)
@@ -68,7 +68,7 @@ public class ExecutionChecker implements ConditionChecker {
public Object eval() {
final ExecutableFlow exflow;
try {
- exflow = executorManager.getExecutableFlow(this.execId);
+ exflow = executorManagerAdapter.getExecutableFlow(this.execId);
} catch (final ExecutorManagerException e) {
e.printStackTrace();
return Boolean.FALSE;
diff --git a/azkaban-common/src/main/java/azkaban/trigger/builtin/KillExecutionAction.java b/azkaban-common/src/main/java/azkaban/trigger/builtin/KillExecutionAction.java
index 919ecc4..c378c90 100644
--- a/azkaban-common/src/main/java/azkaban/trigger/builtin/KillExecutionAction.java
+++ b/azkaban-common/src/main/java/azkaban/trigger/builtin/KillExecutionAction.java
@@ -38,7 +38,7 @@ public class KillExecutionAction implements TriggerAction {
private static final Logger logger = Logger
.getLogger(KillExecutionAction.class);
- private static ExecutorManagerAdapter executorManager;
+ private static ExecutorManagerAdapter executorManagerAdapter;
private final String actionId;
private final int execId;
@@ -48,8 +48,8 @@ public class KillExecutionAction implements TriggerAction {
this.actionId = actionId;
}
- public static void setExecutorManager(final ExecutorManagerAdapter em) {
- executorManager = em;
+ public static void setExecutorManagerAdapter(final ExecutorManagerAdapter em) {
+ executorManagerAdapter = em;
}
public static KillExecutionAction createFromJson(final Object obj) {
@@ -94,11 +94,11 @@ public class KillExecutionAction implements TriggerAction {
@Override
public void doAction() throws Exception {
- final ExecutableFlow exFlow = executorManager.getExecutableFlow(this.execId);
+ final ExecutableFlow exFlow = executorManagerAdapter.getExecutableFlow(this.execId);
logger.info("ready to kill execution " + this.execId);
if (!Status.isStatusFinished(exFlow.getStatus())) {
logger.info("Killing execution " + this.execId);
- executorManager.cancelFlow(exFlow, Constants.AZKABAN_SLA_CHECKER_USERNAME);
+ executorManagerAdapter.cancelFlow(exFlow, Constants.AZKABAN_SLA_CHECKER_USERNAME);
}
}
diff --git a/azkaban-common/src/main/java/azkaban/trigger/TriggerManager.java b/azkaban-common/src/main/java/azkaban/trigger/TriggerManager.java
index 69438b8..a2b4032 100644
--- a/azkaban-common/src/main/java/azkaban/trigger/TriggerManager.java
+++ b/azkaban-common/src/main/java/azkaban/trigger/TriggerManager.java
@@ -19,11 +19,9 @@ package azkaban.trigger;
import static java.util.Objects.requireNonNull;
import azkaban.event.EventHandler;
-import azkaban.executor.ExecutorManager;
+import azkaban.executor.ExecutorManagerAdapter;
import azkaban.executor.ExecutorManagerException;
import azkaban.utils.Props;
-import javax.inject.Inject;
-import javax.inject.Singleton;
import java.util.ArrayList;
import java.util.Comparator;
import java.util.HashMap;
@@ -34,6 +32,8 @@ import java.util.Set;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.PriorityBlockingQueue;
+import javax.inject.Inject;
+import javax.inject.Singleton;
import org.apache.log4j.Logger;
@Singleton
@@ -57,10 +57,10 @@ public class TriggerManager extends EventHandler implements
@Inject
public TriggerManager(final Props props, final TriggerLoader triggerLoader,
- final ExecutorManager executorManager) throws TriggerManagerException {
+ final ExecutorManagerAdapter executorManagerAdapter) throws TriggerManagerException {
requireNonNull(props);
- requireNonNull(executorManager);
+ requireNonNull(executorManagerAdapter);
this.triggerLoader = requireNonNull(triggerLoader);
final long scannerInterval =
diff --git a/azkaban-common/src/test/java/azkaban/trigger/TriggerManagerTest.java b/azkaban-common/src/test/java/azkaban/trigger/TriggerManagerTest.java
index 2f98175..064d238 100644
--- a/azkaban-common/src/test/java/azkaban/trigger/TriggerManagerTest.java
+++ b/azkaban-common/src/test/java/azkaban/trigger/TriggerManagerTest.java
@@ -18,13 +18,11 @@ package azkaban.trigger;
import static org.junit.Assert.assertTrue;
import static org.mockito.ArgumentMatchers.any;
-import static org.mockito.Mockito.anyObject;
-import static org.mockito.Mockito.doNothing;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
import azkaban.executor.ExecutionOptions;
-import azkaban.executor.ExecutorManager;
+import azkaban.executor.ExecutorManagerAdapter;
import azkaban.executor.ExecutorManagerException;
import azkaban.flow.Flow;
import azkaban.project.Project;
@@ -51,16 +49,15 @@ import org.junit.Test;
public class TriggerManagerTest {
private static TriggerLoader triggerLoader;
- private static ExecutorManager executorManager;
+ private static ExecutorManagerAdapter executorManagerAdapter;
private static ProjectManager projectManager;
private TriggerManager triggerManager;
@BeforeClass
public static void prepare() {
triggerLoader = new MockTriggerLoader();
- executorManager = mock(ExecutorManager.class);
+ executorManagerAdapter = mock(ExecutorManagerAdapter.class);
projectManager = mock(ProjectManager.class);
- doNothing().when(executorManager).addListener(anyObject());
}
@Before
@@ -68,15 +65,15 @@ public class TriggerManagerTest {
final Project project = new Project(1, "test-project");
project.setFlows(ImmutableMap.of("test-flow", new Flow("test-flow")));
when(projectManager.getProject(1)).thenReturn(project);
- when(executorManager.submitExecutableFlow(any(), any()))
+ when(executorManagerAdapter.submitExecutableFlow(any(), any()))
.thenThrow(new ExecutorManagerException("Flow is already running. Skipping execution.",
ExecutorManagerException.Reason.SkippedExecution));
- ExecuteFlowAction.setExecutorManager(this.executorManager);
+ ExecuteFlowAction.setExecutorManagerAdapter(this.executorManagerAdapter);
ExecuteFlowAction.setProjectManager(this.projectManager);
ExecuteFlowAction.setTriggerManager(this.triggerManager);
final Props props = new Props();
props.put("trigger.scan.interval", 300);
- this.triggerManager = new TriggerManager(props, triggerLoader, executorManager);
+ this.triggerManager = new TriggerManager(props, triggerLoader, executorManagerAdapter);
this.triggerManager.registerCheckerType(ThresholdChecker.type,
ThresholdChecker.class);
this.triggerManager.registerActionType(DummyTriggerAction.type,
diff --git a/azkaban-solo-server/src/main/java/azkaban/soloserver/AzkabanSingleServer.java b/azkaban-solo-server/src/main/java/azkaban/soloserver/AzkabanSingleServer.java
index 124fe51..eb16d67 100644
--- a/azkaban-solo-server/src/main/java/azkaban/soloserver/AzkabanSingleServer.java
+++ b/azkaban-solo-server/src/main/java/azkaban/soloserver/AzkabanSingleServer.java
@@ -53,10 +53,10 @@ public class AzkabanSingleServer {
this.executor = executor;
}
- public static void main(String[] args) {
+ public static void main(final String[] args) {
try {
start(args);
- } catch (Exception e) {
+ } catch (final Exception e) {
log.error("Failed to start single server. Shutting down.", e);
System.exit(1);
}
@@ -96,7 +96,7 @@ public class AzkabanSingleServer {
/* Initialize Guice Injector */
final Injector injector = Guice.createInjector(
new AzkabanCommonModule(props),
- new AzkabanWebServerModule(),
+ new AzkabanWebServerModule(props),
new AzkabanExecServerModule()
);
SERVICE_PROVIDER.setInjector(injector);
diff --git a/azkaban-solo-server/src/test/java/azkaban/soloserver/AzkabanSingleServerTest.java b/azkaban-solo-server/src/test/java/azkaban/soloserver/AzkabanSingleServerTest.java
index cc2903c..f328b5b 100644
--- a/azkaban-solo-server/src/test/java/azkaban/soloserver/AzkabanSingleServerTest.java
+++ b/azkaban-solo-server/src/test/java/azkaban/soloserver/AzkabanSingleServerTest.java
@@ -105,7 +105,7 @@ public class AzkabanSingleServerTest {
final Injector injector = Guice.createInjector(
new AzkabanCommonModule(props),
new AzkabanExecServerModule(),
- new AzkabanWebServerModule()
+ new AzkabanWebServerModule(props)
);
SERVICE_PROVIDER.setInjector(injector);
diff --git a/azkaban-web-server/src/main/java/azkaban/webapp/AzkabanWebServer.java b/azkaban-web-server/src/main/java/azkaban/webapp/AzkabanWebServer.java
index 35b74e1..1ed6666 100644
--- a/azkaban-web-server/src/main/java/azkaban/webapp/AzkabanWebServer.java
+++ b/azkaban-web-server/src/main/java/azkaban/webapp/AzkabanWebServer.java
@@ -24,6 +24,7 @@ import azkaban.Constants;
import azkaban.Constants.ConfigurationKeys;
import azkaban.database.AzkabanDatabaseSetup;
import azkaban.executor.ExecutorManager;
+import azkaban.executor.ExecutorManagerAdapter;
import azkaban.flowtrigger.FlowTriggerService;
import azkaban.flowtrigger.quartz.FlowTriggerScheduler;
import azkaban.jmx.JmxExecutorManager;
@@ -137,7 +138,7 @@ public class AzkabanWebServer extends AzkabanServer {
private final Server server;
private final UserManager userManager;
private final ProjectManager projectManager;
- private final ExecutorManager executorManager;
+ private final ExecutorManagerAdapter executorManagerAdapter;
private final ScheduleManager scheduleManager;
private final TriggerManager triggerManager;
private final MetricsManager metricsManager;
@@ -152,7 +153,7 @@ public class AzkabanWebServer extends AzkabanServer {
@Inject
public AzkabanWebServer(final Props props,
final Server server,
- final ExecutorManager executorManager,
+ final ExecutorManagerAdapter executorManagerAdapter,
final ProjectManager projectManager,
final TriggerManager triggerManager,
final MetricsManager metricsManager,
@@ -165,7 +166,8 @@ public class AzkabanWebServer extends AzkabanServer {
final StatusService statusService) {
this.props = requireNonNull(props, "props is null.");
this.server = requireNonNull(server, "server is null.");
- this.executorManager = requireNonNull(executorManager, "executorManager is null.");
+ this.executorManagerAdapter = requireNonNull(executorManagerAdapter,
+ "executorManagerAdapter is null.");
this.projectManager = requireNonNull(projectManager, "projectManager is null.");
this.triggerManager = requireNonNull(triggerManager, "triggerManager is null.");
this.metricsManager = requireNonNull(metricsManager, "metricsManager is null.");
@@ -217,7 +219,7 @@ public class AzkabanWebServer extends AzkabanServer {
/* Initialize Guice Injector */
final Injector injector = Guice.createInjector(
new AzkabanCommonModule(props),
- new AzkabanWebServerModule()
+ new AzkabanWebServerModule(props)
);
SERVICE_PROVIDER.setInjector(injector);
@@ -228,7 +230,10 @@ public class AzkabanWebServer extends AzkabanServer {
/* This creates the Web Server instance */
app = webServer;
- webServer.executorManager.start();
+ // Todo jamiesjc: also start the threads in ExecutionController if needed.
+ if (webServer.executorManagerAdapter instanceof ExecutorManager) {
+ ((ExecutorManager) webServer.executorManagerAdapter).start();
+ }
// TODO refactor code into ServerProvider
webServer.prepareAndStartServer();
@@ -544,7 +549,9 @@ public class AzkabanWebServer extends AzkabanServer {
createThreadPool();
configureRoutes();
- if (this.props.getBoolean(Constants.ConfigurationKeys.IS_METRICS_ENABLED, false)) {
+ // Todo jamiesjc: enable web metrics for azkaban poll model later
+ if (this.props.getBoolean(Constants.ConfigurationKeys.IS_METRICS_ENABLED, false)
+ && !this.props.getBoolean(ConfigurationKeys.AZKABAN_POLL_MODEL, false)) {
startWebMetrics();
}
@@ -589,7 +596,9 @@ public class AzkabanWebServer extends AzkabanServer {
private void startWebMetrics() throws Exception {
- this.metricsManager.addGauge("WEB-NumQueuedFlows", this.executorManager::getQueuedFlowSize);
+ this.metricsManager
+ .addGauge("WEB-NumQueuedFlows", ((ExecutorManager) this
+ .executorManagerAdapter)::getQueuedFlowSize);
/*
* TODO: Currently {@link ExecutorManager#getRunningFlows()} includes both running and non-dispatched flows.
* Originally we would like to do a subtraction between getRunningFlows and {@link ExecutorManager#getQueuedFlowSize()},
@@ -598,7 +607,8 @@ public class AzkabanWebServer extends AzkabanServer {
* a thread safe subtraction. We need to fix this in the future.
*/
this.metricsManager
- .addGauge("WEB-NumRunningFlows", () -> this.executorManager.getRunningFlows().size());
+ .addGauge("WEB-NumRunningFlows",
+ () -> ((ExecutorManager) this.executorManagerAdapter).getRunningFlows().size());
logger.info("starting reporting Web Server Metrics");
this.metricsManager.startReporting("AZ-WEB", this.props);
@@ -606,12 +616,12 @@ public class AzkabanWebServer extends AzkabanServer {
private void loadBuiltinCheckersAndActions() {
logger.info("Loading built-in checker and action types");
- ExecuteFlowAction.setExecutorManager(this.executorManager);
+ ExecuteFlowAction.setExecutorManagerAdapter(this.executorManagerAdapter);
ExecuteFlowAction.setProjectManager(this.projectManager);
ExecuteFlowAction.setTriggerManager(this.triggerManager);
- KillExecutionAction.setExecutorManager(this.executorManager);
+ KillExecutionAction.setExecutorManagerAdapter(this.executorManagerAdapter);
CreateTriggerAction.setTriggerManager(this.triggerManager);
- ExecutionChecker.setExecutorManager(this.executorManager);
+ ExecutionChecker.setExecutorManagerAdapter(this.executorManagerAdapter);
this.triggerManager.registerCheckerType(BasicTimeChecker.type, BasicTimeChecker.class);
this.triggerManager.registerCheckerType(SlaChecker.type, SlaChecker.class);
@@ -647,8 +657,8 @@ public class AzkabanWebServer extends AzkabanServer {
return this.projectManager;
}
- public ExecutorManager getExecutorManager() {
- return this.executorManager;
+ public ExecutorManagerAdapter getExecutorManagerAdapter() {
+ return this.executorManagerAdapter;
}
public ScheduleManager getScheduleManager() {
@@ -681,8 +691,11 @@ public class AzkabanWebServer extends AzkabanServer {
registerMbean("jetty", new JmxJettyServer(this.server));
registerMbean("triggerManager", new JmxTriggerManager(this.triggerManager));
- if (this.executorManager != null) {
- registerMbean("executorManager", new JmxExecutorManager(this.executorManager));
+ // Todo jamiesjc: enable Jmx for executionController later
+ if (this.executorManagerAdapter != null
+ && this.executorManagerAdapter instanceof ExecutorManager) {
+ registerMbean("executorManager",
+ new JmxExecutorManager((ExecutorManager) this.executorManagerAdapter));
}
// Register Log4J loggers as JMX beans so the log level can be
@@ -712,7 +725,7 @@ public class AzkabanWebServer extends AzkabanServer {
logger.error("Failed to cleanup MBeanServer", e);
}
this.scheduleManager.shutdown();
- this.executorManager.shutdown();
+ this.executorManagerAdapter.shutdown();
try {
this.server.stop();
} catch (final Exception e) {
diff --git a/azkaban-web-server/src/main/java/azkaban/webapp/AzkabanWebServerModule.java b/azkaban-web-server/src/main/java/azkaban/webapp/AzkabanWebServerModule.java
index 039e864..344b81e 100644
--- a/azkaban-web-server/src/main/java/azkaban/webapp/AzkabanWebServerModule.java
+++ b/azkaban-web-server/src/main/java/azkaban/webapp/AzkabanWebServerModule.java
@@ -18,11 +18,13 @@
package azkaban.webapp;
import azkaban.Constants.ConfigurationKeys;
+import azkaban.executor.ExecutionController;
+import azkaban.executor.ExecutorManager;
+import azkaban.executor.ExecutorManagerAdapter;
import azkaban.flowtrigger.database.FlowTriggerInstanceLoader;
import azkaban.flowtrigger.database.JdbcFlowTriggerInstanceLoaderImpl;
import azkaban.flowtrigger.plugin.FlowTriggerDependencyPluginException;
import azkaban.flowtrigger.plugin.FlowTriggerDependencyPluginManager;
-import azkaban.flowtrigger.database.JdbcFlowTriggerInstanceLoaderImpl;
import azkaban.scheduler.ScheduleLoader;
import azkaban.scheduler.TriggerBasedScheduleLoader;
import azkaban.user.UserManager;
@@ -50,6 +52,11 @@ public class AzkabanWebServerModule extends AbstractModule {
private static final Logger log = Logger.getLogger(AzkabanWebServerModule.class);
private static final String USER_MANAGER_CLASS_PARAM = "user.manager.class";
private static final String VELOCITY_DEV_MODE_PARAM = "velocity.dev.mode";
+ private final Props props;
+
+ public AzkabanWebServerModule(final Props props) {
+ this.props = props;
+ }
@Provides
@Singleton
@@ -71,6 +78,12 @@ public class AzkabanWebServerModule extends AbstractModule {
bind(Server.class).toProvider(WebServerProvider.class);
bind(ScheduleLoader.class).to(TriggerBasedScheduleLoader.class);
bind(FlowTriggerInstanceLoader.class).to(JdbcFlowTriggerInstanceLoaderImpl.class);
+ bind(ExecutorManagerAdapter.class).to(resolveExecutorManagerAdaptorClassType());
+ }
+
+ private Class<? extends ExecutorManagerAdapter> resolveExecutorManagerAdaptorClassType() {
+ return this.props.getBoolean(ConfigurationKeys.AZKABAN_POLL_MODEL, false)
+ ? ExecutionController.class : ExecutorManager.class;
}
@Inject
diff --git a/azkaban-web-server/src/main/java/azkaban/webapp/servlet/ExecutorServlet.java b/azkaban-web-server/src/main/java/azkaban/webapp/servlet/ExecutorServlet.java
index beeab4a..85d74c6 100644
--- a/azkaban-web-server/src/main/java/azkaban/webapp/servlet/ExecutorServlet.java
+++ b/azkaban-web-server/src/main/java/azkaban/webapp/servlet/ExecutorServlet.java
@@ -74,7 +74,7 @@ public class ExecutorServlet extends LoginAbstractAzkabanServlet {
private WebMetrics webMetrics;
private ProjectManager projectManager;
private FlowTriggerService flowTriggerService;
- private ExecutorManagerAdapter executorManager;
+ private ExecutorManagerAdapter executorManagerAdapter;
private ScheduleManager scheduleManager;
private UserManager userManager;
@@ -84,7 +84,7 @@ public class ExecutorServlet extends LoginAbstractAzkabanServlet {
final AzkabanWebServer server = (AzkabanWebServer) getApplication();
this.userManager = server.getUserManager();
this.projectManager = server.getProjectManager();
- this.executorManager = server.getExecutorManager();
+ this.executorManagerAdapter = server.getExecutorManagerAdapter();
this.scheduleManager = server.getScheduleManager();
this.flowTriggerService = server.getFlowTriggerService();
// TODO: reallocf fully guicify
@@ -120,7 +120,7 @@ public class ExecutorServlet extends LoginAbstractAzkabanServlet {
ExecutableFlow exFlow = null;
try {
- exFlow = this.executorManager.getExecutableFlow(execid);
+ exFlow = this.executorManagerAdapter.getExecutableFlow(execid);
} catch (final ExecutorManagerException e) {
ret.put("error",
"Error fetching execution '" + execid + "': " + e.getMessage());
@@ -198,9 +198,9 @@ public class ExecutorServlet extends LoginAbstractAzkabanServlet {
if (HttpRequestUtils.hasPermission(this.userManager, user, Type.ADMIN)) {
try {
if (enableQueue) {
- this.executorManager.enableQueueProcessorThread();
+ this.executorManagerAdapter.enableQueueProcessorThread();
} else {
- this.executorManager.disableQueueProcessorThread();
+ this.executorManagerAdapter.disableQueueProcessorThread();
}
returnMap.put(ConnectorParams.STATUS_PARAM,
ConnectorParams.RESPONSE_SUCCESS);
@@ -256,7 +256,7 @@ public class ExecutorServlet extends LoginAbstractAzkabanServlet {
boolean wasSuccess = false;
if (HttpRequestUtils.hasPermission(this.userManager, user, Type.ADMIN)) {
try {
- this.executorManager.setupExecutors();
+ this.executorManagerAdapter.setupExecutors();
returnMap.put(ConnectorParams.STATUS_PARAM,
ConnectorParams.RESPONSE_SUCCESS);
wasSuccess = true;
@@ -300,7 +300,7 @@ public class ExecutorServlet extends LoginAbstractAzkabanServlet {
ExecutableNode node = null;
final String jobLinkUrl;
try {
- flow = this.executorManager.getExecutableFlow(execId);
+ flow = this.executorManagerAdapter.getExecutableFlow(execId);
if (flow == null) {
page.add("errorMsg", "Error loading executing flow " + execId
+ ": not found.");
@@ -315,7 +315,7 @@ public class ExecutorServlet extends LoginAbstractAzkabanServlet {
return;
}
- jobLinkUrl = this.executorManager.getJobLinkUrl(flow, jobId, attempt);
+ jobLinkUrl = this.executorManagerAdapter.getJobLinkUrl(flow, jobId, attempt);
final List<ViewerPlugin> jobViewerPlugins =
PluginRegistry.getRegistry().getViewerPluginsForJobType(
@@ -359,11 +359,11 @@ public class ExecutorServlet extends LoginAbstractAzkabanServlet {
"azkaban/webapp/servlet/velocity/executionspage.vm");
final List<Pair<ExecutableFlow, Optional<Executor>>> runningFlows =
- this.executorManager.getActiveFlowsWithExecutor();
+ this.executorManagerAdapter.getActiveFlowsWithExecutor();
page.add("runningFlows", runningFlows.isEmpty() ? null : runningFlows);
final List<ExecutableFlow> finishedFlows =
- this.executorManager.getRecentlyFinishedFlows();
+ this.executorManagerAdapter.getRecentlyFinishedFlows();
page.add("recentlyFinished", finishedFlows.isEmpty() ? null : finishedFlows);
page.add("vmutils", new VelocityUtil(this.projectManager));
page.render();
@@ -439,7 +439,7 @@ public class ExecutorServlet extends LoginAbstractAzkabanServlet {
ExecutableFlow flow = null;
try {
- flow = this.executorManager.getExecutableFlow(execId);
+ flow = this.executorManagerAdapter.getExecutableFlow(execId);
if (flow == null) {
page.add("errorMsg", "Error loading executing flow " + execId
+ " not found.");
@@ -537,7 +537,7 @@ public class ExecutorServlet extends LoginAbstractAzkabanServlet {
}
try {
- this.executorManager.retryFailures(exFlow, user.getUserId());
+ this.executorManagerAdapter.retryFailures(exFlow, user.getUserId());
} catch (final ExecutorManagerException e) {
ret.put("error", e.getMessage());
}
@@ -563,7 +563,7 @@ public class ExecutorServlet extends LoginAbstractAzkabanServlet {
try {
final LogData data =
- this.executorManager.getExecutableFlowLog(exFlow, offset, length);
+ this.executorManagerAdapter.getExecutableFlowLog(exFlow, offset, length);
if (data == null) {
ret.put("length", 0);
ret.put("offset", offset);
@@ -614,7 +614,7 @@ public class ExecutorServlet extends LoginAbstractAzkabanServlet {
final int attempt = this.getIntParam(req, "attempt", node.getAttempt());
final LogData data =
- this.executorManager.getExecutionJobLog(exFlow, jobId, offset, length,
+ this.executorManagerAdapter.getExecutionJobLog(exFlow, jobId, offset, length,
attempt);
if (data == null) {
ret.put("length", 0);
@@ -651,7 +651,7 @@ public class ExecutorServlet extends LoginAbstractAzkabanServlet {
}
final List<Object> jsonObj =
- this.executorManager
+ this.executorManagerAdapter
.getExecutionJobStats(exFlow, jobId, node.getAttempt());
ret.put("jobStats", jsonObj);
} catch (final ExecutorManagerException e) {
@@ -765,7 +765,7 @@ public class ExecutorServlet extends LoginAbstractAzkabanServlet {
}
try {
- this.executorManager.cancelFlow(exFlow, user.getUserId());
+ this.executorManagerAdapter.cancelFlow(exFlow, user.getUserId());
} catch (final ExecutorManagerException e) {
ret.put("error", e.getMessage());
}
@@ -781,7 +781,7 @@ public class ExecutorServlet extends LoginAbstractAzkabanServlet {
}
final List<Integer> refs =
- this.executorManager.getRunningFlows(project.getId(), flowId);
+ this.executorManagerAdapter.getRunningFlows(project.getId(), flowId);
if (!refs.isEmpty()) {
ret.put("execIds", refs);
}
@@ -798,7 +798,7 @@ public class ExecutorServlet extends LoginAbstractAzkabanServlet {
}
try {
- this.executorManager.pauseFlow(exFlow, user.getUserId());
+ this.executorManagerAdapter.pauseFlow(exFlow, user.getUserId());
} catch (final ExecutorManagerException e) {
ret.put("error", e.getMessage());
}
@@ -815,7 +815,7 @@ public class ExecutorServlet extends LoginAbstractAzkabanServlet {
}
try {
- this.executorManager.resumeFlow(exFlow, user.getUserId());
+ this.executorManagerAdapter.resumeFlow(exFlow, user.getUserId());
} catch (final ExecutorManagerException e) {
ret.put("resume", e.getMessage());
}
@@ -1004,7 +1004,7 @@ public class ExecutorServlet extends LoginAbstractAzkabanServlet {
try {
HttpRequestUtils.filterAdminOnlyFlowParams(this.userManager, options, user);
final String message =
- this.executorManager.submitExecutableFlow(exflow, user.getUserId());
+ this.executorManagerAdapter.submitExecutableFlow(exflow, user.getUserId());
ret.put("message", message);
} catch (final Exception e) {
e.printStackTrace();
diff --git a/azkaban-web-server/src/main/java/azkaban/webapp/servlet/HistoryServlet.java b/azkaban-web-server/src/main/java/azkaban/webapp/servlet/HistoryServlet.java
index 6ab39e2..b2b02ad 100644
--- a/azkaban-web-server/src/main/java/azkaban/webapp/servlet/HistoryServlet.java
+++ b/azkaban-web-server/src/main/java/azkaban/webapp/servlet/HistoryServlet.java
@@ -35,14 +35,14 @@ public class HistoryServlet extends LoginAbstractAzkabanServlet {
private static final String FILTER_BY_DATE_PATTERN = "MM/dd/yyyy hh:mm aa";
private static final long serialVersionUID = 1L;
- private ExecutorManagerAdapter executorManager;
+ private ExecutorManagerAdapter executorManagerAdapter;
private ProjectManager projectManager;
@Override
public void init(final ServletConfig config) throws ServletException {
super.init(config);
final AzkabanWebServer server = (AzkabanWebServer) getApplication();
- this.executorManager = server.getExecutorManager();
+ this.executorManagerAdapter = server.getExecutorManagerAdapter();
this.projectManager = server.getProjectManager();
}
@@ -111,7 +111,7 @@ public class HistoryServlet extends LoginAbstractAzkabanServlet {
.parseDateTime(end).getMillis();
try {
history =
- this.executorManager.getExecutableFlows(projContain, flowContain,
+ this.executorManagerAdapter.getExecutableFlows(projContain, flowContain,
userContain, status, beginTime, endTime, (pageNum - 1)
* pageSize, pageSize);
} catch (final ExecutorManagerException e) {
@@ -121,7 +121,7 @@ public class HistoryServlet extends LoginAbstractAzkabanServlet {
final String searchTerm = getParam(req, "searchterm");
try {
history =
- this.executorManager.getExecutableFlows(searchTerm, (pageNum - 1)
+ this.executorManagerAdapter.getExecutableFlows(searchTerm, (pageNum - 1)
* pageSize, pageSize);
} catch (final ExecutorManagerException e) {
page.add("error", e.getMessage());
@@ -129,7 +129,7 @@ public class HistoryServlet extends LoginAbstractAzkabanServlet {
} else {
try {
history =
- this.executorManager.getExecutableFlows((pageNum - 1) * pageSize,
+ this.executorManagerAdapter.getExecutableFlows((pageNum - 1) * pageSize,
pageSize);
} catch (final ExecutorManagerException e) {
e.printStackTrace();
diff --git a/azkaban-web-server/src/main/java/azkaban/webapp/servlet/JMXHttpServlet.java b/azkaban-web-server/src/main/java/azkaban/webapp/servlet/JMXHttpServlet.java
index 1ca04a2..d389062 100644
--- a/azkaban-web-server/src/main/java/azkaban/webapp/servlet/JMXHttpServlet.java
+++ b/azkaban-web-server/src/main/java/azkaban/webapp/servlet/JMXHttpServlet.java
@@ -51,7 +51,7 @@ public class JMXHttpServlet extends LoginAbstractAzkabanServlet implements
private UserManager userManager;
private AzkabanWebServer server;
- private ExecutorManagerAdapter executorManager;
+ private ExecutorManagerAdapter executorManagerAdapter;
private TriggerManager triggerManager;
@Override
@@ -60,7 +60,7 @@ public class JMXHttpServlet extends LoginAbstractAzkabanServlet implements
this.server = (AzkabanWebServer) getApplication();
this.userManager = this.server.getUserManager();
- this.executorManager = this.server.getExecutorManager();
+ this.executorManagerAdapter = this.server.getExecutorManagerAdapter();
this.triggerManager = this.server.getTriggerManager();
}
@@ -83,7 +83,7 @@ public class JMXHttpServlet extends LoginAbstractAzkabanServlet implements
final String hostPort = getParam(req, JMX_HOSTPORT);
final String mbean = getParam(req, JMX_MBEAN);
final Map<String, Object> result =
- this.executorManager.callExecutorJMX(hostPort,
+ this.executorManagerAdapter.callExecutorJMX(hostPort,
JMX_GET_ALL_MBEAN_ATTRIBUTES, mbean);
// order the attribute by name
for (final Map.Entry<String, Object> entry : result.entrySet()) {
@@ -171,10 +171,10 @@ public class JMXHttpServlet extends LoginAbstractAzkabanServlet implements
page.add("mbeans", this.server.getMbeanNames());
final Map<String, Object> executorMBeans = new HashMap<>();
- for (final String hostPort : this.executorManager.getAllActiveExecutorServerHosts()) {
+ for (final String hostPort : this.executorManagerAdapter.getAllActiveExecutorServerHosts()) {
try {
final Map<String, Object> mbeans =
- this.executorManager.callExecutorJMX(hostPort, JMX_GET_MBEANS, null);
+ this.executorManagerAdapter.callExecutorJMX(hostPort, JMX_GET_MBEANS, null);
executorMBeans.put(hostPort, mbeans.get("mbeans"));
} catch (final IOException e) {
diff --git a/azkaban-web-server/src/main/java/azkaban/webapp/servlet/ProjectManagerServlet.java b/azkaban-web-server/src/main/java/azkaban/webapp/servlet/ProjectManagerServlet.java
index 6a6ae41..ac2dcaf 100644
--- a/azkaban-web-server/src/main/java/azkaban/webapp/servlet/ProjectManagerServlet.java
+++ b/azkaban-web-server/src/main/java/azkaban/webapp/servlet/ProjectManagerServlet.java
@@ -103,7 +103,7 @@ public class ProjectManagerServlet extends LoginAbstractAzkabanServlet {
}
};
private ProjectManager projectManager;
- private ExecutorManagerAdapter executorManager;
+ private ExecutorManagerAdapter executorManagerAdapter;
private ScheduleManager scheduleManager;
private UserManager userManager;
private FlowTriggerScheduler scheduler;
@@ -118,7 +118,7 @@ public class ProjectManagerServlet extends LoginAbstractAzkabanServlet {
final AzkabanWebServer server = (AzkabanWebServer) getApplication();
this.projectManager = server.getProjectManager();
- this.executorManager = server.getExecutorManager();
+ this.executorManagerAdapter = server.getExecutorManagerAdapter();
this.scheduleManager = server.getScheduleManager();
this.userManager = server.getUserManager();
this.scheduler = server.getScheduler();
@@ -388,7 +388,7 @@ public class ProjectManagerServlet extends LoginAbstractAzkabanServlet {
List<ExecutableFlow> exFlows = null;
try {
exFlows =
- this.executorManager.getExecutableFlows(project.getId(), flowId, 0, 1,
+ this.executorManagerAdapter.getExecutableFlows(project.getId(), flowId, 0, 1,
Status.SUCCEEDED);
} catch (final ExecutorManagerException e) {
ret.put("error", "Error retrieving executable flows");
@@ -417,7 +417,7 @@ public class ProjectManagerServlet extends LoginAbstractAzkabanServlet {
int total = 0;
try {
total =
- this.executorManager.getExecutableFlows(project.getId(), flowId, from,
+ this.executorManagerAdapter.getExecutableFlows(project.getId(), flowId, from,
length, exFlows);
} catch (final ExecutorManagerException e) {
ret.put("error", "Error retrieving executable flows");
@@ -1179,10 +1179,10 @@ public class ProjectManagerServlet extends LoginAbstractAzkabanServlet {
int numResults = 0;
try {
- numResults = this.executorManager.getNumberOfJobExecutions(project, jobId);
+ numResults = this.executorManagerAdapter.getNumberOfJobExecutions(project, jobId);
final int maxPage = (numResults / pageSize) + 1;
List<ExecutableJobInfo> jobInfo =
- this.executorManager.getExecutableJobs(project, jobId, skipPage, pageSize);
+ this.executorManagerAdapter.getExecutableJobs(project, jobId, skipPage, pageSize);
if (jobInfo == null || jobInfo.isEmpty()) {
jobInfo = null;
diff --git a/azkaban-web-server/src/main/java/azkaban/webapp/servlet/StatsServlet.java b/azkaban-web-server/src/main/java/azkaban/webapp/servlet/StatsServlet.java
index e6eac84..daf289f 100644
--- a/azkaban-web-server/src/main/java/azkaban/webapp/servlet/StatsServlet.java
+++ b/azkaban-web-server/src/main/java/azkaban/webapp/servlet/StatsServlet.java
@@ -18,7 +18,7 @@ package azkaban.webapp.servlet;
import azkaban.executor.ConnectorParams;
import azkaban.executor.Executor;
-import azkaban.executor.ExecutorManager;
+import azkaban.executor.ExecutorManagerAdapter;
import azkaban.executor.ExecutorManagerException;
import azkaban.server.session.Session;
import azkaban.user.User;
@@ -47,14 +47,14 @@ public class StatsServlet extends LoginAbstractAzkabanServlet {
private static final long serialVersionUID = 1L;
private UserManager userManager;
- private ExecutorManager execManager;
+ private ExecutorManagerAdapter execManagerAdapter;
@Override
public void init(final ServletConfig config) throws ServletException {
super.init(config);
final AzkabanWebServer server = (AzkabanWebServer) getApplication();
this.userManager = server.getUserManager();
- this.execManager = server.getExecutorManager();
+ this.execManagerAdapter = server.getExecutorManagerAdapter();
}
@Override
@@ -108,7 +108,7 @@ public class StatsServlet extends LoginAbstractAzkabanServlet {
final Map<String, Object> result;
try {
result =
- this.execManager.callExecutorStats(executorId,
+ this.execManagerAdapter.callExecutorStats(executorId,
ConnectorParams.STATS_GET_ALLMETRICSNAME,
(Pair<String, String>[]) null);
@@ -134,7 +134,7 @@ public class StatsServlet extends LoginAbstractAzkabanServlet {
throws ServletException, IOException {
try {
final Map<String, Object> result =
- this.execManager
+ this.execManagerAdapter
.callExecutorStats(executorId, actionName, getAllParams(req));
if (result.containsKey(ConnectorParams.RESPONSE_ERROR)) {
ret.put(ConnectorParams.RESPONSE_ERROR,
@@ -157,7 +157,7 @@ public class StatsServlet extends LoginAbstractAzkabanServlet {
ServletException {
try {
final Map<String, Object> result =
- this.execManager.callExecutorStats(executorId,
+ this.execManagerAdapter.callExecutorStats(executorId,
ConnectorParams.STATS_GET_METRICHISTORY, getAllParams(req));
if (result.containsKey(ConnectorParams.RESPONSE_ERROR)) {
ret.put(ConnectorParams.RESPONSE_ERROR,
@@ -181,11 +181,11 @@ public class StatsServlet extends LoginAbstractAzkabanServlet {
final Page page = newPage(req, resp, session, "azkaban/webapp/servlet/velocity/statsPage.vm");
try {
- final Collection<Executor> executors = this.execManager.getAllActiveExecutors();
+ final Collection<Executor> executors = this.execManagerAdapter.getAllActiveExecutors();
page.add("executorList", executors);
final Map<String, Object> result =
- this.execManager.callExecutorStats(executors.iterator().next().getId(),
+ this.execManagerAdapter.callExecutorStats(executors.iterator().next().getId(),
ConnectorParams.STATS_GET_ALLMETRICSNAME,
(Pair<String, String>[]) null);
if (result.containsKey(ConnectorParams.RESPONSE_ERROR)) {
diff --git a/azkaban-web-server/src/test/java/azkaban/scheduler/QuartzSchedulerTest.java b/azkaban-web-server/src/test/java/azkaban/scheduler/QuartzSchedulerTest.java
index fd27ec6..0ef2c35 100644
--- a/azkaban-web-server/src/test/java/azkaban/scheduler/QuartzSchedulerTest.java
+++ b/azkaban-web-server/src/test/java/azkaban/scheduler/QuartzSchedulerTest.java
@@ -60,7 +60,7 @@ public class QuartzSchedulerTest {
props.put("h2.path", "./h2");
final Injector injector = Guice.createInjector(
new AzkabanCommonModule(props),
- new AzkabanWebServerModule()
+ new AzkabanWebServerModule(props)
);
SERVICE_PROVIDER.unsetInjector();
diff --git a/azkaban-web-server/src/test/java/azkaban/webapp/AzkabanWebServerTest.java b/azkaban-web-server/src/test/java/azkaban/webapp/AzkabanWebServerTest.java
index 7a11e4e..f1b2207 100644
--- a/azkaban-web-server/src/test/java/azkaban/webapp/AzkabanWebServerTest.java
+++ b/azkaban-web-server/src/test/java/azkaban/webapp/AzkabanWebServerTest.java
@@ -38,7 +38,7 @@ import azkaban.executor.Executor;
import azkaban.executor.ExecutorDao;
import azkaban.executor.ExecutorEventsDao;
import azkaban.executor.ExecutorLoader;
-import azkaban.executor.ExecutorManager;
+import azkaban.executor.ExecutorManagerAdapter;
import azkaban.executor.FetchActiveFlowDao;
import azkaban.project.ProjectLoader;
import azkaban.project.ProjectManager;
@@ -120,7 +120,7 @@ public class AzkabanWebServerTest {
public void testInjection() throws Exception {
final Injector injector = Guice.createInjector(
new AzkabanCommonModule(props),
- new AzkabanWebServerModule()
+ new AzkabanWebServerModule(props)
);
SERVICE_PROVIDER.unsetInjector();
SERVICE_PROVIDER.setInjector(injector);
@@ -137,7 +137,7 @@ public class AzkabanWebServerTest {
//Test if triggermanager is singletonly guiced. If not, the below test will fail.
assertSingleton(ExecutorLoader.class, injector);
- assertSingleton(ExecutorManager.class, injector);
+ assertSingleton(ExecutorManagerAdapter.class, injector);
assertSingleton(ProjectLoader.class, injector);
assertSingleton(ProjectManager.class, injector);
assertSingleton(Storage.class, injector);
diff --git a/az-reportal/src/main/java/azkaban/viewer/reportal/ReportalServlet.java b/az-reportal/src/main/java/azkaban/viewer/reportal/ReportalServlet.java
index b01e9d4..c300aa1 100644
--- a/az-reportal/src/main/java/azkaban/viewer/reportal/ReportalServlet.java
+++ b/az-reportal/src/main/java/azkaban/viewer/reportal/ReportalServlet.java
@@ -288,9 +288,9 @@ public class ReportalServlet extends LoginAbstractAzkabanServlet {
final int offset = getIntParam(req, "offset");
final int length = getIntParam(req, "length");
final ExecutableFlow exec;
- final ExecutorManagerAdapter executorManager = this.server.getExecutorManager();
+ final ExecutorManagerAdapter executorManagerAdapter = this.server.getExecutorManagerAdapter();
try {
- exec = executorManager.getExecutableFlow(execId);
+ exec = executorManagerAdapter.getExecutableFlow(execId);
} catch (final Exception e) {
ret.put("error", "Log does not exist or isn't created yet.");
return;
@@ -299,7 +299,7 @@ public class ReportalServlet extends LoginAbstractAzkabanServlet {
final LogData data;
try {
data =
- executorManager.getExecutionJobLog(exec, jobId, offset, length,
+ executorManagerAdapter.getExecutionJobLog(exec, jobId, offset, length,
exec.getExecutableNode(jobId).getAttempt());
} catch (final Exception e) {
e.printStackTrace();
@@ -364,7 +364,7 @@ public class ReportalServlet extends LoginAbstractAzkabanServlet {
preparePage(page, session);
final ProjectManager projectManager = this.server.getProjectManager();
- final ExecutorManagerAdapter executorManager = this.server.getExecutorManager();
+ final ExecutorManagerAdapter executorManagerAdapter = this.server.getExecutorManagerAdapter();
final Project project = projectManager.getProject(id);
final Reportal reportal = Reportal.loadFromProject(project);
@@ -392,7 +392,7 @@ public class ReportalServlet extends LoginAbstractAzkabanServlet {
if (hasParam(req, "logs")) {
final ExecutableFlow exec;
try {
- exec = executorManager.getExecutableFlow(execId);
+ exec = executorManagerAdapter.getExecutableFlow(execId);
} catch (final ExecutorManagerException e) {
e.printStackTrace();
page.add("errorMsg", "ExecutableFlow not found. " + e.getMessage());
@@ -501,10 +501,10 @@ public class ReportalServlet extends LoginAbstractAzkabanServlet {
}
try {
final Flow flow = project.getFlows().get(0);
- executorManager.getExecutableFlows(project.getId(), flow.getId(),
+ executorManagerAdapter.getExecutableFlows(project.getId(), flow.getId(),
pageNumber * this.itemsPerPage, this.itemsPerPage, exFlows);
final ArrayList<ExecutableFlow> tmp = new ArrayList<>();
- executorManager.getExecutableFlows(project.getId(), flow.getId(),
+ executorManagerAdapter.getExecutableFlows(project.getId(), flow.getId(),
(pageNumber + 1) * this.itemsPerPage, 1, tmp);
if (!tmp.isEmpty()) {
hasNextPage = true;
@@ -1218,7 +1218,7 @@ public class ReportalServlet extends LoginAbstractAzkabanServlet {
try {
final String message =
- this.server.getExecutorManager().submitExecutableFlow(exflow,
+ this.server.getExecutorManagerAdapter().submitExecutableFlow(exflow,
session.getUser().getUserId())
+ ".";
ret.put("message", message);