azkaban-aplcache
Changes
.travis.yml 2(+1 -1)
build.gradle 112(+63 -49)
gradle.properties 1(+1 -0)
gradle/wrapper/gradle-wrapper.jar 0(+0 -0)
gradlew 164(+164 -0)
gradlew.bat 90(+90 -0)
src/main/java/azkaban/utils/FileIOUtils.java 39(+23 -16)
src/main/java/azkaban/utils/Utils.java 43(+29 -14)
Details
.travis.yml 2(+1 -1)
diff --git a/.travis.yml b/.travis.yml
index dcd8f5b..4f1c17c 100644
--- a/.travis.yml
+++ b/.travis.yml
@@ -1,4 +1,4 @@
languages: java
install:
- time npm install -g less dustjs-linkedin
-script: ant
+script: ./gradlew dist
build.gradle 112(+63 -49)
diff --git a/build.gradle b/build.gradle
index 7d6ff9d..5fe2780 100644
--- a/build.gradle
+++ b/build.gradle
@@ -1,6 +1,8 @@
apply plugin: 'java'
apply plugin: 'eclipse'
+defaultTasks 'dist'
+
/**
* Helper that calls a command and returns the output
*/
@@ -10,7 +12,7 @@ def cmdCaller = { commandln ->
commandLine commandln
standardOutput = stdout
}
-
+
return stdout.toString().trim()
}
@@ -21,7 +23,6 @@ def getVersionName = { ->
return cmdCaller(['git', 'describe', '--tags', '--abbrev=0'])
}
-
version = getVersionName()
archivesBaseName = 'azkaban'
check.dependsOn.remove(test)
@@ -33,8 +34,8 @@ repositories {
configurations {
all {
- // We don't want the kitchen sink for dependencies. Only the ones we know we need for
- // compile and ones we need to package.
+ // We don't want the kitchen sink for dependencies. Only the ones we
+ // know we need for compile and ones we need to package.
transitive = false
}
compile {
@@ -47,7 +48,9 @@ configurations {
extendsFrom compile
}
}
-
+configurations.compile {
+ description = 'compile classpath'
+}
pegasusVersion = '1.15.7'
dependencies {
@@ -95,7 +98,7 @@ dependencies {
[group: 'com.linkedin.pegasus', name:'generator', version: pegasusVersion],
[group: 'com.linkedin.pegasus', name:'restli-tools', version: pegasusVersion]
)
-
+
testCompile (
[group: 'junit', name:'junit', version: '4.11']
)
@@ -113,7 +116,7 @@ jar {
baseName = 'azkaban'
manifest {
attributes(
- 'Implementation-Title': 'Azkaban',
+ 'Implementation-Title': 'Azkaban',
'Implementation-Version': version
)
}
@@ -143,10 +146,10 @@ eclipse.classpath.file {
beforeMerged {
classpath -> classpath.entries.removeAll { entry -> true }
}
-
- // We want to make sure that if there is an entry for src, that it doesn't have any
- // include parameters
- whenMerged { classpath ->
+
+ // We want to make sure that if there is an entry for src, that it doesn't
+ // have any include parameters
+ whenMerged { classpath ->
classpath.entries.findAll { entry -> entry.kind == 'src' }*.includes = []
}
}
@@ -180,7 +183,7 @@ task web(dependsOn: ['compileLess', 'compileDust']) << {
println 'Copying web files'
copy {
from('src/web')
- into('build/web')
+ into('build/web')
}
copy {
@@ -223,19 +226,19 @@ task packageSolo(type: Tar, dependsOn: [jar, 'web', 'createVersionFile']) {
mkdir packageDir
mkdir packageDir + '/extlib'
mkdir packageDir + '/plugins'
-
+
println 'Copying Soloserver bin & conf'
copy {
from('src/package/soloserver')
into(packageDir)
}
-
+
println 'Copying Azkaban lib'
copy {
from('build/libs')
into(packageDir + '/lib')
}
-
+
println 'Copying web'
copy {
from('build/web')
@@ -253,46 +256,46 @@ task packageSolo(type: Tar, dependsOn: [jar, 'web', 'createVersionFile']) {
into packageDir + '/lib'
from configurations.compile
}
-
+
copy {
into packageDir
from 'build/package/version.file'
}
-
+
println 'Tarballing Solo Package'
extension = 'tar.gz'
compression = Compression.GZIP
-
+
basedir = baseName + '-' + appendix + '-' + version
println 'Source is in ' + packageDir
- into(basedir) {
+ into(basedir) {
from packageDir
exclude 'bin'
}
-
+
dst_bin = basedir + '/bin'
src_bin = packageDir + '/bin'
- from(src_bin) {
+ from(src_bin) {
into dst_bin
fileMode = 0755
}
-}
+}
/**
* Packages the Sql Scripts for Azkaban DB
*/
task packageSql(type: Tar) {
String packageDir = 'build/package/sql'
-
+
println 'Creating Azkaban SQL Scripts into ' + packageDir
- mkdir packageDir
- delete packageDir
+ mkdir packageDir
+
println 'Copying SQL files'
copy {
from('src/sql')
into(packageDir)
}
-
+
String destFile = packageDir + '/create-all-sql-' + version + '.sql';
println('Concating create scripts to ' + destFile)
ant.concat(destfile:destFile, fixlastline:'yes') {
@@ -301,19 +304,19 @@ task packageSql(type: Tar) {
exclude(name: 'database.properties')
}
}
-
+
println 'Tarballing SQL Package'
extension = 'tar.gz'
compression = Compression.GZIP
appendix = 'sql'
-
+
basedir = baseName + '-' + appendix + '-' + version
packageDir = 'build/package/sql'
println 'Source is in ' + packageDir
- into(basedir) {
+ into(basedir) {
from packageDir
}
-}
+}
/**
* Packages the Azkaban Executor Server
@@ -321,13 +324,13 @@ task packageSql(type: Tar) {
task packageExec(type: Tar, dependsOn: [jar, 'createVersionFile']) {
appendix = 'exec-server'
String packageDir = 'build/package/' + baseName + '-' + appendix
+
delete packageDir
-
println 'Creating Azkaban Executor Server Package into ' + packageDir
mkdir packageDir
mkdir packageDir + '/extlib'
mkdir packageDir + '/plugins'
-
+
println 'Copying Exec server bin & conf'
copy {
from('src/package/execserver')
@@ -346,28 +349,28 @@ task packageExec(type: Tar, dependsOn: [jar, 'createVersionFile']) {
into packageDir + '/lib'
from configurations.compile
}
-
+
copy {
into packageDir
from 'build/package/version.file'
}
-
+
println 'Tarballing Web Package'
extension = 'tar.gz'
compression = Compression.GZIP
-
+
basedir = baseName + '-' + appendix + '-' + version
packageDir = 'build/package/' + baseName + '-' + appendix
println 'Source is in ' + packageDir
- into(basedir) {
+ into(basedir) {
from packageDir
exclude 'bin'
}
-
+
dst_bin = basedir + '/bin'
src_bin = packageDir + '/bin'
- from(src_bin) {
+ from(src_bin) {
into dst_bin
fileMode = 0755
}
@@ -379,13 +382,13 @@ task packageExec(type: Tar, dependsOn: [jar, 'createVersionFile']) {
task packageWeb(type: Tar, dependsOn: [jar, 'web', 'createVersionFile']) {
appendix = 'web-server'
String packageDir = 'build/package/' + baseName + '-' + appendix
-
delete packageDir
+
println 'Creating Azkaban Web Server Package into ' + packageDir
mkdir packageDir
mkdir packageDir + '/extlib'
mkdir packageDir + '/plugins'
-
+
println 'Copying Web server bin & conf'
copy {
from('src/package/webserver')
@@ -394,11 +397,12 @@ task packageWeb(type: Tar, dependsOn: [jar, 'web', 'createVersionFile']) {
String jarFile = 'build/libs/' + baseName + '-' + version + '.jar'
println 'Copying Azkaban lib ' + jarFile
+
copy {
from(jarFile)
into(packageDir + '/lib')
}
-
+
println 'Copying web'
copy {
from('build/web')
@@ -410,30 +414,40 @@ task packageWeb(type: Tar, dependsOn: [jar, 'web', 'createVersionFile']) {
into packageDir + '/lib'
from configurations.compile
}
-
+
copy {
into packageDir
from 'build/package/version.file'
}
-
+
println 'Tarballing Web Package'
extension = 'tar.gz'
compression = Compression.GZIP
-
+
basedir = baseName + '-' + appendix + '-' + version
println 'Source is in ' + packageDir
- into(basedir) {
+ into(basedir) {
from packageDir
exclude 'bin'
}
-
+
dst_bin = basedir + '/bin'
src_bin = packageDir + '/bin'
- from(src_bin) {
+ from(src_bin) {
into dst_bin
fileMode = 0755
}
}
-task packageAll(dependsOn : ['packageWeb', 'packageExec', 'packageSolo', 'packageSql']) {
-}
\ No newline at end of file
+task packageAll(dependsOn: ['packageWeb',
+ 'packageExec',
+ 'packageSolo',
+ 'packageSql']) {
+}
+
+task dist(dependsOn: 'packageAll') {
+}
+
+task wrapper(type: Wrapper) {
+ gradleVersion = '1.11'
+}
gradle.properties 1(+1 -0)
diff --git a/gradle.properties b/gradle.properties
new file mode 100644
index 0000000..1a644c7
--- /dev/null
+++ b/gradle.properties
@@ -0,0 +1 @@
+org.gradle.daemon=true
gradle/wrapper/gradle-wrapper.jar 0(+0 -0)
diff --git a/gradle/wrapper/gradle-wrapper.jar b/gradle/wrapper/gradle-wrapper.jar
new file mode 100644
index 0000000..3c7abdf
Binary files /dev/null and b/gradle/wrapper/gradle-wrapper.jar differ
diff --git a/gradle/wrapper/gradle-wrapper.properties b/gradle/wrapper/gradle-wrapper.properties
new file mode 100644
index 0000000..2d93fc4
--- /dev/null
+++ b/gradle/wrapper/gradle-wrapper.properties
@@ -0,0 +1,6 @@
+#Thu Mar 20 14:12:56 PDT 2014
+distributionBase=GRADLE_USER_HOME
+distributionPath=wrapper/dists
+zipStoreBase=GRADLE_USER_HOME
+zipStorePath=wrapper/dists
+distributionUrl=http\://services.gradle.org/distributions/gradle-1.11-bin.zip
gradlew 164(+164 -0)
diff --git a/gradlew b/gradlew
new file mode 100755
index 0000000..91a7e26
--- /dev/null
+++ b/gradlew
@@ -0,0 +1,164 @@
+#!/usr/bin/env bash
+
+##############################################################################
+##
+## Gradle start up script for UN*X
+##
+##############################################################################
+
+# Add default JVM options here. You can also use JAVA_OPTS and GRADLE_OPTS to pass JVM options to this script.
+DEFAULT_JVM_OPTS=""
+
+APP_NAME="Gradle"
+APP_BASE_NAME=`basename "$0"`
+
+# Use the maximum available, or set MAX_FD != -1 to use that value.
+MAX_FD="maximum"
+
+warn ( ) {
+ echo "$*"
+}
+
+die ( ) {
+ echo
+ echo "$*"
+ echo
+ exit 1
+}
+
+# OS specific support (must be 'true' or 'false').
+cygwin=false
+msys=false
+darwin=false
+case "`uname`" in
+ CYGWIN* )
+ cygwin=true
+ ;;
+ Darwin* )
+ darwin=true
+ ;;
+ MINGW* )
+ msys=true
+ ;;
+esac
+
+# For Cygwin, ensure paths are in UNIX format before anything is touched.
+if $cygwin ; then
+ [ -n "$JAVA_HOME" ] && JAVA_HOME=`cygpath --unix "$JAVA_HOME"`
+fi
+
+# Attempt to set APP_HOME
+# Resolve links: $0 may be a link
+PRG="$0"
+# Need this for relative symlinks.
+while [ -h "$PRG" ] ; do
+ ls=`ls -ld "$PRG"`
+ link=`expr "$ls" : '.*-> \(.*\)$'`
+ if expr "$link" : '/.*' > /dev/null; then
+ PRG="$link"
+ else
+ PRG=`dirname "$PRG"`"/$link"
+ fi
+done
+SAVED="`pwd`"
+cd "`dirname \"$PRG\"`/" >&-
+APP_HOME="`pwd -P`"
+cd "$SAVED" >&-
+
+CLASSPATH=$APP_HOME/gradle/wrapper/gradle-wrapper.jar
+
+# Determine the Java command to use to start the JVM.
+if [ -n "$JAVA_HOME" ] ; then
+ if [ -x "$JAVA_HOME/jre/sh/java" ] ; then
+ # IBM's JDK on AIX uses strange locations for the executables
+ JAVACMD="$JAVA_HOME/jre/sh/java"
+ else
+ JAVACMD="$JAVA_HOME/bin/java"
+ fi
+ if [ ! -x "$JAVACMD" ] ; then
+ die "ERROR: JAVA_HOME is set to an invalid directory: $JAVA_HOME
+
+Please set the JAVA_HOME variable in your environment to match the
+location of your Java installation."
+ fi
+else
+ JAVACMD="java"
+ which java >/dev/null 2>&1 || die "ERROR: JAVA_HOME is not set and no 'java' command could be found in your PATH.
+
+Please set the JAVA_HOME variable in your environment to match the
+location of your Java installation."
+fi
+
+# Increase the maximum file descriptors if we can.
+if [ "$cygwin" = "false" -a "$darwin" = "false" ] ; then
+ MAX_FD_LIMIT=`ulimit -H -n`
+ if [ $? -eq 0 ] ; then
+ if [ "$MAX_FD" = "maximum" -o "$MAX_FD" = "max" ] ; then
+ MAX_FD="$MAX_FD_LIMIT"
+ fi
+ ulimit -n $MAX_FD
+ if [ $? -ne 0 ] ; then
+ warn "Could not set maximum file descriptor limit: $MAX_FD"
+ fi
+ else
+ warn "Could not query maximum file descriptor limit: $MAX_FD_LIMIT"
+ fi
+fi
+
+# For Darwin, add options to specify how the application appears in the dock
+if $darwin; then
+ GRADLE_OPTS="$GRADLE_OPTS \"-Xdock:name=$APP_NAME\" \"-Xdock:icon=$APP_HOME/media/gradle.icns\""
+fi
+
+# For Cygwin, switch paths to Windows format before running java
+if $cygwin ; then
+ APP_HOME=`cygpath --path --mixed "$APP_HOME"`
+ CLASSPATH=`cygpath --path --mixed "$CLASSPATH"`
+
+ # We build the pattern for arguments to be converted via cygpath
+ ROOTDIRSRAW=`find -L / -maxdepth 1 -mindepth 1 -type d 2>/dev/null`
+ SEP=""
+ for dir in $ROOTDIRSRAW ; do
+ ROOTDIRS="$ROOTDIRS$SEP$dir"
+ SEP="|"
+ done
+ OURCYGPATTERN="(^($ROOTDIRS))"
+ # Add a user-defined pattern to the cygpath arguments
+ if [ "$GRADLE_CYGPATTERN" != "" ] ; then
+ OURCYGPATTERN="$OURCYGPATTERN|($GRADLE_CYGPATTERN)"
+ fi
+ # Now convert the arguments - kludge to limit ourselves to /bin/sh
+ i=0
+ for arg in "$@" ; do
+ CHECK=`echo "$arg"|egrep -c "$OURCYGPATTERN" -`
+ CHECK2=`echo "$arg"|egrep -c "^-"` ### Determine if an option
+
+ if [ $CHECK -ne 0 ] && [ $CHECK2 -eq 0 ] ; then ### Added a condition
+ eval `echo args$i`=`cygpath --path --ignore --mixed "$arg"`
+ else
+ eval `echo args$i`="\"$arg\""
+ fi
+ i=$((i+1))
+ done
+ case $i in
+ (0) set -- ;;
+ (1) set -- "$args0" ;;
+ (2) set -- "$args0" "$args1" ;;
+ (3) set -- "$args0" "$args1" "$args2" ;;
+ (4) set -- "$args0" "$args1" "$args2" "$args3" ;;
+ (5) set -- "$args0" "$args1" "$args2" "$args3" "$args4" ;;
+ (6) set -- "$args0" "$args1" "$args2" "$args3" "$args4" "$args5" ;;
+ (7) set -- "$args0" "$args1" "$args2" "$args3" "$args4" "$args5" "$args6" ;;
+ (8) set -- "$args0" "$args1" "$args2" "$args3" "$args4" "$args5" "$args6" "$args7" ;;
+ (9) set -- "$args0" "$args1" "$args2" "$args3" "$args4" "$args5" "$args6" "$args7" "$args8" ;;
+ esac
+fi
+
+# Split up the JVM_OPTS And GRADLE_OPTS values into an array, following the shell quoting and substitution rules
+function splitJvmOpts() {
+ JVM_OPTS=("$@")
+}
+eval splitJvmOpts $DEFAULT_JVM_OPTS $JAVA_OPTS $GRADLE_OPTS
+JVM_OPTS[${#JVM_OPTS[*]}]="-Dorg.gradle.appname=$APP_BASE_NAME"
+
+exec "$JAVACMD" "${JVM_OPTS[@]}" -classpath "$CLASSPATH" org.gradle.wrapper.GradleWrapperMain "$@"
gradlew.bat 90(+90 -0)
diff --git a/gradlew.bat b/gradlew.bat
new file mode 100644
index 0000000..aec9973
--- /dev/null
+++ b/gradlew.bat
@@ -0,0 +1,90 @@
+@if "%DEBUG%" == "" @echo off
+@rem ##########################################################################
+@rem
+@rem Gradle startup script for Windows
+@rem
+@rem ##########################################################################
+
+@rem Set local scope for the variables with windows NT shell
+if "%OS%"=="Windows_NT" setlocal
+
+@rem Add default JVM options here. You can also use JAVA_OPTS and GRADLE_OPTS to pass JVM options to this script.
+set DEFAULT_JVM_OPTS=
+
+set DIRNAME=%~dp0
+if "%DIRNAME%" == "" set DIRNAME=.
+set APP_BASE_NAME=%~n0
+set APP_HOME=%DIRNAME%
+
+@rem Find java.exe
+if defined JAVA_HOME goto findJavaFromJavaHome
+
+set JAVA_EXE=java.exe
+%JAVA_EXE% -version >NUL 2>&1
+if "%ERRORLEVEL%" == "0" goto init
+
+echo.
+echo ERROR: JAVA_HOME is not set and no 'java' command could be found in your PATH.
+echo.
+echo Please set the JAVA_HOME variable in your environment to match the
+echo location of your Java installation.
+
+goto fail
+
+:findJavaFromJavaHome
+set JAVA_HOME=%JAVA_HOME:"=%
+set JAVA_EXE=%JAVA_HOME%/bin/java.exe
+
+if exist "%JAVA_EXE%" goto init
+
+echo.
+echo ERROR: JAVA_HOME is set to an invalid directory: %JAVA_HOME%
+echo.
+echo Please set the JAVA_HOME variable in your environment to match the
+echo location of your Java installation.
+
+goto fail
+
+:init
+@rem Get command-line arguments, handling Windowz variants
+
+if not "%OS%" == "Windows_NT" goto win9xME_args
+if "%@eval[2+2]" == "4" goto 4NT_args
+
+:win9xME_args
+@rem Slurp the command line arguments.
+set CMD_LINE_ARGS=
+set _SKIP=2
+
+:win9xME_args_slurp
+if "x%~1" == "x" goto execute
+
+set CMD_LINE_ARGS=%*
+goto execute
+
+:4NT_args
+@rem Get arguments from the 4NT Shell from JP Software
+set CMD_LINE_ARGS=%$
+
+:execute
+@rem Setup the command line
+
+set CLASSPATH=%APP_HOME%\gradle\wrapper\gradle-wrapper.jar
+
+@rem Execute Gradle
+"%JAVA_EXE%" %DEFAULT_JVM_OPTS% %JAVA_OPTS% %GRADLE_OPTS% "-Dorg.gradle.appname=%APP_BASE_NAME%" -classpath "%CLASSPATH%" org.gradle.wrapper.GradleWrapperMain %CMD_LINE_ARGS%
+
+:end
+@rem End local scope for the variables with windows NT shell
+if "%ERRORLEVEL%"=="0" goto mainEnd
+
+:fail
+rem Set variable GRADLE_EXIT_CONSOLE if you need the _script_ return code instead of
+rem the _cmd.exe /c_ return code!
+if not "" == "%GRADLE_EXIT_CONSOLE%" exit 1
+exit /b 1
+
+:mainEnd
+if "%OS%"=="Windows_NT" endlocal
+
+:omega
diff --git a/src/main/java/azkaban/executor/JdbcExecutorLoader.java b/src/main/java/azkaban/executor/JdbcExecutorLoader.java
index f7c5c40..cb8e00e 100644
--- a/src/main/java/azkaban/executor/JdbcExecutorLoader.java
+++ b/src/main/java/azkaban/executor/JdbcExecutorLoader.java
@@ -768,37 +768,40 @@ public class JdbcExecutorLoader extends AbstractJdbcLoader
int pos = 0;
int length = buffer.length;
int startByte = 0;
- BufferedInputStream bufferedStream = null;
try {
for (int i = 0; i < files.length; ++i) {
File file = files[i];
- bufferedStream = new BufferedInputStream(new FileInputStream(file));
- int size = bufferedStream.read(buffer, pos, length);
- while (size >= 0) {
- if (pos + size == buffer.length) {
- // Flush here.
- uploadLogPart(
- connection,
- execId,
- name,
- attempt,
- startByte,
- startByte + buffer.length,
- encType,
- buffer,
- buffer.length);
-
- pos = 0;
- length = buffer.length;
- startByte += buffer.length;
- }
- else {
- // Usually end of file.
- pos += size;
- length = buffer.length - pos;
+ BufferedInputStream bufferedStream = new BufferedInputStream(new FileInputStream(file));
+ try {
+ int size = bufferedStream.read(buffer, pos, length);
+ while (size >= 0) {
+ if (pos + size == buffer.length) {
+ // Flush here.
+ uploadLogPart(
+ connection,
+ execId,
+ name,
+ attempt,
+ startByte,
+ startByte + buffer.length,
+ encType,
+ buffer,
+ buffer.length);
+
+ pos = 0;
+ length = buffer.length;
+ startByte += buffer.length;
+ }
+ else {
+ // Usually end of file.
+ pos += size;
+ length = buffer.length - pos;
+ }
+ size = bufferedStream.read(buffer, pos, length);
}
- size = bufferedStream.read(buffer, pos, length);
+ } finally {
+ IOUtils.closeQuietly(bufferedStream);
}
}
@@ -822,9 +825,6 @@ public class JdbcExecutorLoader extends AbstractJdbcLoader
catch (IOException e) {
throw new ExecutorManagerException("Error chunking", e);
}
- finally {
- IOUtils.closeQuietly(bufferedStream);
- }
}
private void uploadLogPart(
diff --git a/src/main/java/azkaban/jobExecutor/utils/process/AzkabanProcess.java b/src/main/java/azkaban/jobExecutor/utils/process/AzkabanProcess.java
index 2e5de64..01576d4 100644
--- a/src/main/java/azkaban/jobExecutor/utils/process/AzkabanProcess.java
+++ b/src/main/java/azkaban/jobExecutor/utils/process/AzkabanProcess.java
@@ -25,6 +25,7 @@ import java.util.Map;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
+import org.apache.commons.io.IOUtils;
import org.apache.log4j.Level;
import org.apache.log4j.Logger;
@@ -72,35 +73,41 @@ public class AzkabanProcess {
builder.directory(new File(workingDir));
builder.environment().putAll(env);
this.process = builder.start();
- this.processId = processId(process);
- if (processId == 0) {
- logger.debug("Spawned thread with unknown process id");
- } else {
- logger.debug("Spawned thread with process id " + processId);
- }
-
- this.startupLatch.countDown();
-
- LogGobbler outputGobbler = new LogGobbler(new InputStreamReader(process.getInputStream()), logger, Level.INFO, 30);
- LogGobbler errorGobbler = new LogGobbler(new InputStreamReader(process.getErrorStream()), logger, Level.ERROR, 30);
-
- outputGobbler.start();
- errorGobbler.start();
- int exitCode = -1;
try {
- exitCode = process.waitFor();
- } catch (InterruptedException e) {
- logger.info("Process interrupted. Exit code is " + exitCode, e);
- }
-
- completeLatch.countDown();
- if (exitCode != 0) {
- throw new ProcessFailureException(exitCode, errorGobbler.getRecentLog());
+ this.processId = processId(process);
+ if (processId == 0) {
+ logger.debug("Spawned thread with unknown process id");
+ } else {
+ logger.debug("Spawned thread with process id " + processId);
+ }
+
+ this.startupLatch.countDown();
+
+ LogGobbler outputGobbler = new LogGobbler(new InputStreamReader(process.getInputStream()), logger, Level.INFO, 30);
+ LogGobbler errorGobbler = new LogGobbler(new InputStreamReader(process.getErrorStream()), logger, Level.ERROR, 30);
+
+ outputGobbler.start();
+ errorGobbler.start();
+ int exitCode = -1;
+ try {
+ exitCode = process.waitFor();
+ } catch (InterruptedException e) {
+ logger.info("Process interrupted. Exit code is " + exitCode, e);
+ }
+
+ completeLatch.countDown();
+ if (exitCode != 0) {
+ throw new ProcessFailureException(exitCode, errorGobbler.getRecentLog());
+ }
+
+ // try to wait for everything to get logged out before exiting
+ outputGobbler.awaitCompletion(5000);
+ errorGobbler.awaitCompletion(5000);
+ } finally {
+ IOUtils.closeQuietly(process.getInputStream());
+ IOUtils.closeQuietly(process.getOutputStream());
+ IOUtils.closeQuietly(process.getErrorStream());
}
-
- // try to wait for everything to get logged out before exiting
- outputGobbler.awaitCompletion(5000);
- errorGobbler.awaitCompletion(5000);
}
/**
diff --git a/src/main/java/azkaban/project/JdbcProjectLoader.java b/src/main/java/azkaban/project/JdbcProjectLoader.java
index 9553d48..c3cf424 100644
--- a/src/main/java/azkaban/project/JdbcProjectLoader.java
+++ b/src/main/java/azkaban/project/JdbcProjectLoader.java
@@ -271,7 +271,6 @@ public class JdbcProjectLoader extends AbstractJdbcLoader implements ProjectLoad
}
}
- @SuppressWarnings("resource")
private void uploadProjectFile(Connection connection, Project project, int version, String filetype, String filename, File localFile, String uploader) throws ProjectManagerException {
QueryRunner runner = new QueryRunner();
long updateTime = System.currentTimeMillis();
@@ -306,18 +305,16 @@ public class JdbcProjectLoader extends AbstractJdbcLoader implements ProjectLoad
runner.update(connection, INSERT_PROJECT_FILES, project.getId(), version, chunk, size, buf);
logger.info("Finished update for " + filename + " chunk " + chunk);
} catch (SQLException e) {
- IOUtils.closeQuietly(bufferedStream);
throw new ProjectManagerException("Error chunking", e);
}
++chunk;
size = bufferedStream.read(buffer);
}
-
- bufferedStream.close();
} catch (IOException e) {
- IOUtils.closeQuietly(bufferedStream);
throw new ProjectManagerException("Error chunking file " + filename);
+ } finally {
+ IOUtils.closeQuietly(bufferedStream);
}
final String INSERT_PROJECT_VERSION =
@@ -361,7 +358,6 @@ public class JdbcProjectLoader extends AbstractJdbcLoader implements ProjectLoad
return handler;
}
- @SuppressWarnings("resource")
private ProjectFileHandler getUploadedFile(Connection connection, int projectId, int version) throws ProjectManagerException {
QueryRunner runner = new QueryRunner();
ProjectVersionResultHandler pfHandler = new ProjectVersionResultHandler();
@@ -382,45 +378,45 @@ public class JdbcProjectLoader extends AbstractJdbcLoader implements ProjectLoad
BufferedOutputStream bStream = null;
File file = null;
try {
- file = File.createTempFile(projHandler.getFileName(), String.valueOf(version), tempDir);
-
- bStream = new BufferedOutputStream(new FileOutputStream(file));
- }
- catch (IOException e) {
- IOUtils.closeQuietly(bStream);
- throw new ProjectManagerException("Error creating temp file for stream.");
- }
-
- int collect = 5;
- int fromChunk = 0;
- int toChunk = collect;
- do {
- ProjectFileChunkResultHandler chunkHandler = new ProjectFileChunkResultHandler();
- List<byte[]> data = null;
- try {
- data = runner.query(connection, ProjectFileChunkResultHandler.SELECT_PROJECT_CHUNKS_FILE, chunkHandler, projectId, version, fromChunk, toChunk);
- }
- catch(SQLException e) {
- logger.error(e);
- IOUtils.closeQuietly(bStream);
- throw new ProjectManagerException("Query for uploaded file for " + projectId + " failed.", e);
- }
-
try {
- for (byte[] d : data) {
- bStream.write(d);
- }
+ file = File.createTempFile(projHandler.getFileName(), String.valueOf(version), tempDir);
+
+ bStream = new BufferedOutputStream(new FileOutputStream(file));
}
catch (IOException e) {
- IOUtils.closeQuietly(bStream);
- throw new ProjectManagerException("Error writing file", e);
+ throw new ProjectManagerException("Error creating temp file for stream.");
}
- // Add all the bytes to the stream.
- fromChunk += collect;
- toChunk += collect;
- } while (fromChunk <= numChunks);
- IOUtils.closeQuietly(bStream);
+ int collect = 5;
+ int fromChunk = 0;
+ int toChunk = collect;
+ do {
+ ProjectFileChunkResultHandler chunkHandler = new ProjectFileChunkResultHandler();
+ List<byte[]> data = null;
+ try {
+ data = runner.query(connection, ProjectFileChunkResultHandler.SELECT_PROJECT_CHUNKS_FILE, chunkHandler, projectId, version, fromChunk, toChunk);
+ }
+ catch(SQLException e) {
+ logger.error(e);
+ throw new ProjectManagerException("Query for uploaded file for " + projectId + " failed.", e);
+ }
+
+ try {
+ for (byte[] d : data) {
+ bStream.write(d);
+ }
+ }
+ catch (IOException e) {
+ throw new ProjectManagerException("Error writing file", e);
+ }
+
+ // Add all the bytes to the stream.
+ fromChunk += collect;
+ toChunk += collect;
+ } while (fromChunk <= numChunks);
+ } finally {
+ IOUtils.closeQuietly(bStream);
+ }
// Check md5.
byte[] md5 = null;
diff --git a/src/main/java/azkaban/scheduler/ScheduleStatisticManager.java b/src/main/java/azkaban/scheduler/ScheduleStatisticManager.java
index d5de1e1..71c6aaf 100644
--- a/src/main/java/azkaban/scheduler/ScheduleStatisticManager.java
+++ b/src/main/java/azkaban/scheduler/ScheduleStatisticManager.java
@@ -113,8 +113,11 @@ public class ScheduleStatisticManager {
File cache = getCacheFile(scheduleId);
cache.createNewFile();
OutputStream output = new FileOutputStream(cache);
- JSONUtils.toJSON(data, output, false);
- output.close();
+ try {
+ JSONUtils.toJSON(data, output, false);
+ } finally {
+ output.close();
+ }
}
} catch (Exception e) {
e.printStackTrace();
src/main/java/azkaban/utils/FileIOUtils.java 39(+23 -16)
diff --git a/src/main/java/azkaban/utils/FileIOUtils.java b/src/main/java/azkaban/utils/FileIOUtils.java
index b9edc56..c3aaee4 100644
--- a/src/main/java/azkaban/utils/FileIOUtils.java
+++ b/src/main/java/azkaban/utils/FileIOUtils.java
@@ -111,27 +111,34 @@ public class FileIOUtils {
//System.out.println(command);
ProcessBuilder builder = new ProcessBuilder().command("sh", "-c", command);
builder.directory(destDir);
-
+
+ // XXX what about stopping threads ??
Process process = builder.start();
- NullLogger errorLogger = new NullLogger(process.getErrorStream());
- NullLogger inputLogger = new NullLogger(process.getInputStream());
- errorLogger.start();
- inputLogger.start();
-
try {
- if (process.waitFor() < 0) {
- // Assume that the error will be in standard out. Otherwise it'll be in standard in.
- String errorMessage = errorLogger.getLastMessages();
- if (errorMessage.isEmpty()) {
- errorMessage = inputLogger.getLastMessages();
+ NullLogger errorLogger = new NullLogger(process.getErrorStream());
+ NullLogger inputLogger = new NullLogger(process.getInputStream());
+ errorLogger.start();
+ inputLogger.start();
+
+ try {
+ if (process.waitFor() < 0) {
+ // Assume that the error will be in standard out. Otherwise it'll be in standard in.
+ String errorMessage = errorLogger.getLastMessages();
+ if (errorMessage.isEmpty()) {
+ errorMessage = inputLogger.getLastMessages();
+ }
+
+ throw new IOException(errorMessage);
}
- throw new IOException(errorMessage);
+ // System.out.println(errorLogger.getLastMessages());
+ } catch (InterruptedException e) {
+ e.printStackTrace();
}
-
- // System.out.println(errorLogger.getLastMessages());
- } catch (InterruptedException e) {
- e.printStackTrace();
+ } finally {
+ IOUtils.closeQuietly(process.getInputStream());
+ IOUtils.closeQuietly(process.getOutputStream());
+ IOUtils.closeQuietly(process.getErrorStream());
}
}
diff --git a/src/main/java/azkaban/utils/JSONUtils.java b/src/main/java/azkaban/utils/JSONUtils.java
index ad78b6c..324d8ef 100644
--- a/src/main/java/azkaban/utils/JSONUtils.java
+++ b/src/main/java/azkaban/utils/JSONUtils.java
@@ -84,8 +84,11 @@ public class JSONUtils {
public static void toJSON(Object obj, File file, boolean prettyPrint) throws IOException {
BufferedOutputStream stream = new BufferedOutputStream(new FileOutputStream(file));
- toJSON(obj, stream, prettyPrint);
- stream.close();
+ try {
+ toJSON(obj, stream, prettyPrint);
+ } finally {
+ stream.close();
+ }
}
public static Object parseJSONFromStringQuiet(String json) {
diff --git a/src/main/java/azkaban/utils/Props.java b/src/main/java/azkaban/utils/Props.java
index eb39dd6..b3dacf8 100644
--- a/src/main/java/azkaban/utils/Props.java
+++ b/src/main/java/azkaban/utils/Props.java
@@ -89,10 +89,10 @@ public class Props {
try {
loadFrom(input);
} catch (IOException e) {
- input.close();
throw e;
+ } finally {
+ input.close();
}
- input.close();
}
/**
src/main/java/azkaban/utils/Utils.java 43(+29 -14)
diff --git a/src/main/java/azkaban/utils/Utils.java b/src/main/java/azkaban/utils/Utils.java
index 44edfaa..bb14d03 100644
--- a/src/main/java/azkaban/utils/Utils.java
+++ b/src/main/java/azkaban/utils/Utils.java
@@ -132,20 +132,26 @@ public class Utils {
public static void zip(File input, File output) throws IOException {
FileOutputStream out = new FileOutputStream(output);
ZipOutputStream zOut = new ZipOutputStream(out);
- zipFile("", input, zOut);
- zOut.close();
+ try {
+ zipFile("", input, zOut);
+ } finally {
+ zOut.close();
+ }
}
public static void zipFolderContent(File folder, File output) throws IOException {
FileOutputStream out = new FileOutputStream(output);
ZipOutputStream zOut = new ZipOutputStream(out);
- File[] files = folder.listFiles();
- if (files != null) {
- for (File f : files) {
- zipFile("", f, zOut);
+ try {
+ File[] files = folder.listFiles();
+ if (files != null) {
+ for (File f : files) {
+ zipFile("", f, zOut);
+ }
}
+ } finally {
+ zOut.close();
}
- zOut.close();
}
private static void zipFile(String path, File input, ZipOutputStream zOut) throws IOException {
@@ -165,8 +171,11 @@ public class Utils {
zOut.putNextEntry(entry);
InputStream fileInputStream = new BufferedInputStream(
new FileInputStream(input));
- IOUtils.copy(fileInputStream, zOut);
- fileInputStream.close();
+ try {
+ IOUtils.copy(fileInputStream, zOut);
+ } finally {
+ fileInputStream.close();
+ }
}
}
@@ -180,11 +189,17 @@ public class Utils {
} else {
newFile.getParentFile().mkdirs();
InputStream src = source.getInputStream(entry);
- OutputStream output = new BufferedOutputStream(
- new FileOutputStream(newFile));
- IOUtils.copy(src, output);
- src.close();
- output.close();
+ try {
+ OutputStream output = new BufferedOutputStream(
+ new FileOutputStream(newFile));
+ try {
+ IOUtils.copy(src, output);
+ } finally {
+ output.close();
+ }
+ } finally {
+ src.close();
+ }
}
}
}
diff --git a/src/main/java/azkaban/webapp/servlet/ScheduleServlet.java b/src/main/java/azkaban/webapp/servlet/ScheduleServlet.java
index 0f8750b..96f9f9b 100644
--- a/src/main/java/azkaban/webapp/servlet/ScheduleServlet.java
+++ b/src/main/java/azkaban/webapp/servlet/ScheduleServlet.java
@@ -480,9 +480,13 @@ public class ScheduleServlet extends LoginAbstractAzkabanServlet {
if (cacheExists) {
// Send the cache instead
InputStream cacheInput = new BufferedInputStream(new FileInputStream(cache));
- IOUtils.copy(cacheInput, resp.getOutputStream());
- // System.out.println("Using cache copy for " + start);
- return;
+ try {
+ IOUtils.copy(cacheInput, resp.getOutputStream());
+ // System.out.println("Using cache copy for " + start);
+ return;
+ } finally {
+ IOUtils.closeQuietly(cacheInput);
+ }
}
}
@@ -518,11 +522,13 @@ public class ScheduleServlet extends LoginAbstractAzkabanServlet {
File cacheTemp = new File(cacheDirFile, startTime + ".tmp");
cacheTemp.createNewFile();
OutputStream cacheOutput = new BufferedOutputStream(new FileOutputStream(cacheTemp));
- OutputStream outputStream = new SplitterOutputStream(cacheOutput, resp.getOutputStream());
- // Write to both the cache file and web output
- JSONUtils.toJSON(ret, outputStream, false);
- cacheOutput.close();
-
+ try {
+ OutputStream outputStream = new SplitterOutputStream(cacheOutput, resp.getOutputStream());
+ // Write to both the cache file and web output
+ JSONUtils.toJSON(ret, outputStream, false);
+ } finally {
+ IOUtils.closeQuietly(cacheOutput);
+ }
//Move cache file
synchronized (this) {
cacheTemp.renameTo(cache);