Details
diff --git a/azkaban-common/src/main/java/azkaban/executor/ExecutionController.java b/azkaban-common/src/main/java/azkaban/executor/ExecutionController.java
index fee1499..55da336 100644
--- a/azkaban-common/src/main/java/azkaban/executor/ExecutionController.java
+++ b/azkaban-common/src/main/java/azkaban/executor/ExecutionController.java
@@ -25,13 +25,14 @@ import java.time.Duration;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
-import java.util.Collections;
+import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import javax.inject.Inject;
import javax.inject.Singleton;
+import org.apache.commons.lang.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -89,8 +90,13 @@ public class ExecutionController extends EventHandler implements ExecutorManager
@Override
public Collection<Executor> getAllActiveExecutors() {
- // Todo: get the executor info from DB
- return Collections.emptyList();
+ List<Executor> executors = new ArrayList<>();
+ try {
+ executors = this.executorLoader.fetchActiveExecutors();
+ } catch (final ExecutorManagerException e) {
+ this.logger.error("Failed to get all active executors.", e);
+ }
+ return executors;
}
@Override
@@ -100,31 +106,88 @@ public class ExecutionController extends EventHandler implements ExecutorManager
@Override
public Set<String> getPrimaryServerHosts() {
- // Todo: get the primary executor host info from DB
- return Collections.emptySet();
+ final HashSet<String> ports = new HashSet<>();
+ try {
+ for (final Executor executor : this.executorLoader.fetchActiveExecutors()) {
+ ports.add(executor.getHost() + ":" + executor.getPort());
+ }
+ } catch (final ExecutorManagerException e) {
+ this.logger.error("Failed to get primary server hosts.", e);
+ }
+ return ports;
}
@Override
public Set<String> getAllActiveExecutorServerHosts() {
- // Todo: get all executor host info from DB
- return Collections.emptySet();
+ final Set<String> ports = getPrimaryServerHosts();
+ // include executor which were initially active and still has flows running
+ try {
+ for (final Pair<ExecutionReference, ExecutableFlow> running : this.executorLoader
+ .fetchActiveFlows().values()) {
+ final ExecutionReference ref = running.getFirst();
+ if (ref.getExecutor().isPresent()) {
+ final Executor executor = ref.getExecutor().get();
+ ports.add(executor.getHost() + ":" + executor.getPort());
+ }
+ }
+ } catch (final ExecutorManagerException e) {
+ this.logger.error("Failed to get all active executor server hosts.", e);
+ }
+ return ports;
}
/**
- * Gets a list of all the active (running flows and non-dispatched flows) executions for a given
- * project and flow from database. {@inheritDoc}
+ * Gets a list of all the unfinished (both dispatched and non-dispatched) executions for a
+ * given project and flow {@inheritDoc}.
+ *
+ * @see azkaban.executor.ExecutorManagerAdapter#getRunningFlows(int, java.lang.String)
*/
@Override
public List<Integer> getRunningFlows(final int projectId, final String flowId) {
- // Todo: get running flows from DB
- return Collections.emptyList();
+ final List<Integer> executionIds = new ArrayList<>();
+ try {
+ executionIds.addAll(getRunningFlowsHelper(projectId, flowId,
+ this.executorLoader.fetchUnfinishedFlows().values()));
+ } catch (final ExecutorManagerException e) {
+ this.logger.error("Failed to get running flows for project " + projectId + ", flow "
+ + flowId, e);
+ }
+ return executionIds;
+ }
+
+ /* Helper method for getRunningFlows */
+ private List<Integer> getRunningFlowsHelper(final int projectId, final String flowId,
+ final Collection<Pair<ExecutionReference, ExecutableFlow>> collection) {
+ final List<Integer> executionIds = new ArrayList<>();
+ for (final Pair<ExecutionReference, ExecutableFlow> ref : collection) {
+ if (ref.getSecond().getFlowId().equals(flowId)
+ && ref.getSecond().getProjectId() == projectId) {
+ executionIds.add(ref.getFirst().getExecId());
+ }
+ }
+ return executionIds;
}
@Override
public List<Pair<ExecutableFlow, Optional<Executor>>> getActiveFlowsWithExecutor()
throws IOException {
- // Todo: get active flows with executor from DB
- return Collections.emptyList();
+ final List<Pair<ExecutableFlow, Optional<Executor>>> flows = new ArrayList<>();
+ try {
+ getActiveFlowsWithExecutorHelper(flows, this.executorLoader.fetchUnfinishedFlows().values());
+ } catch (final ExecutorManagerException e) {
+ this.logger.error("Failed to get active flows with executor.", e);
+ }
+ return flows;
+ }
+
+ /* Helper method for getActiveFlowsWithExecutor */
+ private void getActiveFlowsWithExecutorHelper(
+ final List<Pair<ExecutableFlow, Optional<Executor>>> flows,
+ final Collection<Pair<ExecutionReference, ExecutableFlow>> collection) {
+ for (final Pair<ExecutionReference, ExecutableFlow> ref : collection) {
+ flows.add(new Pair<>(ref.getSecond(), ref
+ .getFirst().getExecutor()));
+ }
}
/**
@@ -133,8 +196,29 @@ public class ExecutionController extends EventHandler implements ExecutorManager
*/
@Override
public boolean isFlowRunning(final int projectId, final String flowId) {
- // Todo: check DB to see if flow is running
- return true;
+ boolean isRunning = false;
+ try {
+ isRunning = isFlowRunningHelper(projectId, flowId,
+ this.executorLoader.fetchUnfinishedFlows().values());
+
+ } catch (final ExecutorManagerException e) {
+ this.logger.error(
+ "Failed to check if the flow is running for project " + projectId + ", flow " + flowId,
+ e);
+ }
+ return isRunning;
+ }
+
+ /* Search a running flow in a collection */
+ private boolean isFlowRunningHelper(final int projectId, final String flowId,
+ final Collection<Pair<ExecutionReference, ExecutableFlow>> collection) {
+ for (final Pair<ExecutionReference, ExecutableFlow> ref : collection) {
+ if (ref.getSecond().getProjectId() == projectId
+ && ref.getSecond().getFlowId().equals(flowId)) {
+ return true;
+ }
+ }
+ return false;
}
/**
@@ -151,8 +235,24 @@ public class ExecutionController extends EventHandler implements ExecutorManager
*/
@Override
public List<ExecutableFlow> getRunningFlows() {
- // Todo: get running flows from DB
- return Collections.emptyList();
+ final ArrayList<ExecutableFlow> flows = new ArrayList<>();
+ try {
+ getActiveFlowHelper(flows, this.executorLoader.fetchUnfinishedFlows().values());
+ } catch (final ExecutorManagerException e) {
+ this.logger.error("Failed to get running flows.", e);
+ }
+ return flows;
+ }
+
+ /**
+ * Helper method to get all running flows from a Pair<ExecutionReference,
+ * ExecutableFlow collection
+ */
+ private void getActiveFlowHelper(final ArrayList<ExecutableFlow> flows,
+ final Collection<Pair<ExecutionReference, ExecutableFlow>> collection) {
+ for (final Pair<ExecutionReference, ExecutableFlow> ref : collection) {
+ flows.add(ref.getSecond());
+ }
}
@Override
@@ -211,22 +311,77 @@ public class ExecutionController extends EventHandler implements ExecutorManager
@Override
public LogData getExecutableFlowLog(final ExecutableFlow exFlow, final int offset,
final int length) throws ExecutorManagerException {
- // Todo: get the flow log from executor if the flow is running, else get it from DB.
- return new LogData(0, 0, "dummy");
+ final Pair<ExecutionReference, ExecutableFlow> pair = this.executorLoader
+ .fetchActiveFlowByExecId(exFlow.getExecutionId());
+ if (pair != null) {
+ final Pair<String, String> typeParam = new Pair<>("type", "flow");
+ final Pair<String, String> offsetParam =
+ new Pair<>("offset", String.valueOf(offset));
+ final Pair<String, String> lengthParam =
+ new Pair<>("length", String.valueOf(length));
+
+ @SuppressWarnings("unchecked") final Map<String, Object> result =
+ this.apiGateway.callWithReference(pair.getFirst(), ConnectorParams.LOG_ACTION,
+ typeParam, offsetParam, lengthParam);
+ return LogData.createLogDataFromObject(result);
+ } else {
+ final LogData value =
+ this.executorLoader.fetchLogs(exFlow.getExecutionId(), "", 0, offset,
+ length);
+ return value;
+ }
}
@Override
public LogData getExecutionJobLog(final ExecutableFlow exFlow, final String jobId,
final int offset, final int length, final int attempt) throws ExecutorManagerException {
- // Todo: get the job log from executor if the flow is running, else get it from DB.
- return new LogData(0, 0, "dummy");
+ final Pair<ExecutionReference, ExecutableFlow> pair = this.executorLoader
+ .fetchActiveFlowByExecId(exFlow.getExecutionId());
+ if (pair != null) {
+ final Pair<String, String> typeParam = new Pair<>("type", "job");
+ final Pair<String, String> jobIdParam =
+ new Pair<>("jobId", jobId);
+ final Pair<String, String> offsetParam =
+ new Pair<>("offset", String.valueOf(offset));
+ final Pair<String, String> lengthParam =
+ new Pair<>("length", String.valueOf(length));
+ final Pair<String, String> attemptParam =
+ new Pair<>("attempt", String.valueOf(attempt));
+
+ @SuppressWarnings("unchecked") final Map<String, Object> result =
+ this.apiGateway.callWithReference(pair.getFirst(), ConnectorParams.LOG_ACTION,
+ typeParam, jobIdParam, offsetParam, lengthParam, attemptParam);
+ return LogData.createLogDataFromObject(result);
+ } else {
+ final LogData value =
+ this.executorLoader.fetchLogs(exFlow.getExecutionId(), jobId, attempt,
+ offset, length);
+ return value;
+ }
}
@Override
public List<Object> getExecutionJobStats(final ExecutableFlow exFlow, final String jobId,
final int attempt) throws ExecutorManagerException {
- // Todo: get execution job status from executor if the flow is running, else get if from DB.
- return Collections.emptyList();
+ final Pair<ExecutionReference, ExecutableFlow> pair =
+ this.executorLoader.fetchActiveFlowByExecId(exFlow.getExecutionId());
+ if (pair == null) {
+ return this.executorLoader.fetchAttachments(exFlow.getExecutionId(), jobId,
+ attempt);
+ }
+
+ final Pair<String, String> jobIdParam = new Pair<>("jobId", jobId);
+ final Pair<String, String> attemptParam =
+ new Pair<>("attempt", String.valueOf(attempt));
+
+ @SuppressWarnings("unchecked") final Map<String, Object> result =
+ this.apiGateway.callWithReference(pair.getFirst(), ConnectorParams.ATTACHMENTS_ACTION,
+ jobIdParam, attemptParam);
+
+ @SuppressWarnings("unchecked") final List<Object> jobStats = (List<Object>) result
+ .get("attachments");
+
+ return jobStats;
}
@Override
@@ -249,19 +404,83 @@ public class ExecutionController extends EventHandler implements ExecutorManager
@Override
public void resumeFlow(final ExecutableFlow exFlow, final String userId)
throws ExecutorManagerException {
- // Todo: call executor to resume the flow
+ synchronized (exFlow) {
+ final Pair<ExecutionReference, ExecutableFlow> pair =
+ this.executorLoader.fetchActiveFlowByExecId(exFlow.getExecutionId());
+ if (pair == null) {
+ throw new ExecutorManagerException("Execution "
+ + exFlow.getExecutionId() + " of flow " + exFlow.getFlowId()
+ + " isn't running.");
+ }
+ this.apiGateway
+ .callWithReferenceByUser(pair.getFirst(), ConnectorParams.RESUME_ACTION, userId);
+ }
}
@Override
public void pauseFlow(final ExecutableFlow exFlow, final String userId)
throws ExecutorManagerException {
- // Todo: call executor to pause the flow
+ synchronized (exFlow) {
+ final Pair<ExecutionReference, ExecutableFlow> pair =
+ this.executorLoader.fetchActiveFlowByExecId(exFlow.getExecutionId());
+ if (pair == null) {
+ throw new ExecutorManagerException("Execution "
+ + exFlow.getExecutionId() + " of flow " + exFlow.getFlowId()
+ + " isn't running.");
+ }
+ this.apiGateway
+ .callWithReferenceByUser(pair.getFirst(), ConnectorParams.PAUSE_ACTION, userId);
+ }
}
@Override
public void retryFailures(final ExecutableFlow exFlow, final String userId)
throws ExecutorManagerException {
- // Todo: call executor to retry failed flows
+ modifyExecutingJobs(exFlow, ConnectorParams.MODIFY_RETRY_FAILURES, userId);
+ }
+
+ @SuppressWarnings("unchecked")
+ private Map<String, Object> modifyExecutingJobs(final ExecutableFlow exFlow,
+ final String command, final String userId, final String... jobIds)
+ throws ExecutorManagerException {
+ synchronized (exFlow) {
+ final Pair<ExecutionReference, ExecutableFlow> pair =
+ this.executorLoader.fetchActiveFlowByExecId(exFlow.getExecutionId());
+ if (pair == null) {
+ throw new ExecutorManagerException("Execution "
+ + exFlow.getExecutionId() + " of flow " + exFlow.getFlowId()
+ + " isn't running.");
+ }
+
+ final Map<String, Object> response;
+ if (jobIds != null && jobIds.length > 0) {
+ for (final String jobId : jobIds) {
+ if (!jobId.isEmpty()) {
+ final ExecutableNode node = exFlow.getExecutableNode(jobId);
+ if (node == null) {
+ throw new ExecutorManagerException("Job " + jobId
+ + " doesn't exist in execution " + exFlow.getExecutionId()
+ + ".");
+ }
+ }
+ }
+ final String ids = StringUtils.join(jobIds, ',');
+ response =
+ this.apiGateway.callWithReferenceByUser(pair.getFirst(),
+ ConnectorParams.MODIFY_EXECUTION_ACTION, userId,
+ new Pair<>(
+ ConnectorParams.MODIFY_EXECUTION_ACTION_TYPE, command),
+ new Pair<>(ConnectorParams.MODIFY_JOBS_LIST, ids));
+ } else {
+ response =
+ this.apiGateway.callWithReferenceByUser(pair.getFirst(),
+ ConnectorParams.MODIFY_EXECUTION_ACTION, userId,
+ new Pair<>(
+ ConnectorParams.MODIFY_EXECUTION_ACTION_TYPE, command));
+ }
+
+ return response;
+ }
}
/**
diff --git a/azkaban-common/src/main/java/azkaban/executor/ExecutorLoader.java b/azkaban-common/src/main/java/azkaban/executor/ExecutorLoader.java
index 87ff41a..8827344 100644
--- a/azkaban-common/src/main/java/azkaban/executor/ExecutorLoader.java
+++ b/azkaban-common/src/main/java/azkaban/executor/ExecutorLoader.java
@@ -39,6 +39,12 @@ public interface ExecutorLoader {
Map<Integer, Pair<ExecutionReference, ExecutableFlow>> fetchActiveFlows()
throws ExecutorManagerException;
+ Map<Integer, Pair<ExecutionReference, ExecutableFlow>> fetchUnfinishedFlows()
+ throws ExecutorManagerException;
+
+ Pair<ExecutionReference, ExecutableFlow> fetchActiveFlowByExecId(int execId)
+ throws ExecutorManagerException;
+
List<ExecutableFlow> fetchFlowHistory(int skip, int num)
throws ExecutorManagerException;
diff --git a/azkaban-common/src/main/java/azkaban/executor/FetchActiveFlowDao.java b/azkaban-common/src/main/java/azkaban/executor/FetchActiveFlowDao.java
index 11ff709..628c369 100644
--- a/azkaban-common/src/main/java/azkaban/executor/FetchActiveFlowDao.java
+++ b/azkaban-common/src/main/java/azkaban/executor/FetchActiveFlowDao.java
@@ -44,22 +44,111 @@ public class FetchActiveFlowDao {
this.dbOperator = dbOperator;
}
+ private static Pair<ExecutionReference, ExecutableFlow> getExecutableFlowHelper(
+ final ResultSet rs) throws SQLException {
+ final int id = rs.getInt(1);
+ final int encodingType = rs.getInt(2);
+ final byte[] data = rs.getBytes(3);
+ final String host = rs.getString(4);
+ final int port = rs.getInt(5);
+ final int executorId = rs.getInt(6);
+ final boolean executorStatus = rs.getBoolean(7);
+
+ if (data == null) {
+ logger.warn("Execution id " + id + " has flow_data = null. To clean up, update status to "
+ + "FAILED manually, eg. "
+ + "SET status = " + Status.FAILED.getNumVal() + " WHERE id = " + id);
+ } else {
+ final EncodingType encType = EncodingType.fromInteger(encodingType);
+ try {
+ final ExecutableFlow exFlow =
+ ExecutableFlow.createExecutableFlowFromObject(
+ GZIPUtils.transformBytesToObject(data, encType));
+ final Executor executor;
+ if (host == null) {
+ logger.warn("Executor id " + executorId + " (on execution " +
+ id + ") wasn't found");
+ executor = null;
+ } else {
+ executor = new Executor(executorId, host, port, executorStatus);
+ }
+ final ExecutionReference ref = new ExecutionReference(id, executor);
+ return new Pair<>(ref, exFlow);
+ } catch (final IOException e) {
+ throw new SQLException("Error retrieving flow data " + id, e);
+ }
+ }
+ return null;
+ }
+
+ /**
+ * Fetch flows that are not in finished status, including both dispatched and non-dispatched
+ * flows.
+ *
+ * @return unfinished flows map
+ * @throws ExecutorManagerException the executor manager exception
+ */
+ Map<Integer, Pair<ExecutionReference, ExecutableFlow>> fetchUnfinishedFlows()
+ throws ExecutorManagerException {
+ try {
+ return this.dbOperator.query(FetchActiveExecutableFlows.FETCH_UNFINISHED_EXECUTABLE_FLOWS,
+ new FetchActiveExecutableFlows());
+ } catch (final SQLException e) {
+ throw new ExecutorManagerException("Error fetching unfinished flows", e);
+ }
+ }
+
+ /**
+ * Fetch flows that are dispatched and not yet finished.
+ *
+ * @return active flows map
+ * @throws ExecutorManagerException the executor manager exception
+ */
Map<Integer, Pair<ExecutionReference, ExecutableFlow>> fetchActiveFlows()
throws ExecutorManagerException {
try {
- return this.dbOperator.query(FetchActiveExecutableFlows.FETCH_ACTIVE_EXECUTABLE_FLOW,
+ return this.dbOperator.query(FetchActiveExecutableFlows.FETCH_ACTIVE_EXECUTABLE_FLOWS,
new FetchActiveExecutableFlows());
} catch (final SQLException e) {
throw new ExecutorManagerException("Error fetching active flows", e);
}
}
+ /**
+ * Fetch the flow that is dispatched and not yet finished by execution id.
+ *
+ * @return active flow pair
+ * @throws ExecutorManagerException the executor manager exception
+ */
+ Pair<ExecutionReference, ExecutableFlow> fetchActiveFlowByExecId(final int execId)
+ throws ExecutorManagerException {
+ try {
+ return this.dbOperator.query(FetchActiveExecutableFlow
+ .FETCH_ACTIVE_EXECUTABLE_FLOW_BY_EXEC_ID,
+ new FetchActiveExecutableFlow(), execId);
+ } catch (final SQLException e) {
+ throw new ExecutorManagerException("Error fetching active flow by exec id" + execId, e);
+ }
+ }
+
@VisibleForTesting
static class FetchActiveExecutableFlows implements
ResultSetHandler<Map<Integer, Pair<ExecutionReference, ExecutableFlow>>> {
- // Select running and executor assigned flows
- private static final String FETCH_ACTIVE_EXECUTABLE_FLOW =
+ // Select flows that are not in finished status
+ private static final String FETCH_UNFINISHED_EXECUTABLE_FLOWS =
+ "SELECT ex.exec_id exec_id, ex.enc_type enc_type, ex.flow_data flow_data, et.host host, "
+ + "et.port port, ex.executor_id executorId, et.active executorStatus"
+ + " FROM execution_flows ex"
+ + " LEFT JOIN "
+ + " executors et ON ex.executor_id = et.id"
+ + " Where ex.status NOT IN ("
+ + Status.SUCCEEDED.getNumVal() + ", "
+ + Status.KILLED.getNumVal() + ", "
+ + Status.FAILED.getNumVal() + ")";
+
+ // Select flows that are dispatched and not in finished status
+ private static final String FETCH_ACTIVE_EXECUTABLE_FLOWS =
"SELECT ex.exec_id exec_id, ex.enc_type enc_type, ex.flow_data flow_data, et.host host, "
+ "et.port port, ex.executor_id executorId, et.active executorStatus"
+ " FROM execution_flows ex"
@@ -86,36 +175,9 @@ public class FetchActiveFlowDao {
final Map<Integer, Pair<ExecutionReference, ExecutableFlow>> execFlows =
new HashMap<>();
do {
- final int id = rs.getInt(1);
- final int encodingType = rs.getInt(2);
- final byte[] data = rs.getBytes(3);
- final String host = rs.getString(4);
- final int port = rs.getInt(5);
- final int executorId = rs.getInt(6);
- final boolean executorStatus = rs.getBoolean(7);
-
- if (data == null) {
- logger.warn("Execution id " + id + " has flow_data=null. To clean up, update status to "
- + "FAILED manually, eg. "
- + "SET status = " + Status.FAILED.getNumVal() + " WHERE id = " + id);
- } else {
- final EncodingType encType = EncodingType.fromInteger(encodingType);
- try {
- final ExecutableFlow exFlow =
- ExecutableFlow.createExecutableFlowFromObject(
- GZIPUtils.transformBytesToObject(data, encType));
- final Executor executor;
- if (host == null) {
- logger.warn("Executor id " + executorId + " (on execution " + id + ") wasn't found");
- executor = null;
- } else {
- executor = new Executor(executorId, host, port, executorStatus);
- }
- final ExecutionReference ref = new ExecutionReference(id, executor);
- execFlows.put(id, new Pair<>(ref, exFlow));
- } catch (final IOException e) {
- throw new SQLException("Error retrieving flow data " + id, e);
- }
+ final Pair<ExecutionReference, ExecutableFlow> exFlow = getExecutableFlowHelper(rs);
+ if (exFlow != null) {
+ execFlows.put(rs.getInt(1), exFlow);
}
} while (rs.next());
@@ -123,4 +185,35 @@ public class FetchActiveFlowDao {
}
}
+ private static class FetchActiveExecutableFlow implements
+ ResultSetHandler<Pair<ExecutionReference, ExecutableFlow>> {
+
+ // Select the flow that is dispatched and not in finished status by execution id
+ private static final String FETCH_ACTIVE_EXECUTABLE_FLOW_BY_EXEC_ID =
+ "SELECT ex.exec_id exec_id, ex.enc_type enc_type, ex.flow_data flow_data, et.host host, "
+ + "et.port port, ex.executor_id executorId, et.active executorStatus"
+ + " FROM execution_flows ex"
+ + " LEFT JOIN "
+ + " executors et ON ex.executor_id = et.id"
+ + " Where ex.exec_id = ? AND ex.status NOT IN ("
+ + Status.SUCCEEDED.getNumVal() + ", "
+ + Status.KILLED.getNumVal() + ", "
+ + Status.FAILED.getNumVal() + ")"
+ // exclude queued flows that haven't been assigned yet -- this is the opposite of
+ // the condition in ExecutionFlowDao#FETCH_QUEUED_EXECUTABLE_FLOW
+ + " AND NOT ("
+ + " ex.executor_id IS NULL"
+ + " AND ex.status = " + Status.PREPARING.getNumVal()
+ + " )";
+
+ @Override
+ public Pair<ExecutionReference, ExecutableFlow> handle(
+ final ResultSet rs) throws SQLException {
+ if (!rs.next()) {
+ return null;
+ }
+ return getExecutableFlowHelper(rs);
+ }
+ }
+
}
diff --git a/azkaban-common/src/main/java/azkaban/executor/JdbcExecutorLoader.java b/azkaban-common/src/main/java/azkaban/executor/JdbcExecutorLoader.java
index 683cc65..13217e4 100644
--- a/azkaban-common/src/main/java/azkaban/executor/JdbcExecutorLoader.java
+++ b/azkaban-common/src/main/java/azkaban/executor/JdbcExecutorLoader.java
@@ -97,11 +97,22 @@ public class JdbcExecutorLoader implements ExecutorLoader {
@Override
public Map<Integer, Pair<ExecutionReference, ExecutableFlow>> fetchActiveFlows()
throws ExecutorManagerException {
-
return this.fetchActiveFlowDao.fetchActiveFlows();
}
@Override
+ public Map<Integer, Pair<ExecutionReference, ExecutableFlow>> fetchUnfinishedFlows()
+ throws ExecutorManagerException {
+ return this.fetchActiveFlowDao.fetchUnfinishedFlows();
+ }
+
+ @Override
+ public Pair<ExecutionReference, ExecutableFlow> fetchActiveFlowByExecId(final int execId)
+ throws ExecutorManagerException {
+ return this.fetchActiveFlowDao.fetchActiveFlowByExecId(execId);
+ }
+
+ @Override
public int fetchNumExecutableFlows() throws ExecutorManagerException {
return this.numExecutionsDao.fetchNumExecutableFlows();
}
diff --git a/azkaban-common/src/test/java/azkaban/executor/ExecutionControllerTest.java b/azkaban-common/src/test/java/azkaban/executor/ExecutionControllerTest.java
new file mode 100644
index 0000000..a41e2db
--- /dev/null
+++ b/azkaban-common/src/test/java/azkaban/executor/ExecutionControllerTest.java
@@ -0,0 +1,126 @@
+/*
+* Copyright 2018 LinkedIn Corp.
+*
+* Licensed under the Apache License, Version 2.0 (the “License”); you may not
+* use this file except in compliance with the License. You may obtain a copy of
+* the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an “AS IS” BASIS, WITHOUT
+* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+* License for the specific language governing permissions and limitations under
+* the License.
+*/
+package azkaban.executor;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+import azkaban.utils.Pair;
+import azkaban.utils.TestUtils;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableMap;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+public class ExecutionControllerTest {
+
+ private Map<Integer, Pair<ExecutionReference, ExecutableFlow>> activeFlows = new HashMap<>();
+ private Map<Integer, Pair<ExecutionReference, ExecutableFlow>> unfinishedFlows = new
+ HashMap<>();
+ private List<Executor> activeExecutors = new ArrayList<>();
+ private List<Executor> allExecutors = new ArrayList<>();
+ private ExecutionController controller;
+ private ExecutorLoader loader;
+ private ExecutorApiGateway apiGateway;
+
+ @Before
+ public void setup() throws Exception {
+ this.loader = mock(ExecutorLoader.class);
+ this.apiGateway = mock(ExecutorApiGateway.class);
+ this.controller = new ExecutionController(this.loader, this.apiGateway);
+
+ final Executor executor1 = new Executor(1, "localhost", 12345, true);
+ final Executor executor2 = new Executor(2, "localhost", 12346, true);
+ final Executor executor3 = new Executor(3, "localhost", 12347, false);
+ this.activeExecutors = ImmutableList.of(executor1, executor2);
+ this.allExecutors = ImmutableList.of(executor1, executor2, executor3);
+ when(this.loader.fetchActiveExecutors()).thenReturn(this.activeExecutors);
+
+ final ExecutableFlow flow1 = TestUtils.createTestExecutableFlow("exectest1", "exec1");
+ final ExecutableFlow flow2 = TestUtils.createTestExecutableFlow("exectest1", "exec2");
+ final ExecutableFlow flow3 = TestUtils.createTestExecutableFlow("exectest1", "exec2");
+ flow1.setExecutionId(1);
+ flow2.setExecutionId(2);
+ flow3.setExecutionId(3);
+ final ExecutionReference ref1 =
+ new ExecutionReference(flow1.getExecutionId(), null);
+ final ExecutionReference ref2 =
+ new ExecutionReference(flow2.getExecutionId(), executor2);
+ final ExecutionReference ref3 =
+ new ExecutionReference(flow3.getExecutionId(), executor3);
+
+ this.activeFlows = ImmutableMap
+ .of(flow2.getExecutionId(), new Pair<>(ref2, flow2), flow3.getExecutionId(),
+ new Pair<>(ref3, flow3));
+ when(this.loader.fetchActiveFlows()).thenReturn(this.activeFlows);
+
+ this.unfinishedFlows = ImmutableMap.of(flow1.getExecutionId(), new Pair<>(ref1, flow1),
+ flow2.getExecutionId(), new Pair<>(ref2, flow2), flow3.getExecutionId(), new Pair<>(ref3,
+ flow3));
+ when(this.loader.fetchUnfinishedFlows()).thenReturn(this.unfinishedFlows);
+ }
+
+ @After
+ public void tearDown() {
+ if (this.controller != null) {
+ this.controller.shutdown();
+ }
+ }
+
+ @Test
+ public void testFetchAllActiveFlows() throws Exception {
+ final List<ExecutableFlow> flows = this.controller.getRunningFlows();
+ this.unfinishedFlows.values()
+ .forEach(pair -> assertThat(flows.contains(pair.getSecond())).isTrue());
+ }
+
+ @Test
+ public void testFetchActiveFlowByProject() throws Exception {
+ final ExecutableFlow flow2 = this.unfinishedFlows.get(2).getSecond();
+ final ExecutableFlow flow3 = this.unfinishedFlows.get(3).getSecond();
+ final List<Integer> executions = this.controller.getRunningFlows(flow2.getProjectId(), flow2
+ .getFlowId());
+ assertThat(executions.contains(flow2.getExecutionId())).isTrue();
+ assertThat(executions.contains(flow3.getExecutionId())).isTrue();
+ assertThat(this.controller.isFlowRunning(flow2.getProjectId(), flow2.getFlowId())).isTrue();
+ assertThat(this.controller.isFlowRunning(flow3.getProjectId(), flow3.getFlowId())).isTrue();
+ }
+
+ @Test
+ public void testFetchActiveFlowWithExecutor() throws Exception {
+ final List<Pair<ExecutableFlow, Optional<Executor>>> activeFlowsWithExecutor =
+ this.controller.getActiveFlowsWithExecutor();
+ this.unfinishedFlows.values().forEach(pair -> assertThat(activeFlowsWithExecutor
+ .contains(new Pair<>(pair.getSecond(), pair.getFirst().getExecutor()))).isTrue());
+ }
+
+ @Test
+ public void testFetchAllActiveExecutorServerHosts() throws Exception {
+ final Set<String> activeExecutorServerHosts = this.controller.getAllActiveExecutorServerHosts();
+ assertThat(activeExecutorServerHosts.size()).isEqualTo(3);
+ this.allExecutors.forEach(executor -> assertThat(
+ activeExecutorServerHosts.contains(executor.getHost() + ":" + executor.getPort()))
+ .isTrue());
+ }
+}
diff --git a/azkaban-common/src/test/java/azkaban/executor/ExecutionFlowDaoTest.java b/azkaban-common/src/test/java/azkaban/executor/ExecutionFlowDaoTest.java
index 412907a..35ad37f 100644
--- a/azkaban-common/src/test/java/azkaban/executor/ExecutionFlowDaoTest.java
+++ b/azkaban-common/src/test/java/azkaban/executor/ExecutionFlowDaoTest.java
@@ -29,6 +29,7 @@ import azkaban.user.User;
import azkaban.utils.Pair;
import azkaban.utils.Props;
import azkaban.utils.TestUtils;
+import com.google.common.collect.ImmutableList;
import java.io.File;
import java.io.IOException;
import java.sql.SQLException;
@@ -311,9 +312,50 @@ public class ExecutionFlowDaoTest {
@Test
public void testFetchActiveFlowsExecutorAssigned() throws Exception {
+ final List<ExecutableFlow> flows = createExecutions();
+ final Map<Integer, Pair<ExecutionReference, ExecutableFlow>> activeFlows = this.fetchActiveFlowDao
+ .fetchActiveFlows();
+ assertFound(activeFlows, flows.get(0), true);
+ assertNotFound(activeFlows, flows.get(1), "Returned a queued execution");
+ assertFound(activeFlows, flows.get(2), true);
+ assertNotFound(activeFlows, flows.get(3), "Returned an execution with a finished status");
+ assertFound(activeFlows, flows.get(4), false);
+ assertTwoFlowSame(activeFlows.get(flows.get(0).getExecutionId()).getSecond(), flows.get(0));
+ }
- final Executor executor = this.executorDao.addExecutor("test", 1);
+ @Test
+ public void testFetchUnfinishedFlows() throws Exception {
+ final List<ExecutableFlow> flows = createExecutions();
+ final Map<Integer, Pair<ExecutionReference, ExecutableFlow>> unfinishedFlows =
+ this.fetchActiveFlowDao.fetchUnfinishedFlows();
+ assertFound(unfinishedFlows, flows.get(0), true);
+ assertFound(unfinishedFlows, flows.get(1), false);
+ assertFound(unfinishedFlows, flows.get(2), true);
+ assertNotFound(unfinishedFlows, flows.get(3), "Returned an execution with a finished status");
+ assertFound(unfinishedFlows, flows.get(4), false);
+ assertTwoFlowSame(unfinishedFlows.get(flows.get(0).getExecutionId()).getSecond(), flows.get(0));
+ }
+
+ @Test
+ public void testFetchActiveFlowByExecId() throws Exception {
+ final List<ExecutableFlow> flows = createExecutions();
+ assertTwoFlowSame(
+ this.fetchActiveFlowDao.fetchActiveFlowByExecId(flows.get(0).getExecutionId()).getSecond(),
+ flows.get(0));
+ assertThat(this.fetchActiveFlowDao.fetchActiveFlowByExecId(flows.get(1).getExecutionId()))
+ .isNull();
+ assertTwoFlowSame(
+ this.fetchActiveFlowDao.fetchActiveFlowByExecId(flows.get(2).getExecutionId()).getSecond(),
+ flows.get(2));
+ assertThat(this.fetchActiveFlowDao.fetchActiveFlowByExecId(flows.get(3).getExecutionId()))
+ .isNull();
+ assertTwoFlowSame(
+ this.fetchActiveFlowDao.fetchActiveFlowByExecId(flows.get(4).getExecutionId()).getSecond(),
+ flows.get(4));
+ }
+ private List<ExecutableFlow> createExecutions() throws Exception {
+ final Executor executor = this.executorDao.addExecutor("test", 1);
final ExecutableFlow flow1 = createExecutionAndAssign(Status.PREPARING, executor);
// flow2 is not assigned
final ExecutableFlow flow2 = createExecution(Status.PREPARING);
@@ -324,19 +366,7 @@ public class ExecutionFlowDaoTest {
// flow5 is assigned to an executor that is then removed
final ExecutableFlow flow5 = createExecutionAndAssign(Status.RUNNING, executor2);
this.executorDao.removeExecutor(executor2.getHost(), executor2.getPort());
-
- final Map<Integer, Pair<ExecutionReference, ExecutableFlow>> activeFlows =
- this.fetchActiveFlowDao.fetchActiveFlows();
-
- assertFound(activeFlows, flow1, true);
- assertNotFound(activeFlows, flow2, "Returned a queued execution");
- assertFound(activeFlows, flow3, true);
- assertNotFound(activeFlows, flow4, "Returned an execution with a finished status");
- assertFound(activeFlows, flow5, false);
-
- final ExecutableFlow flow1Result =
- activeFlows.get(flow1.getExecutionId()).getSecond();
- assertTwoFlowSame(flow1Result, flow1);
+ return ImmutableList.of(flow1, flow2, flow3, flow4, flow5);
}
private void assertNotFound(
diff --git a/azkaban-common/src/test/java/azkaban/executor/MockExecutorLoader.java b/azkaban-common/src/test/java/azkaban/executor/MockExecutorLoader.java
index 3973d0b..8033629 100644
--- a/azkaban-common/src/test/java/azkaban/executor/MockExecutorLoader.java
+++ b/azkaban-common/src/test/java/azkaban/executor/MockExecutorLoader.java
@@ -76,6 +76,17 @@ public class MockExecutorLoader implements ExecutorLoader {
}
@Override
+ public Map<Integer, Pair<ExecutionReference, ExecutableFlow>> fetchUnfinishedFlows()
+ throws ExecutorManagerException {
+ return new ConcurrentHashMap<>();
+ }
+
+ @Override
+ public Pair<ExecutionReference, ExecutableFlow> fetchActiveFlowByExecId(final int execId) {
+ return new Pair<>(null, null);
+ }
+
+ @Override
public List<ExecutableFlow> fetchFlowHistory(final int projectId, final String flowId,
final int skip, final int num) throws ExecutorManagerException {
return null;
diff --git a/azkaban-web-server/src/main/java/azkaban/webapp/servlet/StatsServlet.java b/azkaban-web-server/src/main/java/azkaban/webapp/servlet/StatsServlet.java
index d42da55..0be6cad 100644
--- a/azkaban-web-server/src/main/java/azkaban/webapp/servlet/StatsServlet.java
+++ b/azkaban-web-server/src/main/java/azkaban/webapp/servlet/StatsServlet.java
@@ -184,6 +184,10 @@ public class StatsServlet extends LoginAbstractAzkabanServlet {
final Collection<Executor> executors = this.execManagerAdapter.getAllActiveExecutors();
page.add("executorList", executors);
+ if (executors.isEmpty()) {
+ throw new ExecutorManagerException("Executor list is empty.");
+ }
+
final Map<String, Object> result =
this.execManagerAdapter.callExecutorStats(executors.iterator().next().getId(),
ConnectorParams.STATS_GET_ALLMETRICSNAME,