azkaban-aplcache

Details

diff --git a/azkaban-common/src/main/java/azkaban/executor/ConnectorParams.java b/azkaban-common/src/main/java/azkaban/executor/ConnectorParams.java
index 8b32604..54cb5ad 100644
--- a/azkaban-common/src/main/java/azkaban/executor/ConnectorParams.java
+++ b/azkaban-common/src/main/java/azkaban/executor/ConnectorParams.java
@@ -43,12 +43,6 @@ public interface ConnectorParams {
   public static final String MODIFY_EXECUTION_ACTION = "modifyExecution";
   public static final String MODIFY_EXECUTION_ACTION_TYPE = "modifyType";
   public static final String MODIFY_RETRY_FAILURES = "retryFailures";
-  public static final String MODIFY_RETRY_JOBS = "retryJobs";
-  public static final String MODIFY_CANCEL_JOBS = "cancelJobs";
-  public static final String MODIFY_DISABLE_JOBS = "skipJobs";
-  public static final String MODIFY_ENABLE_JOBS = "enableJobs";
-  public static final String MODIFY_PAUSE_JOBS = "pauseJobs";
-  public static final String MODIFY_RESUME_JOBS = "resumeJobs";
   public static final String MODIFY_JOBS_LIST = "jobIds";
 
   public static final String START_PARAM = "start";
diff --git a/azkaban-common/src/main/java/azkaban/executor/ExecutorManager.java b/azkaban-common/src/main/java/azkaban/executor/ExecutorManager.java
index 87e0346..2947e7a 100644
--- a/azkaban-common/src/main/java/azkaban/executor/ExecutorManager.java
+++ b/azkaban-common/src/main/java/azkaban/executor/ExecutorManager.java
@@ -29,7 +29,6 @@ import azkaban.metrics.CommonMetrics;
 import azkaban.project.Project;
 import azkaban.project.ProjectWhitelist;
 import azkaban.utils.AuthenticationUtils;
-import azkaban.utils.FileIOUtils.JobMetaData;
 import azkaban.utils.FileIOUtils.LogData;
 import azkaban.utils.Pair;
 import azkaban.utils.Props;
@@ -101,6 +100,7 @@ public class ExecutorManager extends EventHandler implements
   private final ExecutorManagerUpdaterStage updaterStage;
   private final ExecutionFinalizer executionFinalizer;
   private final ActiveExecutors activeExecutors;
+  private final ExecutorService executorInfoRefresherService;
   QueuedExecutions queuedFlows;
   File cacheDir;
   private QueueProcessorThread queueProcessor;
@@ -108,7 +108,6 @@ public class ExecutorManager extends EventHandler implements
   private List<String> filterList;
   private Map<String, Integer> comparatorWeightsMap;
   private long lastSuccessfulExecutorInfoRefresh;
-  private final ExecutorService executorInfoRefresherService;
   private Duration sleepAfterDispatchFailure = Duration.ofSeconds(1L);
   private boolean initialized = false;
 
@@ -135,6 +134,18 @@ public class ExecutorManager extends EventHandler implements
     this.executorInfoRefresherService = createExecutorInfoRefresherService();
   }
 
+  // TODO move to some common place
+  static boolean isFinished(final ExecutableFlow flow) {
+    switch (flow.getStatus()) {
+      case SUCCEEDED:
+      case FAILED:
+      case KILLED:
+        return true;
+      default:
+        return false;
+    }
+  }
+
   private int getMaxConcurrentRunsOneFlow(final Props azkProps) {
     // The default threshold is set to 30 for now, in case some users are affected. We may
     // decrease this number in future, to better prevent DDos attacks.
@@ -165,18 +176,6 @@ public class ExecutorManager extends EventHandler implements
     this.queueProcessor = setupQueueProcessor();
   }
 
-  // TODO move to some common place
-  static boolean isFinished(final ExecutableFlow flow) {
-    switch (flow.getStatus()) {
-      case SUCCEEDED:
-      case FAILED:
-      case KILLED:
-        return true;
-      default:
-        return false;
-    }
-  }
-
   public void start() throws ExecutorManagerException {
     initialize();
     this.updaterThread.start();
@@ -626,14 +625,6 @@ public class ExecutorManager extends EventHandler implements
   }
 
   @Override
