diff --git a/azkaban-common/src/main/java/azkaban/executor/ExecutorManager.java b/azkaban-common/src/main/java/azkaban/executor/ExecutorManager.java
index 87e0346..2947e7a 100644
--- a/azkaban-common/src/main/java/azkaban/executor/ExecutorManager.java
+++ b/azkaban-common/src/main/java/azkaban/executor/ExecutorManager.java
@@ -29,7 +29,6 @@ import azkaban.metrics.CommonMetrics;
import azkaban.project.Project;
import azkaban.project.ProjectWhitelist;
import azkaban.utils.AuthenticationUtils;
-import azkaban.utils.FileIOUtils.JobMetaData;
import azkaban.utils.FileIOUtils.LogData;
import azkaban.utils.Pair;
import azkaban.utils.Props;
@@ -101,6 +100,7 @@ public class ExecutorManager extends EventHandler implements
private final ExecutorManagerUpdaterStage updaterStage;
private final ExecutionFinalizer executionFinalizer;
private final ActiveExecutors activeExecutors;
+ private final ExecutorService executorInfoRefresherService;
QueuedExecutions queuedFlows;
File cacheDir;
private QueueProcessorThread queueProcessor;
@@ -108,7 +108,6 @@ public class ExecutorManager extends EventHandler implements
private List<String> filterList;
private Map<String, Integer> comparatorWeightsMap;
private long lastSuccessfulExecutorInfoRefresh;
- private final ExecutorService executorInfoRefresherService;
private Duration sleepAfterDispatchFailure = Duration.ofSeconds(1L);
private boolean initialized = false;
@@ -135,6 +134,18 @@ public class ExecutorManager extends EventHandler implements
this.executorInfoRefresherService = createExecutorInfoRefresherService();
}
+ // TODO move to some common place
+ static boolean isFinished(final ExecutableFlow flow) {
+ switch (flow.getStatus()) {
+ case SUCCEEDED:
+ case FAILED:
+ case KILLED:
+ return true;
+ default:
+ return false;
+ }
+ }
+
private int getMaxConcurrentRunsOneFlow(final Props azkProps) {
// The default threshold is set to 30 for now, in case some users are affected. We may
// decrease this number in future, to better prevent DDos attacks.
@@ -165,18 +176,6 @@ public class ExecutorManager extends EventHandler implements
this.queueProcessor = setupQueueProcessor();
}
- // TODO move to some common place
- static boolean isFinished(final ExecutableFlow flow) {
- switch (flow.getStatus()) {
- case SUCCEEDED:
- case FAILED:
- case KILLED:
- return true;
- default:
- return false;
- }
- }
-
public void start() throws ExecutorManagerException {
initialize();
this.updaterThread.start();
@@ -626,14 +625,6 @@ public class ExecutorManager extends EventHandler implements
}
@Override
- public List<ExecutableFlow> getExecutableFlows(final Project project,
- final String flowId, final int skip, final int size) throws ExecutorManagerException {
- final List<ExecutableFlow> flows =
- this.executorLoader.fetchFlowHistory(project.getId(), flowId, skip, size);
- return flows;
- }
-
- @Override
public List<ExecutableFlow> getExecutableFlows(final int skip, final int size)
throws ExecutorManagerException {
final List<ExecutableFlow> flows = this.executorLoader.fetchFlowHistory(skip, size);
@@ -675,12 +666,6 @@ public class ExecutorManager extends EventHandler implements
}
@Override
- public int getNumberOfExecutions(final Project project, final String flowId)
- throws ExecutorManagerException {
- return this.executorLoader.fetchNumExecutableFlows(project.getId(), flowId);
- }
-
- @Override
public LogData getExecutableFlowLog(final ExecutableFlow exFlow, final int offset,
final int length) throws ExecutorManagerException {
final Pair<ExecutionReference, ExecutableFlow> pair =
@@ -849,33 +834,6 @@ public class ExecutorManager extends EventHandler implements
return null;
}
- @Override
- public JobMetaData getExecutionJobMetaData(final ExecutableFlow exFlow,
- final String jobId, final int offset, final int length, final int attempt)
- throws ExecutorManagerException {
- final Pair<ExecutionReference, ExecutableFlow> pair =
- this.runningExecutions.get().get(exFlow.getExecutionId());
- if (pair != null) {
-
- final Pair<String, String> typeParam = new Pair<>("type", "job");
- final Pair<String, String> jobIdParam =
- new Pair<>("jobId", jobId);
- final Pair<String, String> offsetParam =
- new Pair<>("offset", String.valueOf(offset));
- final Pair<String, String> lengthParam =
- new Pair<>("length", String.valueOf(length));
- final Pair<String, String> attemptParam =
- new Pair<>("attempt", String.valueOf(attempt));
-
- @SuppressWarnings("unchecked") final Map<String, Object> result =
- this.apiGateway.callWithReference(pair.getFirst(), ConnectorParams.METADATA_ACTION,
- typeParam, jobIdParam, offsetParam, lengthParam, attemptParam);
- return JobMetaData.createJobMetaDataFromObject(result);
- } else {
- return null;
- }
- }
-
/**
* if flows was dispatched to an executor, cancel by calling Executor else if flow is still in
* queue, remove from queue and finalize {@inheritDoc}
@@ -937,53 +895,11 @@ public class ExecutorManager extends EventHandler implements
}
@Override
- public void pauseExecutingJobs(final ExecutableFlow exFlow, final String userId,
- final String... jobIds) throws ExecutorManagerException {
- modifyExecutingJobs(exFlow, ConnectorParams.MODIFY_PAUSE_JOBS, userId,
- jobIds);
- }
-
- @Override
- public void resumeExecutingJobs(final ExecutableFlow exFlow, final String userId,
- final String... jobIds) throws ExecutorManagerException {
- modifyExecutingJobs(exFlow, ConnectorParams.MODIFY_RESUME_JOBS, userId,
- jobIds);
- }
-
- @Override
public void retryFailures(final ExecutableFlow exFlow, final String userId)
throws ExecutorManagerException {
modifyExecutingJobs(exFlow, ConnectorParams.MODIFY_RETRY_FAILURES, userId);
}
- @Override
- public void retryExecutingJobs(final ExecutableFlow exFlow, final String userId,
- final String... jobIds) throws ExecutorManagerException {
- modifyExecutingJobs(exFlow, ConnectorParams.MODIFY_RETRY_JOBS, userId,
- jobIds);
- }
-
- @Override
- public void disableExecutingJobs(final ExecutableFlow exFlow, final String userId,
- final String... jobIds) throws ExecutorManagerException {
- modifyExecutingJobs(exFlow, ConnectorParams.MODIFY_DISABLE_JOBS, userId,
- jobIds);
- }
-
- @Override
- public void enableExecutingJobs(final ExecutableFlow exFlow, final String userId,
- final String... jobIds) throws ExecutorManagerException {
- modifyExecutingJobs(exFlow, ConnectorParams.MODIFY_ENABLE_JOBS, userId,
- jobIds);
- }
-
- @Override
- public void cancelExecutingJobs(final ExecutableFlow exFlow, final String userId,
- final String... jobIds) throws ExecutorManagerException {
- modifyExecutingJobs(exFlow, ConnectorParams.MODIFY_CANCEL_JOBS, userId,
- jobIds);
- }
-
@SuppressWarnings("unchecked")
private Map<String, Object> modifyExecutingJobs(final ExecutableFlow exFlow,
final String command, final String userId, final String... jobIds)
@@ -1234,6 +1150,11 @@ public class ExecutorManager extends EventHandler implements
exflow.getExecutionId(), reference.getNumErrors()));
}
+ @VisibleForTesting
+ void setSleepAfterDispatchFailure(final Duration sleepAfterDispatchFailure) {
+ this.sleepAfterDispatchFailure = sleepAfterDispatchFailure;
+ }
+
/*
* cleaner thread to clean up execution_logs, etc in DB. Runs every hour.
*/
@@ -1549,9 +1470,4 @@ public class ExecutorManager extends EventHandler implements
ExecutorManager.this.queuedFlows.enqueue(exflow, reference);
}
}
-
- @VisibleForTesting
- void setSleepAfterDispatchFailure(final Duration sleepAfterDispatchFailure) {
- this.sleepAfterDispatchFailure = sleepAfterDispatchFailure;
- }
}
diff --git a/azkaban-common/src/main/java/azkaban/executor/ExecutorManagerAdapter.java b/azkaban-common/src/main/java/azkaban/executor/ExecutorManagerAdapter.java
index 2c2cd7a..ae263ce 100644
--- a/azkaban-common/src/main/java/azkaban/executor/ExecutorManagerAdapter.java
+++ b/azkaban-common/src/main/java/azkaban/executor/ExecutorManagerAdapter.java
@@ -17,7 +17,6 @@
package azkaban.executor;
import azkaban.project.Project;
-import azkaban.utils.FileIOUtils.JobMetaData;
import azkaban.utils.FileIOUtils.LogData;
import azkaban.utils.Pair;
import java.io.IOException;
@@ -50,9 +49,6 @@ public interface ExecutorManagerAdapter {
public List<ExecutableFlow> getRecentlyFinishedFlows();
- public List<ExecutableFlow> getExecutableFlows(Project project,
- String flowId, int skip, int size) throws ExecutorManagerException;
-
public List<ExecutableFlow> getExecutableFlows(int skip, int size)
throws ExecutorManagerException;
@@ -76,9 +72,6 @@ public interface ExecutorManagerAdapter {
public int getNumberOfJobExecutions(Project project, String jobId)
throws ExecutorManagerException;
- public int getNumberOfExecutions(Project project, String flowId)
- throws ExecutorManagerException;
-
public LogData getExecutableFlowLog(ExecutableFlow exFlow, int offset,
int length) throws ExecutorManagerException;
@@ -90,10 +83,6 @@ public interface ExecutorManagerAdapter {
public String getJobLinkUrl(ExecutableFlow exFlow, String jobId, int attempt);
- public JobMetaData getExecutionJobMetaData(ExecutableFlow exFlow,
- String jobId, int offset, int length, int attempt)
- throws ExecutorManagerException;
-
public void cancelFlow(ExecutableFlow exFlow, String userId)
throws ExecutorManagerException;
@@ -103,27 +92,9 @@ public interface ExecutorManagerAdapter {
public void pauseFlow(ExecutableFlow exFlow, String userId)
throws ExecutorManagerException;
- public void pauseExecutingJobs(ExecutableFlow exFlow, String userId,
- String... jobIds) throws ExecutorManagerException;
-
- public void resumeExecutingJobs(ExecutableFlow exFlow, String userId,
- String... jobIds) throws ExecutorManagerException;
-
public void retryFailures(ExecutableFlow exFlow, String userId)
throws ExecutorManagerException;
- public void retryExecutingJobs(ExecutableFlow exFlow, String userId,
- String... jobIds) throws ExecutorManagerException;
-
- public void disableExecutingJobs(ExecutableFlow exFlow, String userId,
- String... jobIds) throws ExecutorManagerException;
-
- public void enableExecutingJobs(ExecutableFlow exFlow, String userId,
- String... jobIds) throws ExecutorManagerException;
-
- public void cancelExecutingJobs(ExecutableFlow exFlow, String userId,
- String... jobIds) throws ExecutorManagerException;
-
public String submitExecutableFlow(ExecutableFlow exflow, String userId)
throws ExecutorManagerException;