azkaban-aplcache

Details

diff --git a/azkaban-common/src/main/java/azkaban/utils/FileIOUtils.java b/azkaban-common/src/main/java/azkaban/utils/FileIOUtils.java
index 967e755..7290bad 100644
--- a/azkaban-common/src/main/java/azkaban/utils/FileIOUtils.java
+++ b/azkaban-common/src/main/java/azkaban/utils/FileIOUtils.java
@@ -141,7 +141,7 @@ public class FileIOUtils {
   /**
    * Hard link files and recurse into directories.
    */
-  public static void createDeepHardlink(final File sourceDir, final File destDir)
+  public static int createDeepHardlink(final File sourceDir, final File destDir)
       throws IOException {
     if (!sourceDir.exists()) {
       throw new IOException("Source directory " + sourceDir.getPath()
@@ -156,6 +156,7 @@ public class FileIOUtils {
     final Set<String> paths = new HashSet<>();
     createDirsFindFiles(sourceDir, sourceDir, destDir, paths);
 
+    int linkCount = 0;
     for (String path : paths) {
       final File sourceLink = new File(sourceDir, path);
       path = destDir + path;
@@ -167,9 +168,11 @@ public class FileIOUtils {
           // NOTE!! If modifying this, you must run this ignored test manually to validate:
           // FileIOUtilsTest#testHardlinkCopyOfBigDir
           Files.createLink(linkFile.toPath(), Paths.get(targetFile.getAbsolutePath()));
+          linkCount++;
         }
       }
     }
+    return linkCount;
   }
 
   private static void createDirsFindFiles(final File baseDir, final File sourceDir,
diff --git a/azkaban-common/src/test/java/azkaban/utils/FileIOUtilsTest.java b/azkaban-common/src/test/java/azkaban/utils/FileIOUtilsTest.java
index b6ceb19..9887004 100644
--- a/azkaban-common/src/test/java/azkaban/utils/FileIOUtilsTest.java
+++ b/azkaban-common/src/test/java/azkaban/utils/FileIOUtilsTest.java
@@ -141,9 +141,10 @@ public class FileIOUtilsTest {
 
   @Test
   public void testHardlinkCopy() throws IOException {
-    FileIOUtils.createDeepHardlink(this.sourceDir, this.destDir);
+    final int hardLinkCount = FileIOUtils.createDeepHardlink(this.sourceDir, this.destDir);
     assertThat(areDirsEqual(this.sourceDir, this.destDir, true)).isTrue();
     FileUtils.deleteDirectory(this.destDir);
+    assertThat(hardLinkCount).isEqualTo(5);
     assertThat(areDirsEqual(this.baseDir, this.sourceDir, true)).isTrue();
   }
 
diff --git a/azkaban-exec-server/src/main/java/azkaban/execapp/FlowPreparer.java b/azkaban-exec-server/src/main/java/azkaban/execapp/FlowPreparer.java
index 74f7411..0e943c9 100644
--- a/azkaban-exec-server/src/main/java/azkaban/execapp/FlowPreparer.java
+++ b/azkaban-exec-server/src/main/java/azkaban/execapp/FlowPreparer.java
@@ -105,7 +105,23 @@ public class FlowPreparer {
       }
       pv.setFileCount((int) FileIOUtils.readNumberFromFile(path));
     } catch (final IOException e) {
-      log.error("error when dumping file count to file", e);
+      log.error("error when updating file count", e);
+    }
+  }
+
+
+  /**
+   * check if number of files inside the project dir equals to target
+   */
+  private boolean isFileCountEqual(final ProjectVersion pv, final int target) {
+    final int fileCount;
+    try {
+      final Path path = Paths.get(pv.getInstalledDir().getPath(), PROJECT_DIR_COUNT_FILE_NAME);
+      fileCount = (int) FileIOUtils.readNumberFromFile(path);
+      return fileCount == target;
+    } catch (final IOException e) {
+      log.error(e);
+      return false;
     }
   }
 
@@ -128,7 +144,14 @@ public class FlowPreparer {
       execDir = createExecDir(flow);
 
       // Create the symlinks from the project
-      copyCreateHardlinkDirectory(projectVersion.getInstalledDir(), execDir);
+      final int linkCount = FileIOUtils
+          .createDeepHardlink(projectVersion.getInstalledDir(), execDir);
+
+      if (!isFileCountEqual(projectVersion, linkCount)) {
+        throw new Exception(String.format("File count check failed for execid: %d, project dir %s"
+                + " are being deleted when setting this execution up",
+            flow.getExecutionId(), projectVersion.getInstalledDir()));
+      }
 
       log.info(String
           .format("Flow Preparation complete. [execid: %d, path: %s]", flow.getExecutionId(),
@@ -220,10 +243,6 @@ public class FlowPreparer {
     }
   }
 
-  private void copyCreateHardlinkDirectory(final File projectDir, final File execDir)
-      throws IOException {
-    FileIOUtils.createDeepHardlink(projectDir, execDir);
-  }
 
   private File createExecDir(final ExecutableFlow flow) {
     final int execId = flow.getExecutionId();