-  public List<ExecutableFlow> getExecutableFlows(final Project project,
-      final String flowId, final int skip, final int size) throws ExecutorManagerException {
-    final List<ExecutableFlow> flows =
-        this.executorLoader.fetchFlowHistory(project.getId(), flowId, skip, size);
-    return flows;
-  }
-
-  @Override
   public List<ExecutableFlow> getExecutableFlows(final int skip, final int size)
       throws ExecutorManagerException {
     final List<ExecutableFlow> flows = this.executorLoader.fetchFlowHistory(skip, size);
@@ -675,12 +666,6 @@ public class ExecutorManager extends EventHandler implements
   }
 
   @Override
-  public int getNumberOfExecutions(final Project project, final String flowId)
-      throws ExecutorManagerException {
-    return this.executorLoader.fetchNumExecutableFlows(project.getId(), flowId);
-  }
-
-  @Override
   public LogData getExecutableFlowLog(final ExecutableFlow exFlow, final int offset,
       final int length) throws ExecutorManagerException {
     final Pair<ExecutionReference, ExecutableFlow> pair =
@@ -849,33 +834,6 @@ public class ExecutorManager extends EventHandler implements
     return null;
   }
 
-  @Override
-  public JobMetaData getExecutionJobMetaData(final ExecutableFlow exFlow,
-      final String jobId, final int offset, final int length, final int attempt)
-      throws ExecutorManagerException {
-    final Pair<ExecutionReference, ExecutableFlow> pair =
-        this.runningExecutions.get().get(exFlow.getExecutionId());
-    if (pair != null) {
-
-      final Pair<String, String> typeParam = new Pair<>("type", "job");
-      final Pair<String, String> jobIdParam =
-          new Pair<>("jobId", jobId);
-      final Pair<String, String> offsetParam =
-          new Pair<>("offset", String.valueOf(offset));
-      final Pair<String, String> lengthParam =
-          new Pair<>("length", String.valueOf(length));
-      final Pair<String, String> attemptParam =
-          new Pair<>("attempt", String.valueOf(attempt));
-
-      @SuppressWarnings("unchecked") final Map<String, Object> result =
-          this.apiGateway.callWithReference(pair.getFirst(), ConnectorParams.METADATA_ACTION,
-              typeParam, jobIdParam, offsetParam, lengthParam, attemptParam);
-      return JobMetaData.createJobMetaDataFromObject(result);
-    } else {
-      return null;
-    }
-  }
-
   /**
    * if flows was dispatched to an executor, cancel by calling Executor else if flow is still in
    * queue, remove from queue and finalize {@inheritDoc}
@@ -937,53 +895,11 @@ public class ExecutorManager extends EventHandler implements
   }
 
   @Override
-  public void pauseExecutingJobs(final ExecutableFlow exFlow, final String userId,
-      final String... jobIds) throws ExecutorManagerException {
-    modifyExecutingJobs(exFlow, ConnectorParams.MODIFY_PAUSE_JOBS, userId,
-        jobIds);
-  }
-
-  @Override
-  public void resumeExecutingJobs(final ExecutableFlow exFlow, final String userId,
-      final String... jobIds) throws ExecutorManagerException {
-    modifyExecutingJobs(exFlow, ConnectorParams.MODIFY_RESUME_JOBS, userId,
-        jobIds);
-  }
-
-  @Override
   public void retryFailures(final ExecutableFlow exFlow, final String userId)
       throws ExecutorManagerException {
     modifyExecutingJobs(exFlow, ConnectorParams.MODIFY_RETRY_FAILURES, userId);
   }
 
-  @Override
-  public void retryExecutingJobs(final ExecutableFlow exFlow, final String userId,
-      final String... jobIds) throws ExecutorManagerException {
-    modifyExecutingJobs(exFlow, ConnectorParams.MODIFY_RETRY_JOBS, userId,
-        jobIds);
-  }
-
-  @Override
-  public void disableExecutingJobs(final ExecutableFlow exFlow, final String userId,
-      final String... jobIds) throws ExecutorManagerException {
-    modifyExecutingJobs(exFlow, ConnectorParams.MODIFY_DISABLE_JOBS, userId,
-        jobIds);
-  }
-
-  @Override
-  public void enableExecutingJobs(final ExecutableFlow exFlow, final String userId,
-      final String... jobIds) throws ExecutorManagerException {
-    modifyExecutingJobs(exFlow, ConnectorParams.MODIFY_ENABLE_JOBS, userId,
-        jobIds);
-  }
-
-  @Override
-  public void cancelExecutingJobs(final ExecutableFlow exFlow, final String userId,
-      final String... jobIds) throws ExecutorManagerException {
-    modifyExecutingJobs(exFlow, ConnectorParams.MODIFY_CANCEL_JOBS, userId,
-        jobIds);
-  }
-
   @SuppressWarnings("unchecked")
   private Map<String, Object> modifyExecutingJobs(final ExecutableFlow exFlow,
       final String command, final String userId, final String... jobIds)
@@ -1234,6 +1150,11 @@ public class ExecutorManager extends EventHandler implements
         exflow.getExecutionId(), reference.getNumErrors()));
   }
 
+  @VisibleForTesting
+  void setSleepAfterDispatchFailure(final Duration sleepAfterDispatchFailure) {
+    this.sleepAfterDispatchFailure = sleepAfterDispatchFailure;
+  }
+
   /*
    * cleaner thread to clean up execution_logs, etc in DB. Runs every hour.
    */
