Details
diff --git a/azkaban-common/src/main/java/azkaban/utils/FileIOUtils.java b/azkaban-common/src/main/java/azkaban/utils/FileIOUtils.java
index 967e755..7290bad 100644
--- a/azkaban-common/src/main/java/azkaban/utils/FileIOUtils.java
+++ b/azkaban-common/src/main/java/azkaban/utils/FileIOUtils.java
@@ -141,7 +141,7 @@ public class FileIOUtils {
/**
* Hard link files and recurse into directories.
*/
- public static void createDeepHardlink(final File sourceDir, final File destDir)
+ public static int createDeepHardlink(final File sourceDir, final File destDir)
throws IOException {
if (!sourceDir.exists()) {
throw new IOException("Source directory " + sourceDir.getPath()
@@ -156,6 +156,7 @@ public class FileIOUtils {
final Set<String> paths = new HashSet<>();
createDirsFindFiles(sourceDir, sourceDir, destDir, paths);
+ int linkCount = 0;
for (String path : paths) {
final File sourceLink = new File(sourceDir, path);
path = destDir + path;
@@ -167,9 +168,11 @@ public class FileIOUtils {
// NOTE!! If modifying this, you must run this ignored test manually to validate:
// FileIOUtilsTest#testHardlinkCopyOfBigDir
Files.createLink(linkFile.toPath(), Paths.get(targetFile.getAbsolutePath()));
+ linkCount++;
}
}
}
+ return linkCount;
}
private static void createDirsFindFiles(final File baseDir, final File sourceDir,
diff --git a/azkaban-common/src/test/java/azkaban/utils/FileIOUtilsTest.java b/azkaban-common/src/test/java/azkaban/utils/FileIOUtilsTest.java
index b6ceb19..9887004 100644
--- a/azkaban-common/src/test/java/azkaban/utils/FileIOUtilsTest.java
+++ b/azkaban-common/src/test/java/azkaban/utils/FileIOUtilsTest.java
@@ -141,9 +141,10 @@ public class FileIOUtilsTest {
@Test
public void testHardlinkCopy() throws IOException {
- FileIOUtils.createDeepHardlink(this.sourceDir, this.destDir);
+ final int hardLinkCount = FileIOUtils.createDeepHardlink(this.sourceDir, this.destDir);
assertThat(areDirsEqual(this.sourceDir, this.destDir, true)).isTrue();
FileUtils.deleteDirectory(this.destDir);
+ assertThat(hardLinkCount).isEqualTo(5);
assertThat(areDirsEqual(this.baseDir, this.sourceDir, true)).isTrue();
}
diff --git a/azkaban-exec-server/src/main/java/azkaban/execapp/FlowPreparer.java b/azkaban-exec-server/src/main/java/azkaban/execapp/FlowPreparer.java
index 74f7411..0e943c9 100644
--- a/azkaban-exec-server/src/main/java/azkaban/execapp/FlowPreparer.java
+++ b/azkaban-exec-server/src/main/java/azkaban/execapp/FlowPreparer.java
@@ -105,7 +105,23 @@ public class FlowPreparer {
}
pv.setFileCount((int) FileIOUtils.readNumberFromFile(path));
} catch (final IOException e) {
- log.error("error when dumping file count to file", e);
+ log.error("error when updating file count", e);
+ }
+ }
+
+
+ /**
+ * check if number of files inside the project dir equals to target
+ */
+ private boolean isFileCountEqual(final ProjectVersion pv, final int target) {
+ final int fileCount;
+ try {
+ final Path path = Paths.get(pv.getInstalledDir().getPath(), PROJECT_DIR_COUNT_FILE_NAME);
+ fileCount = (int) FileIOUtils.readNumberFromFile(path);
+ return fileCount == target;
+ } catch (final IOException e) {
+ log.error(e);
+ return false;
}
}
@@ -128,7 +144,14 @@ public class FlowPreparer {
execDir = createExecDir(flow);
// Create the symlinks from the project
- copyCreateHardlinkDirectory(projectVersion.getInstalledDir(), execDir);
+ final int linkCount = FileIOUtils
+ .createDeepHardlink(projectVersion.getInstalledDir(), execDir);
+
+ if (!isFileCountEqual(projectVersion, linkCount)) {
+ throw new Exception(String.format("File count check failed for execid: %d, project dir %s"
+ + " are being deleted when setting this execution up",
+ flow.getExecutionId(), projectVersion.getInstalledDir()));
+ }
log.info(String
.format("Flow Preparation complete. [execid: %d, path: %s]", flow.getExecutionId(),
@@ -220,10 +243,6 @@ public class FlowPreparer {
}
}
- private void copyCreateHardlinkDirectory(final File projectDir, final File execDir)
- throws IOException {
- FileIOUtils.createDeepHardlink(projectDir, execDir);
- }
private File createExecDir(final ExecutableFlow flow) {
final int execId = flow.getExecutionId();