diff --git a/azkaban-exec-server/src/main/java/azkaban/execapp/ExecutorServlet.java b/azkaban-exec-server/src/main/java/azkaban/execapp/ExecutorServlet.java
index 3d618d2..e91ac53 100644
--- a/azkaban-exec-server/src/main/java/azkaban/execapp/ExecutorServlet.java
+++ b/azkaban-exec-server/src/main/java/azkaban/execapp/ExecutorServlet.java
@@ -357,6 +357,7 @@ public class ExecutorServlet extends HttpServlet implements ConnectorParams {
if (executor.isActive() != value) {
executor.setActive(value);
executorLoader.updateExecutor(executor);
+ flowRunnerManager.setActive(value);
} else {
logger.warn("Set active action ignored. Executor is already " + (value? "active" : "inactive"));
}
diff --git a/azkaban-exec-server/src/main/java/azkaban/execapp/FlowRunnerManager.java b/azkaban-exec-server/src/main/java/azkaban/execapp/FlowRunnerManager.java
index eed51ad..074d1b1 100644
--- a/azkaban-exec-server/src/main/java/azkaban/execapp/FlowRunnerManager.java
+++ b/azkaban-exec-server/src/main/java/azkaban/execapp/FlowRunnerManager.java
@@ -23,7 +23,6 @@ import java.io.IOException;
import java.lang.Thread.State;
import java.util.ArrayList;
import java.util.Collections;
-import java.util.Date;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
@@ -46,6 +45,7 @@ import azkaban.execapp.event.RemoteFlowWatcher;
import azkaban.execapp.metric.NumFailedFlowMetric;
import azkaban.executor.ExecutableFlow;
import azkaban.executor.ExecutionOptions;
+import azkaban.executor.Executor;
import azkaban.executor.ExecutorLoader;
import azkaban.executor.ExecutorManagerException;
import azkaban.jobtype.JobTypeManager;
@@ -148,6 +148,9 @@ public class FlowRunnerManager implements EventListener,
// date time of the the last flow submitted.
private long lastFlowSubmittedDate = 0;
+ // whether the current executor is active
+ private volatile boolean isActive = false;
+
public FlowRunnerManager(Props props, ExecutorLoader executorLoader,
ProjectLoader projectLoader, ClassLoader parentClassLoader)
throws IOException {
@@ -189,6 +192,8 @@ public class FlowRunnerManager implements EventListener,
this.validateProxyUser =
azkabanProps.getBoolean("proxy.user.lock.down", false);
+ setActive(true);
+
cleanerThread = new CleanerThread();
cleanerThread.start();
@@ -260,6 +265,10 @@ public class FlowRunnerManager implements EventListener,
return allProjects;
}
+ public void setActive(boolean isActive) {
+ this.isActive = isActive;
+ }
+
public long getLastFlowSubmittedTime(){
// Note: this is not thread safe and may result in providing dirty data.
// we will provide this data as is for now and will revisit if there
@@ -312,10 +321,9 @@ public class FlowRunnerManager implements EventListener,
lastRecentlyFinishedCleanTime = currentTime;
}
- if (currentTime - OLD_PROJECT_DIR_INTERVAL_MS > lastOldProjectCleanTime) {
+ if (currentTime - OLD_PROJECT_DIR_INTERVAL_MS > lastOldProjectCleanTime && isActive) {
logger.info("Cleaning old projects");
cleanOlderProjects();
-
lastOldProjectCleanTime = currentTime;
}
@@ -577,7 +585,7 @@ public class FlowRunnerManager implements EventListener,
projectVersion.setupProjectFiles(projectLoader, projectDirectory, logger);
projectVersion.copyCreateSymlinkDirectory(execPath);
} catch (Exception e) {
- e.printStackTrace();
+ logger.error("Error in setting up project directory "+projectDirectory+", "+e);
if (execPath.exists()) {
try {
FileUtils.deleteDirectory(execPath);
@@ -882,4 +890,4 @@ public class FlowRunnerManager implements EventListener,
submittedFlows.remove(r);
}
-}
\ No newline at end of file
+}
diff --git a/azkaban-exec-server/src/main/java/azkaban/execapp/ProjectVersion.java b/azkaban-exec-server/src/main/java/azkaban/execapp/ProjectVersion.java
index 0afc7f8..0dc967b 100644
--- a/azkaban-exec-server/src/main/java/azkaban/execapp/ProjectVersion.java
+++ b/azkaban-exec-server/src/main/java/azkaban/execapp/ProjectVersion.java
@@ -18,6 +18,8 @@ package azkaban.execapp;
import java.io.File;
import java.io.IOException;
+import java.nio.file.Files;
+import java.nio.file.StandardCopyOption;
import java.util.zip.ZipFile;
import org.apache.commons.io.FileUtils;
@@ -78,8 +80,7 @@ public class ProjectVersion implements Comparable<ProjectVersion> {
logger.info("Downloading zip file.");
ZipFile zip = new ZipFile(projectFileHandler.getLocalFile());
Utils.unzip(zip, tempDir);
-
- tempDir.renameTo(installedDir);
+ Files.move(tempDir.toPath(), installedDir.toPath(), StandardCopyOption.ATOMIC_MOVE);
} else {
throw new IOException("The file type hasn't been decided yet.");
}