azkaban-developers

Use hard link in execution dir to prevent flow failure where

2/9/2017 5:28:03 AM

Details

.travis.yml 2(+1 -1)

diff --git a/.travis.yml b/.travis.yml
index d1aa00a..bc5b2c5 100644
--- a/.travis.yml
+++ b/.travis.yml
@@ -3,4 +3,4 @@ languages: java
 jdk:
   - oraclejdk8
 sudo: false
-script: ./gradlew clean build
+script: ./gradlew clean build 
diff --git a/azkaban-common/src/main/java/azkaban/utils/FileIOUtils.java b/azkaban-common/src/main/java/azkaban/utils/FileIOUtils.java
index 813de2b..2154ae9 100644
--- a/azkaban-common/src/main/java/azkaban/utils/FileIOUtils.java
+++ b/azkaban-common/src/main/java/azkaban/utils/FileIOUtils.java
@@ -32,12 +32,15 @@ import java.util.Set;
 import java.util.StringTokenizer;
 
 import org.apache.commons.io.IOUtils;
+import org.apache.log4j.Logger;
+
 
 /**
  * Runs a few unix commands. Created this so that I can move to JNI in the
  * future.
  */
 public class FileIOUtils {
+  private final static Logger logger = Logger.getLogger(EmailMessage.class);
 
   public static class PrefixSuffixFileFilter implements FileFilter {
     private String prefix;
@@ -84,9 +87,10 @@ public class FileIOUtils {
   }
 
   /**
-   * Run a unix command that will symlink files, and recurse into directories.
+   * Run a unix command that will hard link files and recurse into directories.
    */
-  public static void createDeepSymlink(File sourceDir, File destDir)
+
+  public static void createDeepHardlink(File sourceDir, File destDir)
       throws IOException {
     if (!sourceDir.exists()) {
       throw new IOException("Source directory " + sourceDir.getPath()
@@ -106,13 +110,17 @@ public class FileIOUtils {
       File sourceLink = new File(sourceDir, path);
       path = "." + path;
 
-      buffer.append("ln -s ").append(sourceLink.getAbsolutePath()).append("/*")
+      buffer.append("ln ").append(sourceLink.getAbsolutePath()).append("/*")
           .append(" ").append(path).append(";");
     }
 
-    String command = buffer.toString();
+    runShellCommand(buffer.toString(), destDir);
+  }
+
+  private static void runShellCommand(String command, File workingDir)
+      throws IOException {
     ProcessBuilder builder = new ProcessBuilder().command("sh", "-c", command);
-    builder.directory(destDir);
+    builder.directory(workingDir);
 
     // XXX what about stopping threads ??
     Process process = builder.start();
@@ -133,16 +141,15 @@ public class FileIOUtils {
 
           throw new IOException(errorMessage);
         }
-
-        // System.out.println(errorLogger.getLastMessages());
       } catch (InterruptedException e) {
-        e.printStackTrace();
+        logger.error(e);
       }
     } finally {
       IOUtils.closeQuietly(process.getInputStream());
       IOUtils.closeQuietly(process.getOutputStream());
       IOUtils.closeQuietly(process.getErrorStream());
     }
+
   }
 
   private static void createDirsFindFiles(File baseDir, File sourceDir,
diff --git a/azkaban-common/src/test/java/azkaban/utils/FileIOUtilsTest.java b/azkaban-common/src/test/java/azkaban/utils/FileIOUtilsTest.java
index 7a4ccde..884580f 100644
--- a/azkaban-common/src/test/java/azkaban/utils/FileIOUtilsTest.java
+++ b/azkaban-common/src/test/java/azkaban/utils/FileIOUtilsTest.java
@@ -16,16 +16,15 @@
 
 package azkaban.utils;
 
-import com.google.common.io.Resources;
-
 import java.io.File;
+import java.io.FileOutputStream;
 import java.io.IOException;
 import java.io.UnsupportedEncodingException;
-import java.net.URL;
-
+import java.util.Arrays;
+import org.apache.commons.io.FileUtils;
+import org.apache.commons.io.comparator.NameFileComparator;
 import org.junit.After;
 import org.junit.Before;
-import org.junit.BeforeClass;
 import org.junit.Test;
 import org.junit.Rule;
 import org.junit.rules.TemporaryFolder;
@@ -35,40 +34,60 @@ import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.assertNotNull;
 
 public class FileIOUtilsTest {
+  private File sourceDir, destDir, baseDir;
   @Rule
   public TemporaryFolder temp = new TemporaryFolder();
 
-  private static File sourceDir;
-
-  private File destDir;
-
-  @BeforeClass
-  public static void createSourceDir() throws Exception {
-    URL resourceUrl = Resources.getResource("project/testjob");
-    assertNotNull(resourceUrl);
-    sourceDir = new File(resourceUrl.toURI());
-  }
-
   @Before
   public void setUp() throws Exception {
-    destDir = temp.newFolder("unixsymlink");
+    // setup base dir
+    baseDir = temp.newFolder("base");
+    File file1 = new File(baseDir.getAbsolutePath()+"/a.out");
+    File file2 = new File(baseDir.getAbsolutePath()+"/testdir");
+    File file3 = new File(file2.getAbsolutePath()+"/b.out");
+    file1.createNewFile();
+    file2.mkdir();
+    file3.createNewFile();
+
+
+    byte[] fileData = new byte[]{1,2,3};
+    FileOutputStream out = new FileOutputStream(file1);
+    out.write(fileData);
+    out.close();
+
+    fileData = new byte[]{2,3,4};
+    out = new FileOutputStream(file3);
+    out.write(fileData);
+    out.close();
+
+    sourceDir = temp.newFolder("src");
+    FileUtils.copyDirectory(baseDir, sourceDir);
+
+    // setup target dir
+    destDir = temp.newFolder("dest");
   }
 
   @After
   public void tearDown() throws Exception {
     temp.delete();
+    FileUtils.deleteDirectory(baseDir);
+    FileUtils.deleteDirectory(sourceDir);
+    FileUtils.deleteDirectory(destDir);
   }
 
   @Test
-  public void testSymlinkCopy() throws IOException {
-    FileIOUtils.createDeepSymlink(sourceDir, destDir);
+  public void testHardlinkCopy() throws IOException {
+    FileIOUtils.createDeepHardlink(sourceDir, destDir);
+    assertTrue(areDirsEqual(sourceDir, destDir, true));
+    FileUtils.deleteDirectory(destDir);
+    assertTrue(areDirsEqual(baseDir, sourceDir, true));
   }
 
   @Test
-  public void testSymlinkCopyNonSource() {
+  public void testHardlinkCopyNonSource() {
     boolean exception = false;
     try {
-      FileIOUtils.createDeepSymlink(new File(sourceDir, "idonotexist"), destDir);
+      FileIOUtils.createDeepHardlink(new File(sourceDir, "idonotexist"), destDir);
     } catch (IOException e) {
       System.out.println(e.getMessage());
       System.out.println("Handled this case nicely.");
@@ -78,6 +97,37 @@ public class FileIOUtilsTest {
     assertTrue(exception);
   }
 
+  private boolean areDirsEqualUtil(File file1, File file2, boolean isRoot, boolean ignoreRoot) throws IOException {
+    if(!file1.getName().equals(file2.getName())) {
+      if(!isRoot && ignoreRoot) return false;
+    }
+    if(file1.isDirectory() && file2.isDirectory()) {
+      if(file1.listFiles().length != file2.listFiles().length) {
+        return false;
+      }
+      File[] fileList1 = file1.listFiles(), fileList2 = file2.listFiles();
+      Arrays.sort(fileList1, NameFileComparator.NAME_COMPARATOR);
+      Arrays.sort(fileList2, NameFileComparator.NAME_COMPARATOR);
+
+      for(int i = 0; i < fileList1.length; i++) {
+        if(!areDirsEqualUtil(fileList1[i], fileList2[i], false, ignoreRoot)) {
+          return false;
+        }
+      }
+      return true;
+    }
+    else if(file1.isFile() && file2.isFile()) {
+      return file1.getName().equals(file2.getName()) && FileUtils.contentEquals(file1, file2);
+    }
+    else return false;
+  }
+
+
+  // check if two dirs are structurally same and contains files of same content
+  private boolean areDirsEqual(File file1, File file2, boolean ignoreRoot) throws IOException {
+    return areDirsEqualUtil(file1, file2, true, ignoreRoot);
+  }
+
   @Test
   public void testAsciiUTF() throws IOException {
     String foreignText = "abcdefghijklmnopqrstuvwxyz";
diff --git a/azkaban-exec-server/src/main/java/azkaban/execapp/FlowRunnerManager.java b/azkaban-exec-server/src/main/java/azkaban/execapp/FlowRunnerManager.java
index 4491dcb..f5d4780 100644
--- a/azkaban-exec-server/src/main/java/azkaban/execapp/FlowRunnerManager.java
+++ b/azkaban-exec-server/src/main/java/azkaban/execapp/FlowRunnerManager.java
@@ -556,7 +556,7 @@ public class FlowRunnerManager implements EventListener,
 
     try {
       projectVersion.setupProjectFiles(projectLoader, projectDirectory, logger);
-      projectVersion.copyCreateSymlinkDirectory(execPath);
+      projectVersion.copyCreateHardlinkDirectory(execPath);
     } catch (Exception e) {
       logger.error("Error in setting up project directory "+projectDirectory+", "+e);
       if (execPath.exists()) {
diff --git a/azkaban-exec-server/src/main/java/azkaban/execapp/ProjectVersion.java b/azkaban-exec-server/src/main/java/azkaban/execapp/ProjectVersion.java
index 0dc967b..a89a58e 100644
--- a/azkaban-exec-server/src/main/java/azkaban/execapp/ProjectVersion.java
+++ b/azkaban-exec-server/src/main/java/azkaban/execapp/ProjectVersion.java
@@ -92,7 +92,7 @@ public class ProjectVersion implements Comparable<ProjectVersion> {
     }
   }
 
-  public synchronized void copyCreateSymlinkDirectory(File executionDir)
+  public synchronized void copyCreateHardlinkDirectory(File executionDir)
       throws IOException {
     if (installedDir == null || !installedDir.exists()) {
       throw new IOException("Installed dir doesn't exist: "
@@ -101,7 +101,7 @@ public class ProjectVersion implements Comparable<ProjectVersion> {
       throw new IOException("Execution dir doesn't exist: "
           + ((executionDir == null) ? null : executionDir.getAbsolutePath()));
     }
-    FileIOUtils.createDeepSymlink(installedDir, executionDir);
+    FileIOUtils.createDeepHardlink(installedDir, executionDir);
   }
 
   public synchronized void deleteDirectory() throws IOException {