azkaban-aplcache

Details

diff --git a/src/main/java/azkaban/executor/JdbcExecutorLoader.java b/src/main/java/azkaban/executor/JdbcExecutorLoader.java
index f7c5c40..cb8e00e 100644
--- a/src/main/java/azkaban/executor/JdbcExecutorLoader.java
+++ b/src/main/java/azkaban/executor/JdbcExecutorLoader.java
@@ -768,37 +768,40 @@ public class JdbcExecutorLoader extends AbstractJdbcLoader
 		int pos = 0;
 		int length = buffer.length;
 		int startByte = 0;
-		BufferedInputStream bufferedStream = null;
 		try {
 			for (int i = 0; i < files.length; ++i) {
 				File file = files[i];
 				
-				bufferedStream = new BufferedInputStream(new FileInputStream(file));
-				int size = bufferedStream.read(buffer, pos, length);
-				while (size >= 0) {
-					if (pos + size == buffer.length) {
-						// Flush here.
-						uploadLogPart(
-								connection, 
-								execId, 
-								name, 
-								attempt, 
-								startByte, 
-								startByte + buffer.length, 
-								encType, 
-								buffer, 
-								buffer.length);
-						
-						pos = 0;
-						length = buffer.length;
-						startByte += buffer.length;
-					}
-					else {
-						// Usually end of file.
-						pos += size;
-						length = buffer.length - pos;
+				BufferedInputStream bufferedStream = new BufferedInputStream(new FileInputStream(file));
+				try {
+					int size = bufferedStream.read(buffer, pos, length);
+					while (size >= 0) {
+						if (pos + size == buffer.length) {
+							// Flush here.
+							uploadLogPart(
+									connection, 
+									execId, 
+									name, 
+									attempt, 
+									startByte, 
+									startByte + buffer.length, 
+									encType, 
+									buffer, 
+									buffer.length);
+							
+							pos = 0;
+							length = buffer.length;
+							startByte += buffer.length;
+						}
+						else {
+							// Usually end of file.
+							pos += size;
+							length = buffer.length - pos;
+						}
+						size = bufferedStream.read(buffer, pos, length);
 					}
-					size = bufferedStream.read(buffer, pos, length);
+				} finally {
+					IOUtils.closeQuietly(bufferedStream);
 				}
 			}
 			
@@ -822,9 +825,6 @@ public class JdbcExecutorLoader extends AbstractJdbcLoader
 		catch (IOException e) {
 			throw new ExecutorManagerException("Error chunking", e);
 		}
-		finally {
-			IOUtils.closeQuietly(bufferedStream);
-		}
 	}
 	
 	private void uploadLogPart(
diff --git a/src/main/java/azkaban/jobExecutor/utils/process/AzkabanProcess.java b/src/main/java/azkaban/jobExecutor/utils/process/AzkabanProcess.java
index 2e5de64..01576d4 100644
--- a/src/main/java/azkaban/jobExecutor/utils/process/AzkabanProcess.java
+++ b/src/main/java/azkaban/jobExecutor/utils/process/AzkabanProcess.java
@@ -25,6 +25,7 @@ import java.util.Map;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
 
+import org.apache.commons.io.IOUtils;
 import org.apache.log4j.Level;
 import org.apache.log4j.Logger;
 
@@ -72,35 +73,41 @@ public class AzkabanProcess {
 		builder.directory(new File(workingDir));
 		builder.environment().putAll(env);
 		this.process = builder.start();
-		this.processId = processId(process);
-		if (processId == 0) {
-			logger.debug("Spawned thread with unknown process id");
-		} else {
-			logger.debug("Spawned thread with process id " + processId);
-		}
-
-		this.startupLatch.countDown();
-
-		LogGobbler outputGobbler = new LogGobbler(new InputStreamReader(process.getInputStream()), logger, Level.INFO, 30);
-		LogGobbler errorGobbler = new LogGobbler(new InputStreamReader(process.getErrorStream()), logger, Level.ERROR, 30);
-		
-		outputGobbler.start();
-		errorGobbler.start();
-		int exitCode = -1;
 		try {
-			exitCode = process.waitFor();
-		} catch (InterruptedException e) {
-			logger.info("Process interrupted. Exit code is " + exitCode, e);
-		}
-
-		completeLatch.countDown();
-		if (exitCode != 0) {
-			throw new ProcessFailureException(exitCode, errorGobbler.getRecentLog());
+			this.processId = processId(process);
+			if (processId == 0) {
+				logger.debug("Spawned thread with unknown process id");
+			} else {
+				logger.debug("Spawned thread with process id " + processId);
+			}
+	
+			this.startupLatch.countDown();
+	
+			LogGobbler outputGobbler = new LogGobbler(new InputStreamReader(process.getInputStream()), logger, Level.INFO, 30);
+			LogGobbler errorGobbler = new LogGobbler(new InputStreamReader(process.getErrorStream()), logger, Level.ERROR, 30);
+			
+			outputGobbler.start();
+			errorGobbler.start();
+			int exitCode = -1;
+			try {
+				exitCode = process.waitFor();
+			} catch (InterruptedException e) {
+				logger.info("Process interrupted. Exit code is " + exitCode, e);
+			}
+	
+			completeLatch.countDown();
+			if (exitCode != 0) {
+				throw new ProcessFailureException(exitCode, errorGobbler.getRecentLog());
+			}
+	
+			// try to wait for everything to get logged out before exiting
+			outputGobbler.awaitCompletion(5000);
+			errorGobbler.awaitCompletion(5000);
+		} finally {
+			IOUtils.closeQuietly(process.getInputStream());
+			IOUtils.closeQuietly(process.getOutputStream());
+			IOUtils.closeQuietly(process.getErrorStream());
 		}
-
-		// try to wait for everything to get logged out before exiting
-		outputGobbler.awaitCompletion(5000);
-		errorGobbler.awaitCompletion(5000);
 	}
 
 	/**
diff --git a/src/main/java/azkaban/project/JdbcProjectLoader.java b/src/main/java/azkaban/project/JdbcProjectLoader.java
index e61c2d7..c3cf424 100644
--- a/src/main/java/azkaban/project/JdbcProjectLoader.java
+++ b/src/main/java/azkaban/project/JdbcProjectLoader.java
@@ -305,18 +305,16 @@ public class JdbcProjectLoader extends AbstractJdbcLoader implements ProjectLoad
 					runner.update(connection, INSERT_PROJECT_FILES, project.getId(), version, chunk, size, buf);
 					logger.info("Finished update for " + filename + " chunk " + chunk);
 				} catch (SQLException e) {
-					IOUtils.closeQuietly(bufferedStream);
 					throw new ProjectManagerException("Error chunking", e);
 				}
 				++chunk;
 				
 				size = bufferedStream.read(buffer);
 			}
-			
-			bufferedStream.close();
 		} catch (IOException e) {
-			IOUtils.closeQuietly(bufferedStream);
 			throw new ProjectManagerException("Error chunking file " + filename);
+		} finally {
+			IOUtils.closeQuietly(bufferedStream);
 		}
 
 		final String INSERT_PROJECT_VERSION =
@@ -380,45 +378,45 @@ public class JdbcProjectLoader extends AbstractJdbcLoader implements ProjectLoad
 		BufferedOutputStream bStream = null;
 		File file = null;
 		try {
-			file = File.createTempFile(projHandler.getFileName(), String.valueOf(version), tempDir);
-	
-			bStream = new BufferedOutputStream(new FileOutputStream(file));
-		}
-		catch (IOException e) {
-			IOUtils.closeQuietly(bStream);
-			throw new ProjectManagerException("Error creating temp file for stream.");
-		}
-		
-		int collect = 5;
-		int fromChunk = 0;
-		int toChunk = collect;
-		do {
-			ProjectFileChunkResultHandler chunkHandler = new ProjectFileChunkResultHandler();
-			List<byte[]> data = null;
-			try {
-				data = runner.query(connection, ProjectFileChunkResultHandler.SELECT_PROJECT_CHUNKS_FILE, chunkHandler, projectId, version, fromChunk, toChunk);
-			}
-			catch(SQLException e) {
-				logger.error(e);
-				IOUtils.closeQuietly(bStream);
-				throw new ProjectManagerException("Query for uploaded file for " + projectId + " failed.", e);
-			}
-			
 			try {
-				for (byte[] d : data) {
-					bStream.write(d);
-				}
+				file = File.createTempFile(projHandler.getFileName(), String.valueOf(version), tempDir);
+		
+				bStream = new BufferedOutputStream(new FileOutputStream(file));
 			}
 			catch (IOException e) {
-				IOUtils.closeQuietly(bStream);
-				throw new ProjectManagerException("Error writing file", e);
+				throw new ProjectManagerException("Error creating temp file for stream.");
 			}
 			
-			// Add all the bytes to the stream.
-			fromChunk += collect;
-			toChunk += collect;
-		} while (fromChunk <= numChunks);
-		IOUtils.closeQuietly(bStream);
+			int collect = 5;
+			int fromChunk = 0;
+			int toChunk = collect;
+			do {
+				ProjectFileChunkResultHandler chunkHandler = new ProjectFileChunkResultHandler();
+				List<byte[]> data = null;
+				try {
+					data = runner.query(connection, ProjectFileChunkResultHandler.SELECT_PROJECT_CHUNKS_FILE, chunkHandler, projectId, version, fromChunk, toChunk);
+				}
+				catch(SQLException e) {
+					logger.error(e);
+					throw new ProjectManagerException("Query for uploaded file for " + projectId + " failed.", e);
+				}
+				
+				try {
+					for (byte[] d : data) {
+						bStream.write(d);
+					}
+				}
+				catch (IOException e) {
+					throw new ProjectManagerException("Error writing file", e);
+				}
+				
+				// Add all the bytes to the stream.
+				fromChunk += collect;
+				toChunk += collect;
+			} while (fromChunk <= numChunks);
+		} finally {
+			IOUtils.closeQuietly(bStream);
+		}
 		
 		// Check md5.
 		byte[] md5 = null;
diff --git a/src/main/java/azkaban/scheduler/ScheduleStatisticManager.java b/src/main/java/azkaban/scheduler/ScheduleStatisticManager.java
index d5de1e1..71c6aaf 100644
--- a/src/main/java/azkaban/scheduler/ScheduleStatisticManager.java
+++ b/src/main/java/azkaban/scheduler/ScheduleStatisticManager.java
@@ -113,8 +113,11 @@ public class ScheduleStatisticManager {
 				File cache = getCacheFile(scheduleId);
 				cache.createNewFile();
 				OutputStream output = new FileOutputStream(cache);
-				JSONUtils.toJSON(data, output, false);
-				output.close();
+				try {
+					JSONUtils.toJSON(data, output, false);
+				} finally {
+					output.close();
+				}
 			}
 		} catch (Exception e) {
 			e.printStackTrace();
diff --git a/src/main/java/azkaban/utils/FileIOUtils.java b/src/main/java/azkaban/utils/FileIOUtils.java
index b9edc56..c3aaee4 100644
--- a/src/main/java/azkaban/utils/FileIOUtils.java
+++ b/src/main/java/azkaban/utils/FileIOUtils.java
@@ -111,27 +111,34 @@ public class FileIOUtils {
 		//System.out.println(command);
 		ProcessBuilder builder = new ProcessBuilder().command("sh", "-c", command);
 		builder.directory(destDir);
-		
+
+		// XXX what about stopping threads ??
 		Process process = builder.start();
-		NullLogger errorLogger = new NullLogger(process.getErrorStream());
-		NullLogger inputLogger = new NullLogger(process.getInputStream());
-		errorLogger.start();
-		inputLogger.start();
-		
 		try {
-			if (process.waitFor() < 0) {
-				// Assume that the error will be in standard out. Otherwise it'll be in standard in.
-				String errorMessage = errorLogger.getLastMessages();
-				if (errorMessage.isEmpty()) {
-					errorMessage = inputLogger.getLastMessages();
+			NullLogger errorLogger = new NullLogger(process.getErrorStream());
+			NullLogger inputLogger = new NullLogger(process.getInputStream());
+			errorLogger.start();
+			inputLogger.start();
+			
+			try {
+				if (process.waitFor() < 0) {
+					// Assume that the error will be in standard out. Otherwise it'll be in standard in.
+					String errorMessage = errorLogger.getLastMessages();
+					if (errorMessage.isEmpty()) {
+						errorMessage = inputLogger.getLastMessages();
+					}
+					
+					throw new IOException(errorMessage);
 				}
 				
-				throw new IOException(errorMessage);
+			//	System.out.println(errorLogger.getLastMessages());
+			} catch (InterruptedException e) {
+				e.printStackTrace();
 			}
-			
-		//	System.out.println(errorLogger.getLastMessages());
-		} catch (InterruptedException e) {
-			e.printStackTrace();
+		} finally {
+			IOUtils.closeQuietly(process.getInputStream());
+			IOUtils.closeQuietly(process.getOutputStream());
+			IOUtils.closeQuietly(process.getErrorStream());
 		}
 	}
 	
diff --git a/src/main/java/azkaban/utils/JSONUtils.java b/src/main/java/azkaban/utils/JSONUtils.java
index ad78b6c..324d8ef 100644
--- a/src/main/java/azkaban/utils/JSONUtils.java
+++ b/src/main/java/azkaban/utils/JSONUtils.java
@@ -84,8 +84,11 @@ public class JSONUtils {
 	
 	public static void toJSON(Object obj, File file, boolean prettyPrint) throws IOException {
 		BufferedOutputStream stream = new BufferedOutputStream(new FileOutputStream(file));
-		toJSON(obj, stream, prettyPrint);
-		stream.close();
+		try {
+			toJSON(obj, stream, prettyPrint);
+		} finally {
+			stream.close();
+		}
 	}
 	
 	public static Object parseJSONFromStringQuiet(String json) {
diff --git a/src/main/java/azkaban/utils/Props.java b/src/main/java/azkaban/utils/Props.java
index eb39dd6..b3dacf8 100644
--- a/src/main/java/azkaban/utils/Props.java
+++ b/src/main/java/azkaban/utils/Props.java
@@ -89,10 +89,10 @@ public class Props {
 		try {
 			loadFrom(input);
 		} catch (IOException e) {
-			input.close();
 			throw e;
+		} finally {
+			input.close();
 		}
-		input.close();
 	}
 
 	/**
diff --git a/src/main/java/azkaban/utils/Utils.java b/src/main/java/azkaban/utils/Utils.java
index 44edfaa..bb14d03 100644
--- a/src/main/java/azkaban/utils/Utils.java
+++ b/src/main/java/azkaban/utils/Utils.java
@@ -132,20 +132,26 @@ public class Utils {
 	public static void zip(File input, File output) throws IOException {
 		FileOutputStream out = new FileOutputStream(output);
 		ZipOutputStream zOut = new ZipOutputStream(out);
-		zipFile("", input, zOut);
-		zOut.close();
+		try {
+			zipFile("", input, zOut);
+		} finally {
+			zOut.close();
+		}
 	}
 
 	public static void zipFolderContent(File folder, File output) throws IOException {
 		FileOutputStream out = new FileOutputStream(output);
 		ZipOutputStream zOut = new ZipOutputStream(out);
-		File[] files = folder.listFiles();
-		if (files != null) {
-			for (File f : files) {
-				zipFile("", f, zOut);
+		try {
+			File[] files = folder.listFiles();
+			if (files != null) {
+				for (File f : files) {
+					zipFile("", f, zOut);
+				}
 			}
+		} finally {
+			zOut.close();
 		}
-		zOut.close();
 	}
 
 	private static void zipFile(String path, File input, ZipOutputStream zOut) throws IOException {
@@ -165,8 +171,11 @@ public class Utils {
 			zOut.putNextEntry(entry);
 			InputStream fileInputStream = new BufferedInputStream(
 					new FileInputStream(input));
-			IOUtils.copy(fileInputStream, zOut);
-			fileInputStream.close();
+			try {
+				IOUtils.copy(fileInputStream, zOut);
+			} finally {
+				fileInputStream.close();
+			}
 		}
 	}
 
@@ -180,11 +189,17 @@ public class Utils {
 			} else {
 				newFile.getParentFile().mkdirs();
 				InputStream src = source.getInputStream(entry);
-				OutputStream output = new BufferedOutputStream(
-						new FileOutputStream(newFile));
-				IOUtils.copy(src, output);
-				src.close();
-				output.close();
+				try {
+					OutputStream output = new BufferedOutputStream(
+							new FileOutputStream(newFile));
+					try {
+						IOUtils.copy(src, output);
+					} finally {
+						output.close();
+					}
+				} finally {
+					src.close();
+				}
 			}
 		}
 	}
diff --git a/src/main/java/azkaban/webapp/servlet/ScheduleServlet.java b/src/main/java/azkaban/webapp/servlet/ScheduleServlet.java
index 0f8750b..96f9f9b 100644
--- a/src/main/java/azkaban/webapp/servlet/ScheduleServlet.java
+++ b/src/main/java/azkaban/webapp/servlet/ScheduleServlet.java
@@ -480,9 +480,13 @@ public class ScheduleServlet extends LoginAbstractAzkabanServlet {
 			if (cacheExists) {
 				// Send the cache instead
 				InputStream cacheInput = new BufferedInputStream(new FileInputStream(cache));
-				IOUtils.copy(cacheInput, resp.getOutputStream());
-				// System.out.println("Using cache copy for " + start);
-				return;
+				try {
+					IOUtils.copy(cacheInput, resp.getOutputStream());
+					// System.out.println("Using cache copy for " + start);
+					return;
+				} finally {
+					IOUtils.closeQuietly(cacheInput);
+				}
 			}
 		}
 
@@ -518,11 +522,13 @@ public class ScheduleServlet extends LoginAbstractAzkabanServlet {
 		File cacheTemp = new File(cacheDirFile, startTime + ".tmp");
 		cacheTemp.createNewFile();
 		OutputStream cacheOutput = new BufferedOutputStream(new FileOutputStream(cacheTemp));
-		OutputStream outputStream = new SplitterOutputStream(cacheOutput, resp.getOutputStream());
-		// Write to both the cache file and web output
-		JSONUtils.toJSON(ret, outputStream, false);
-		cacheOutput.close();
-		
+		try {
+			OutputStream outputStream = new SplitterOutputStream(cacheOutput, resp.getOutputStream());
+			// Write to both the cache file and web output
+			JSONUtils.toJSON(ret, outputStream, false);
+		} finally {
+			IOUtils.closeQuietly(cacheOutput);
+		}
 		//Move cache file
 		synchronized (this) {
 			cacheTemp.renameTo(cache);