diff --git a/azkaban-exec-server/src/main/java/azkaban/execapp/FlowRunnerManager.java b/azkaban-exec-server/src/main/java/azkaban/execapp/FlowRunnerManager.java
index 42091ae..3cb2311 100644
--- a/azkaban-exec-server/src/main/java/azkaban/execapp/FlowRunnerManager.java
+++ b/azkaban-exec-server/src/main/java/azkaban/execapp/FlowRunnerManager.java
@@ -77,8 +77,8 @@ import org.apache.log4j.Logger;
/**
* Execution manager for the server side execution.
*
- * When a flow is submitted to FlowRunnerManager, it is the {@link Status.PREPARING} status. When a
- * flow is about to be executed by FlowRunner, its status is updated to {@link Status.RUNNING}
+ * When a flow is submitted to FlowRunnerManager, it is the {@link Status#PREPARING} status. When a
+ * flow is about to be executed by FlowRunner, its status is updated to {@link Status#RUNNING}
*
* Two main data structures are used in this class to maintain flows.
*
@@ -129,17 +129,17 @@ public class FlowRunnerManager implements EventListener,
private final Object executionDirDeletionSync = new Object();
private Map<Pair<Integer, Integer>, ProjectVersion> installedProjects;
- private int numThreads = DEFAULT_NUM_EXECUTING_FLOWS;
+ private final int numThreads;
private int threadPoolQueueSize = -1;
- private int numJobThreadPerFlow = DEFAULT_FLOW_NUM_JOB_TREADS;
+ private final int numJobThreadPerFlow;
private Props globalProps;
private long lastCleanerThreadCheckTime = -1;
private long executionDirRetention = 1 * 24 * 60 * 60 * 1000; // 1 Day
// We want to limit the log sizes to about 20 megs
- private String jobLogChunkSize = "5MB";
- private int jobLogNumFiles = 4;
+ private final String jobLogChunkSize;
+ private final int jobLogNumFiles;
// If true, jobs will validate proxy user against a list of valid proxy users.
- private boolean validateProxyUser = false;
+ private final boolean validateProxyUser;
// date time of the the last flow submitted.
private long lastFlowSubmittedDate = 0;
// whether the current executor is active
@@ -870,25 +870,25 @@ public class FlowRunnerManager implements EventListener,
synchronized (this) {
try {
FlowRunnerManager.this.lastCleanerThreadCheckTime = System.currentTimeMillis();
- logger.info("# of executing flows: " + getNumRunningFlows());
+ FlowRunnerManager.logger.info("# of executing flows: " + getNumRunningFlows());
// Cleanup old stuff.
final long currentTime = System.currentTimeMillis();
if (currentTime - RECENTLY_FINISHED_INTERVAL_MS > this.lastRecentlyFinishedCleanTime) {
- logger.info("Cleaning recently finished");
+ FlowRunnerManager.logger.info("Cleaning recently finished");
cleanRecentlyFinished();
this.lastRecentlyFinishedCleanTime = currentTime;
}
if (currentTime - OLD_PROJECT_DIR_INTERVAL_MS > this.lastOldProjectCleanTime
&& FlowRunnerManager.this.isExecutorActive) {
- logger.info("Cleaning old projects");
+ FlowRunnerManager.logger.info("Cleaning old projects");
cleanProjectsOfOldVersion();
this.lastOldProjectCleanTime = currentTime;
}
if (currentTime - EXECUTION_DIR_CLEAN_INTERVAL_MS > this.lastExecutionDirCleanTime) {
- logger.info("Cleaning old execution dirs");
+ FlowRunnerManager.logger.info("Cleaning old execution dirs");
cleanOlderExecutionDirs();
this.lastExecutionDirCleanTime = currentTime;
}
@@ -896,12 +896,13 @@ public class FlowRunnerManager implements EventListener,
if (this.flowMaxRunningTimeInMins > 0
&& currentTime - LONG_RUNNING_FLOW_KILLING_INTERVAL_MS
> this.lastLongRunningFlowCleanTime) {
- logger.info(String.format("Killing long jobs running longer than %s mins",
- this.flowMaxRunningTimeInMins));
+ FlowRunnerManager.logger
+ .info(String.format("Killing long jobs running longer than %s mins",
+ this.flowMaxRunningTimeInMins));
for (final FlowRunner flowRunner : FlowRunnerManager.this.runningFlows.values()) {
if (isFlowRunningLongerThan(flowRunner.getExecutableFlow(),
this.flowMaxRunningTimeInMins)) {
- logger.info(String
+ FlowRunnerManager.logger.info(String
.format("Killing job [id: %s, status: %s]. It has been running for %s mins",
flowRunner.getExecutableFlow().getId(),
flowRunner.getExecutableFlow().getStatus(), TimeUnit.MILLISECONDS
@@ -913,11 +914,11 @@ public class FlowRunnerManager implements EventListener,
this.lastLongRunningFlowCleanTime = currentTime;
}
- wait(RECENTLY_FINISHED_TIME_TO_LIVE);
+ wait(FlowRunnerManager.RECENTLY_FINISHED_TIME_TO_LIVE);
} catch (final InterruptedException e) {
- logger.info("Interrupted. Probably to shut down.");
+ FlowRunnerManager.logger.info("Interrupted. Probably to shut down.");
} catch (final Throwable t) {
- logger.warn(
+ FlowRunnerManager.logger.warn(
"Uncaught throwable, please look into why it is not caught", t);
}
}
@@ -940,7 +941,7 @@ public class FlowRunnerManager implements EventListener,
continue;
}
} catch (final NumberFormatException e) {
- logger.error("Can't delete exec dir " + exDir.getName()
+ FlowRunnerManager.logger.error("Can't delete exec dir " + exDir.getName()
+ " it is not a number");
continue;
}
@@ -949,7 +950,7 @@ public class FlowRunnerManager implements EventListener,
try {
FileUtils.deleteDirectory(exDir);
} catch (final IOException e) {
- logger.error("Error cleaning execution dir " + exDir.getPath(), e);
+ FlowRunnerManager.logger.error("Error cleaning execution dir " + exDir.getPath(), e);
}
}
}
@@ -957,7 +958,7 @@ public class FlowRunnerManager implements EventListener,
private void cleanRecentlyFinished() {
final long cleanupThreshold =
- System.currentTimeMillis() - RECENTLY_FINISHED_TIME_TO_LIVE;
+ System.currentTimeMillis() - FlowRunnerManager.RECENTLY_FINISHED_TIME_TO_LIVE;
final ArrayList<Integer> executionToKill = new ArrayList<>();
for (final ExecutableFlow flow : FlowRunnerManager.this.recentlyFinishedFlows.values()) {
if (flow.getEndTime() < cleanupThreshold) {
@@ -966,7 +967,7 @@ public class FlowRunnerManager implements EventListener,
}
for (final Integer id : executionToKill) {
- logger.info("Cleaning execution " + id
+ FlowRunnerManager.logger.info("Cleaning execution " + id
+ " from recently finished flows list.");
FlowRunnerManager.this.recentlyFinishedFlows.remove(id);
}
@@ -1000,13 +1001,13 @@ public class FlowRunnerManager implements EventListener,
final ProjectVersion version = installedVersions.get(i);
if (!isActiveProject(version)) {
try {
- logger.info("Removing old unused installed project "
+ FlowRunnerManager.logger.info("Removing old unused installed project "
+ version.getProjectId() + ":" + version.getVersion());
- deleteDirectory(version);
+ FlowRunnerManager.deleteDirectory(version);
FlowRunnerManager.this.installedProjects.remove(new Pair<>(version
.getProjectId(), version.getVersion()));
} catch (final IOException e) {
- logger.error(e);
+ FlowRunnerManager.logger.error(e);
}
}
}