azkaban-aplcache
Changes
src/java/azkaban/execapp/FlowRunnerManager.java 44(+22 -22)
src/java/azkaban/execapp/JobRunner.java 18(+9 -9)
src/java/azkaban/executor/ExecutorManager.java 60(+30 -30)
Details
diff --git a/src/java/azkaban/execapp/ExecutorServlet.java b/src/java/azkaban/execapp/ExecutorServlet.java
index 60ad9b5..102505b 100644
--- a/src/java/azkaban/execapp/ExecutorServlet.java
+++ b/src/java/azkaban/execapp/ExecutorServlet.java
@@ -214,10 +214,10 @@ public class ExecutorServlet extends HttpServlet implements ConnectorParams {
Map<String, Object> respMap) throws ServletException {
String jobId = getParam(req, "jobId");
- int attempt = getIntParam(req, "attempt", 0);
+ int attempt = getIntParam(req, "attempt", 0);
try {
List<Object> result = flowRunnerManager.readJobAttachment(
- execId, jobId, attempt);
+ execId, jobId, attempt);
respMap.put("attachment", result);
}
catch (Exception e) {
src/java/azkaban/execapp/FlowRunnerManager.java 44(+22 -22)
diff --git a/src/java/azkaban/execapp/FlowRunnerManager.java b/src/java/azkaban/execapp/FlowRunnerManager.java
index 3c19e31..be8a329 100644
--- a/src/java/azkaban/execapp/FlowRunnerManager.java
+++ b/src/java/azkaban/execapp/FlowRunnerManager.java
@@ -581,33 +581,33 @@ public class FlowRunnerManager implements EventListener {
FlowRunner runner = runningFlows.get(execId);
if (runner == null) {
throw new ExecutorManagerException(
- "Running flow " + execId + " not found.");
+ "Running flow " + execId + " not found.");
}
- File dir = runner.getExecutionDir();
- if (dir == null || !dir.exists()) {
- throw new ExecutorManagerException(
- "Error reading file. Log directory doesn't exist.");
- }
+ File dir = runner.getExecutionDir();
+ if (dir == null || !dir.exists()) {
+ throw new ExecutorManagerException(
+ "Error reading file. Log directory doesn't exist.");
+ }
- try {
- synchronized (executionDirDeletionSync) {
- if (!dir.exists()) {
+ try {
+ synchronized (executionDirDeletionSync) {
+ if (!dir.exists()) {
throw new ExecutorManagerException(
- "Execution dir file doesn't exist. Probably has beend deleted");
- }
+ "Execution dir file doesn't exist. Probably has beend deleted");
+ }
- File attachmentFile = runner.getJobAttachmentFile(jobId, attempt);
- if (attachmentFile == null || !attachmentFile.exists()) {
- throw new ExecutorManagerException(
- "Job attachment file doesn't exist.");
- }
- return (ArrayList<Object>) JSONUtils.parseJSONFromFile(attachmentFile);
- }
- }
- catch (IOException e) {
- throw new ExecutorManagerException(e);
- }
+ File attachmentFile = runner.getJobAttachmentFile(jobId, attempt);
+ if (attachmentFile == null || !attachmentFile.exists()) {
+ throw new ExecutorManagerException(
+ "Job attachment file doesn't exist.");
+ }
+ return (ArrayList<Object>) JSONUtils.parseJSONFromFile(attachmentFile);
+ }
+ }
+ catch (IOException e) {
+ throw new ExecutorManagerException(e);
+ }
}
public JobMetaData readJobMetaData(int execId, String jobId, int attempt, int startByte, int length) throws ExecutorManagerException {
src/java/azkaban/execapp/JobRunner.java 18(+9 -9)
diff --git a/src/java/azkaban/execapp/JobRunner.java b/src/java/azkaban/execapp/JobRunner.java
index ff0c707..6dc3d61 100644
--- a/src/java/azkaban/execapp/JobRunner.java
+++ b/src/java/azkaban/execapp/JobRunner.java
@@ -589,23 +589,23 @@ public class JobRunner extends EventHandler implements Runnable {
}
public static String createAttachmentFileName(
- int executionId, String jobId, int attempt) {
+ int executionId, String jobId, int attempt) {
return attempt > 0
- ? "_job." + executionId + "." + attempt + "." + jobId + ".attach"
- : "_job." + executionId + "." + jobId + ".attach";
+ ? "_job." + executionId + "." + attempt + "." + jobId + ".attach"
+ : "_job." + executionId + "." + jobId + ".attach";
}
public static String createLogFileName(
- int executionId, String jobId, int attempt) {
+ int executionId, String jobId, int attempt) {
return attempt > 0
- ? "_job." + executionId + "." + attempt + "." + jobId + ".log"
- : "_job." + executionId + "." + jobId + ".log";
+ ? "_job." + executionId + "." + attempt + "." + jobId + ".log"
+ : "_job." + executionId + "." + jobId + ".log";
}
public static String createMetaDataFileName(
- int executionId, String jobId, int attempt) {
+ int executionId, String jobId, int attempt) {
return attempt > 0
- ? "_job." + executionId + "." + attempt + "." + jobId + ".meta"
- : "_job." + executionId + "." + jobId + ".meta";
+ ? "_job." + executionId + "." + attempt + "." + jobId + ".meta"
+ : "_job." + executionId + "." + jobId + ".meta";
}
}
src/java/azkaban/executor/ExecutorManager.java 60(+30 -30)
diff --git a/src/java/azkaban/executor/ExecutorManager.java b/src/java/azkaban/executor/ExecutorManager.java
index 37b858e..ddf2b23 100644
--- a/src/java/azkaban/executor/ExecutorManager.java
+++ b/src/java/azkaban/executor/ExecutorManager.java
@@ -201,41 +201,41 @@ public class ExecutorManager extends EventHandler implements ExecutorManagerAdap
@Override
public List<ExecutableFlow> getExecutableFlows(
- Project project, String flowId, int skip, int size)
- throws ExecutorManagerException {
+ Project project, String flowId, int skip, int size)
+ throws ExecutorManagerException {
List<ExecutableFlow> flows = executorLoader.fetchFlowHistory(
- project.getId(), flowId, skip, size);
+ project.getId(), flowId, skip, size);
return flows;
}
@Override
public List<ExecutableFlow> getExecutableFlows(int skip, int size)
- throws ExecutorManagerException {
+ throws ExecutorManagerException {
List<ExecutableFlow> flows = executorLoader.fetchFlowHistory(skip, size);
return flows;
}
@Override
public List<ExecutableFlow> getExecutableFlows(
- String flowIdContains, int skip, int size)
- throws ExecutorManagerException {
+ String flowIdContains, int skip, int size)
+ throws ExecutorManagerException {
List<ExecutableFlow> flows = executorLoader.fetchFlowHistory(
- null, '%'+flowIdContains+'%', null, 0, -1, -1 , skip, size);
+ null, '%'+flowIdContains+'%', null, 0, -1, -1 , skip, size);
return flows;
}
@Override
public List<ExecutableFlow> getExecutableFlows(
- String projContain,
- String flowContain,
- String userContain,
- int status,
- long begin,
- long end,
- int skip,
- int size) throws ExecutorManagerException {
+ String projContain,
+ String flowContain,
+ String userContain,
+ int status,
+ long begin,
+ long end,
+ int skip,
+ int size) throws ExecutorManagerException {
List<ExecutableFlow> flows = executorLoader.fetchFlowHistory(
- projContain, flowContain, userContain, status, begin, end , skip, size);
+ projContain, flowContain, userContain, status, begin, end , skip, size);
return flows;
}
@@ -306,24 +306,24 @@ public class ExecutorManager extends EventHandler implements ExecutorManagerAdap
@Override
public List<Object> getExecutionJobStats(
- ExecutableFlow exFlow, String jobId, int attempt)
+ ExecutableFlow exFlow, String jobId, int attempt)
throws ExecutorManagerException {
Pair<ExecutionReference, ExecutableFlow> pair =
runningFlows.get(exFlow.getExecutionId());
if (pair == null) {
return executorLoader.fetchAttachment(
- exFlow.getExecutionId(), jobId, attempt);
+ exFlow.getExecutionId(), jobId, attempt);
}
Pair<String, String> jobIdParam = new Pair<String, String>("jobId", jobId);
- Pair<String,String> attemptParam = new Pair<String,String>("attempt", String.valueOf(attempt));
+ Pair<String,String> attemptParam = new Pair<String,String>("attempt", String.valueOf(attempt));
@SuppressWarnings("unchecked")
Map<String, Object> result = callExecutorServer(
pair.getFirst(),
ConnectorParams.ATTACHMENT_ACTION,
jobIdParam,
- attemptParam);
+ attemptParam);
return (List<Object>) result.get("attachment");
}
@@ -527,7 +527,7 @@ public class ExecutorManager extends EventHandler implements ExecutorManagerAdap
ExecutionReference reference = new ExecutionReference(exflow.getExecutionId(), executorHost, executorPort);
executorLoader.addActiveExecutableReference(reference);
try {
- callExecutorServer(reference, ConnectorParams.EXECUTE_ACTION);
+ callExecutorServer(reference, ConnectorParams.EXECUTE_ACTION);
runningFlows.put(exflow.getExecutionId(), new Pair<ExecutionReference, ExecutableFlow>(reference, exflow));
message += "Execution submitted successfully with exec id " + exflow.getExecutionId();
@@ -1149,23 +1149,23 @@ public class ExecutorManager extends EventHandler implements ExecutorManagerAdap
@Override
public int getExecutableFlows(
- int projectId,
- String flowId,
- int from,
- int length,
- List<ExecutableFlow> outputList) throws ExecutorManagerException {
+ int projectId,
+ String flowId,
+ int from,
+ int length,
+ List<ExecutableFlow> outputList) throws ExecutorManagerException {
List<ExecutableFlow> flows = executorLoader.fetchFlowHistory(
- projectId, flowId, from, length);
+ projectId, flowId, from, length);
outputList.addAll(flows);
return executorLoader.fetchNumExecutableFlows(projectId, flowId);
}
@Override
public List<ExecutableFlow> getExecutableFlows(
- int projectId, String flowId, int from, int length, Status status)
- throws ExecutorManagerException {
+ int projectId, String flowId, int from, int length, Status status)
+ throws ExecutorManagerException {
return executorLoader.fetchFlowHistory(
- projectId, flowId, from, length, status);
+ projectId, flowId, from, length, status);
}
/*
diff --git a/src/java/azkaban/executor/JdbcExecutorLoader.java b/src/java/azkaban/executor/JdbcExecutorLoader.java
index 0a3d0b5..988d64e 100644
--- a/src/java/azkaban/executor/JdbcExecutorLoader.java
+++ b/src/java/azkaban/executor/JdbcExecutorLoader.java
@@ -721,13 +721,13 @@ public class JdbcExecutorLoader extends AbstractJdbcLoader
jobId);
return (List<Object>) JSONUtils.parseJSONFromString(attachment);
}
- catch (IOException e) {
+ catch (IOException e) {
throw new ExecutorManagerException(
- "Error converting job attachment to JSON " + jobId, e);
- }
+ "Error converting job attachment to JSON " + jobId, e);
+ }
catch (SQLException e) {
throw new ExecutorManagerException(
- "Error query job attachment " + jobId, e);
+ "Error query job attachment " + jobId, e);
}
}
diff --git a/src/java/azkaban/jobtype/JobTypeManager.java b/src/java/azkaban/jobtype/JobTypeManager.java
index 1ef57ef..a5c2064 100644
--- a/src/java/azkaban/jobtype/JobTypeManager.java
+++ b/src/java/azkaban/jobtype/JobTypeManager.java
@@ -338,15 +338,16 @@ public class JobTypeManager
}
- public Job buildJobExecutor(String jobId, Props jobProps, Logger logger) throws JobTypeManagerException
- {
+ public Job buildJobExecutor(String jobId, Props jobProps, Logger logger)
+ throws JobTypeManagerException {
Job job = null;
try {
String jobType = jobProps.getString("type");
if (jobType == null || jobType.length() == 0) {
/*throw an exception when job name is null or empty*/
- throw new JobExecutionException (
- String.format("The 'type' parameter for job[%s] is null or empty", jobProps, logger));
+ throw new JobExecutionException(
+ String.format("The 'type' parameter for job[%s] is null or empty",
+ jobProps, logger));
}
logger.info("Building " + jobType + " job executor. ");
@@ -361,10 +362,9 @@ public class JobTypeManager
Props sysConf = jobtypeSysProps.get(jobType);
Props jobConf = jobProps;
- if(jobtypeJobProps.containsKey(jobType)) {
+ if (jobtypeJobProps.containsKey(jobType)) {
Props p = jobtypeJobProps.get(jobType);
- for(String k : p.getKeySet())
- {
+ for (String k : p.getKeySet()) {
if(!jobConf.containsKey(k)) {
jobConf.put(k, p.get(k));
}
@@ -379,7 +379,6 @@ public class JobTypeManager
sysConf = new Props();
}
-
// logger.info("sysConf is " + sysConf);
// logger.info("jobConf is " + jobConf);
//
diff --git a/src/java/azkaban/webapp/servlet/ExecutorServlet.java b/src/java/azkaban/webapp/servlet/ExecutorServlet.java
index f330d53..168ca49 100644
--- a/src/java/azkaban/webapp/servlet/ExecutorServlet.java
+++ b/src/java/azkaban/webapp/servlet/ExecutorServlet.java
@@ -128,9 +128,9 @@ public class ExecutorServlet extends LoginAbstractAzkabanServlet {
else if (ajaxName.equals("fetchExecJobLogs")) {
ajaxFetchJobLogs(req, resp, ret, session.getUser(), exFlow);
}
- else if (ajaxName.equals("fetchExecJobStats")) {
- ajaxFetchJobStats(req, resp, ret, session.getUser(), exFlow);
- }
+ else if (ajaxName.equals("fetchExecJobStats")) {
+ ajaxFetchJobStats(req, resp, ret, session.getUser(), exFlow);
+ }
else if (ajaxName.equals("retryFailedJobs")) {
ajaxRestartFailed(req, resp, ret, session.getUser(), exFlow);
}
@@ -451,14 +451,14 @@ public class ExecutorServlet extends LoginAbstractAzkabanServlet {
}
}
- private void ajaxFetchJobStats(
- HttpServletRequest req,
- HttpServletResponse resp,
- HashMap<String, Object> ret,
- User user,
- ExecutableFlow exFlow) throws ServletException {
+ private void ajaxFetchJobStats(
+ HttpServletRequest req,
+ HttpServletResponse resp,
+ HashMap<String, Object> ret,
+ User user,
+ ExecutableFlow exFlow) throws ServletException {
Project project = getProjectAjaxByPermission(
- ret, exFlow.getProjectId(), user, Type.READ);
+ ret, exFlow.getProjectId(), user, Type.READ);
if (project == null) {
return;
}
@@ -469,20 +469,20 @@ public class ExecutorServlet extends LoginAbstractAzkabanServlet {
ExecutableNode node = exFlow.getExecutableNode(jobId);
if (node == null) {
ret.put("error", "Job " + jobId + " doesn't exist in " +
- exFlow.getExecutionId());
+ exFlow.getExecutionId());
return;
}
int attempt = this.getIntParam(req, "attempt", node.getAttempt());
List<Object> jsonObj = executorManager.getExecutionJobStats(
- exFlow, jobId, attempt);
- ret.put("jobStats", jsonObj);
- }
+ exFlow, jobId, attempt);
+ ret.put("jobStats", jsonObj);
+ }
catch (ExecutorManagerException e) {
ret.put("error", "Error retrieving stats for job " + jobId);
return;
}
- }
+ }
private void ajaxFetchFlowInfo(HttpServletRequest req, HttpServletResponse resp, HashMap<String, Object> ret, User user, String projectName, String flowId) throws ServletException {
Project project = getProjectAjaxByPermission(ret, projectName, user, Type.READ);
@@ -710,7 +710,7 @@ public class ExecutorServlet extends LoginAbstractAzkabanServlet {
Map<String, Object> map = getExecutableFlowUpdateInfo(exFlow, lastUpdateTime);
map.put("status", exFlow.getStatus());
map.put("startTime", exFlow.getStartTime());
- map.put("endTime", exFlow.getEndTime());
+ map.put("endTime", exFlow.getEndTime());
map.put("updateTime", exFlow.getUpdateTime());
ret.putAll(map);
}