azkaban-aplcache

Merge branch 'master' of https://github.com/azkaban/azkaban2

4/18/2014 6:02:23 PM

Details

.travis.yml 2(+1 -1)

diff --git a/.travis.yml b/.travis.yml
index dcd8f5b..4f1c17c 100644
--- a/.travis.yml
+++ b/.travis.yml
@@ -1,4 +1,4 @@
 languages: java
 install:
  - time npm install -g less dustjs-linkedin
-script: ant
+script: ./gradlew dist

build.gradle 112(+63 -49)

diff --git a/build.gradle b/build.gradle
index 7d6ff9d..5fe2780 100644
--- a/build.gradle
+++ b/build.gradle
@@ -1,6 +1,8 @@
 apply plugin: 'java'
 apply plugin: 'eclipse'
 
+defaultTasks 'dist'
+
 /**
  * Helper that calls a command and returns the output
  */
@@ -10,7 +12,7 @@ def cmdCaller = { commandln ->
         commandLine commandln
         standardOutput = stdout
     }
-    
+
     return stdout.toString().trim()
 }
 
@@ -21,7 +23,6 @@ def getVersionName = { ->
     return cmdCaller(['git', 'describe', '--tags', '--abbrev=0'])
 }
 
-
 version = getVersionName()
 archivesBaseName = 'azkaban'
 check.dependsOn.remove(test)
