azkaban-aplcache

Clean up FlowRunnerManager (#1980) - fix broken refs in javadoc -

10/16/2018 3:22:56 PM

Details

diff --git a/azkaban-exec-server/src/main/java/azkaban/execapp/FlowRunnerManager.java b/azkaban-exec-server/src/main/java/azkaban/execapp/FlowRunnerManager.java
index 42091ae..3cb2311 100644
--- a/azkaban-exec-server/src/main/java/azkaban/execapp/FlowRunnerManager.java
+++ b/azkaban-exec-server/src/main/java/azkaban/execapp/FlowRunnerManager.java
@@ -77,8 +77,8 @@ import org.apache.log4j.Logger;
 /**
  * Execution manager for the server side execution.
  *
- * When a flow is submitted to FlowRunnerManager, it is the {@link Status.PREPARING} status. When a
- * flow is about to be executed by FlowRunner, its status is updated to {@link Status.RUNNING}
+ * When a flow is submitted to FlowRunnerManager, it is the {@link Status#PREPARING} status. When a
+ * flow is about to be executed by FlowRunner, its status is updated to {@link Status#RUNNING}
  *
  * Two main data structures are used in this class to maintain flows.
  *
@@ -129,17 +129,17 @@ public class FlowRunnerManager implements EventListener,
   private final Object executionDirDeletionSync = new Object();
 
   private Map<Pair<Integer, Integer>, ProjectVersion> installedProjects;
-  private int numThreads = DEFAULT_NUM_EXECUTING_FLOWS;
+  private final int numThreads;
   private int threadPoolQueueSize = -1;
-  private int numJobThreadPerFlow = DEFAULT_FLOW_NUM_JOB_TREADS;
+  private final int numJobThreadPerFlow;
   private Props globalProps;
   private long lastCleanerThreadCheckTime = -1;
   private long executionDirRetention = 1 * 24 * 60 * 60 * 1000; // 1 Day
   // We want to limit the log sizes to about 20 megs
-  private String jobLogChunkSize = "5MB";
-  private int jobLogNumFiles = 4;
+  private final String jobLogChunkSize;
+  private final int jobLogNumFiles;
   // If true, jobs will validate proxy user against a list of valid proxy users.
-  private boolean validateProxyUser = false;
+  private final boolean validateProxyUser;
   // date time of the the last flow submitted.
   private long lastFlowSubmittedDate = 0;
   // whether the current executor is active
@@ -870,25 +870,25 @@ public class FlowRunnerManager implements EventListener,
         synchronized (this) {
           try {
             FlowRunnerManager.this.lastCleanerThreadCheckTime = System.currentTimeMillis();
-            logger.info("# of executing flows: " + getNumRunningFlows());
+            FlowRunnerManager.logger.info("# of executing flows: " + getNumRunningFlows());
 
             // Cleanup old stuff.
             final long currentTime = System.currentTimeMillis();
             if (currentTime - RECENTLY_FINISHED_INTERVAL_MS > this.lastRecentlyFinishedCleanTime) {
-              logger.info("Cleaning recently finished");
+              FlowRunnerManager.logger.info("Cleaning recently finished");
               cleanRecentlyFinished();
               this.lastRecentlyFinishedCleanTime = currentTime;
             }
 
             if (currentTime - OLD_PROJECT_DIR_INTERVAL_MS > this.lastOldProjectCleanTime
                 && FlowRunnerManager.this.isExecutorActive) {
-              logger.info("Cleaning old projects");
+              FlowRunnerManager.logger.info("Cleaning old projects");
               cleanProjectsOfOldVersion();
               this.lastOldProjectCleanTime = currentTime;
             }
 
             if (currentTime - EXECUTION_DIR_CLEAN_INTERVAL_MS > this.lastExecutionDirCleanTime) {
-              logger.info("Cleaning old execution dirs");
+              FlowRunnerManager.logger.info("Cleaning old execution dirs");
               cleanOlderExecutionDirs();
               this.lastExecutionDirCleanTime = currentTime;
             }
@@ -896,12 +896,13 @@ public class FlowRunnerManager implements EventListener,
             if (this.flowMaxRunningTimeInMins > 0
                 && currentTime - LONG_RUNNING_FLOW_KILLING_INTERVAL_MS
                 > this.lastLongRunningFlowCleanTime) {
-              logger.info(String.format("Killing long jobs running longer than %s mins",
-                  this.flowMaxRunningTimeInMins));
+              FlowRunnerManager.logger
+                  .info(String.format("Killing long jobs running longer than %s mins",
+                      this.flowMaxRunningTimeInMins));
               for (final FlowRunner flowRunner : FlowRunnerManager.this.runningFlows.values()) {
                 if (isFlowRunningLongerThan(flowRunner.getExecutableFlow(),
                     this.flowMaxRunningTimeInMins)) {
-                  logger.info(String
+                  FlowRunnerManager.logger.info(String
                       .format("Killing job [id: %s, status: %s]. It has been running for %s mins",
                           flowRunner.getExecutableFlow().getId(),
                           flowRunner.getExecutableFlow().getStatus(), TimeUnit.MILLISECONDS
@@ -913,11 +914,11 @@ public class FlowRunnerManager implements EventListener,
               this.lastLongRunningFlowCleanTime = currentTime;
             }
 
-            wait(RECENTLY_FINISHED_TIME_TO_LIVE);
+            wait(FlowRunnerManager.RECENTLY_FINISHED_TIME_TO_LIVE);
           } catch (final InterruptedException e) {
-            logger.info("Interrupted. Probably to shut down.");
+            FlowRunnerManager.logger.info("Interrupted. Probably to shut down.");
           } catch (final Throwable t) {
-            logger.warn(
+            FlowRunnerManager.logger.warn(
                 "Uncaught throwable, please look into why it is not caught", t);
           }
         }
@@ -940,7 +941,7 @@ public class FlowRunnerManager implements EventListener,
             continue;
           }
         } catch (final NumberFormatException e) {
-          logger.error("Can't delete exec dir " + exDir.getName()
+          FlowRunnerManager.logger.error("Can't delete exec dir " + exDir.getName()
               + " it is not a number");
           continue;
         }
@@ -949,7 +950,7 @@ public class FlowRunnerManager implements EventListener,
           try {
             FileUtils.deleteDirectory(exDir);
           } catch (final IOException e) {
-            logger.error("Error cleaning execution dir " + exDir.getPath(), e);
+            FlowRunnerManager.logger.error("Error cleaning execution dir " + exDir.getPath(), e);
           }
         }
       }
@@ -957,7 +958,7 @@ public class FlowRunnerManager implements EventListener,
 
     private void cleanRecentlyFinished() {
       final long cleanupThreshold =
-          System.currentTimeMillis() - RECENTLY_FINISHED_TIME_TO_LIVE;
+          System.currentTimeMillis() - FlowRunnerManager.RECENTLY_FINISHED_TIME_TO_LIVE;
       final ArrayList<Integer> executionToKill = new ArrayList<>();
       for (final ExecutableFlow flow : FlowRunnerManager.this.recentlyFinishedFlows.values()) {
         if (flow.getEndTime() < cleanupThreshold) {
@@ -966,7 +967,7 @@ public class FlowRunnerManager implements EventListener,
       }
 
       for (final Integer id : executionToKill) {
-        logger.info("Cleaning execution " + id
+        FlowRunnerManager.logger.info("Cleaning execution " + id
             + " from recently finished flows list.");
         FlowRunnerManager.this.recentlyFinishedFlows.remove(id);
       }
@@ -1000,13 +1001,13 @@ public class FlowRunnerManager implements EventListener,
           final ProjectVersion version = installedVersions.get(i);
           if (!isActiveProject(version)) {
             try {
-              logger.info("Removing old unused installed project "
+              FlowRunnerManager.logger.info("Removing old unused installed project "
                   + version.getProjectId() + ":" + version.getVersion());
-              deleteDirectory(version);
+              FlowRunnerManager.deleteDirectory(version);
               FlowRunnerManager.this.installedProjects.remove(new Pair<>(version
                   .getProjectId(), version.getVersion()));
             } catch (final IOException e) {
-              logger.error(e);
+              FlowRunnerManager.logger.error(e);
             }
           }
         }