azkaban-developers

Flow Preparer Refactor (#951) Changes: - Extracted setupFlow

3/27/2017 5:23:54 PM

Details

diff --git a/azkaban-common/src/main/java/azkaban/utils/FileIOUtils.java b/azkaban-common/src/main/java/azkaban/utils/FileIOUtils.java
index 2154ae9..dbea348 100644
--- a/azkaban-common/src/main/java/azkaban/utils/FileIOUtils.java
+++ b/azkaban-common/src/main/java/azkaban/utils/FileIOUtils.java
@@ -40,7 +40,7 @@ import org.apache.log4j.Logger;
  * future.
  */
 public class FileIOUtils {
-  private final static Logger logger = Logger.getLogger(EmailMessage.class);
+  private final static Logger logger = Logger.getLogger(FileIOUtils.class);
 
   public static class PrefixSuffixFileFilter implements FileFilter {
     private String prefix;
diff --git a/azkaban-exec-server/build.gradle b/azkaban-exec-server/build.gradle
index 50feba0..6c2c40d 100644
--- a/azkaban-exec-server/build.gradle
+++ b/azkaban-exec-server/build.gradle
@@ -5,7 +5,7 @@ dependencies {
   compile('org.apache.kafka:kafka-log4j-appender:0.10.0.0')
   compile('com.googlecode.json-simple:json-simple:1.1.1')
 
-  testCompile('org.hamcrest:hamcrest-all:1.3')
+  testCompile(project(path: ':azkaban-common', configuration: 'testCompile'))
   testCompile(project(':azkaban-common').sourceSets.test.output)
   testCompile('com.google.guava:guava:13.0.1')
 }
diff --git a/azkaban-exec-server/src/main/java/azkaban/execapp/FlowPreparer.java b/azkaban-exec-server/src/main/java/azkaban/execapp/FlowPreparer.java
new file mode 100644
index 0000000..e2b137c
--- /dev/null
+++ b/azkaban-exec-server/src/main/java/azkaban/execapp/FlowPreparer.java
@@ -0,0 +1,180 @@
+/*
+ * Copyright 2017 LinkedIn Corp.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ *
+ */
+
+package azkaban.execapp;
+
+import azkaban.executor.ExecutableFlow;
+import azkaban.project.ProjectFileHandler;
+import azkaban.project.ProjectLoader;
+import azkaban.project.ProjectManagerException;
+import azkaban.utils.FileIOUtils;
+import azkaban.utils.Pair;
+import azkaban.utils.Utils;
+import com.google.common.annotations.VisibleForTesting;
+import java.io.File;
+import java.io.IOException;
+import java.nio.file.Files;
+import java.nio.file.StandardCopyOption;
+import java.util.Map;
+import java.util.zip.ZipFile;
+import org.apache.commons.io.FileUtils;
+import org.apache.log4j.Logger;
+
+import static com.google.common.base.Preconditions.*;
+import static java.util.Objects.*;
+
+
+public class FlowPreparer {
+  private static final Logger log = Logger.getLogger(FlowPreparer.class);
+
+  // TODO move to config class
+  private final File executionsDir;
+  // TODO move to config class
+  private final File projectsDir;
+
+  private final Map<Pair<Integer, Integer>, ProjectVersion> installedProjects;
+  private final ProjectLoader projectLoader;
+
+  public FlowPreparer(ProjectLoader projectLoader,
+      File executionsDir,
+      File projectsDir,
+      Map<Pair<Integer, Integer>, ProjectVersion> installedProjects) {
+    this.projectLoader = projectLoader;
+    this.executionsDir = executionsDir;
+    this.projectsDir = projectsDir;
+    this.installedProjects = installedProjects;
+  }
+
+  /**
+   * Prepare the flow directory for execution.
+   *
+   * @param flow Executable Flow instance.
+   */
+  void setup(ExecutableFlow flow) {
+    File execDir = null;
+    try {
+      // First get the ProjectVersion
+      final ProjectVersion projectVersion = getProjectVersion(flow);
+
+      // Setup the project
+      setupProject(projectVersion);
+
+      // Create the execution directory
+      execDir = createExecDir(flow);
+
+      // Create the symlinks from the project
+      copyCreateHardlinkDirectory(projectVersion.getInstalledDir(), execDir);
+
+      log.info(String.format("Flow Preparation complete. [execid: %d, path: %s]",
+          flow.getExecutionId(), execDir.getPath()));
+    } catch (Exception e) {
+      log.error("Error in setting up project directory: " + projectsDir + ", Exception: " + e);
+      cleanup(execDir);
+      throw new RuntimeException(e);
+    }
+  }
+
+  /**
+   * Prepare the project directory.
+   *
+   * @param pv ProjectVersion object
+   * @throws ProjectManagerException
+   * @throws IOException
+   */
+  @VisibleForTesting
+  void setupProject(final ProjectVersion pv)
+      throws ProjectManagerException, IOException {
+    final int projectId = pv.getProjectId();
+    final int version = pv.getVersion();
+
+    final String projectDir = String.valueOf(projectId) + "." + String.valueOf(version);
+    if (pv.getInstalledDir() == null) {
+      pv.setInstalledDir(new File(projectsDir, projectDir));
+    }
+
+    // If directory exists. Assume its prepared and skip.
+    if (pv.getInstalledDir().exists()) {
+      log.info("Project already cached. Skipping download. " + pv);
+      return;
+    }
+
+    log.info("Preparing Project: " + pv);
+
+    File tempDir = new File(projectsDir, "_temp." + projectDir + "." + System.currentTimeMillis());
+
+    // TODO Why mkdirs? This path should be already set up.
+    tempDir.mkdirs();
+
+    ProjectFileHandler projectFileHandler = null;
+    try {
+      projectFileHandler = requireNonNull(projectLoader.getUploadedFile(projectId, version));
+      checkState("zip".equals(projectFileHandler.getFileType()));
+
+      log.info("Downloading zip file.");
+      final File zipFile = requireNonNull(projectFileHandler.getLocalFile());
+      final ZipFile zip = new ZipFile(zipFile);
+      Utils.unzip(zip, tempDir);
+
+      Files.move(tempDir.toPath(), pv.getInstalledDir().toPath(), StandardCopyOption.ATOMIC_MOVE);
+
+      log.warn(String.format("Project Preparation complete. [%s]", pv));
+    } finally {
+
+      if (projectFileHandler != null) {
+        projectFileHandler.deleteLocalFile();
+      }
+
+      // Clean up: Remove tempDir if exists
+      FileUtils.deleteDirectory(tempDir);
+    }
+  }
+
+  private void copyCreateHardlinkDirectory(File projectDir, File execDir) throws IOException {
+    FileIOUtils.createDeepHardlink(projectDir, execDir);
+  }
+
+  private File createExecDir(ExecutableFlow flow) {
+    final int execId = flow.getExecutionId();
+    File execDir = new File(executionsDir, String.valueOf(execId));
+    flow.setExecutionPath(execDir.getPath());
+
+    // TODO Why mkdirs? This path should be already set up.
+    execDir.mkdirs();
+    return execDir;
+  }
+
+  private ProjectVersion getProjectVersion(ExecutableFlow flow) {
+    // We're setting up the installed projects. First time, it may take a while
+    // to set up.
+    final ProjectVersion projectVersion;
+    synchronized (installedProjects) {
+      projectVersion = installedProjects.computeIfAbsent(new Pair<>(flow.getProjectId(), flow.getVersion()),
+          k -> new ProjectVersion(flow.getProjectId(), flow.getVersion()));
+    }
+    return projectVersion;
+  }
+
+  private void cleanup(File execDir) {
+    if (execDir != null) {
+      try {
+        FileUtils.deleteDirectory(execDir);
+      } catch (IOException e) {
+        throw new RuntimeException(e);
+      }
+    }
+  }
+}
diff --git a/azkaban-exec-server/src/main/java/azkaban/execapp/FlowRunnerManager.java b/azkaban-exec-server/src/main/java/azkaban/execapp/FlowRunnerManager.java
index f5d4780..9a77d21 100644
--- a/azkaban-exec-server/src/main/java/azkaban/execapp/FlowRunnerManager.java
+++ b/azkaban-exec-server/src/main/java/azkaban/execapp/FlowRunnerManager.java
@@ -112,6 +112,7 @@ public class FlowRunnerManager implements EventListener,
   private final ExecutorLoader executorLoader;
   private final ProjectLoader projectLoader;
   private final JobTypeManager jobtypeManager;
+  private final FlowPreparer flowPreparer;
 
   private final Props azkabanProps;
   private final File executionDirectory;
@@ -165,6 +166,9 @@ public class FlowRunnerManager implements EventListener,
     numJobThreadPerFlow = props.getInt(FLOW_NUM_JOB_THREADS, DEFAULT_FLOW_NUM_JOB_TREADS);
     executorService = createExecutorService(numThreads);
 
+    // Create a flow preparer
+    flowPreparer = new FlowPreparer(projectLoader, executionDirectory, projectDirectory, installedProjects);
+
     this.executorLoader = executorLoader;
     this.projectLoader = projectLoader;
 
@@ -413,11 +417,11 @@ public class FlowRunnerManager implements EventListener,
             try {
               logger.info("Removing old unused installed project "
                   + version.getProjectId() + ":" + version.getVersion());
-              version.deleteDirectory();
+              deleteDirectory(version);
               installedProjects.remove(new Pair<Integer, Integer>(version
                   .getProjectId(), version.getVersion()));
             } catch (IOException e) {
-              e.printStackTrace();
+              logger.error(e);
             }
 
             installedVersions.remove(versionKey);
@@ -427,6 +431,16 @@ public class FlowRunnerManager implements EventListener,
     }
   }
 
+  public void deleteDirectory(ProjectVersion pv) throws IOException {
+    synchronized (pv) {
+      logger.warn("Deleting project: " + pv);
+      final File installedDir = pv.getInstalledDir();
+      if (installedDir != null && installedDir.exists()) {
+        FileUtils.deleteDirectory(installedDir);
+      }
+    }
+  }
+
   public void submitFlow(int execId) throws ExecutorManagerException {
     // Load file and submit
     if (runningFlows.containsKey(execId)) {
@@ -442,7 +456,7 @@ public class FlowRunnerManager implements EventListener,
     }
 
     // Sets up the project files and execution directory.
-    setupFlow(flow);
+    flowPreparer.setup(flow);
 
     // Setup flow runner
     FlowWatcher watcher = null;
@@ -530,46 +544,6 @@ public class FlowRunnerManager implements EventListener,
 
   }
 
-  private void setupFlow(ExecutableFlow flow) throws ExecutorManagerException {
-    int execId = flow.getExecutionId();
-    File execPath = new File(executionDirectory, String.valueOf(execId));
-    flow.setExecutionPath(execPath.getPath());
-    logger
-        .info("Flow " + execId + " submitted with path " + execPath.getPath());
-    execPath.mkdirs();
-
-    // We're setting up the installed projects. First time, it may take a while
-    // to set up.
-    Pair<Integer, Integer> projectVersionKey =
-        new Pair<Integer, Integer>(flow.getProjectId(), flow.getVersion());
-
-    // We set up project versions this way
-    ProjectVersion projectVersion = null;
-    synchronized (installedProjects) {
-      projectVersion = installedProjects.get(projectVersionKey);
-      if (projectVersion == null) {
-        projectVersion =
-            new ProjectVersion(flow.getProjectId(), flow.getVersion());
-        installedProjects.put(projectVersionKey, projectVersion);
-      }
-    }
-
-    try {
-      projectVersion.setupProjectFiles(projectLoader, projectDirectory, logger);
-      projectVersion.copyCreateHardlinkDirectory(execPath);
-    } catch (Exception e) {
-      logger.error("Error in setting up project directory "+projectDirectory+", "+e);
-      if (execPath.exists()) {
-        try {
-          FileUtils.deleteDirectory(execPath);
-        } catch (IOException e1) {
-          e1.printStackTrace();
-        }
-      }
-      throw new ExecutorManagerException(e);
-    }
-  }
-
   public void cancelFlow(int execId, String user)
       throws ExecutorManagerException {
     FlowRunner runner = runningFlows.get(execId);
diff --git a/azkaban-exec-server/src/main/java/azkaban/execapp/ProjectVersion.java b/azkaban-exec-server/src/main/java/azkaban/execapp/ProjectVersion.java
index a89a58e..a52d6e9 100644
--- a/azkaban-exec-server/src/main/java/azkaban/execapp/ProjectVersion.java
+++ b/azkaban-exec-server/src/main/java/azkaban/execapp/ProjectVersion.java
@@ -16,34 +16,29 @@
 
 package azkaban.execapp;
 
+import com.google.common.base.Preconditions;
 import java.io.File;
-import java.io.IOException;
-import java.nio.file.Files;
-import java.nio.file.StandardCopyOption;
-import java.util.zip.ZipFile;
-
-import org.apache.commons.io.FileUtils;
 import org.apache.log4j.Logger;
 
-import azkaban.project.ProjectFileHandler;
-import azkaban.project.ProjectLoader;
-import azkaban.project.ProjectManagerException;
-import azkaban.utils.FileIOUtils;
-import azkaban.utils.Utils;
+import static com.google.common.base.Preconditions.*;
+
 
 public class ProjectVersion implements Comparable<ProjectVersion> {
   private final int projectId;
   private final int version;
+
   private File installedDir;
 
   public ProjectVersion(int projectId, int version) {
+    checkArgument(projectId > 0);
+    checkArgument(version > 0);
+
     this.projectId = projectId;
     this.version = version;
   }
 
   public ProjectVersion(int projectId, int version, File installedDir) {
-    this.projectId = projectId;
-    this.version = version;
+    this(projectId, version);
     this.installedDir = installedDir;
   }
 
@@ -55,60 +50,12 @@ public class ProjectVersion implements Comparable<ProjectVersion> {
     return version;
   }
 
-  public synchronized void setupProjectFiles(ProjectLoader projectLoader,
-      File projectDir, Logger logger) throws ProjectManagerException,
-      IOException {
-    String projectVersion =
-        String.valueOf(projectId) + "." + String.valueOf(version);
-    if (installedDir == null) {
-      installedDir = new File(projectDir, projectVersion);
-    }
-
-    if (!installedDir.exists()) {
-
-      logger.info("First time executing new project. Setting up in directory "
-          + installedDir.getPath());
-
-      File tempDir =
-          new File(projectDir, "_temp." + projectVersion + "."
-              + System.currentTimeMillis());
-      tempDir.mkdirs();
-      ProjectFileHandler projectFileHandler = null;
-      try {
-        projectFileHandler = projectLoader.getUploadedFile(projectId, version);
-        if ("zip".equals(projectFileHandler.getFileType())) {
-          logger.info("Downloading zip file.");
-          ZipFile zip = new ZipFile(projectFileHandler.getLocalFile());
-          Utils.unzip(zip, tempDir);
-          Files.move(tempDir.toPath(), installedDir.toPath(), StandardCopyOption.ATOMIC_MOVE);
-        } else {
-          throw new IOException("The file type hasn't been decided yet.");
-        }
-      } finally {
-        if (projectFileHandler != null) {
-          projectFileHandler.deleteLocalFile();
-        }
-      }
-    }
+  public File getInstalledDir() {
+    return installedDir;
   }
 
-  public synchronized void copyCreateHardlinkDirectory(File executionDir)
-      throws IOException {
-    if (installedDir == null || !installedDir.exists()) {
-      throw new IOException("Installed dir doesn't exist: "
-          + ((installedDir == null) ? null : installedDir.getAbsolutePath()));
-    } else if (executionDir == null || !executionDir.exists()) {
-      throw new IOException("Execution dir doesn't exist: "
-          + ((executionDir == null) ? null : executionDir.getAbsolutePath()));
-    }
-    FileIOUtils.createDeepHardlink(installedDir, executionDir);
-  }
-
-  public synchronized void deleteDirectory() throws IOException {
-    System.out.println("Deleting old unused project versin " + installedDir);
-    if (installedDir != null && installedDir.exists()) {
-      FileUtils.deleteDirectory(installedDir);
-    }
+  public void setInstalledDir(File installedDir) {
+    this.installedDir = installedDir;
   }
 
   @Override
@@ -119,4 +66,10 @@ public class ProjectVersion implements Comparable<ProjectVersion> {
 
     return projectId - o.projectId;
   }
+
+  @Override
+  public String toString() {
+    return "ProjectVersion{" + "projectId=" + projectId + ", version=" + version + ", installedDir=" + installedDir
+        + '}';
+  }
 }
diff --git a/azkaban-exec-server/src/test/java/azkaban/execapp/FlowPreparerTest.java b/azkaban-exec-server/src/test/java/azkaban/execapp/FlowPreparerTest.java
new file mode 100644
index 0000000..495d496
--- /dev/null
+++ b/azkaban-exec-server/src/test/java/azkaban/execapp/FlowPreparerTest.java
@@ -0,0 +1,93 @@
+/*
+ * Copyright 2017 LinkedIn Corp.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ *
+ */
+
+package azkaban.execapp;
+
+import azkaban.executor.ExecutableFlow;
+import azkaban.project.ProjectFileHandler;
+import azkaban.project.ProjectLoader;
+import azkaban.utils.Pair;
+import java.io.File;
+import java.util.HashMap;
+import java.util.Map;
+import org.apache.commons.io.FileUtils;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+import static org.junit.Assert.*;
+import static org.mockito.Mockito.*;
+
+
+public class FlowPreparerTest {
+  public static final String SAMPLE_FLOW_01 = "sample_flow_01";
+
+  final File executionsDir = new File("executions");
+  final File projectsDir = new File("projects");
+  final Map<Pair<Integer, Integer>, ProjectVersion> installedProjects = new HashMap<>();
+
+  private FlowPreparer instance;
+
+  @Before
+  public void setUp() throws Exception {
+    tearDown();
+
+    executionsDir.mkdirs();
+    projectsDir.mkdirs();
+
+
+    ClassLoader classLoader = getClass().getClassLoader();
+    File file = new File(classLoader.getResource(SAMPLE_FLOW_01 + ".zip").getFile());
+
+    ProjectFileHandler projectFileHandler = mock(ProjectFileHandler.class);
+    when(projectFileHandler.getFileType()).thenReturn("zip");
+    when(projectFileHandler.getLocalFile()).thenReturn(file);
+
+    ProjectLoader projectLoader = mock(ProjectLoader.class);
+    when(projectLoader.getUploadedFile(12, 34)).thenReturn(projectFileHandler);
+
+    instance = new FlowPreparer(projectLoader, executionsDir, projectsDir, installedProjects);
+  }
+
+  @After
+  public void tearDown() throws Exception {
+    FileUtils.deleteDirectory(executionsDir);
+    FileUtils.deleteDirectory(projectsDir);
+  }
+
+  @Test
+  public void testSetupProject() throws Exception {
+    ProjectVersion pv = new ProjectVersion(12, 34, new File(projectsDir, "sample_project_01"));
+    instance.setupProject(pv);
+
+    assertTrue(pv.getInstalledDir().exists());
+    assertTrue(new File(pv.getInstalledDir(), "sample_flow_01").exists());
+  }
+
+  @Test
+  public void testSetupFlow() throws Exception {
+    ExecutableFlow executableFlow = mock(ExecutableFlow.class);
+    when(executableFlow.getExecutionId()).thenReturn(12345);
+    when(executableFlow.getProjectId()).thenReturn(12);
+    when(executableFlow.getVersion()).thenReturn(34);
+
+    instance.setup(executableFlow);
+    File execDir = new File(executionsDir, "12345");
+    assertTrue(execDir.exists());
+    assertTrue(new File(execDir, SAMPLE_FLOW_01).exists());
+  }
+}
diff --git a/azkaban-exec-server/src/test/resources/sample_flow_01.zip b/azkaban-exec-server/src/test/resources/sample_flow_01.zip
new file mode 100644
index 0000000..1147976
Binary files /dev/null and b/azkaban-exec-server/src/test/resources/sample_flow_01.zip differ
diff --git a/azkaban-web-server/build.gradle b/azkaban-web-server/build.gradle
index 113c35f..98c1958 100644
--- a/azkaban-web-server/build.gradle
+++ b/azkaban-web-server/build.gradle
@@ -54,9 +54,9 @@ dependencies {
   compile('com.linkedin.pegasus:restli-server:' + pegasusVersion)
   compile('org.apache.velocity:velocity-tools:2.0')
 
+  testCompile(project(path: ':azkaban-common', configuration: 'testCompile'))
   testCompile('commons-collections:commons-collections:3.2.2')
   testCompile('org.hamcrest:hamcrest-all:1.3')
-  testCompile('org.mockito:mockito-all:1.10.19')
 
   //AZ web module tests need to access classes defined in azkaban-common test module
   testCompile project(':azkaban-common').sourceSets.test.output