azkaban-aplcache
Changes
src/main/java/azkaban/utils/FileIOUtils.java 39(+23 -16)
src/main/java/azkaban/utils/Utils.java 43(+29 -14)
Details
diff --git a/src/main/java/azkaban/executor/JdbcExecutorLoader.java b/src/main/java/azkaban/executor/JdbcExecutorLoader.java
index f7c5c40..cb8e00e 100644
--- a/src/main/java/azkaban/executor/JdbcExecutorLoader.java
+++ b/src/main/java/azkaban/executor/JdbcExecutorLoader.java
@@ -768,37 +768,40 @@ public class JdbcExecutorLoader extends AbstractJdbcLoader
int pos = 0;
int length = buffer.length;
int startByte = 0;
- BufferedInputStream bufferedStream = null;
try {
for (int i = 0; i < files.length; ++i) {
File file = files[i];
- bufferedStream = new BufferedInputStream(new FileInputStream(file));
- int size = bufferedStream.read(buffer, pos, length);
- while (size >= 0) {
- if (pos + size == buffer.length) {
- // Flush here.
- uploadLogPart(
- connection,
- execId,
- name,
- attempt,
- startByte,
- startByte + buffer.length,
- encType,
- buffer,
- buffer.length);
-
- pos = 0;
- length = buffer.length;
- startByte += buffer.length;
- }
- else {
- // Usually end of file.
- pos += size;
- length = buffer.length - pos;
+ BufferedInputStream bufferedStream = new BufferedInputStream(new FileInputStream(file));
+ try {
+ int size = bufferedStream.read(buffer, pos, length);
+ while (size >= 0) {
+ if (pos + size == buffer.length) {
+ // Flush here.
+ uploadLogPart(
+ connection,
+ execId,
+ name,
+ attempt,
+ startByte,
+ startByte + buffer.length,
+ encType,
+ buffer,
+ buffer.length);
+
+ pos = 0;
+ length = buffer.length;
+ startByte += buffer.length;
+ }
+ else {
+ // Usually end of file.
+ pos += size;
+ length = buffer.length - pos;
+ }
+ size = bufferedStream.read(buffer, pos, length);
}
- size = bufferedStream.read(buffer, pos, length);
+ } finally {
+ IOUtils.closeQuietly(bufferedStream);
}
}
@@ -822,9 +825,6 @@ public class JdbcExecutorLoader extends AbstractJdbcLoader
catch (IOException e) {
throw new ExecutorManagerException("Error chunking", e);
}
- finally {
- IOUtils.closeQuietly(bufferedStream);
- }
}
private void uploadLogPart(
diff --git a/src/main/java/azkaban/jobExecutor/utils/process/AzkabanProcess.java b/src/main/java/azkaban/jobExecutor/utils/process/AzkabanProcess.java
index 2e5de64..01576d4 100644
--- a/src/main/java/azkaban/jobExecutor/utils/process/AzkabanProcess.java
+++ b/src/main/java/azkaban/jobExecutor/utils/process/AzkabanProcess.java
@@ -25,6 +25,7 @@ import java.util.Map;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
+import org.apache.commons.io.IOUtils;
import org.apache.log4j.Level;
import org.apache.log4j.Logger;
@@ -72,35 +73,41 @@ public class AzkabanProcess {
builder.directory(new File(workingDir));
builder.environment().putAll(env);
this.process = builder.start();
- this.processId = processId(process);
- if (processId == 0) {
- logger.debug("Spawned thread with unknown process id");
- } else {
- logger.debug("Spawned thread with process id " + processId);
- }
-
- this.startupLatch.countDown();
-
- LogGobbler outputGobbler = new LogGobbler(new InputStreamReader(process.getInputStream()), logger, Level.INFO, 30);
- LogGobbler errorGobbler = new LogGobbler(new InputStreamReader(process.getErrorStream()), logger, Level.ERROR, 30);
-
- outputGobbler.start();
- errorGobbler.start();
- int exitCode = -1;
try {
- exitCode = process.waitFor();
- } catch (InterruptedException e) {
- logger.info("Process interrupted. Exit code is " + exitCode, e);
- }
-
- completeLatch.countDown();
- if (exitCode != 0) {
- throw new ProcessFailureException(exitCode, errorGobbler.getRecentLog());
+ this.processId = processId(process);
+ if (processId == 0) {
+ logger.debug("Spawned thread with unknown process id");
+ } else {
+ logger.debug("Spawned thread with process id " + processId);
+ }
+
+ this.startupLatch.countDown();
+
+ LogGobbler outputGobbler = new LogGobbler(new InputStreamReader(process.getInputStream()), logger, Level.INFO, 30);
+ LogGobbler errorGobbler = new LogGobbler(new InputStreamReader(process.getErrorStream()), logger, Level.ERROR, 30);
+
+ outputGobbler.start();
+ errorGobbler.start();
+ int exitCode = -1;
+ try {
+ exitCode = process.waitFor();
+ } catch (InterruptedException e) {
+ logger.info("Process interrupted. Exit code is " + exitCode, e);
+ }
+
+ completeLatch.countDown();
+ if (exitCode != 0) {
+ throw new ProcessFailureException(exitCode, errorGobbler.getRecentLog());
+ }
+
+ // try to wait for everything to get logged out before exiting
+ outputGobbler.awaitCompletion(5000);
+ errorGobbler.awaitCompletion(5000);
+ } finally {
+ IOUtils.closeQuietly(process.getInputStream());
+ IOUtils.closeQuietly(process.getOutputStream());
+ IOUtils.closeQuietly(process.getErrorStream());
}
-
- // try to wait for everything to get logged out before exiting
- outputGobbler.awaitCompletion(5000);
- errorGobbler.awaitCompletion(5000);
}
/**
diff --git a/src/main/java/azkaban/project/JdbcProjectLoader.java b/src/main/java/azkaban/project/JdbcProjectLoader.java
index e61c2d7..c3cf424 100644
--- a/src/main/java/azkaban/project/JdbcProjectLoader.java
+++ b/src/main/java/azkaban/project/JdbcProjectLoader.java
@@ -305,18 +305,16 @@ public class JdbcProjectLoader extends AbstractJdbcLoader implements ProjectLoad
runner.update(connection, INSERT_PROJECT_FILES, project.getId(), version, chunk, size, buf);
logger.info("Finished update for " + filename + " chunk " + chunk);
} catch (SQLException e) {
- IOUtils.closeQuietly(bufferedStream);
throw new ProjectManagerException("Error chunking", e);
}
++chunk;
size = bufferedStream.read(buffer);
}
-
- bufferedStream.close();
} catch (IOException e) {
- IOUtils.closeQuietly(bufferedStream);
throw new ProjectManagerException("Error chunking file " + filename);
+ } finally {
+ IOUtils.closeQuietly(bufferedStream);
}
final String INSERT_PROJECT_VERSION =
@@ -380,45 +378,45 @@ public class JdbcProjectLoader extends AbstractJdbcLoader implements ProjectLoad
BufferedOutputStream bStream = null;
File file = null;
try {
- file = File.createTempFile(projHandler.getFileName(), String.valueOf(version), tempDir);
-
- bStream = new BufferedOutputStream(new FileOutputStream(file));
- }
- catch (IOException e) {
- IOUtils.closeQuietly(bStream);
- throw new ProjectManagerException("Error creating temp file for stream.");
- }
-
- int collect = 5;
- int fromChunk = 0;
- int toChunk = collect;
- do {
- ProjectFileChunkResultHandler chunkHandler = new ProjectFileChunkResultHandler();
- List<byte[]> data = null;
- try {
- data = runner.query(connection, ProjectFileChunkResultHandler.SELECT_PROJECT_CHUNKS_FILE, chunkHandler, projectId, version, fromChunk, toChunk);
- }
- catch(SQLException e) {
- logger.error(e);
- IOUtils.closeQuietly(bStream);
- throw new ProjectManagerException("Query for uploaded file for " + projectId + " failed.", e);
- }
-
try {
- for (byte[] d : data) {
- bStream.write(d);
- }
+ file = File.createTempFile(projHandler.getFileName(), String.valueOf(version), tempDir);
+
+ bStream = new BufferedOutputStream(new FileOutputStream(file));
}
catch (IOException e) {
- IOUtils.closeQuietly(bStream);
- throw new ProjectManagerException("Error writing file", e);
+ throw new ProjectManagerException("Error creating temp file for stream.");
}
- // Add all the bytes to the stream.
- fromChunk += collect;
- toChunk += collect;
- } while (fromChunk <= numChunks);
- IOUtils.closeQuietly(bStream);
+ int collect = 5;
+ int fromChunk = 0;
+ int toChunk = collect;
+ do {
+ ProjectFileChunkResultHandler chunkHandler = new ProjectFileChunkResultHandler();
+ List<byte[]> data = null;
+ try {
+ data = runner.query(connection, ProjectFileChunkResultHandler.SELECT_PROJECT_CHUNKS_FILE, chunkHandler, projectId, version, fromChunk, toChunk);
+ }
+ catch(SQLException e) {
+ logger.error(e);
+ throw new ProjectManagerException("Query for uploaded file for " + projectId + " failed.", e);
+ }
+
+ try {
+ for (byte[] d : data) {
+ bStream.write(d);
+ }
+ }
+ catch (IOException e) {
+ throw new ProjectManagerException("Error writing file", e);
+ }
+
+ // Add all the bytes to the stream.
+ fromChunk += collect;
+ toChunk += collect;
+ } while (fromChunk <= numChunks);
+ } finally {
+ IOUtils.closeQuietly(bStream);
+ }
// Check md5.
byte[] md5 = null;
diff --git a/src/main/java/azkaban/scheduler/ScheduleStatisticManager.java b/src/main/java/azkaban/scheduler/ScheduleStatisticManager.java
index d5de1e1..71c6aaf 100644
--- a/src/main/java/azkaban/scheduler/ScheduleStatisticManager.java
+++ b/src/main/java/azkaban/scheduler/ScheduleStatisticManager.java
@@ -113,8 +113,11 @@ public class ScheduleStatisticManager {
File cache = getCacheFile(scheduleId);
cache.createNewFile();
OutputStream output = new FileOutputStream(cache);
- JSONUtils.toJSON(data, output, false);
- output.close();
+ try {
+ JSONUtils.toJSON(data, output, false);
+ } finally {
+ output.close();
+ }
}
} catch (Exception e) {
e.printStackTrace();
src/main/java/azkaban/utils/FileIOUtils.java 39(+23 -16)
diff --git a/src/main/java/azkaban/utils/FileIOUtils.java b/src/main/java/azkaban/utils/FileIOUtils.java
index b9edc56..c3aaee4 100644
--- a/src/main/java/azkaban/utils/FileIOUtils.java
+++ b/src/main/java/azkaban/utils/FileIOUtils.java
@@ -111,27 +111,34 @@ public class FileIOUtils {
//System.out.println(command);
ProcessBuilder builder = new ProcessBuilder().command("sh", "-c", command);
builder.directory(destDir);
-
+
+ // XXX what about stopping threads ??
Process process = builder.start();
- NullLogger errorLogger = new NullLogger(process.getErrorStream());
- NullLogger inputLogger = new NullLogger(process.getInputStream());
- errorLogger.start();
- inputLogger.start();
-
try {
- if (process.waitFor() < 0) {
- // Assume that the error will be in standard out. Otherwise it'll be in standard in.
- String errorMessage = errorLogger.getLastMessages();
- if (errorMessage.isEmpty()) {
- errorMessage = inputLogger.getLastMessages();
+ NullLogger errorLogger = new NullLogger(process.getErrorStream());
+ NullLogger inputLogger = new NullLogger(process.getInputStream());
+ errorLogger.start();
+ inputLogger.start();
+
+ try {
+ if (process.waitFor() < 0) {
+ // Assume that the error will be in standard out. Otherwise it'll be in standard in.
+ String errorMessage = errorLogger.getLastMessages();
+ if (errorMessage.isEmpty()) {
+ errorMessage = inputLogger.getLastMessages();
+ }
+
+ throw new IOException(errorMessage);
}
- throw new IOException(errorMessage);
+ // System.out.println(errorLogger.getLastMessages());
+ } catch (InterruptedException e) {
+ e.printStackTrace();
}
-
- // System.out.println(errorLogger.getLastMessages());
- } catch (InterruptedException e) {
- e.printStackTrace();
+ } finally {
+ IOUtils.closeQuietly(process.getInputStream());
+ IOUtils.closeQuietly(process.getOutputStream());
+ IOUtils.closeQuietly(process.getErrorStream());
}
}
diff --git a/src/main/java/azkaban/utils/JSONUtils.java b/src/main/java/azkaban/utils/JSONUtils.java
index ad78b6c..324d8ef 100644
--- a/src/main/java/azkaban/utils/JSONUtils.java
+++ b/src/main/java/azkaban/utils/JSONUtils.java
@@ -84,8 +84,11 @@ public class JSONUtils {
public static void toJSON(Object obj, File file, boolean prettyPrint) throws IOException {
BufferedOutputStream stream = new BufferedOutputStream(new FileOutputStream(file));
- toJSON(obj, stream, prettyPrint);
- stream.close();
+ try {
+ toJSON(obj, stream, prettyPrint);
+ } finally {
+ stream.close();
+ }
}
public static Object parseJSONFromStringQuiet(String json) {
diff --git a/src/main/java/azkaban/utils/Props.java b/src/main/java/azkaban/utils/Props.java
index eb39dd6..b3dacf8 100644
--- a/src/main/java/azkaban/utils/Props.java
+++ b/src/main/java/azkaban/utils/Props.java
@@ -89,10 +89,10 @@ public class Props {
try {
loadFrom(input);
} catch (IOException e) {
- input.close();
throw e;
+ } finally {
+ input.close();
}
- input.close();
}
/**
src/main/java/azkaban/utils/Utils.java 43(+29 -14)
diff --git a/src/main/java/azkaban/utils/Utils.java b/src/main/java/azkaban/utils/Utils.java
index 44edfaa..bb14d03 100644
--- a/src/main/java/azkaban/utils/Utils.java
+++ b/src/main/java/azkaban/utils/Utils.java
@@ -132,20 +132,26 @@ public class Utils {
public static void zip(File input, File output) throws IOException {
FileOutputStream out = new FileOutputStream(output);
ZipOutputStream zOut = new ZipOutputStream(out);
- zipFile("", input, zOut);
- zOut.close();
+ try {
+ zipFile("", input, zOut);
+ } finally {
+ zOut.close();
+ }
}
public static void zipFolderContent(File folder, File output) throws IOException {
FileOutputStream out = new FileOutputStream(output);
ZipOutputStream zOut = new ZipOutputStream(out);
- File[] files = folder.listFiles();
- if (files != null) {
- for (File f : files) {
- zipFile("", f, zOut);
+ try {
+ File[] files = folder.listFiles();
+ if (files != null) {
+ for (File f : files) {
+ zipFile("", f, zOut);
+ }
}
+ } finally {
+ zOut.close();
}
- zOut.close();
}
private static void zipFile(String path, File input, ZipOutputStream zOut) throws IOException {
@@ -165,8 +171,11 @@ public class Utils {
zOut.putNextEntry(entry);
InputStream fileInputStream = new BufferedInputStream(
new FileInputStream(input));
- IOUtils.copy(fileInputStream, zOut);
- fileInputStream.close();
+ try {
+ IOUtils.copy(fileInputStream, zOut);
+ } finally {
+ fileInputStream.close();
+ }
}
}
@@ -180,11 +189,17 @@ public class Utils {
} else {
newFile.getParentFile().mkdirs();
InputStream src = source.getInputStream(entry);
- OutputStream output = new BufferedOutputStream(
- new FileOutputStream(newFile));
- IOUtils.copy(src, output);
- src.close();
- output.close();
+ try {
+ OutputStream output = new BufferedOutputStream(
+ new FileOutputStream(newFile));
+ try {
+ IOUtils.copy(src, output);
+ } finally {
+ output.close();
+ }
+ } finally {
+ src.close();
+ }
}
}
}
diff --git a/src/main/java/azkaban/webapp/servlet/ScheduleServlet.java b/src/main/java/azkaban/webapp/servlet/ScheduleServlet.java
index 0f8750b..96f9f9b 100644
--- a/src/main/java/azkaban/webapp/servlet/ScheduleServlet.java
+++ b/src/main/java/azkaban/webapp/servlet/ScheduleServlet.java
@@ -480,9 +480,13 @@ public class ScheduleServlet extends LoginAbstractAzkabanServlet {
if (cacheExists) {
// Send the cache instead
InputStream cacheInput = new BufferedInputStream(new FileInputStream(cache));
- IOUtils.copy(cacheInput, resp.getOutputStream());
- // System.out.println("Using cache copy for " + start);
- return;
+ try {
+ IOUtils.copy(cacheInput, resp.getOutputStream());
+ // System.out.println("Using cache copy for " + start);
+ return;
+ } finally {
+ IOUtils.closeQuietly(cacheInput);
+ }
}
}
@@ -518,11 +522,13 @@ public class ScheduleServlet extends LoginAbstractAzkabanServlet {
File cacheTemp = new File(cacheDirFile, startTime + ".tmp");
cacheTemp.createNewFile();
OutputStream cacheOutput = new BufferedOutputStream(new FileOutputStream(cacheTemp));
- OutputStream outputStream = new SplitterOutputStream(cacheOutput, resp.getOutputStream());
- // Write to both the cache file and web output
- JSONUtils.toJSON(ret, outputStream, false);
- cacheOutput.close();
-
+ try {
+ OutputStream outputStream = new SplitterOutputStream(cacheOutput, resp.getOutputStream());
+ // Write to both the cache file and web output
+ JSONUtils.toJSON(ret, outputStream, false);
+ } finally {
+ IOUtils.closeQuietly(cacheOutput);
+ }
//Move cache file
synchronized (this) {
cacheTemp.renameTo(cache);