Details
diff --git a/azkaban-common/src/main/java/azkaban/executor/ExecutorLoader.java b/azkaban-common/src/main/java/azkaban/executor/ExecutorLoader.java
index d041209..2bf7baf 100644
--- a/azkaban-common/src/main/java/azkaban/executor/ExecutorLoader.java
+++ b/azkaban-common/src/main/java/azkaban/executor/ExecutorLoader.java
@@ -35,6 +35,9 @@ public interface ExecutorLoader {
Map<Integer, Pair<ExecutionReference, ExecutableFlow>> fetchActiveFlows()
throws ExecutorManagerException;
+ Pair<ExecutionReference, ExecutableFlow> fetchActiveFlowByExecId(int execId)
+ throws ExecutorManagerException;
+
List<ExecutableFlow> fetchFlowHistory(int skip, int num)
throws ExecutorManagerException;
diff --git a/azkaban-common/src/main/java/azkaban/executor/ExecutorManager.java b/azkaban-common/src/main/java/azkaban/executor/ExecutorManager.java
index 9ab3504..65834a7 100644
--- a/azkaban-common/src/main/java/azkaban/executor/ExecutorManager.java
+++ b/azkaban-common/src/main/java/azkaban/executor/ExecutorManager.java
@@ -93,14 +93,12 @@ public class ExecutorManager extends EventHandler implements
private CleanerThread cleanerThread;
- private ConcurrentHashMap<Integer, Pair<ExecutionReference, ExecutableFlow>> runningFlows =
- new ConcurrentHashMap<Integer, Pair<ExecutionReference, ExecutableFlow>>();
private ConcurrentHashMap<Integer, ExecutableFlow> recentlyFinished =
- new ConcurrentHashMap<Integer, ExecutableFlow>();
+ new ConcurrentHashMap<>();
QueuedExecutions queuedFlows;
- final private Set<Executor> activeExecutors = new HashSet<Executor>();
+ final private Set<Executor> activeExecutors = new HashSet<>();
private QueueProcessorThread queueProcessor;
private volatile Pair<ExecutionReference, ExecutableFlow> runningCandidate = null;
@@ -129,7 +127,6 @@ public class ExecutorManager extends EventHandler implements
this.azkProps = azkProps;
this.executorLoader = loader;
this.setupExecutors();
- this.loadRunningFlows();
queuedFlows =
new QueuedExecutions(azkProps.getLong(AZKABAN_WEBSERVER_QUEUE_SIZE, 100000));
@@ -420,23 +417,23 @@ public class ExecutorManager extends EventHandler implements
@Override
public Set<String> getAllActiveExecutorServerHosts() {
// Includes non primary server/hosts
- HashSet<String> ports = new HashSet<String>();
+ HashSet<String> ports = new HashSet<>();
for (Executor executor : activeExecutors) {
ports.add(executor.getHost() + ":" + executor.getPort());
}
- // include executor which were initially active and still has flows running
- for (Pair<ExecutionReference, ExecutableFlow> running : runningFlows
- .values()) {
- ExecutionReference ref = running.getFirst();
- ports.add(ref.getHost() + ":" + ref.getPort());
+ try {
+ // include executor which were initially active and still has flows running
+ for (Pair<ExecutionReference, ExecutableFlow> running :
+ executorLoader.fetchActiveFlows().values()) {
+ ExecutionReference ref = running.getFirst();
+ ports.add(ref.getHost() + ":" + ref.getPort());
+ }
+ } catch(ExecutorManagerException e) {
+ logger.error(e);
}
return ports;
}
- private void loadRunningFlows() throws ExecutorManagerException {
- runningFlows.putAll(executorLoader.fetchActiveFlows());
- }
-
/*
* load queued flows i.e with active_execution_reference and not assigned to
* any executor
@@ -469,8 +466,12 @@ public class ExecutorManager extends EventHandler implements
if (runningCandidate != null) {
executionIds.addAll(getRunningFlowsHelper(projectId, flowId, Lists.newArrayList(runningCandidate)));
}
- executionIds.addAll(getRunningFlowsHelper(projectId, flowId,
- runningFlows.values()));
+ try {
+ executionIds.addAll(getRunningFlowsHelper(projectId, flowId,
+ executorLoader.fetchActiveFlows().values()));
+ } catch(ExecutorManagerException e) {
+ logger.error(e);
+ }
Collections.sort(executionIds);
return executionIds;
}
@@ -497,10 +498,13 @@ public class ExecutorManager extends EventHandler implements
@Override
public List<Pair<ExecutableFlow, Executor>> getActiveFlowsWithExecutor()
throws IOException {
- List<Pair<ExecutableFlow, Executor>> flows =
- new ArrayList<Pair<ExecutableFlow, Executor>>();
+ List<Pair<ExecutableFlow, Executor>> flows = new ArrayList<>();
getActiveFlowsWithExecutorHelper(flows, queuedFlows.getAllEntries());
- getActiveFlowsWithExecutorHelper(flows, runningFlows.values());
+ try {
+ getActiveFlowsWithExecutorHelper(flows, executorLoader.fetchActiveFlows().values());
+ } catch(ExecutorManagerException e) {
+ logger.error(e);
+ }
return flows;
}
@@ -527,9 +531,12 @@ public class ExecutorManager extends EventHandler implements
isRunning =
isRunning
|| isFlowRunningHelper(projectId, flowId, queuedFlows.getAllEntries());
- isRunning =
- isRunning
- || isFlowRunningHelper(projectId, flowId, runningFlows.values());
+ try {
+ isRunning = isRunning || isFlowRunningHelper(projectId, flowId,
+ executorLoader.fetchActiveFlows().values());
+ } catch(ExecutorManagerException e) {
+ logger.error(e);
+ }
return isRunning;
}
@@ -565,9 +572,13 @@ public class ExecutorManager extends EventHandler implements
*/
@Override
public List<ExecutableFlow> getRunningFlows() {
- ArrayList<ExecutableFlow> flows = new ArrayList<ExecutableFlow>();
+ ArrayList<ExecutableFlow> flows = new ArrayList<>();
getActiveFlowHelper(flows, queuedFlows.getAllEntries());
- getActiveFlowHelper(flows, runningFlows.values());
+ try {
+ getActiveFlowHelper(flows, executorLoader.fetchActiveFlows().values());
+ } catch(ExecutorManagerException e) {
+ logger.error(e);
+ }
return flows;
}
@@ -590,9 +601,13 @@ public class ExecutorManager extends EventHandler implements
* @see azkaban.executor.ExecutorManagerAdapter#getRunningFlows()
*/
public String getRunningFlowIds() {
- List<Integer> allIds = new ArrayList<Integer>();
+ List<Integer> allIds = new ArrayList<>();
getRunningFlowsIdsHelper(allIds, queuedFlows.getAllEntries());
- getRunningFlowsIdsHelper(allIds, runningFlows.values());
+ try {
+ getRunningFlowsIdsHelper(allIds, executorLoader.fetchActiveFlows().values());
+ } catch(ExecutorManagerException e) {
+ logger.error(e);
+ }
Collections.sort(allIds);
return allIds.toString();
}
@@ -687,7 +702,7 @@ public class ExecutorManager extends EventHandler implements
public LogData getExecutableFlowLog(ExecutableFlow exFlow, int offset,
int length) throws ExecutorManagerException {
Pair<ExecutionReference, ExecutableFlow> pair =
- runningFlows.get(exFlow.getExecutionId());
+ executorLoader.fetchActiveFlowByExecId(exFlow.getExecutionId());
if (pair != null) {
Pair<String, String> typeParam = new Pair<String, String>("type", "flow");
Pair<String, String> offsetParam =
@@ -712,7 +727,7 @@ public class ExecutorManager extends EventHandler implements
public LogData getExecutionJobLog(ExecutableFlow exFlow, String jobId,
int offset, int length, int attempt) throws ExecutorManagerException {
Pair<ExecutionReference, ExecutableFlow> pair =
- runningFlows.get(exFlow.getExecutionId());
+ executorLoader.fetchActiveFlowByExecId(exFlow.getExecutionId());
if (pair != null) {
Pair<String, String> typeParam = new Pair<String, String>("type", "job");
Pair<String, String> jobIdParam =
@@ -741,7 +756,7 @@ public class ExecutorManager extends EventHandler implements
public List<Object> getExecutionJobStats(ExecutableFlow exFlow, String jobId,
int attempt) throws ExecutorManagerException {
Pair<ExecutionReference, ExecutableFlow> pair =
- runningFlows.get(exFlow.getExecutionId());
+ executorLoader.fetchActiveFlowByExecId(exFlow.getExecutionId());
if (pair == null) {
return executorLoader.fetchAttachments(exFlow.getExecutionId(), jobId,
attempt);
@@ -766,9 +781,9 @@ public class ExecutorManager extends EventHandler implements
public JobMetaData getExecutionJobMetaData(ExecutableFlow exFlow,
String jobId, int offset, int length, int attempt)
throws ExecutorManagerException {
- Pair<ExecutionReference, ExecutableFlow> pair =
- runningFlows.get(exFlow.getExecutionId());
- if (pair != null) {
+ Pair<ExecutionReference, ExecutableFlow> activeFlow =
+ executorLoader.fetchActiveFlowByExecId(exFlow.getExecutionId());
+ if (activeFlow != null) {
Pair<String, String> typeParam = new Pair<String, String>("type", "job");
Pair<String, String> jobIdParam =
@@ -782,7 +797,7 @@ public class ExecutorManager extends EventHandler implements
@SuppressWarnings("unchecked")
Map<String, Object> result =
- callExecutorServer(pair.getFirst(), ConnectorParams.METADATA_ACTION,
+ callExecutorServer(activeFlow.getFirst(), ConnectorParams.METADATA_ACTION,
typeParam, jobIdParam, offsetParam, lengthParam, attemptParam);
return JobMetaData.createJobMetaDataFromObject(result);
} else {
@@ -801,10 +816,10 @@ public class ExecutorManager extends EventHandler implements
public void cancelFlow(ExecutableFlow exFlow, String userId)
throws ExecutorManagerException {
synchronized (exFlow) {
- if (runningFlows.containsKey(exFlow.getExecutionId())) {
- Pair<ExecutionReference, ExecutableFlow> pair =
- runningFlows.get(exFlow.getExecutionId());
- callExecutorServer(pair.getFirst(), ConnectorParams.CANCEL_ACTION,
+ Pair<ExecutionReference, ExecutableFlow> activeFlow =
+ executorLoader.fetchActiveFlowByExecId(exFlow.getExecutionId());
+ if(activeFlow != null) {
+ callExecutorServer(activeFlow.getFirst(), ConnectorParams.CANCEL_ACTION,
userId);
} else if (queuedFlows.hasExecution(exFlow.getExecutionId())) {
queuedFlows.dequeue(exFlow.getExecutionId());
@@ -821,14 +836,14 @@ public class ExecutorManager extends EventHandler implements
public void resumeFlow(ExecutableFlow exFlow, String userId)
throws ExecutorManagerException {
synchronized (exFlow) {
- Pair<ExecutionReference, ExecutableFlow> pair =
- runningFlows.get(exFlow.getExecutionId());
- if (pair == null) {
+ Pair<ExecutionReference, ExecutableFlow> activeFlow =
+ executorLoader.fetchActiveFlowByExecId(exFlow.getExecutionId());
+ if (activeFlow == null) {
throw new ExecutorManagerException("Execution "
+ exFlow.getExecutionId() + " of flow " + exFlow.getFlowId()
+ " isn't running.");
}
- callExecutorServer(pair.getFirst(), ConnectorParams.RESUME_ACTION, userId);
+ callExecutorServer(activeFlow.getFirst(), ConnectorParams.RESUME_ACTION, userId);
}
}
@@ -836,14 +851,14 @@ public class ExecutorManager extends EventHandler implements
public void pauseFlow(ExecutableFlow exFlow, String userId)
throws ExecutorManagerException {
synchronized (exFlow) {
- Pair<ExecutionReference, ExecutableFlow> pair =
- runningFlows.get(exFlow.getExecutionId());
- if (pair == null) {
+ Pair<ExecutionReference, ExecutableFlow> activeFlow =
+ executorLoader.fetchActiveFlowByExecId(exFlow.getExecutionId());
+ if (activeFlow == null) {
throw new ExecutorManagerException("Execution "
+ exFlow.getExecutionId() + " of flow " + exFlow.getFlowId()
+ " isn't running.");
}
- callExecutorServer(pair.getFirst(), ConnectorParams.PAUSE_ACTION, userId);
+ callExecutorServer(activeFlow.getFirst(), ConnectorParams.PAUSE_ACTION, userId);
}
}
@@ -901,7 +916,7 @@ public class ExecutorManager extends EventHandler implements
throws ExecutorManagerException {
synchronized (exFlow) {
Pair<ExecutionReference, ExecutableFlow> pair =
- runningFlows.get(exFlow.getExecutionId());
+ executorLoader.fetchActiveFlowByExecId(exFlow.getExecutionId());
if (pair == null) {
throw new ExecutorManagerException("Execution "
+ exFlow.getExecutionId() + " of flow " + exFlow.getFlowId()
@@ -1142,6 +1157,7 @@ public class ExecutorManager extends EventHandler implements
paramList = new ArrayList<Pair<String, String>>();
}
+ // TODO: refactor using Guice, inject ExecutorApiClient in ExecutorManager
ExecutorApiClient apiclient = ExecutorApiClient.getInstance();
@SuppressWarnings("unchecked")
URI uri =
@@ -1240,6 +1256,8 @@ public class ExecutorManager extends EventHandler implements
new ArrayList<ExecutableFlow>();
ArrayList<ExecutableFlow> finalizeFlows =
new ArrayList<ExecutableFlow>();
+ Map<Integer, Pair<ExecutionReference, ExecutableFlow>> activeFlows =
+ executorLoader.fetchActiveFlows();
if (exFlowMap.size() > 0) {
for (Map.Entry<Executor, List<ExecutableFlow>> entry : exFlowMap
@@ -1275,15 +1293,15 @@ public class ExecutorManager extends EventHandler implements
} catch (IOException e) {
logger.error(e);
for (ExecutableFlow flow : entry.getValue()) {
- Pair<ExecutionReference, ExecutableFlow> pair =
- runningFlows.get(flow.getExecutionId());
+ Pair<ExecutionReference, ExecutableFlow> activeFlow =
+ activeFlows.get(flow.getExecutionId());
updaterStage =
"Failed to get update. Doing some clean up for flow "
- + pair.getSecond().getExecutionId();
+ + flow.getExecutionId();
- if (pair != null) {
- ExecutionReference ref = pair.getFirst();
+ if (activeFlow != null) {
+ ExecutionReference ref = activeFlow.getFirst();
int numErrors = ref.getNumErrors();
if (ref.getNumErrors() < this.numErrors) {
ref.setNextCheckTime(System.currentTimeMillis()
@@ -1293,7 +1311,7 @@ public class ExecutorManager extends EventHandler implements
logger.error("Evicting flow " + flow.getExecutionId()
+ ". The executor is unresponsive.");
// TODO should send out an unresponsive email here.
- finalizeFlows.add(pair.getSecond());
+ finalizeFlows.add(activeFlow.getSecond());
}
}
}
@@ -1354,7 +1372,7 @@ public class ExecutorManager extends EventHandler implements
synchronized (this) {
try {
- if (runningFlows.size() > 0) {
+ if (activeFlows.size() > 0) {
this.wait(waitTimeMs);
} else {
this.wait(waitTimeIdleMs);
@@ -1402,7 +1420,6 @@ public class ExecutorManager extends EventHandler implements
executorLoader.removeActiveExecutableReference(execId);
updaterStage = "finalizing flow " + execId + " cleaning from memory";
- runningFlows.remove(execId);
fireEventListeners(Event.create(dsFlow, Type.FLOW_FINISHED, new EventData(dsFlow.getStatus())));
recentlyFinished.put(execId, dsFlow);
@@ -1530,15 +1547,15 @@ public class ExecutorManager extends EventHandler implements
"Response is malformed. Need exec id to update.");
}
- Pair<ExecutionReference, ExecutableFlow> refPair =
- this.runningFlows.get(execId);
- if (refPair == null) {
+ Pair<ExecutionReference, ExecutableFlow> activeFlow =
+ executorLoader.fetchActiveFlowByExecId(execId);
+ if (activeFlow == null) {
throw new ExecutorManagerException(
"No running flow found with the execution id. Removing " + execId);
}
- ExecutionReference ref = refPair.getFirst();
- ExecutableFlow flow = refPair.getSecond();
+ ExecutionReference ref = activeFlow.getFirst();
+ ExecutableFlow flow = activeFlow.getSecond();
if (updateData.containsKey("error")) {
// The flow should be finished here.
throw new ExecutorManagerException((String) updateData.get("error"), flow);
@@ -1610,27 +1627,31 @@ public class ExecutorManager extends EventHandler implements
/* Group Executable flow by Executors to reduce number of REST calls */
private Map<Executor, List<ExecutableFlow>> getFlowToExecutorMap() {
HashMap<Executor, List<ExecutableFlow>> exFlowMap =
- new HashMap<Executor, List<ExecutableFlow>>();
+ new HashMap<>();
- for (Pair<ExecutionReference, ExecutableFlow> runningFlow : runningFlows
- .values()) {
- ExecutionReference ref = runningFlow.getFirst();
- ExecutableFlow flow = runningFlow.getSecond();
- Executor executor = ref.getExecutor();
+ try {
+ for (Pair<ExecutionReference, ExecutableFlow> runningFlow :
+ executorLoader.fetchActiveFlows().values()) {
+ ExecutionReference ref = runningFlow.getFirst();
+ ExecutableFlow flow = runningFlow.getSecond();
+ Executor executor = ref.getExecutor();
+
+ // We can set the next check time to prevent the checking of certain
+ // flows.
+ if (ref.getNextCheckTime() >= System.currentTimeMillis()) {
+ continue;
+ }
- // We can set the next check time to prevent the checking of certain
- // flows.
- if (ref.getNextCheckTime() >= System.currentTimeMillis()) {
- continue;
- }
+ List<ExecutableFlow> flows = exFlowMap.get(executor);
+ if (flows == null) {
+ flows = new ArrayList<>();
+ exFlowMap.put(executor, flows);
+ }
- List<ExecutableFlow> flows = exFlowMap.get(executor);
- if (flows == null) {
- flows = new ArrayList<ExecutableFlow>();
- exFlowMap.put(executor, flows);
+ flows.add(flow);
}
-
- flows.add(flow);
+ } catch(ExecutorManagerException e) {
+ logger.error(e);
}
return exFlowMap;
@@ -1732,10 +1753,6 @@ public class ExecutorManager extends EventHandler implements
}
reference.setExecutor(choosenExecutor);
- // move from flow to running flows
- runningFlows.put(exflow.getExecutionId(),
- new Pair<ExecutionReference, ExecutableFlow>(reference, exflow));
-
logger.info(String.format(
"Successfully dispatched exec %d with error count %d",
exflow.getExecutionId(), reference.getNumErrors()));
diff --git a/azkaban-common/src/main/java/azkaban/executor/JdbcExecutorLoader.java b/azkaban-common/src/main/java/azkaban/executor/JdbcExecutorLoader.java
index a8a4eaf..0b79bb7 100644
--- a/azkaban-common/src/main/java/azkaban/executor/JdbcExecutorLoader.java
+++ b/azkaban-common/src/main/java/azkaban/executor/JdbcExecutorLoader.java
@@ -219,6 +219,27 @@ public class JdbcExecutorLoader extends AbstractJdbcLoader implements
}
@Override
+ public Pair<ExecutionReference, ExecutableFlow> fetchActiveFlowByExecId(int execId)
+ throws ExecutorManagerException {
+ QueryRunner runner = createQueryRunner();
+ FetchActiveExecutableFlowByExecId flowHandler = new FetchActiveExecutableFlowByExecId();
+
+ try {
+ List<Pair<ExecutionReference, ExecutableFlow>> flows =
+ runner.query(FetchActiveExecutableFlowByExecId.FETCH_ACTIVE_EXECUTABLE_FLOW_BY_EXECID,
+ flowHandler, execId);
+ if(flows.isEmpty()) {
+ return null;
+ }
+ else {
+ return flows.get(0);
+ }
+ } catch (SQLException e) {
+ throw new ExecutorManagerException("Error fetching active flows by exec id", e);
+ }
+ }
+
+ @Override
public int fetchNumExecutableFlows() throws ExecutorManagerException {
QueryRunner runner = createQueryRunner();
@@ -1157,7 +1178,7 @@ public class JdbcExecutorLoader extends AbstractJdbcLoader implements
"SELECT exec_id, project_id, version, flow_id, job_id, "
+ "start_time, end_time, status, attempt "
+ "FROM execution_jobs WHERE exec_id=? "
- + "AND job_id=? AND attempt_id=?";
+ + "AND job_id=? AND attempt=?";
private static String FETCH_EXECUTABLE_NODE_ATTEMPTS =
"SELECT exec_id, project_id, version, flow_id, job_id, "
+ "start_time, end_time, status, attempt FROM execution_jobs "
@@ -1364,7 +1385,7 @@ public class JdbcExecutorLoader extends AbstractJdbcLoader implements
}
Map<Integer, Pair<ExecutionReference, ExecutableFlow>> execFlows =
- new HashMap<Integer, Pair<ExecutionReference, ExecutableFlow>>();
+ new HashMap<>();
do {
int id = rs.getInt(1);
int encodingType = rs.getInt(2);
@@ -1410,6 +1431,68 @@ public class JdbcExecutorLoader extends AbstractJdbcLoader implements
}
}
+ private static class FetchActiveExecutableFlowByExecId implements
+ ResultSetHandler<List<Pair<ExecutionReference, ExecutableFlow>>> {
+ private static String FETCH_ACTIVE_EXECUTABLE_FLOW_BY_EXECID =
+ "SELECT ex.exec_id exec_id, ex.enc_type enc_type, ex.flow_data flow_data, et.host host, "
+ + "et.port port, ax.update_time axUpdateTime, et.id executorId, et.active executorStatus"
+ + " FROM execution_flows ex"
+ + " INNER JOIN "
+ + " active_executing_flows ax ON ex.exec_id = ax.exec_id"
+ + " INNER JOIN "
+ + " executors et ON ex.executor_id = et.id"
+ + " WHERE ax.exec_id = ?";
+
+ @Override
+ public List<Pair<ExecutionReference, ExecutableFlow>> handle(ResultSet rs)
+ throws SQLException {
+ if (!rs.next()) {
+ return Collections.emptyList();
+ }
+
+ List<Pair<ExecutionReference, ExecutableFlow>> execFlows =
+ new ArrayList<>();
+ do {
+ int id = rs.getInt(1);
+ int encodingType = rs.getInt(2);
+ byte[] data = rs.getBytes(3);
+ String host = rs.getString(4);
+ int port = rs.getInt(5);
+ long updateTime = rs.getLong(6);
+ int executorId = rs.getInt(7);
+ boolean executorStatus = rs.getBoolean(8);
+
+ if (data == null) {
+ logger.error("Found a flow with empty data blob exec_id: " + id);
+ } else {
+ EncodingType encType = EncodingType.fromInteger(encodingType);
+ Object flowObj;
+ try {
+ if (encType == EncodingType.GZIP) {
+ String jsonString = GZIPUtils.unGzipString(data, "UTF-8");
+ flowObj = JSONUtils.parseJSONFromString(jsonString);
+ } else {
+ String jsonString = new String(data, "UTF-8");
+ flowObj = JSONUtils.parseJSONFromString(jsonString);
+ }
+
+ ExecutableFlow exFlow =
+ ExecutableFlow.createExecutableFlowFromObject(flowObj);
+ Executor executor = new Executor(executorId, host, port, executorStatus);
+ ExecutionReference ref = new ExecutionReference(id, executor);
+ ref.setUpdateTime(updateTime);
+
+ execFlows.add(new Pair<>(ref, exFlow));
+ } catch (IOException e) {
+ throw new SQLException("Error retrieving flow data " + id, e);
+ }
+ }
+ } while (rs.next());
+
+ return execFlows;
+ }
+ }
+
private static class FetchExecutableFlows implements
ResultSetHandler<List<ExecutableFlow>> {
private static String FETCH_BASE_EXECUTABLE_FLOW_QUERY =
diff --git a/azkaban-common/src/test/java/azkaban/executor/ExecutorManagerTest.java b/azkaban-common/src/test/java/azkaban/executor/ExecutorManagerTest.java
index 62d187b..8f84eca 100644
--- a/azkaban-common/src/test/java/azkaban/executor/ExecutorManagerTest.java
+++ b/azkaban-common/src/test/java/azkaban/executor/ExecutorManagerTest.java
@@ -16,30 +16,36 @@
package azkaban.executor;
-import java.io.File;
import java.io.IOException;
+import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.LinkedList;
import java.util.List;
+import java.util.Map;
import java.util.Set;
import org.junit.Assert;
import org.junit.Test;
import azkaban.alert.Alerter;
-import azkaban.flow.Flow;
-import azkaban.project.Project;
import azkaban.user.User;
-import azkaban.utils.JSONUtils;
import azkaban.utils.Pair;
import azkaban.utils.Props;
import azkaban.utils.TestUtils;
+import static org.mockito.Mockito.*;
/**
* Test class for executor manager
*/
public class ExecutorManagerTest {
+ private ExecutorManager manager;
+ private ExecutorLoader loader;
+ private Props props;
+ private User user;
+ private Map<Integer, Pair<ExecutionReference, ExecutableFlow>> activeFlows = new HashMap<>();
+ private ExecutableFlow flow1;
+ private ExecutableFlow flow2;
/* Helper method to create a ExecutorManager Instance */
private ExecutorManager createMultiExecutorManagerInstance()
@@ -255,4 +261,90 @@ public class ExecutorManagerTest {
Assert.assertFalse(manager.getRunningFlows().contains(flow1));
}
+
+ /*
+ * Added tests for runningFlows
+ * TODO: When removing queuedFlows cache, will refactor rest of the ExecutorManager test cases
+ */
+ @Test
+ public void testSubmitFlows() throws ExecutorManagerException, IOException {
+ testSetUpForRunningFlows();
+ ExecutableFlow flow1 = TestUtils.createExecutableFlow("exectest1", "exec1");
+ manager.submitExecutableFlow(flow1, user.getUserId());
+ verify(loader).uploadExecutableFlow(flow1);
+ verify(loader).addActiveExecutableReference(any());
+ }
+
+ @Test
+ public void testFetchAllActiveFlows() throws ExecutorManagerException, IOException {
+ testSetUpForRunningFlows();
+ List<ExecutableFlow> flows = manager.getRunningFlows();
+ for(Pair<ExecutionReference, ExecutableFlow> pair : activeFlows.values()) {
+ Assert.assertTrue(flows.contains(pair.getSecond()));
+ }
+ }
+
+ @Test
+ public void testFetchActiveFlowByProject() throws ExecutorManagerException, IOException {
+ testSetUpForRunningFlows();
+ List<Integer> executions = manager.getRunningFlows(flow1.getProjectId(), flow1.getFlowId());
+ Assert.assertTrue(executions.contains(flow1.getExecutionId()));
+ Assert.assertTrue(manager.isFlowRunning(flow1.getProjectId(), flow1.getFlowId()));
+ }
+
+ @Test
+ public void testFetchActiveFlowWithExecutor() throws ExecutorManagerException, IOException {
+ testSetUpForRunningFlows();
+ List<Pair<ExecutableFlow, Executor>> activeFlowsWithExecutor =
+ manager.getActiveFlowsWithExecutor();
+ Assert.assertTrue(activeFlowsWithExecutor.contains(new Pair<>(flow1,
+ manager.fetchExecutor(flow1.getExecutionId()))));
+ Assert.assertTrue(activeFlowsWithExecutor.contains(new Pair<>(flow2,
+ manager.fetchExecutor(flow2.getExecutionId()))));
+ }
+
+ @Test
+ public void testFetchAllActiveExecutorServerHosts() throws ExecutorManagerException, IOException {
+ testSetUpForRunningFlows();
+ Set<String> activeExecutorServerHosts = manager.getAllActiveExecutorServerHosts();
+ Executor executor1 = manager.fetchExecutor(flow1.getExecutionId());
+ Executor executor2 = manager.fetchExecutor(flow2.getExecutionId());
+ Assert.assertTrue(activeExecutorServerHosts.contains(executor1.getHost() + ":" + executor1.getPort()));
+ Assert.assertTrue(activeExecutorServerHosts.contains(executor2.getHost() + ":" + executor2.getPort()));
+ }
+
+ /*
+ * TODO: will move below method to setUp() and run before every test for both runningFlows and queuedFlows
+ */
+ private void testSetUpForRunningFlows()
+ throws ExecutorManagerException, IOException {
+ loader = mock(ExecutorLoader.class);
+ user = TestUtils.getTestUser();
+ props = new Props();
+ props.put(ExecutorManager.AZKABAN_USE_MULTIPLE_EXECUTORS, "true");
+ //To test runningFlows, AZKABAN_QUEUEPROCESSING_ENABLED should be set to true
+ //so that flows will be dispatched to executors.
+ props.put(ExecutorManager.AZKABAN_QUEUEPROCESSING_ENABLED, "true");
+
+ List<Executor> executors = new ArrayList<>();
+ Executor executor1 = new Executor(1, "localhost", 12345, true);
+ Executor executor2 = new Executor(2, "localhost", 12346, true);
+ executors.add(executor1);
+ executors.add(executor2);
+
+ when(loader.fetchActiveExecutors()).thenReturn(executors);
+ manager = new ExecutorManager(props, loader, new HashMap<>());
+
+ flow1 = TestUtils.createExecutableFlow("exectest1", "exec1");
+ flow2 = TestUtils.createExecutableFlow("exectest1", "exec2");
+ flow1.setExecutionId(1);
+ flow2.setExecutionId(2);
+ ExecutionReference ref1 =
+ new ExecutionReference(flow1.getExecutionId(), executor1);
+ ExecutionReference ref2 =
+ new ExecutionReference(flow2.getExecutionId(), executor2);
+ activeFlows.put(flow1.getExecutionId(), new Pair<>(ref1, flow1));
+ activeFlows.put(flow2.getExecutionId(), new Pair<>(ref2, flow2));
+ when(loader.fetchActiveFlows()).thenReturn(activeFlows);
+ }
}
diff --git a/azkaban-common/src/test/java/azkaban/executor/JdbcExecutorLoaderTest.java b/azkaban-common/src/test/java/azkaban/executor/JdbcExecutorLoaderTest.java
index 39ed6d9..3a41b56 100644
--- a/azkaban-common/src/test/java/azkaban/executor/JdbcExecutorLoaderTest.java
+++ b/azkaban-common/src/test/java/azkaban/executor/JdbcExecutorLoaderTest.java
@@ -22,7 +22,6 @@ import java.sql.Connection;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.util.ArrayList;
-import java.util.HashMap;
import java.util.HashSet;
import java.util.LinkedList;
import java.util.List;
@@ -42,11 +41,8 @@ import org.junit.Test;
import azkaban.database.DataSourceUtils;
import azkaban.executor.ExecutorLogEvent.EventType;
-import azkaban.flow.Flow;
-import azkaban.project.Project;
import azkaban.user.User;
import azkaban.utils.FileIOUtils.LogData;
-import azkaban.utils.JSONUtils;
import azkaban.utils.Pair;
import azkaban.utils.Props;
import azkaban.utils.TestUtils;
@@ -55,7 +51,7 @@ public class JdbcExecutorLoaderTest {
private static boolean testDBExists;
/* Directory with serialized description of test flows */
private static final String UNIT_BASE_DIR =
- "../azkaban-test/src/test/resources/executions";
+ "../azkaban-test/src/test/resources/azkaban/test/executions";
// @TODO remove this and turn into local host.
private static final String host = "localhost";
private static final int port = 3306;
@@ -501,19 +497,26 @@ public class JdbcExecutorLoaderTest {
ExecutableFlow flow = TestUtils.createExecutableFlow("exectest1", "exec1");
loader.uploadExecutableFlow(flow);
ExecutableFlow flow2 = TestUtils.createExecutableFlow("exectest1", "exec2");
- loader.uploadExecutableFlow(flow);
+ loader.uploadExecutableFlow(flow2);
ExecutionReference ref2 = new ExecutionReference(flow2.getExecutionId());
loader.addActiveExecutableReference(ref2);
ExecutionReference ref = new ExecutionReference(flow.getExecutionId());
loader.addActiveExecutableReference(ref);
- queuedFlows.add(new Pair<ExecutionReference, ExecutableFlow>(ref, flow));
- queuedFlows.add(new Pair<ExecutionReference, ExecutableFlow>(ref2, flow2));
+ List<Pair<ExecutionReference, ExecutableFlow>> fetchedQueuedFlows = loader.fetchQueuedFlows();
+ Assert.assertEquals(2, fetchedQueuedFlows.size());
+ Pair<ExecutionReference, ExecutableFlow> fetchedFlow1 = fetchedQueuedFlows.get(0);
+ Pair<ExecutionReference, ExecutableFlow> fetchedFlow2 = fetchedQueuedFlows.get(1);
- // only running and completed flows
- Assert.assertArrayEquals(loader.fetchQueuedFlows().toArray(),
- queuedFlows.toArray());
+ Assert.assertEquals(ref.getExecId(), fetchedFlow1.getFirst().getExecId());
+ Assert.assertEquals(flow.getExecutionId(), fetchedFlow1.getSecond().getExecutionId());
+ Assert.assertEquals(flow.getFlowId(), fetchedFlow1.getSecond().getFlowId());
+ Assert.assertEquals(flow.getProjectId(), fetchedFlow1.getSecond().getProjectId());
+ Assert.assertEquals(ref2.getExecId(), fetchedFlow2.getFirst().getExecId());
+ Assert.assertEquals(flow2.getExecutionId(), fetchedFlow2.getSecond().getExecutionId());
+ Assert.assertEquals(flow2.getFlowId(), fetchedFlow2.getSecond().getFlowId());
+ Assert.assertEquals(flow2.getProjectId(), fetchedFlow2.getSecond().getProjectId());
}
/* Test all executors fetch from empty executors */
@@ -786,13 +789,15 @@ public class JdbcExecutorLoaderTest {
ExecutorLoader loader = createLoader();
ExecutableFlow flow1 = TestUtils.createExecutableFlow("exectest1", "exec1");
loader.uploadExecutableFlow(flow1);
- Executor executor = new Executor(2, "test", 1, true);
+ Executor executor = loader.addExecutor("test", 1);
+ loader.assignExecutor(executor.getId(), flow1.getExecutionId());
ExecutionReference ref1 =
new ExecutionReference(flow1.getExecutionId(), executor);
loader.addActiveExecutableReference(ref1);
ExecutableFlow flow2 = TestUtils.createExecutableFlow("exectest1", "exec1");
loader.uploadExecutableFlow(flow2);
+ loader.assignExecutor(executor.getId(), flow2.getExecutionId());
ExecutionReference ref2 =
new ExecutionReference(flow2.getExecutionId(), executor);
loader.addActiveExecutableReference(ref2);
@@ -839,6 +844,35 @@ public class JdbcExecutorLoaderTest {
Assert.assertFalse(activeFlows2.containsKey(flow2.getExecutionId()));
}
+ @Test
+ public void testFetchActiveFlowByExecId() throws Exception {
+ if (!isTestSetup()) {
+ return;
+ }
+
+ ExecutorLoader loader = createLoader();
+ ExecutableFlow flow1 = TestUtils.createExecutableFlow("exectest1", "exec1");
+ loader.uploadExecutableFlow(flow1);
+ Executor executor = loader.addExecutor("test", 1);
+ loader.assignExecutor(executor.getId(), flow1.getExecutionId());
+ ExecutionReference ref1 = new ExecutionReference(flow1.getExecutionId(), executor);
+ loader.addActiveExecutableReference(ref1);
+
+ Pair<ExecutionReference, ExecutableFlow> activeFlow1 =
+ loader.fetchActiveFlowByExecId(flow1.getExecutionId());
+
+ ExecutionReference execRef1 = activeFlow1.getFirst();
+ ExecutableFlow execFlow1 = activeFlow1.getSecond();
+ Assert.assertNotNull(execRef1);
+ Assert.assertEquals(ref1.getExecId(), execRef1.getExecId());
+ Assert.assertEquals(ref1.getExecutor(), execRef1.getExecutor());
+ Assert.assertNotNull(execFlow1);
+ Assert.assertEquals(flow1.getExecutionId(), execFlow1.getExecutionId());
+ Assert.assertEquals(flow1.getFlowId(), execFlow1.getFlowId());
+ Assert.assertEquals(flow1.getProjectId(), execFlow1.getProjectId());
+ Assert.assertEquals(flow1.getVersion(), execFlow1.getVersion());
+ }
+
@Ignore @Test
public void testSmallUploadLog() throws ExecutorManagerException {
File logDir = new File(UNIT_BASE_DIR + "logtest");
diff --git a/azkaban-common/src/test/java/azkaban/executor/MockExecutorLoader.java b/azkaban-common/src/test/java/azkaban/executor/MockExecutorLoader.java
index 2c8a95b..728cb16 100644
--- a/azkaban-common/src/test/java/azkaban/executor/MockExecutorLoader.java
+++ b/azkaban-common/src/test/java/azkaban/executor/MockExecutorLoader.java
@@ -67,6 +67,12 @@ public class MockExecutorLoader implements ExecutorLoader {
}
@Override
+ public Pair<ExecutionReference, ExecutableFlow> fetchActiveFlowByExecId(int execId)
+ throws ExecutorManagerException {
+ return activeFlows.get(execId);
+ }
+
+ @Override
public List<ExecutableFlow> fetchFlowHistory(int projectId, String flowId,
int skip, int num) throws ExecutorManagerException {
return null;
diff --git a/azkaban-test/src/test/resources/azkaban/test/executions/exectest1/exec2.flow b/azkaban-test/src/test/resources/azkaban/test/executions/exectest1/exec2.flow
index 7197af9..d9d2706 100644
--- a/azkaban-test/src/test/resources/azkaban/test/executions/exectest1/exec2.flow
+++ b/azkaban-test/src/test/resources/azkaban/test/executions/exectest1/exec2.flow
@@ -1,7 +1,7 @@
{
"project.id":1,
"version":2,
- "id" : "derived-member-data",
+ "id" : "derived-member-data-2",
"success.email" : [],
"edges" : [ {
"source" : "job1",