@@ -33,8 +34,8 @@ repositories {
 
 configurations {
     all {
-        // We don't want the kitchen sink for dependencies. Only the ones we know we need for
-        // compile and ones we need to package.
+        // We don't want the kitchen sink for dependencies. Only the ones we
+        // know we need for compile and ones we need to package.
         transitive = false
     }
     compile {
@@ -47,7 +48,9 @@ configurations {
         extendsFrom compile
     }
 }
-
+configurations.compile {
+    description = 'compile classpath'
+}
 pegasusVersion = '1.15.7'
 
 dependencies {
@@ -95,7 +98,7 @@ dependencies {
       [group: 'com.linkedin.pegasus', name:'generator', version: pegasusVersion],
       [group: 'com.linkedin.pegasus', name:'restli-tools', version: pegasusVersion]
   )
-  
+
   testCompile (
     [group: 'junit', name:'junit', version: '4.11']
   )
@@ -113,7 +116,7 @@ jar {
     baseName =  'azkaban'
     manifest {
       attributes(
-        'Implementation-Title': 'Azkaban', 
+        'Implementation-Title': 'Azkaban',
         'Implementation-Version': version
       )
     }
@@ -143,10 +146,10 @@ eclipse.classpath.file {
     beforeMerged {
         classpath -> classpath.entries.removeAll { entry -> true }
     }
-    
-    // We want to make sure that if there is an entry for src, that it doesn't have any
-    // include parameters
-    whenMerged { classpath -> 
+
+    // We want to make sure that if there is an entry for src, that it doesn't
+    // have any include parameters
+    whenMerged { classpath ->
         classpath.entries.findAll { entry -> entry.kind == 'src' }*.includes = []
     }
 }
@@ -180,7 +183,7 @@ task web(dependsOn: ['compileLess', 'compileDust']) << {
     println 'Copying web files'
     copy {
         from('src/web')
-        into('build/web') 
+        into('build/web')
     }
 
     copy {
@@ -223,19 +226,19 @@ task packageSolo(type: Tar, dependsOn: [jar, 'web', 'createVersionFile']) {
     mkdir packageDir
     mkdir packageDir + '/extlib'
     mkdir packageDir + '/plugins'
-    
+
     println 'Copying Soloserver bin & conf'
     copy {
         from('src/package/soloserver')
         into(packageDir)
     }
-    
+
     println 'Copying Azkaban lib'
     copy {
         from('build/libs')
         into(packageDir + '/lib')
     }
-    
+
     println 'Copying web'
     copy {
         from('build/web')
@@ -253,46 +256,46 @@ task packageSolo(type: Tar, dependsOn: [jar, 'web', 'createVersionFile']) {
         into packageDir + '/lib'
         from configurations.compile
     }
-    
+
     copy {
         into packageDir
         from 'build/package/version.file'
     }
-    
+
     println 'Tarballing Solo Package'
     extension = 'tar.gz'
     compression = Compression.GZIP
-  
+
     basedir = baseName + '-' + appendix + '-' + version
     println 'Source is in ' + packageDir
-    into(basedir) { 
+    into(basedir) {
         from packageDir
         exclude 'bin'
     }
-    
+
     dst_bin = basedir + '/bin'
     src_bin = packageDir + '/bin'
-    from(src_bin) { 
+    from(src_bin) {
         into dst_bin
         fileMode = 0755
     }
-} 
+}
 
 /**
  * Packages the Sql Scripts for Azkaban DB
  */
 task packageSql(type: Tar) {
     String packageDir = 'build/package/sql'
- 
+
     println 'Creating Azkaban SQL Scripts into ' + packageDir
-    mkdir packageDir
-    delete packageDir
+	mkdir packageDir
+
     println 'Copying SQL files'
     copy {
         from('src/sql')
         into(packageDir)
     }
-    
+
     String destFile = packageDir + '/create-all-sql-' + version + '.sql';
     println('Concating create scripts to ' + destFile)
     ant.concat(destfile:destFile, fixlastline:'yes') {
@@ -301,19 +304,19 @@ task packageSql(type: Tar) {
             exclude(name: 'database.properties')
         }
     }
-    
+
     println 'Tarballing SQL Package'
     extension = 'tar.gz'
     compression = Compression.GZIP
     appendix = 'sql'
-  
+
     basedir = baseName + '-' + appendix + '-' + version
     packageDir = 'build/package/sql'
     println 'Source is in ' + packageDir
-    into(basedir) { 
+    into(basedir) {
         from packageDir
     }
-} 
+}
 
 /**
  * Packages the Azkaban Executor Server
@@ -321,13 +324,13 @@ task packageSql(type: Tar) {
 task packageExec(type: Tar, dependsOn: [jar, 'createVersionFile']) {
     appendix = 'exec-server'
     String packageDir = 'build/package/' + baseName + '-' + appendix
+
     delete packageDir
-    
     println 'Creating Azkaban Executor Server Package into ' + packageDir
     mkdir packageDir
     mkdir packageDir + '/extlib'
     mkdir packageDir + '/plugins'
-    
+
     println 'Copying Exec server bin & conf'
     copy {
         from('src/package/execserver')
@@ -346,28 +349,28 @@ task packageExec(type: Tar, dependsOn: [jar, 'createVersionFile']) {
         into packageDir + '/lib'
         from configurations.compile
     }
-    
+
     copy {
         into packageDir
         from 'build/package/version.file'
     }
-    
+
     println 'Tarballing Web Package'
     extension = 'tar.gz'
     compression = Compression.GZIP
-  
+
     basedir = baseName + '-' + appendix + '-' + version
     packageDir = 'build/package/' + baseName + '-' + appendix
     println 'Source is in ' + packageDir
 
-    into(basedir) { 
+    into(basedir) {
         from packageDir
         exclude 'bin'
     }
-    
+
     dst_bin = basedir + '/bin'
     src_bin = packageDir + '/bin'
-    from(src_bin) { 
+    from(src_bin) {
         into dst_bin
         fileMode = 0755
     }
@@ -379,13 +382,13 @@ task packageExec(type: Tar, dependsOn: [jar, 'createVersionFile']) {
 task packageWeb(type: Tar, dependsOn: [jar, 'web', 'createVersionFile']) {
     appendix = 'web-server'
     String packageDir = 'build/package/' + baseName + '-' + appendix
- 
     delete packageDir
+
     println 'Creating Azkaban Web Server Package into ' + packageDir
     mkdir packageDir
     mkdir packageDir + '/extlib'
     mkdir packageDir + '/plugins'
-    
+
     println 'Copying Web server bin & conf'
     copy {
         from('src/package/webserver')
@@ -394,11 +397,12 @@ task packageWeb(type: Tar, dependsOn: [jar, 'web', 'createVersionFile']) {
     
     String jarFile = 'build/libs/' + baseName + '-' + version + '.jar'
     println 'Copying Azkaban lib ' + jarFile
+
     copy {
         from(jarFile)
         into(packageDir + '/lib')
     }
-    
+
     println 'Copying web'
     copy {
         from('build/web')
@@ -410,30 +414,40 @@ task packageWeb(type: Tar, dependsOn: [jar, 'web', 'createVersionFile']) {
         into packageDir + '/lib'
         from configurations.compile
     }
-    
+
     copy {
         into packageDir
         from 'build/package/version.file'
     }
-    
+
     println 'Tarballing Web Package'
     extension = 'tar.gz'
     compression = Compression.GZIP
-  
+
     basedir = baseName + '-' + appendix + '-' + version
     println 'Source is in ' + packageDir
-    into(basedir) { 
+    into(basedir) {
         from packageDir
         exclude 'bin'
     }
-    
+
     dst_bin = basedir + '/bin'
     src_bin = packageDir + '/bin'
-    from(src_bin) { 
+    from(src_bin) {
         into dst_bin
         fileMode = 0755
     }
 }
 
-task packageAll(dependsOn : ['packageWeb', 'packageExec', 'packageSolo', 'packageSql']) {
-}
\ No newline at end of file
+task packageAll(dependsOn: ['packageWeb',
+                            'packageExec',
+                            'packageSolo',
+                            'packageSql']) {
+}
+
+task dist(dependsOn: 'packageAll') {
+}
+
+task wrapper(type: Wrapper) {
+    gradleVersion = '1.11'
+}
diff --git a/gradle.properties b/gradle.properties
new file mode 100644
index 0000000..1a644c7
--- /dev/null
+++ b/gradle.properties
@@ -0,0 +1 @@
+org.gradle.daemon=true
diff --git a/gradle/wrapper/gradle-wrapper.jar b/gradle/wrapper/gradle-wrapper.jar
new file mode 100644
index 0000000..3c7abdf
Binary files /dev/null and b/gradle/wrapper/gradle-wrapper.jar differ
diff --git a/gradle/wrapper/gradle-wrapper.properties b/gradle/wrapper/gradle-wrapper.properties
new file mode 100644
index 0000000..2d93fc4
--- /dev/null
+++ b/gradle/wrapper/gradle-wrapper.properties
@@ -0,0 +1,6 @@
+#Thu Mar 20 14:12:56 PDT 2014
+distributionBase=GRADLE_USER_HOME
+distributionPath=wrapper/dists
+zipStoreBase=GRADLE_USER_HOME
+zipStorePath=wrapper/dists
+distributionUrl=http\://services.gradle.org/distributions/gradle-1.11-bin.zip

gradlew 164(+164 -0)

diff --git a/gradlew b/gradlew
new file mode 100755
index 0000000..91a7e26
--- /dev/null
+++ b/gradlew
@@ -0,0 +1,164 @@
+#!/usr/bin/env bash
+
+##############################################################################
+##
+##  Gradle start up script for UN*X
+##
+##############################################################################
+
+# Add default JVM options here. You can also use JAVA_OPTS and GRADLE_OPTS to pass JVM options to this script.
+DEFAULT_JVM_OPTS=""
+
+APP_NAME="Gradle"
+APP_BASE_NAME=`basename "$0"`
+
+# Use the maximum available, or set MAX_FD != -1 to use that value.
+MAX_FD="maximum"
+
+warn ( ) {
+    echo "$*"
+}
+
+die ( ) {
+    echo
+    echo "$*"
+    echo
+    exit 1
+}
+
+# OS specific support (must be 'true' or 'false').
+cygwin=false
+msys=false
+darwin=false
+case "`uname`" in
+  CYGWIN* )
+    cygwin=true
+    ;;
+  Darwin* )
+    darwin=true
+    ;;
+  MINGW* )
+    msys=true
+    ;;
+esac
+
+# For Cygwin, ensure paths are in UNIX format before anything is touched.
+if $cygwin ; then
+    [ -n "$JAVA_HOME" ] && JAVA_HOME=`cygpath --unix "$JAVA_HOME"`
+fi
+
+# Attempt to set APP_HOME
+# Resolve links: $0 may be a link
+PRG="$0"
+# Need this for relative symlinks.
+while [ -h "$PRG" ] ; do
+    ls=`ls -ld "$PRG"`
+    link=`expr "$ls" : '.*-> \(.*\)$'`
+    if expr "$link" : '/.*' > /dev/null; then
+        PRG="$link"
+    else
+        PRG=`dirname "$PRG"`"/$link"
+    fi
+done
+SAVED="`pwd`"
+cd "`dirname \"$PRG\"`/" >&-
+APP_HOME="`pwd -P`"
+cd "$SAVED" >&-
+
+CLASSPATH=$APP_HOME/gradle/wrapper/gradle-wrapper.jar
+
+# Determine the Java command to use to start the JVM.
+if [ -n "$JAVA_HOME" ] ; then
+    if [ -x "$JAVA_HOME/jre/sh/java" ] ; then
+        # IBM's JDK on AIX uses strange locations for the executables
+        JAVACMD="$JAVA_HOME/jre/sh/java"
+    else
+        JAVACMD="$JAVA_HOME/bin/java"
+    fi
+    if [ ! -x "$JAVACMD" ] ; then
+        die "ERROR: JAVA_HOME is set to an invalid directory: $JAVA_HOME
+
+Please set the JAVA_HOME variable in your environment to match the
+location of your Java installation."
+    fi
+else
+    JAVACMD="java"
+    which java >/dev/null 2>&1 || die "ERROR: JAVA_HOME is not set and no 'java' command could be found in your PATH.
+
+Please set the JAVA_HOME variable in your environment to match the
+location of your Java installation."
+fi
+
+# Increase the maximum file descriptors if we can.
+if [ "$cygwin" = "false" -a "$darwin" = "false" ] ; then
+    MAX_FD_LIMIT=`ulimit -H -n`
+    if [ $? -eq 0 ] ; then
+        if [ "$MAX_FD" = "maximum" -o "$MAX_FD" = "max" ] ; then
+            MAX_FD="$MAX_FD_LIMIT"
+        fi
+        ulimit -n $MAX_FD
+        if [ $? -ne 0 ] ; then
+            warn "Could not set maximum file descriptor limit: $MAX_FD"
+        fi
+    else
+        warn "Could not query maximum file descriptor limit: $MAX_FD_LIMIT"
+    fi
+fi
+
+# For Darwin, add options to specify how the application appears in the dock
+if $darwin; then
+    GRADLE_OPTS="$GRADLE_OPTS \"-Xdock:name=$APP_NAME\" \"-Xdock:icon=$APP_HOME/media/gradle.icns\""
+fi
+
+# For Cygwin, switch paths to Windows format before running java
+if $cygwin ; then
+    APP_HOME=`cygpath --path --mixed "$APP_HOME"`
+    CLASSPATH=`cygpath --path --mixed "$CLASSPATH"`
+
+    # We build the pattern for arguments to be converted via cygpath
+    ROOTDIRSRAW=`find -L / -maxdepth 1 -mindepth 1 -type d 2>/dev/null`
+    SEP=""
+    for dir in $ROOTDIRSRAW ; do
+        ROOTDIRS="$ROOTDIRS$SEP$dir"
+        SEP="|"
+    done
+    OURCYGPATTERN="(^($ROOTDIRS))"
+    # Add a user-defined pattern to the cygpath arguments
+    if [ "$GRADLE_CYGPATTERN" != "" ] ; then
+        OURCYGPATTERN="$OURCYGPATTERN|($GRADLE_CYGPATTERN)"
+    fi
+    # Now convert the arguments - kludge to limit ourselves to /bin/sh
+    i=0
+    for arg in "$@" ; do
+        CHECK=`echo "$arg"|egrep -c "$OURCYGPATTERN" -`
+        CHECK2=`echo "$arg"|egrep -c "^-"`                                 ### Determine if an option
+
+        if [ $CHECK -ne 0 ] && [ $CHECK2 -eq 0 ] ; then                    ### Added a condition
+            eval `echo args$i`=`cygpath --path --ignore --mixed "$arg"`
+        else
+            eval `echo args$i`="\"$arg\""
+        fi
+        i=$((i+1))
+    done
+    case $i in
+        (0) set -- ;;
+        (1) set -- "$args0" ;;
+        (2) set -- "$args0" "$args1" ;;
+        (3) set -- "$args0" "$args1" "$args2" ;;
+        (4) set -- "$args0" "$args1" "$args2" "$args3" ;;
+        (5) set -- "$args0" "$args1" "$args2" "$args3" "$args4" ;;
+        (6) set -- "$args0" "$args1" "$args2" "$args3" "$args4" "$args5" ;;
+        (7) set -- "$args0" "$args1" "$args2" "$args3" "$args4" "$args5" "$args6" ;;
+        (8) set -- "$args0" "$args1" "$args2" "$args3" "$args4" "$args5" "$args6" "$args7" ;;
+        (9) set -- "$args0" "$args1" "$args2" "$args3" "$args4" "$args5" "$args6" "$args7" "$args8" ;;
+    esac
+fi
+
+# Split up the JVM_OPTS And GRADLE_OPTS values into an array, following the shell quoting and substitution rules
+function splitJvmOpts() {
+    JVM_OPTS=("$@")
+}
+eval splitJvmOpts $DEFAULT_JVM_OPTS $JAVA_OPTS $GRADLE_OPTS
+JVM_OPTS[${#JVM_OPTS[*]}]="-Dorg.gradle.appname=$APP_BASE_NAME"
+
+exec "$JAVACMD" "${JVM_OPTS[@]}" -classpath "$CLASSPATH" org.gradle.wrapper.GradleWrapperMain "$@"

gradlew.bat 90(+90 -0)

diff --git a/gradlew.bat b/gradlew.bat
new file mode 100644
index 0000000..aec9973
--- /dev/null
+++ b/gradlew.bat
@@ -0,0 +1,90 @@
+@if "%DEBUG%" == "" @echo off
+@rem ##########################################################################
+@rem
+@rem  Gradle startup script for Windows
+@rem
+@rem ##########################################################################
+
+@rem Set local scope for the variables with windows NT shell
+if "%OS%"=="Windows_NT" setlocal
+
+@rem Add default JVM options here. You can also use JAVA_OPTS and GRADLE_OPTS to pass JVM options to this script.
+set DEFAULT_JVM_OPTS=
+
+set DIRNAME=%~dp0
+if "%DIRNAME%" == "" set DIRNAME=.
+set APP_BASE_NAME=%~n0
+set APP_HOME=%DIRNAME%
+
+@rem Find java.exe
+if defined JAVA_HOME goto findJavaFromJavaHome
+
+set JAVA_EXE=java.exe
+%JAVA_EXE% -version >NUL 2>&1
+if "%ERRORLEVEL%" == "0" goto init
+
+echo.
+echo ERROR: JAVA_HOME is not set and no 'java' command could be found in your PATH.
+echo.
+echo Please set the JAVA_HOME variable in your environment to match the
+echo location of your Java installation.
+
+goto fail
+
+:findJavaFromJavaHome
+set JAVA_HOME=%JAVA_HOME:"=%
+set JAVA_EXE=%JAVA_HOME%/bin/java.exe
+
+if exist "%JAVA_EXE%" goto init
+
+echo.
+echo ERROR: JAVA_HOME is set to an invalid directory: %JAVA_HOME%
+echo.
+echo Please set the JAVA_HOME variable in your environment to match the
+echo location of your Java installation.
+
+goto fail
+
+:init
+@rem Get command-line arguments, handling Windowz variants
+
+if not "%OS%" == "Windows_NT" goto win9xME_args
+if "%@eval[2+2]" == "4" goto 4NT_args
+
+:win9xME_args
+@rem Slurp the command line arguments.
+set CMD_LINE_ARGS=
+set _SKIP=2
+
+:win9xME_args_slurp
+if "x%~1" == "x" goto execute
+
+set CMD_LINE_ARGS=%*
+goto execute
+
+:4NT_args
+@rem Get arguments from the 4NT Shell from JP Software
+set CMD_LINE_ARGS=%$
+
+:execute
+@rem Setup the command line
+
+set CLASSPATH=%APP_HOME%\gradle\wrapper\gradle-wrapper.jar
+
+@rem Execute Gradle
+"%JAVA_EXE%" %DEFAULT_JVM_OPTS% %JAVA_OPTS% %GRADLE_OPTS% "-Dorg.gradle.appname=%APP_BASE_NAME%" -classpath "%CLASSPATH%" org.gradle.wrapper.GradleWrapperMain %CMD_LINE_ARGS%
+
+:end
+@rem End local scope for the variables with windows NT shell
+if "%ERRORLEVEL%"=="0" goto mainEnd
+
+:fail
+rem Set variable GRADLE_EXIT_CONSOLE if you need the _script_ return code instead of
+rem the _cmd.exe /c_ return code!
+if  not "" == "%GRADLE_EXIT_CONSOLE%" exit 1
+exit /b 1
+
+:mainEnd
+if "%OS%"=="Windows_NT" endlocal
+
+:omega
diff --git a/src/main/java/azkaban/executor/JdbcExecutorLoader.java b/src/main/java/azkaban/executor/JdbcExecutorLoader.java
index f7c5c40..cb8e00e 100644
--- a/src/main/java/azkaban/executor/JdbcExecutorLoader.java
+++ b/src/main/java/azkaban/executor/JdbcExecutorLoader.java
@@ -768,37 +768,40 @@ public class JdbcExecutorLoader extends AbstractJdbcLoader
 		int pos = 0;
 		int length = buffer.length;
 		int startByte = 0;
-		BufferedInputStream bufferedStream = null;
 		try {
 			for (int i = 0; i < files.length; ++i) {
 				File file = files[i];
 				
-				bufferedStream = new BufferedInputStream(new FileInputStream(file));
-				int size = bufferedStream.read(buffer, pos, length);
-				while (size >= 0) {
-					if (pos + size == buffer.length) {
-						// Flush here.
-						uploadLogPart(
-								connection, 
-								execId, 
-								name, 
-								attempt, 
-								startByte, 
-								startByte + buffer.length, 
-								encType, 
-								buffer, 
-								buffer.length);
-						
-						pos = 0;
-						length = buffer.length;
-						startByte += buffer.length;
-					}
-					else {
-						// Usually end of file.
-						pos += size;
-						length = buffer.length - pos;
+				BufferedInputStream bufferedStream = new BufferedInputStream(new FileInputStream(file));
+				try {
+					int size = bufferedStream.read(buffer, pos, length);
+					while (size >= 0) {
+						if (pos + size == buffer.length) {
+							// Flush here.
+							uploadLogPart(
+									connection, 
+									execId, 
+									name, 
+									attempt, 
+									startByte, 
+									startByte + buffer.length, 
+									encType, 
+									buffer, 
+									buffer.length);
+							
+							pos = 0;
+							length = buffer.length;
+							startByte += buffer.length;
+						}
+						else {
+							// Usually end of file.
+							pos += size;
+							length = buffer.length - pos;
+						}
+						size = bufferedStream.read(buffer, pos, length);
 					}
-					size = bufferedStream.read(buffer, pos, length);
+				} finally {
+					IOUtils.closeQuietly(bufferedStream);
 				}
 			}
 			
@@ -822,9 +825,6 @@ public class JdbcExecutorLoader extends AbstractJdbcLoader
 		catch (IOException e) {
 			throw new ExecutorManagerException("Error chunking", e);
 		}
-		finally {
-			IOUtils.closeQuietly(bufferedStream);
-		}
 	}
 	
 	private void uploadLogPart(
diff --git a/src/main/java/azkaban/jobExecutor/utils/process/AzkabanProcess.java b/src/main/java/azkaban/jobExecutor/utils/process/AzkabanProcess.java
index 2e5de64..01576d4 100644
--- a/src/main/java/azkaban/jobExecutor/utils/process/AzkabanProcess.java
+++ b/src/main/java/azkaban/jobExecutor/utils/process/AzkabanProcess.java
@@ -25,6 +25,7 @@ import java.util.Map;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
 
+import org.apache.commons.io.IOUtils;
 import org.apache.log4j.Level;
 import org.apache.log4j.Logger;
 
@@ -72,35 +73,41 @@ public class AzkabanProcess {
 		builder.directory(new File(workingDir));
 		builder.environment().putAll(env);
 		this.process = builder.start();
-		this.processId = processId(process);
-		if (processId == 0) {
-			logger.debug("Spawned thread with unknown process id");
-		} else {
-			logger.debug("Spawned thread with process id " + processId);
-		}
-
-		this.startupLatch.countDown();
-
-		LogGobbler outputGobbler = new LogGobbler(new InputStreamReader(process.getInputStream()), logger, Level.INFO, 30);
-		LogGobbler errorGobbler = new LogGobbler(new InputStreamReader(process.getErrorStream()), logger, Level.ERROR, 30);
-		
-		outputGobbler.start();
-		errorGobbler.start();
-		int exitCode = -1;
 		try {
-			exitCode = process.waitFor();
-		} catch (InterruptedException e) {
-			logger.info("Process interrupted. Exit code is " + exitCode, e);
-		}
-
-		completeLatch.countDown();
-		if (exitCode != 0) {
-			throw new ProcessFailureException(exitCode, errorGobbler.getRecentLog());
+			this.processId = processId(process);
+			if (processId == 0) {
+				logger.debug("Spawned thread with unknown process id");
+			} else {
+				logger.debug("Spawned thread with process id " + processId);
+			}
+	
+			this.startupLatch.countDown();
+	
+			LogGobbler outputGobbler = new LogGobbler(new InputStreamReader(process.getInputStream()), logger, Level.INFO, 30);
+			LogGobbler errorGobbler = new LogGobbler(new InputStreamReader(process.getErrorStream()), logger, Level.ERROR, 30);
+			
+			outputGobbler.start();
+			errorGobbler.start();
+			int exitCode = -1;
+			try {
+				exitCode = process.waitFor();
+			} catch (InterruptedException e) {
+				logger.info("Process interrupted. Exit code is " + exitCode, e);
+			}
+	
+			completeLatch.countDown();
+			if (exitCode != 0) {
+				throw new ProcessFailureException(exitCode, errorGobbler.getRecentLog());
+			}
+	
+			// try to wait for everything to get logged out before exiting
+			outputGobbler.awaitCompletion(5000);
+			errorGobbler.awaitCompletion(5000);
+		} finally {
+			IOUtils.closeQuietly(process.getInputStream());
+			IOUtils.closeQuietly(process.getOutputStream());
+			IOUtils.closeQuietly(process.getErrorStream());
 		}
-
-		// try to wait for everything to get logged out before exiting
-		outputGobbler.awaitCompletion(5000);
-		errorGobbler.awaitCompletion(5000);
 	}
 
 	/**
diff --git a/src/main/java/azkaban/project/JdbcProjectLoader.java b/src/main/java/azkaban/project/JdbcProjectLoader.java
index 9553d48..c3cf424 100644
--- a/src/main/java/azkaban/project/JdbcProjectLoader.java
+++ b/src/main/java/azkaban/project/JdbcProjectLoader.java
@@ -271,7 +271,6 @@ public class JdbcProjectLoader extends AbstractJdbcLoader implements ProjectLoad
 		}
 	}
 
-	@SuppressWarnings("resource")
 	private void uploadProjectFile(Connection connection, Project project, int version, String filetype, String filename, File localFile, String uploader) throws ProjectManagerException {
 		QueryRunner runner = new QueryRunner();
 		long updateTime = System.currentTimeMillis();
@@ -306,18 +305,16 @@ public class JdbcProjectLoader extends AbstractJdbcLoader implements ProjectLoad
 					runner.update(connection, INSERT_PROJECT_FILES, project.getId(), version, chunk, size, buf);
 					logger.info("Finished update for " + filename + " chunk " + chunk);
 				} catch (SQLException e) {
-					IOUtils.closeQuietly(bufferedStream);
 					throw new ProjectManagerException("Error chunking", e);
 				}
 				++chunk;
 				
 				size = bufferedStream.read(buffer);
 			}
-			
-			bufferedStream.close();
 		} catch (IOException e) {
-			IOUtils.closeQuietly(bufferedStream);
 			throw new ProjectManagerException("Error chunking file " + filename);
+		} finally {
+			IOUtils.closeQuietly(bufferedStream);
 		}
 
 		final String INSERT_PROJECT_VERSION =
@@ -361,7 +358,6 @@ public class JdbcProjectLoader extends AbstractJdbcLoader implements ProjectLoad
 		return handler;
 	}
 	
-	@SuppressWarnings("resource")
 	private ProjectFileHandler getUploadedFile(Connection connection, int projectId, int version) throws ProjectManagerException {
 		QueryRunner runner = new QueryRunner();
 		ProjectVersionResultHandler pfHandler = new ProjectVersionResultHandler();
@@ -382,45 +378,45 @@ public class JdbcProjectLoader extends AbstractJdbcLoader implements ProjectLoad
 		BufferedOutputStream bStream = null;
 		File file = null;
 		try {
-			file = File.createTempFile(projHandler.getFileName(), String.valueOf(version), tempDir);
-	
-			bStream = new BufferedOutputStream(new FileOutputStream(file));
-		}
-		catch (IOException e) {
-			IOUtils.closeQuietly(bStream);
-			throw new ProjectManagerException("Error creating temp file for stream.");
-		}
-		
-		int collect = 5;
-		int fromChunk = 0;
-		int toChunk = collect;
-		do {
-			ProjectFileChunkResultHandler chunkHandler = new ProjectFileChunkResultHandler();
-			List<byte[]> data = null;
-			try {
-				data = runner.query(connection, ProjectFileChunkResultHandler.SELECT_PROJECT_CHUNKS_FILE, chunkHandler, projectId, version, fromChunk, toChunk);
-			}
-			catch(SQLException e) {
-				logger.error(e);
-				IOUtils.closeQuietly(bStream);
-				throw new ProjectManagerException("Query for uploaded file for " + projectId + " failed.", e);
-			}
-			
 			try {
-				for (byte[] d : data) {
-					bStream.write(d);
-				}
+				file = File.createTempFile(projHandler.getFileName(), String.valueOf(version), tempDir);
+		
+				bStream = new BufferedOutputStream(new FileOutputStream(file));
 			}
 			catch (IOException e) {
-				IOUtils.closeQuietly(bStream);
-				throw new ProjectManagerException("Error writing file", e);
+				throw new ProjectManagerException("Error creating temp file for stream.");
 			}
 			
-			// Add all the bytes to the stream.
-			fromChunk += collect;
-			toChunk += collect;
-		} while (fromChunk <= numChunks);
-		IOUtils.closeQuietly(bStream);
+			int collect = 5;
+			int fromChunk = 0;
+			int toChunk = collect;
+			do {
+				ProjectFileChunkResultHandler chunkHandler = new ProjectFileChunkResultHandler();
+				List<byte[]> data = null;
+				try {
+					data = runner.query(connection, ProjectFileChunkResultHandler.SELECT_PROJECT_CHUNKS_FILE, chunkHandler, projectId, version, fromChunk, toChunk);
+				}
+				catch(SQLException e) {
+					logger.error(e);
+					throw new ProjectManagerException("Query for uploaded file for " + projectId + " failed.", e);
+				}
+				
+				try {
+					for (byte[] d : data) {
+						bStream.write(d);
+					}
+				}
+				catch (IOException e) {
+					throw new ProjectManagerException("Error writing file", e);
+				}
+				
+				// Add all the bytes to the stream.
+				fromChunk += collect;
+				toChunk += collect;
+			} while (fromChunk <= numChunks);
+		} finally {
+			IOUtils.closeQuietly(bStream);
+		}
 		
 		// Check md5.
 		byte[] md5 = null;
diff --git a/src/main/java/azkaban/scheduler/ScheduleStatisticManager.java b/src/main/java/azkaban/scheduler/ScheduleStatisticManager.java
index d5de1e1..71c6aaf 100644
--- a/src/main/java/azkaban/scheduler/ScheduleStatisticManager.java
+++ b/src/main/java/azkaban/scheduler/ScheduleStatisticManager.java
@@ -113,8 +113,11 @@ public class ScheduleStatisticManager {
 				File cache = getCacheFile(scheduleId);
 				cache.createNewFile();
 				OutputStream output = new FileOutputStream(cache);
-				JSONUtils.toJSON(data, output, false);
-				output.close();
+				try {
+					JSONUtils.toJSON(data, output, false);
+				} finally {
+					output.close();
+				}
 			}
 		} catch (Exception e) {
 			e.printStackTrace();
diff --git a/src/main/java/azkaban/utils/FileIOUtils.java b/src/main/java/azkaban/utils/FileIOUtils.java
index b9edc56..c3aaee4 100644
--- a/src/main/java/azkaban/utils/FileIOUtils.java
+++ b/src/main/java/azkaban/utils/FileIOUtils.java
@@ -111,27 +111,34 @@ public class FileIOUtils {
 		//System.out.println(command);
 		ProcessBuilder builder = new ProcessBuilder().command("sh", "-c", command);
 		builder.directory(destDir);
-		
+
+		// XXX what about stopping threads ??
 		Process process = builder.start();
-		NullLogger errorLogger = new NullLogger(process.getErrorStream());
-		NullLogger inputLogger = new NullLogger(process.getInputStream());
-		errorLogger.start();
-		inputLogger.start();
-		
 		try {
-			if (process.waitFor() < 0) {
-				// Assume that the error will be in standard out. Otherwise it'll be in standard in.
-				String errorMessage = errorLogger.getLastMessages();
-				if (errorMessage.isEmpty()) {
-					errorMessage = inputLogger.getLastMessages();
+			NullLogger errorLogger = new NullLogger(process.getErrorStream());
+			NullLogger inputLogger = new NullLogger(process.getInputStream());
+			errorLogger.start();
+			inputLogger.start();
+			
+			try {
+				if (process.waitFor() < 0) {
+					// Assume that the error will be in standard out. Otherwise it'll be in standard in.
+					String errorMessage = errorLogger.getLastMessages();
+					if (errorMessage.isEmpty()) {
+						errorMessage = inputLogger.getLastMessages();
+					}
+					
+					throw new IOException(errorMessage);
 				}
 				
-				throw new IOException(errorMessage);
+			//	System.out.println(errorLogger.getLastMessages());
+			} catch (InterruptedException e) {
+				e.printStackTrace();
 			}
-			
-		//	System.out.println(errorLogger.getLastMessages());
-		} catch (InterruptedException e) {
-			e.printStackTrace();
+		} finally {
+			IOUtils.closeQuietly(process.getInputStream());
+			IOUtils.closeQuietly(process.getOutputStream());
+			IOUtils.closeQuietly(process.getErrorStream());
 		}
 	}
 	
diff --git a/src/main/java/azkaban/utils/JSONUtils.java b/src/main/java/azkaban/utils/JSONUtils.java
index ad78b6c..324d8ef 100644
--- a/src/main/java/azkaban/utils/JSONUtils.java
+++ b/src/main/java/azkaban/utils/JSONUtils.java
@@ -84,8 +84,11 @@ public class JSONUtils {
 	
 	public static void toJSON(Object obj, File file, boolean prettyPrint) throws IOException {
 		BufferedOutputStream stream = new BufferedOutputStream(new FileOutputStream(file));
-		toJSON(obj, stream, prettyPrint);
-		stream.close();
+		try {
+			toJSON(obj, stream, prettyPrint);
+		} finally {
+			stream.close();
+		}
 	}
 	
 	public static Object parseJSONFromStringQuiet(String json) {
diff --git a/src/main/java/azkaban/utils/Props.java b/src/main/java/azkaban/utils/Props.java
index eb39dd6..b3dacf8 100644
--- a/src/main/java/azkaban/utils/Props.java
+++ b/src/main/java/azkaban/utils/Props.java
@@ -89,10 +89,10 @@ public class Props {
 		try {
 			loadFrom(input);
 		} catch (IOException e) {
-			input.close();
 			throw e;
+		} finally {
+			input.close();
 		}
-		input.close();
 	}
 
 	/**
diff --git a/src/main/java/azkaban/utils/Utils.java b/src/main/java/azkaban/utils/Utils.java
index 44edfaa..bb14d03 100644
--- a/src/main/java/azkaban/utils/Utils.java
+++ b/src/main/java/azkaban/utils/Utils.java
@@ -132,20 +132,26 @@ public class Utils {
 	public static void zip(File input, File output) throws IOException {
 		FileOutputStream out = new FileOutputStream(output);
 		ZipOutputStream zOut = new ZipOutputStream(out);
-		zipFile("", input, zOut);
-		zOut.close();
+		try {
+			zipFile("", input, zOut);
+		} finally {
+			zOut.close();
+		}
 	}
 
 	public static void zipFolderContent(File folder, File output) throws IOException {
 		FileOutputStream out = new FileOutputStream(output);
 		ZipOutputStream zOut = new ZipOutputStream(out);
-		File[] files = folder.listFiles();
-		if (files != null) {
-			for (File f : files) {
-				zipFile("", f, zOut);
+		try {
+			File[] files = folder.listFiles();
+			if (files != null) {
+				for (File f : files) {
+					zipFile("", f, zOut);
+				}
 			}
+		} finally {
+			zOut.close();
 		}
-		zOut.close();
 	}
 
 	private static void zipFile(String path, File input, ZipOutputStream zOut) throws IOException {
@@ -165,8 +171,11 @@ public class Utils {
 			zOut.putNextEntry(entry);
 			InputStream fileInputStream = new BufferedInputStream(
 					new FileInputStream(input));
-			IOUtils.copy(fileInputStream, zOut);
-			fileInputStream.close();
+			try {
+				IOUtils.copy(fileInputStream, zOut);
+			} finally {
+				fileInputStream.close();
+			}
 		}
 	}
 
@@ -180,11 +189,17 @@ public class Utils {
 			} else {
 				newFile.getParentFile().mkdirs();
 				InputStream src = source.getInputStream(entry);
-				OutputStream output = new BufferedOutputStream(
-						new FileOutputStream(newFile));
-				IOUtils.copy(src, output);
-				src.close();
-				output.close();
+				try {
+					OutputStream output = new BufferedOutputStream(
+							new FileOutputStream(newFile));
+					try {
+						IOUtils.copy(src, output);
+					} finally {
+						output.close();
+					}
+				} finally {
+					src.close();
+				}
 			}
 		}
 	}
diff --git a/src/main/java/azkaban/webapp/servlet/ScheduleServlet.java b/src/main/java/azkaban/webapp/servlet/ScheduleServlet.java
index 0f8750b..96f9f9b 100644
--- a/src/main/java/azkaban/webapp/servlet/ScheduleServlet.java
+++ b/src/main/java/azkaban/webapp/servlet/ScheduleServlet.java
@@ -480,9 +480,13 @@ public class ScheduleServlet extends LoginAbstractAzkabanServlet {
 			if (cacheExists) {
 				// Send the cache instead
 				InputStream cacheInput = new BufferedInputStream(new FileInputStream(cache));
-				IOUtils.copy(cacheInput, resp.getOutputStream());
-				// System.out.println("Using cache copy for " + start);
-				return;
+				try {
+					IOUtils.copy(cacheInput, resp.getOutputStream());
+					// System.out.println("Using cache copy for " + start);
+					return;
+				} finally {
+					IOUtils.closeQuietly(cacheInput);
+				}
 			}
 		}
 
@@ -518,11 +522,13 @@ public class ScheduleServlet extends LoginAbstractAzkabanServlet {
 		File cacheTemp = new File(cacheDirFile, startTime + ".tmp");
 		cacheTemp.createNewFile();
 		OutputStream cacheOutput = new BufferedOutputStream(new FileOutputStream(cacheTemp));
-		OutputStream outputStream = new SplitterOutputStream(cacheOutput, resp.getOutputStream());
-		// Write to both the cache file and web output
-		JSONUtils.toJSON(ret, outputStream, false);
-		cacheOutput.close();
-		
+		try {
+			OutputStream outputStream = new SplitterOutputStream(cacheOutput, resp.getOutputStream());
+			// Write to both the cache file and web output
+			JSONUtils.toJSON(ret, outputStream, false);
+		} finally {
+			IOUtils.closeQuietly(cacheOutput);
+		}
 		//Move cache file
 		synchronized (this) {
 			cacheTemp.renameTo(cache);