@@ -1549,9 +1470,4 @@ public class ExecutorManager extends EventHandler implements
       ExecutorManager.this.queuedFlows.enqueue(exflow, reference);
     }
   }
-
-  @VisibleForTesting
-  void setSleepAfterDispatchFailure(final Duration sleepAfterDispatchFailure) {
-    this.sleepAfterDispatchFailure = sleepAfterDispatchFailure;
-  }
 }
diff --git a/azkaban-common/src/main/java/azkaban/executor/ExecutorManagerAdapter.java b/azkaban-common/src/main/java/azkaban/executor/ExecutorManagerAdapter.java
index 2c2cd7a..ae263ce 100644
--- a/azkaban-common/src/main/java/azkaban/executor/ExecutorManagerAdapter.java
+++ b/azkaban-common/src/main/java/azkaban/executor/ExecutorManagerAdapter.java
@@ -17,7 +17,6 @@
 package azkaban.executor;
 
 import azkaban.project.Project;
-import azkaban.utils.FileIOUtils.JobMetaData;
 import azkaban.utils.FileIOUtils.LogData;
 import azkaban.utils.Pair;
 import java.io.IOException;
@@ -50,9 +49,6 @@ public interface ExecutorManagerAdapter {
 
   public List<ExecutableFlow> getRecentlyFinishedFlows();
 
-  public List<ExecutableFlow> getExecutableFlows(Project project,
-      String flowId, int skip, int size) throws ExecutorManagerException;
-
   public List<ExecutableFlow> getExecutableFlows(int skip, int size)
       throws ExecutorManagerException;
 
@@ -76,9 +72,6 @@ public interface ExecutorManagerAdapter {
   public int getNumberOfJobExecutions(Project project, String jobId)
       throws ExecutorManagerException;
 
-  public int getNumberOfExecutions(Project project, String flowId)
-      throws ExecutorManagerException;
-
   public LogData getExecutableFlowLog(ExecutableFlow exFlow, int offset,
       int length) throws ExecutorManagerException;
 
@@ -90,10 +83,6 @@ public interface ExecutorManagerAdapter {
 
   public String getJobLinkUrl(ExecutableFlow exFlow, String jobId, int attempt);
 
-  public JobMetaData getExecutionJobMetaData(ExecutableFlow exFlow,
-      String jobId, int offset, int length, int attempt)
-      throws ExecutorManagerException;
-
   public void cancelFlow(ExecutableFlow exFlow, String userId)
       throws ExecutorManagerException;
 
@@ -103,27 +92,9 @@ public interface ExecutorManagerAdapter {
   public void pauseFlow(ExecutableFlow exFlow, String userId)
       throws ExecutorManagerException;
 
-  public void pauseExecutingJobs(ExecutableFlow exFlow, String userId,
-      String... jobIds) throws ExecutorManagerException;
-
-  public void resumeExecutingJobs(ExecutableFlow exFlow, String userId,
-      String... jobIds) throws ExecutorManagerException;
-
   public void retryFailures(ExecutableFlow exFlow, String userId)
       throws ExecutorManagerException;
 
-  public void retryExecutingJobs(ExecutableFlow exFlow, String userId,
-      String... jobIds) throws ExecutorManagerException;
-
-  public void disableExecutingJobs(ExecutableFlow exFlow, String userId,
-      String... jobIds) throws ExecutorManagerException;
-
-  public void enableExecutingJobs(ExecutableFlow exFlow, String userId,
-      String... jobIds) throws ExecutorManagerException;
-
-  public void cancelExecutingJobs(ExecutableFlow exFlow, String userId,
-      String... jobIds) throws ExecutorManagerException;
-
   public String submitExecutableFlow(ExecutableFlow exflow, String userId)
       throws ExecutorManagerException;