Details
diff --git a/az-core/src/main/java/azkaban/Constants.java b/az-core/src/main/java/azkaban/Constants.java
index a85faf6..e781732 100644
--- a/az-core/src/main/java/azkaban/Constants.java
+++ b/az-core/src/main/java/azkaban/Constants.java
@@ -89,7 +89,6 @@ public class Constants {
// The flow exec id for a flow trigger instance unable to trigger a flow yet
public static final int FAILED_EXEC_ID = -2;
-
public static class ConfigurationKeys {
// Configures Azkaban Flow Version in project YAML file
diff --git a/azkaban-common/src/main/java/azkaban/utils/FileIOUtils.java b/azkaban-common/src/main/java/azkaban/utils/FileIOUtils.java
index 8ddecaa..c2e270e 100644
--- a/azkaban-common/src/main/java/azkaban/utils/FileIOUtils.java
+++ b/azkaban-common/src/main/java/azkaban/utils/FileIOUtils.java
@@ -18,6 +18,7 @@ package azkaban.utils;
import java.io.BufferedInputStream;
import java.io.BufferedReader;
+import java.io.BufferedWriter;
import java.io.File;
import java.io.FileFilter;
import java.io.FileInputStream;
@@ -27,9 +28,11 @@ import java.io.InputStreamReader;
import java.io.OutputStream;
import java.nio.charset.StandardCharsets;
import java.nio.file.Files;
+import java.nio.file.Path;
import java.nio.file.Paths;
import java.util.HashMap;
import java.util.HashSet;
+import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.StringTokenizer;
@@ -69,6 +72,39 @@ public class FileIOUtils {
return true;
}
+
+ /**
+ * Dumps a number into a new file.
+ *
+ * @param filePath the target file
+ * @param num the number to dump
+ * @throws IOException if file already exists
+ */
+ public static void dumpNumberToFile(final Path filePath, final long num) throws IOException {
+ try (BufferedWriter writer = Files
+ .newBufferedWriter(filePath, StandardCharsets.UTF_8)) {
+ writer.write(String.valueOf(num));
+ } catch (final IOException e) {
+ logger.error(String.format("Failed to write the number %s to the file %s", num, filePath), e);
+ throw e;
+ }
+ }
+
+ /**
+ * Reads a number from a file.
+ *
+ * @param filePath the target file
+ */
+ public static long readNumberFromFile(final Path filePath)
+ throws IOException, NumberFormatException {
+ final List<String> allLines = Files.readAllLines(filePath);
+ if (!allLines.isEmpty()) {
+ return Long.parseLong(allLines.get(0));
+ } else {
+ throw new NumberFormatException("unable to parse empty file " + filePath.toString());
+ }
+ }
+
public static String getSourcePathFromClass(final Class<?> containedClass) {
File file =
new File(containedClass.getProtectionDomain().getCodeSource()
diff --git a/azkaban-common/src/test/java/azkaban/utils/FileIOUtilsTest.java b/azkaban-common/src/test/java/azkaban/utils/FileIOUtilsTest.java
index 376b573..43565d9 100644
--- a/azkaban-common/src/test/java/azkaban/utils/FileIOUtilsTest.java
+++ b/azkaban-common/src/test/java/azkaban/utils/FileIOUtilsTest.java
@@ -109,6 +109,31 @@ public class FileIOUtilsTest {
FileUtils.deleteDirectory(this.destDir);
}
+
+ private File dumpNumberToTempFile(final String fileName, final long num) throws IOException {
+ final File fileToDump = this.temp.newFile(fileName);
+ FileIOUtils.dumpNumberToFile(fileToDump.toPath(), num);
+ return fileToDump;
+ }
+
+ @Test
+ public void testDumpNumberToFileAndReadFromFile() throws IOException {
+ final String fileName = "number";
+ final long num = 94127;
+ final File fileToDump = dumpNumberToTempFile(fileName, num);
+ assertThat(FileIOUtils.readNumberFromFile(fileToDump.toPath())).isEqualTo(num);
+ }
+
+ @Test
+ public void testDumpNumberToExistingFile() throws IOException {
+ final String fileName = "number";
+ final long firstNum = 94127;
+ final long secondNum = 94128;
+ dumpNumberToTempFile(fileName, firstNum);
+ assertThatThrownBy(() -> dumpNumberToTempFile(fileName, secondNum))
+ .isInstanceOf(IOException.class).hasMessageContaining("already exists");
+ }
+
@Test
public void testHardlinkCopy() throws IOException {
FileIOUtils.createDeepHardlink(this.sourceDir, this.destDir);
diff --git a/azkaban-exec-server/src/main/java/azkaban/execapp/AzkabanExecutorServer.java b/azkaban-exec-server/src/main/java/azkaban/execapp/AzkabanExecutorServer.java
index 7dc82fe..f50f5c9 100644
--- a/azkaban-exec-server/src/main/java/azkaban/execapp/AzkabanExecutorServer.java
+++ b/azkaban-exec-server/src/main/java/azkaban/execapp/AzkabanExecutorServer.java
@@ -44,12 +44,12 @@ import azkaban.metric.MetricReportManager;
import azkaban.metric.inmemoryemitter.InMemoryMetricEmitter;
import azkaban.metrics.MetricsManager;
import azkaban.server.AzkabanServer;
+import azkaban.utils.FileIOUtils;
import azkaban.utils.Props;
import azkaban.utils.StdOutErrRedirect;
import azkaban.utils.Utils;
import com.google.inject.Guice;
import com.google.inject.Injector;
-import java.io.BufferedWriter;
import java.io.File;
import java.io.IOException;
import java.io.InputStream;
@@ -58,7 +58,6 @@ import java.lang.management.ManagementFactory;
import java.lang.reflect.Constructor;
import java.net.InetAddress;
import java.nio.charset.StandardCharsets;
-import java.nio.file.Files;
import java.nio.file.Paths;
import java.util.ArrayList;
import java.util.List;
@@ -275,16 +274,9 @@ public class AzkabanExecutorServer {
private void dumpPortToFile() throws IOException {
// By default this should write to the working directory
- final String portFile = this.props
+ final String portFileName = this.props
.getString(Constants.AZKABAN_EXECUTOR_PORT_FILE, AZKABAN_EXECUTOR_PORT_FILENAME);
- try (BufferedWriter writer = Files
- .newBufferedWriter(Paths.get(portFile), StandardCharsets.UTF_8)) {
- writer.write(String.valueOf(getPort()));
- writer.write("\n");
- } catch (final IOException e) {
- logger.error("Failed to write the port number to a file", e);
- throw e;
- }
+ FileIOUtils.dumpNumberToFile(Paths.get(portFileName), getPort());
}
private void configureJobCallback(final Props props) {
diff --git a/azkaban-exec-server/src/main/java/azkaban/execapp/FlowPreparer.java b/azkaban-exec-server/src/main/java/azkaban/execapp/FlowPreparer.java
index 243a93c..47010d5 100644
--- a/azkaban-exec-server/src/main/java/azkaban/execapp/FlowPreparer.java
+++ b/azkaban-exec-server/src/main/java/azkaban/execapp/FlowPreparer.java
@@ -31,7 +31,10 @@ import com.google.common.annotations.VisibleForTesting;
import java.io.File;
import java.io.IOException;
import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.Paths;
import java.nio.file.StandardCopyOption;
+import java.nio.file.attribute.FileTime;
import java.util.Map;
import java.util.zip.ZipFile;
import org.apache.commons.io.FileUtils;
@@ -40,13 +43,13 @@ import org.apache.log4j.Logger;
public class FlowPreparer {
+ // Name of the file which keeps project directory size
+ static final String PROJECT_DIR_SIZE_FILE_NAME = "___azkaban_project_dir_size_in_bytes___";
private static final Logger log = Logger.getLogger(FlowPreparer.class);
-
// TODO spyne: move to config class
private final File executionsDir;
// TODO spyne: move to config class
private final File projectsDir;
-
private final Map<Pair<Integer, Integer>, ProjectVersion> installedProjects;
private final StorageManager storageManager;
@@ -89,6 +92,21 @@ public class FlowPreparer {
}
/**
+ * Touch the file if it exists.
+ *
+ * @param path path to the target file
+ */
+ @VisibleForTesting
+ void touchIfExists(final Path path) {
+ try {
+ Files.setLastModifiedTime(path, FileTime.fromMillis(System.currentTimeMillis()));
+ } catch (final IOException ex) {
+ log.error(ex);
+ }
+ }
+
+
+ /**
* Prepare the project directory.
*
* @param pv ProjectVersion object
@@ -107,6 +125,8 @@ public class FlowPreparer {
// If directory exists. Assume its prepared and skip.
if (pv.getInstalledDir().exists()) {
log.info("Project already cached. Skipping download. " + pv);
+ touchIfExists(
+ Paths.get(pv.getInstalledDir().getPath(), PROJECT_DIR_SIZE_FILE_NAME));
return;
}
@@ -127,21 +147,36 @@ public class FlowPreparer {
final File zipFile = requireNonNull(projectFileHandler.getLocalFile());
final ZipFile zip = new ZipFile(zipFile);
Utils.unzip(zip, tempDir);
-
+ updateDirSize(tempDir, pv);
Files.move(tempDir.toPath(), pv.getInstalledDir().toPath(), StandardCopyOption.ATOMIC_MOVE);
-
- log.warn(String.format("Project Preparation complete. [%s]", pv));
+ log.warn(String.format("Project preparation completes. [%s]", pv));
} finally {
-
if (projectFileHandler != null) {
projectFileHandler.deleteLocalFile();
}
-
// Clean up: Remove tempDir if exists
FileUtils.deleteDirectory(tempDir);
}
}
+ /**
+ * Creates a file which keeps the size of {@param dir} in bytes inside the {@param dir} and sets
+ * the dirSize for {@param pv}.
+ *
+ * @param dir the directory whose size needs to be kept in the file to be created.
+ * @param pv the projectVersion whose size needs to updated.
+ */
+ private void updateDirSize(final File dir, final ProjectVersion pv) {
+ final long sizeInByte = FileUtils.sizeOfDirectory(dir);
+ pv.setDirSize(sizeInByte);
+ try {
+ FileIOUtils.dumpNumberToFile(Paths.get(dir.getPath(), PROJECT_DIR_SIZE_FILE_NAME),
+ sizeInByte);
+ } catch (final IOException e) {
+ log.error(e);
+ }
+ }
+
private void copyCreateHardlinkDirectory(final File projectDir, final File execDir)
throws IOException {
FileIOUtils.createDeepHardlink(projectDir, execDir);
diff --git a/azkaban-exec-server/src/main/java/azkaban/execapp/ProjectVersion.java b/azkaban-exec-server/src/main/java/azkaban/execapp/ProjectVersion.java
index e5c1407..a97ca35 100644
--- a/azkaban-exec-server/src/main/java/azkaban/execapp/ProjectVersion.java
+++ b/azkaban-exec-server/src/main/java/azkaban/execapp/ProjectVersion.java
@@ -27,6 +27,7 @@ public class ProjectVersion implements Comparable<ProjectVersion> {
private final int version;
private File installedDir;
+ private Long dirSize;
public ProjectVersion(final int projectId, final int version) {
checkArgument(projectId > 0);
@@ -41,6 +42,14 @@ public class ProjectVersion implements Comparable<ProjectVersion> {
this.installedDir = installedDir;
}
+ public Long getDirSize() {
+ return this.dirSize;
+ }
+
+ public void setDirSize(final Long dirSize) {
+ this.dirSize = dirSize;
+ }
+
public int getProjectId() {
return this.projectId;
}
diff --git a/azkaban-exec-server/src/test/java/azkaban/execapp/FlowPreparerTest.java b/azkaban-exec-server/src/test/java/azkaban/execapp/FlowPreparerTest.java
index 84cd4e1..7af3573 100644
--- a/azkaban-exec-server/src/test/java/azkaban/execapp/FlowPreparerTest.java
+++ b/azkaban-exec-server/src/test/java/azkaban/execapp/FlowPreparerTest.java
@@ -17,15 +17,23 @@
package azkaban.execapp;
+import static org.assertj.core.api.Assertions.assertThat;
import static org.junit.Assert.assertTrue;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.doNothing;
import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
import azkaban.executor.ExecutableFlow;
import azkaban.project.ProjectFileHandler;
import azkaban.storage.StorageManager;
+import azkaban.utils.FileIOUtils;
import azkaban.utils.Pair;
import java.io.File;
+import java.nio.file.Paths;
import java.util.HashMap;
import java.util.Map;
import org.apache.commons.io.FileUtils;
@@ -61,8 +69,9 @@ public class FlowPreparerTest {
final StorageManager storageManager = mock(StorageManager.class);
when(storageManager.getProjectFile(12, 34)).thenReturn(projectFileHandler);
- this.instance = new FlowPreparer(storageManager, this.executionsDir, this.projectsDir,
- this.installedProjects);
+ this.instance = spy(new FlowPreparer(storageManager, this.executionsDir, this.projectsDir,
+ this.installedProjects));
+ doNothing().when(this.instance).touchIfExists(any());
}
@After
@@ -77,11 +86,33 @@ public class FlowPreparerTest {
new File(this.projectsDir, "sample_project_01"));
this.instance.setupProject(pv);
+ final long actualDirSize = 259;
+
+ assertThat(pv.getDirSize()).isEqualTo(actualDirSize);
+ assertThat(FileIOUtils.readNumberFromFile(
+ Paths.get(pv.getInstalledDir().getPath(), FlowPreparer.PROJECT_DIR_SIZE_FILE_NAME)))
+ .isEqualTo(actualDirSize);
assertTrue(pv.getInstalledDir().exists());
assertTrue(new File(pv.getInstalledDir(), "sample_flow_01").exists());
}
@Test
+ public void testSetupProjectTouchesTheDirSizeFile() throws Exception {
+ //verifies setup project touches project dir size file.
+ final ProjectVersion pv = new ProjectVersion(12, 34,
+ new File(this.projectsDir, "sample_project_01"));
+
+ //setup project 1st time will not do touch
+ this.instance.setupProject(pv);
+ verify(this.instance, never()).touchIfExists(
+ Paths.get(pv.getInstalledDir().getPath(), FlowPreparer.PROJECT_DIR_SIZE_FILE_NAME));
+
+ this.instance.setupProject(pv);
+ verify(this.instance).touchIfExists(
+ Paths.get(pv.getInstalledDir().getPath(), FlowPreparer.PROJECT_DIR_SIZE_FILE_NAME));
+ }
+
+ @Test
public void testSetupFlow() throws Exception {
final ExecutableFlow executableFlow = mock(ExecutableFlow.class);
when(executableFlow.getExecutionId()).thenReturn(12345);