azkaban-aplcache
Changes
src/java/azkaban/execapp/JobRunner.java 61(+33 -28)
Details
src/java/azkaban/execapp/JobRunner.java 61(+33 -28)
diff --git a/src/java/azkaban/execapp/JobRunner.java b/src/java/azkaban/execapp/JobRunner.java
index 6dc3d61..1e18b32 100644
--- a/src/java/azkaban/execapp/JobRunner.java
+++ b/src/java/azkaban/execapp/JobRunner.java
@@ -222,7 +222,8 @@ public class JobRunner extends EventHandler implements Runnable {
jobAppender = fileAppender;
logger.addAppender(jobAppender);
logger.setAdditivity(false);
- } catch (IOException e) {
+ }
+ catch (IOException e) {
flowLogger.error("Could not open log file in " + workingDir + " for job " + this.jobId, e);
}
}
@@ -239,7 +240,8 @@ public class JobRunner extends EventHandler implements Runnable {
try {
node.setUpdateTime(System.currentTimeMillis());
loader.updateExecutableNode(node);
- } catch (ExecutorManagerException e) {
+ }
+ catch (ExecutorManagerException e) {
flowLogger.error("Could not update job properties in db for " + this.jobId, e);
}
}
@@ -301,7 +303,7 @@ public class JobRunner extends EventHandler implements Runnable {
if (!blockingStatus.isEmpty()) {
logger.info("Pipeline job " + this.jobId + " waiting on " + blockedList + " in execution " + watcher.getExecId());
- for(BlockingStatus bStatus: blockingStatus) {
+ for (BlockingStatus bStatus: blockingStatus) {
logger.info("Waiting on pipelined job " + bStatus.getJobId());
currentBlockStatus = bStatus;
bStatus.blockOnFinishedStatus();
@@ -328,11 +330,12 @@ public class JobRunner extends EventHandler implements Runnable {
long currentTime = System.currentTimeMillis();
if (delayStartMs > 0) {
logger.info("Delaying start of execution for " + delayStartMs + " milliseconds.");
- synchronized(this) {
+ synchronized (this) {
try {
this.wait(delayStartMs);
logger.info("Execution has been delayed for " + delayStartMs + " ms. Continuing with execution.");
- } catch (InterruptedException e) {
+ }
+ catch (InterruptedException e) {
logger.error("Job " + this.jobId + " was to be delayed for " + delayStartMs + ". Interrupted after " + (System.currentTimeMillis() - currentTime));
}
}
@@ -348,26 +351,25 @@ public class JobRunner extends EventHandler implements Runnable {
private void finalizeLogFile() {
closeLogger();
+ if (logFile == null) {
+ flowLogger.info("Log file for job " + this.jobId + " is null");
+ return;
+ }
- if (logFile != null) {
- try {
- File[] files = logFile.getParentFile().listFiles(new FilenameFilter() {
-
- @Override
- public boolean accept(File dir, String name) {
- return name.startsWith(logFile.getName());
- }
- }
- );
- Arrays.sort(files, Collections.reverseOrder());
+ try {
+ File[] files = logFile.getParentFile().listFiles(new FilenameFilter() {
- loader.uploadLogFile(executionId, this.node.getNestedId(), node.getAttempt(), files);
- } catch (ExecutorManagerException e) {
- flowLogger.error("Error writing out logs for job " + this.node.getNestedId(), e);
- }
+ @Override
+ public boolean accept(File dir, String name) {
+ return name.startsWith(logFile.getName());
+ }
+ });
+ Arrays.sort(files, Collections.reverseOrder());
+
+ loader.uploadLogFile(executionId, this.node.getNestedId(), node.getAttempt(), files);
}
- else {
- flowLogger.info("Log file for job " + this.jobId + " is null");
+ catch (ExecutorManagerException e) {
+ flowLogger.error("Error writing out logs for job " + this.node.getNestedId(), e);
}
}
@@ -432,7 +434,7 @@ public class JobRunner extends EventHandler implements Runnable {
return false;
}
- synchronized(syncObject) {
+ synchronized (syncObject) {
if (node.getStatus() == Status.FAILED || cancelled) {
return false;
}
@@ -451,7 +453,8 @@ public class JobRunner extends EventHandler implements Runnable {
}
props.put(CommonJobProperties.JOB_ATTEMPT, node.getAttempt());
- props.put(CommonJobProperties.JOB_METADATA_FILE, createMetaDataFileName(executionId, this.jobId, node.getAttempt()));
+ props.put(CommonJobProperties.JOB_METADATA_FILE,
+ createMetaDataFileName(executionId, this.jobId, node.getAttempt()));
changeStatus(Status.RUNNING);
// Ability to specify working directory
@@ -459,9 +462,9 @@ public class JobRunner extends EventHandler implements Runnable {
props.put(AbstractProcessJob.WORKING_DIR, workingDir.getAbsolutePath());
}
- if(props.containsKey("user.to.proxy")) {
+ if (props.containsKey("user.to.proxy")) {
String jobProxyUser = props.getString("user.to.proxy");
- if(proxyUsers != null && !proxyUsers.contains(jobProxyUser)) {
+ if (proxyUsers != null && !proxyUsers.contains(jobProxyUser)) {
logger.error("User " + jobProxyUser + " has no permission to execute this job " + this.jobId + "!");
return false;
}
@@ -482,7 +485,8 @@ public class JobRunner extends EventHandler implements Runnable {
private void runJob() {
try {
job.run();
- } catch (Exception e) {
+ }
+ catch (Exception e) {
e.printStackTrace();
if (props.getBoolean("job.succeed.on.failure", false)) {
@@ -549,7 +553,8 @@ public class JobRunner extends EventHandler implements Runnable {
try {
job.cancel();
- } catch (Exception e) {
+ }
+ catch (Exception e) {
logError(e.getMessage());
logError("Failed trying to cancel job. Maybe it hasn't started running yet or just finished.");
}