azkaban-developers

1. make rename operation atomic (#831) 2. only active deployment

11/30/2016 12:28:26 AM
3.10.3

Details

diff --git a/azkaban-exec-server/src/main/java/azkaban/execapp/ExecutorServlet.java b/azkaban-exec-server/src/main/java/azkaban/execapp/ExecutorServlet.java
index 3d618d2..e91ac53 100644
--- a/azkaban-exec-server/src/main/java/azkaban/execapp/ExecutorServlet.java
+++ b/azkaban-exec-server/src/main/java/azkaban/execapp/ExecutorServlet.java
@@ -357,6 +357,7 @@ public class ExecutorServlet extends HttpServlet implements ConnectorParams {
       if (executor.isActive() != value) {
         executor.setActive(value);
         executorLoader.updateExecutor(executor);
+        flowRunnerManager.setActive(value);
       } else {
         logger.warn("Set active action ignored. Executor is already " + (value? "active" : "inactive"));
       }
diff --git a/azkaban-exec-server/src/main/java/azkaban/execapp/FlowRunnerManager.java b/azkaban-exec-server/src/main/java/azkaban/execapp/FlowRunnerManager.java
index eed51ad..074d1b1 100644
--- a/azkaban-exec-server/src/main/java/azkaban/execapp/FlowRunnerManager.java
+++ b/azkaban-exec-server/src/main/java/azkaban/execapp/FlowRunnerManager.java
@@ -23,7 +23,6 @@ import java.io.IOException;
 import java.lang.Thread.State;
 import java.util.ArrayList;
 import java.util.Collections;
-import java.util.Date;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
@@ -46,6 +45,7 @@ import azkaban.execapp.event.RemoteFlowWatcher;
 import azkaban.execapp.metric.NumFailedFlowMetric;
 import azkaban.executor.ExecutableFlow;
 import azkaban.executor.ExecutionOptions;
+import azkaban.executor.Executor;
 import azkaban.executor.ExecutorLoader;
 import azkaban.executor.ExecutorManagerException;
 import azkaban.jobtype.JobTypeManager;
@@ -148,6 +148,9 @@ public class FlowRunnerManager implements EventListener,
   // date time of the the last flow submitted.
   private long lastFlowSubmittedDate = 0;
 
+  // whether the current executor is active
+  private volatile boolean isActive = false;
+
   public FlowRunnerManager(Props props, ExecutorLoader executorLoader,
       ProjectLoader projectLoader, ClassLoader parentClassLoader)
       throws IOException {
@@ -189,6 +192,8 @@ public class FlowRunnerManager implements EventListener,
     this.validateProxyUser =
         azkabanProps.getBoolean("proxy.user.lock.down", false);
 
+    setActive(true);
+
     cleanerThread = new CleanerThread();
     cleanerThread.start();
 
@@ -260,6 +265,10 @@ public class FlowRunnerManager implements EventListener,
     return allProjects;
   }
 
+  public void setActive(boolean isActive) {
+    this.isActive = isActive;
+  }
+
   public long getLastFlowSubmittedTime(){
     // Note: this is not thread safe and may result in providing dirty data.
     //       we will provide this data as is for now and will revisit if there
@@ -312,10 +321,9 @@ public class FlowRunnerManager implements EventListener,
               lastRecentlyFinishedCleanTime = currentTime;
             }
 
-            if (currentTime - OLD_PROJECT_DIR_INTERVAL_MS > lastOldProjectCleanTime) {
+            if (currentTime - OLD_PROJECT_DIR_INTERVAL_MS > lastOldProjectCleanTime && isActive) {
               logger.info("Cleaning old projects");
               cleanOlderProjects();
-
               lastOldProjectCleanTime = currentTime;
             }
 
@@ -577,7 +585,7 @@ public class FlowRunnerManager implements EventListener,
       projectVersion.setupProjectFiles(projectLoader, projectDirectory, logger);
       projectVersion.copyCreateSymlinkDirectory(execPath);
     } catch (Exception e) {
-      e.printStackTrace();
+      logger.error("Error in setting up project directory "+projectDirectory+", "+e);
       if (execPath.exists()) {
         try {
           FileUtils.deleteDirectory(execPath);
@@ -882,4 +890,4 @@ public class FlowRunnerManager implements EventListener,
     submittedFlows.remove(r);
   }
 
-}
\ No newline at end of file
+}
diff --git a/azkaban-exec-server/src/main/java/azkaban/execapp/ProjectVersion.java b/azkaban-exec-server/src/main/java/azkaban/execapp/ProjectVersion.java
index 0afc7f8..0dc967b 100644
--- a/azkaban-exec-server/src/main/java/azkaban/execapp/ProjectVersion.java
+++ b/azkaban-exec-server/src/main/java/azkaban/execapp/ProjectVersion.java
@@ -18,6 +18,8 @@ package azkaban.execapp;
 
 import java.io.File;
 import java.io.IOException;
+import java.nio.file.Files;
+import java.nio.file.StandardCopyOption;
 import java.util.zip.ZipFile;
 
 import org.apache.commons.io.FileUtils;
@@ -78,8 +80,7 @@ public class ProjectVersion implements Comparable<ProjectVersion> {
           logger.info("Downloading zip file.");
           ZipFile zip = new ZipFile(projectFileHandler.getLocalFile());
           Utils.unzip(zip, tempDir);
-
-          tempDir.renameTo(installedDir);
+          Files.move(tempDir.toPath(), installedDir.toPath(), StandardCopyOption.ATOMIC_MOVE);
         } else {
           throw new IOException("The file type hasn't been decided yet.");
         }