azkaban-developers
Changes
src/java/azkaban/execapp/FlowRunner.java 13(+13 -0)
src/java/azkaban/execapp/JobRunner.java 121(+86 -35)
src/java/azkaban/executor/ExecutorManager.java 94(+64 -30)
src/java/azkaban/jobtype/JobTypeManager.java 22(+11 -11)
src/less/tables.less 18(+18 -0)
Details
diff --git a/src/java/azkaban/execapp/AzkabanExecutorServer.java b/src/java/azkaban/execapp/AzkabanExecutorServer.java
index 50be4f6..a6ee307 100644
--- a/src/java/azkaban/execapp/AzkabanExecutorServer.java
+++ b/src/java/azkaban/execapp/AzkabanExecutorServer.java
@@ -110,12 +110,6 @@ public class AzkabanExecutorServer {
configureMBeanServer();
- File statsDir = new File(props.getString("azkaban.stats.dir", "stats"));
- if (!statsDir.exists()) {
- statsDir.mkdir();
- }
- props.put("azkaban.stats.dir", statsDir.getCanonicalPath());
-
try {
server.start();
}
diff --git a/src/java/azkaban/execapp/ExecutorServlet.java b/src/java/azkaban/execapp/ExecutorServlet.java
index 94fb4c2..75a1a08 100644
--- a/src/java/azkaban/execapp/ExecutorServlet.java
+++ b/src/java/azkaban/execapp/ExecutorServlet.java
@@ -20,6 +20,7 @@ import java.io.IOException;
import java.io.OutputStream;
import java.util.ArrayList;
import java.util.HashMap;
+import java.util.List;
import java.util.Map;
import javax.servlet.ServletConfig;
@@ -47,7 +48,6 @@ public class ExecutorServlet extends HttpServlet implements ConnectorParams {
private AzkabanExecutorServer application;
private FlowRunnerManager flowRunnerManager;
-
public ExecutorServlet() {
super();
}
@@ -63,7 +63,6 @@ public class ExecutorServlet extends HttpServlet implements ConnectorParams {
flowRunnerManager = application.getFlowRunnerManager();
}
-
protected void writeJSON(HttpServletResponse resp, Object obj) throws IOException {
resp.setContentType(JSON_MIME_TYPE);
ObjectMapper mapper = new ObjectMapper();
@@ -100,6 +99,9 @@ public class ExecutorServlet extends HttpServlet implements ConnectorParams {
else if (action.equals(LOG_ACTION)) {
handleFetchLogEvent(execid, req, resp, respMap);
}
+ else if (action.equals(ATTACHMENTS_ACTION)) {
+ handleFetchAttachmentsEvent(execid, req, resp, respMap);
+ }
else if (action.equals(EXECUTE_ACTION)) {
handleAjaxExecute(req, respMap, execid);
}
@@ -170,7 +172,11 @@ public class ExecutorServlet extends HttpServlet implements ConnectorParams {
}
}
- private void handleFetchLogEvent(int execId, HttpServletRequest req, HttpServletResponse resp, Map<String, Object> respMap) throws ServletException {
+ private void handleFetchLogEvent(
+ int execId,
+ HttpServletRequest req,
+ HttpServletResponse resp,
+ Map<String, Object> respMap) throws ServletException {
String type = getParam(req, "type");
int startByte = getIntParam(req, "offset");
int length = getIntParam(req, "length");
@@ -200,6 +206,25 @@ public class ExecutorServlet extends HttpServlet implements ConnectorParams {
}
}
}
+
+ private void handleFetchAttachmentsEvent(
+ int execId,
+ HttpServletRequest req,
+ HttpServletResponse resp,
+ Map<String, Object> respMap) throws ServletException {
+
+ String jobId = getParam(req, "jobId");
+ int attempt = getIntParam(req, "attempt", 0);
+ try {
+ List<Object> result = flowRunnerManager.readJobAttachments(
+ execId, jobId, attempt);
+ respMap.put("attachments", result);
+ }
+ catch (Exception e) {
+ logger.error(e);
+ respMap.put("error", e.getMessage());
+ }
+ }
private void handleFetchMetaDataEvent(int execId, HttpServletRequest req, HttpServletResponse resp, Map<String, Object> respMap) throws ServletException {
int startByte = getIntParam(req, "offset");
@@ -217,7 +242,6 @@ public class ExecutorServlet extends HttpServlet implements ConnectorParams {
logger.error(e);
respMap.put("error", e.getMessage());
}
-
}
@SuppressWarnings("unchecked")
src/java/azkaban/execapp/FlowRunner.java 13(+13 -0)
diff --git a/src/java/azkaban/execapp/FlowRunner.java b/src/java/azkaban/execapp/FlowRunner.java
index 5f38882..db33b46 100644
--- a/src/java/azkaban/execapp/FlowRunner.java
+++ b/src/java/azkaban/execapp/FlowRunner.java
@@ -838,6 +838,19 @@ public class FlowRunner extends EventHandler implements Runnable {
return logFile;
}
+
+ public File getJobAttachmentFile(String jobId, int attempt) {
+ ExecutableNode node = flow.getExecutableNode(jobId);
+ File path = new File(execDir, node.getJobSource());
+
+ String attachmentFileName =
+ JobRunner.createAttachmentFileName(execId, jobId, attempt);
+ File attachmentFile = new File(path.getParentFile(), attachmentFileName);
+ if (!attachmentFile.exists()) {
+ return null;
+ }
+ return attachmentFile;
+ }
public File getJobMetaDataFile(String jobId, int attempt) {
ExecutableNode node = flow.getExecutableNode(jobId);
diff --git a/src/java/azkaban/execapp/FlowRunnerManager.java b/src/java/azkaban/execapp/FlowRunnerManager.java
index ecc0847..17d3ae3 100644
--- a/src/java/azkaban/execapp/FlowRunnerManager.java
+++ b/src/java/azkaban/execapp/FlowRunnerManager.java
@@ -25,6 +25,7 @@ import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
+import java.util.List;
import java.util.Map;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ConcurrentHashMap;
@@ -50,6 +51,7 @@ import azkaban.jobtype.JobTypeManager;
import azkaban.utils.FileIOUtils;
import azkaban.utils.FileIOUtils.JobMetaData;
import azkaban.utils.FileIOUtils.LogData;
+import azkaban.utils.JSONUtils;
import azkaban.utils.Pair;
import azkaban.utils.Props;
@@ -137,7 +139,6 @@ public class FlowRunnerManager implements EventListener {
cleanerThread.start();
jobtypeManager = new JobTypeManager(props.getString(AzkabanExecutorServer.JOBTYPE_PLUGIN_DIR, JobTypeManager.DEFAULT_JOBTYPEPLUGINDIR), parentClassLoader);
-
}
private Map<Pair<Integer, Integer>, ProjectVersion> loadExistingProjects() {
@@ -555,7 +556,7 @@ public class FlowRunnerManager implements EventListener {
File dir = runner.getExecutionDir();
if (dir != null && dir.exists()) {
try {
- synchronized(executionDirDeletionSync) {
+ synchronized (executionDirDeletionSync) {
if (!dir.exists()) {
throw new ExecutorManagerException("Execution dir file doesn't exist. Probably has beend deleted");
}
@@ -574,6 +575,39 @@ public class FlowRunnerManager implements EventListener {
throw new ExecutorManagerException("Error reading file. Log directory doesn't exist.");
}
+
+ public List<Object> readJobAttachments(int execId, String jobId, int attempt)
+ throws ExecutorManagerException {
+ FlowRunner runner = runningFlows.get(execId);
+ if (runner == null) {
+ throw new ExecutorManagerException(
+ "Running flow " + execId + " not found.");
+ }
+
+ File dir = runner.getExecutionDir();
+ if (dir == null || !dir.exists()) {
+ throw new ExecutorManagerException(
+ "Error reading file. Log directory doesn't exist.");
+ }
+
+ try {
+ synchronized (executionDirDeletionSync) {
+ if (!dir.exists()) {
+ throw new ExecutorManagerException(
+ "Execution dir file doesn't exist. Probably has beend deleted");
+ }
+
+ File attachmentFile = runner.getJobAttachmentFile(jobId, attempt);
+ if (attachmentFile == null || !attachmentFile.exists()) {
+ return null;
+ }
+ return (ArrayList<Object>) JSONUtils.parseJSONFromFile(attachmentFile);
+ }
+ }
+ catch (IOException e) {
+ throw new ExecutorManagerException(e);
+ }
+ }
public JobMetaData readJobMetaData(int execId, String jobId, int attempt, int startByte, int length) throws ExecutorManagerException {
FlowRunner runner = runningFlows.get(execId);
src/java/azkaban/execapp/JobRunner.java 121(+86 -35)
diff --git a/src/java/azkaban/execapp/JobRunner.java b/src/java/azkaban/execapp/JobRunner.java
index 99bd8ab..211507f 100644
--- a/src/java/azkaban/execapp/JobRunner.java
+++ b/src/java/azkaban/execapp/JobRunner.java
@@ -65,6 +65,7 @@ public class JobRunner extends EventHandler implements Runnable {
private Appender jobAppender;
private File logFile;
+ private String attachmentFileName;
private Job job;
private int executionId = -1;
@@ -222,12 +223,20 @@ public class JobRunner extends EventHandler implements Runnable {
jobAppender = fileAppender;
logger.addAppender(jobAppender);
logger.setAdditivity(false);
- } catch (IOException e) {
+ }
+ catch (IOException e) {
flowLogger.error("Could not open log file in " + workingDir + " for job " + this.jobId, e);
}
}
}
+ private void createAttachmentFile() {
+ String fileName = createAttachmentFileName(
+ this.executionId, this.jobId, node.getAttempt());
+ File file = new File(workingDir, fileName);
+ attachmentFileName = file.getAbsolutePath();
+ }
+
private void closeLogger() {
if (jobAppender != null) {
logger.removeAppender(jobAppender);
@@ -239,7 +248,8 @@ public class JobRunner extends EventHandler implements Runnable {
try {
node.setUpdateTime(System.currentTimeMillis());
loader.updateExecutableNode(node);
- } catch (ExecutorManagerException e) {
+ }
+ catch (ExecutorManagerException e) {
flowLogger.error("Could not update job properties in db for " + this.jobId, e);
}
}
@@ -301,7 +311,7 @@ public class JobRunner extends EventHandler implements Runnable {
if (!blockingStatus.isEmpty()) {
logger.info("Pipeline job " + this.jobId + " waiting on " + blockedList + " in execution " + watcher.getExecId());
- for(BlockingStatus bStatus: blockingStatus) {
+ for (BlockingStatus bStatus: blockingStatus) {
logger.info("Waiting on pipelined job " + bStatus.getJobId());
currentBlockStatus = bStatus;
bStatus.blockOnFinishedStatus();
@@ -328,11 +338,12 @@ public class JobRunner extends EventHandler implements Runnable {
long currentTime = System.currentTimeMillis();
if (delayStartMs > 0) {
logger.info("Delaying start of execution for " + delayStartMs + " milliseconds.");
- synchronized(this) {
+ synchronized (this) {
try {
this.wait(delayStartMs);
logger.info("Execution has been delayed for " + delayStartMs + " ms. Continuing with execution.");
- } catch (InterruptedException e) {
+ }
+ catch (InterruptedException e) {
logger.error("Job " + this.jobId + " was to be delayed for " + delayStartMs + ". Interrupted after " + (System.currentTimeMillis() - currentTime));
}
}
@@ -348,26 +359,45 @@ public class JobRunner extends EventHandler implements Runnable {
private void finalizeLogFile() {
closeLogger();
+ if (logFile == null) {
+ flowLogger.info("Log file for job " + this.jobId + " is null");
+ return;
+ }
- if (logFile != null) {
- try {
- File[] files = logFile.getParentFile().listFiles(new FilenameFilter() {
-
- @Override
- public boolean accept(File dir, String name) {
- return name.startsWith(logFile.getName());
- }
- }
- );
- Arrays.sort(files, Collections.reverseOrder());
-
- loader.uploadLogFile(executionId, this.node.getNestedId(), node.getAttempt(), files);
- } catch (ExecutorManagerException e) {
- flowLogger.error("Error writing out logs for job " + this.node.getNestedId(), e);
+ try {
+ File[] files = logFile.getParentFile().listFiles(new FilenameFilter() {
+ @Override
+ public boolean accept(File dir, String name) {
+ return name.startsWith(logFile.getName());
+ }
+ });
+ Arrays.sort(files, Collections.reverseOrder());
+
+ loader.uploadLogFile(executionId, this.node.getNestedId(), node.getAttempt(), files);
+ }
+ catch (ExecutorManagerException e) {
+ flowLogger.error("Error writing out logs for job " + this.node.getNestedId(), e);
+ }
+ }
+
+ private void finalizeAttachmentFile() {
+ if (attachmentFileName == null) {
+ flowLogger.info("Attachment file for job " + this.jobId + " is null");
+ return;
+ }
+
+ try {
+ File file = new File(attachmentFileName);
+ if (!file.exists()) {
+ flowLogger.info("No attachment file for job " + this.jobId +
+ " written.");
+ return;
}
+ loader.uploadAttachmentFile(node, file);
}
- else {
- flowLogger.info("Log file for job " + this.jobId + " is null");
+ catch (ExecutorManagerException e) {
+ flowLogger.error("Error writing out attachment for job " +
+ this.node.getNestedId(), e);
}
}
@@ -384,12 +414,14 @@ public class JobRunner extends EventHandler implements Runnable {
return;
}
+ createAttachmentFile();
createLogger();
boolean errorFound = false;
// Delay execution if necessary. Will return a true if something went wrong.
errorFound |= delayExecution();
- // For pipelining of jobs. Will watch other jobs. Will return true if something went wrong.
+ // For pipelining of jobs. Will watch other jobs. Will return true if
+ // something went wrong.
errorFound |= blockOnPipeLine();
// Start the node.
@@ -398,7 +430,8 @@ public class JobRunner extends EventHandler implements Runnable {
fireEvent(Event.create(this, Type.JOB_STARTED, null, false));
try {
loader.uploadExecutableNode(node, props);
- } catch (ExecutorManagerException e1) {
+ }
+ catch (ExecutorManagerException e1) {
logger.error("Error writing initial node properties");
}
@@ -423,6 +456,7 @@ public class JobRunner extends EventHandler implements Runnable {
fireEvent(Event.create(this, Type.JOB_FINISHED), false);
finalizeLogFile();
+ finalizeAttachmentFile();
}
private boolean prepareJob() throws RuntimeException {
@@ -432,7 +466,7 @@ public class JobRunner extends EventHandler implements Runnable {
return false;
}
- synchronized(syncObject) {
+ synchronized (syncObject) {
if (node.getStatus() == Status.FAILED || cancelled) {
return false;
}
@@ -451,7 +485,9 @@ public class JobRunner extends EventHandler implements Runnable {
}
props.put(CommonJobProperties.JOB_ATTEMPT, node.getAttempt());
- props.put(CommonJobProperties.JOB_METADATA_FILE, createMetaDataFileName(executionId, this.jobId, node.getAttempt()));
+ props.put(CommonJobProperties.JOB_METADATA_FILE,
+ createMetaDataFileName(executionId, this.jobId, node.getAttempt()));
+ props.put(CommonJobProperties.JOB_ATTACHMENT_FILE, attachmentFileName);
changeStatus(Status.RUNNING);
// Ability to specify working directory
@@ -459,9 +495,9 @@ public class JobRunner extends EventHandler implements Runnable {
props.put(AbstractProcessJob.WORKING_DIR, workingDir.getAbsolutePath());
}
- if(props.containsKey("user.to.proxy")) {
+ if (props.containsKey("user.to.proxy")) {
String jobProxyUser = props.getString("user.to.proxy");
- if(proxyUsers != null && !proxyUsers.contains(jobProxyUser)) {
+ if (proxyUsers != null && !proxyUsers.contains(jobProxyUser)) {
logger.error("User " + jobProxyUser + " has no permission to execute this job " + this.jobId + "!");
return false;
}
@@ -482,7 +518,8 @@ public class JobRunner extends EventHandler implements Runnable {
private void runJob() {
try {
job.run();
- } catch (Exception e) {
+ }
+ catch (Exception e) {
e.printStackTrace();
if (props.getBoolean("job.succeed.on.failure", false)) {
@@ -549,7 +586,8 @@ public class JobRunner extends EventHandler implements Runnable {
try {
job.cancel();
- } catch (Exception e) {
+ }
+ catch (Exception e) {
logError(e.getMessage());
logError("Failed trying to cancel job. Maybe it hasn't started running yet or just finished.");
}
@@ -588,11 +626,24 @@ public class JobRunner extends EventHandler implements Runnable {
return props.getLong("retry.backoff", 0);
}
- public static String createLogFileName(int executionId, String jobId, int attempt) {
- return attempt > 0 ? "_job." + executionId + "." + attempt + "." + jobId + ".log" : "_job." + executionId + "." + jobId + ".log";
+ public static String createAttachmentFileName(
+ int executionId, String jobId, int attempt) {
+ return attempt > 0
+ ? "_job." + executionId + "." + attempt + "." + jobId + ".attach"
+ : "_job." + executionId + "." + jobId + ".attach";
}
-
- public static String createMetaDataFileName(int executionId, String jobId, int attempt) {
- return attempt > 0 ? "_job." + executionId + "." + attempt + "." + jobId + ".meta" : "_job." + executionId + "." + jobId + ".meta";
+
+ public static String createLogFileName(
+ int executionId, String jobId, int attempt) {
+ return attempt > 0
+ ? "_job." + executionId + "." + attempt + "." + jobId + ".log"
+ : "_job." + executionId + "." + jobId + ".log";
+ }
+
+ public static String createMetaDataFileName(
+ int executionId, String jobId, int attempt) {
+ return attempt > 0
+ ? "_job." + executionId + "." + attempt + "." + jobId + ".meta"
+ : "_job." + executionId + "." + jobId + ".meta";
}
}
diff --git a/src/java/azkaban/executor/ConnectorParams.java b/src/java/azkaban/executor/ConnectorParams.java
index 0c0c7e0..c84436e 100644
--- a/src/java/azkaban/executor/ConnectorParams.java
+++ b/src/java/azkaban/executor/ConnectorParams.java
@@ -30,6 +30,7 @@ public interface ConnectorParams {
public static final String RESUME_ACTION = "resume";
public static final String PING_ACTION = "ping";
public static final String LOG_ACTION = "log";
+ public static final String ATTACHMENTS_ACTION = "attachments";
public static final String METADATA_ACTION = "metadata";
public static final String MODIFY_EXECUTION_ACTION = "modifyExecution";
diff --git a/src/java/azkaban/executor/ExecutableNode.java b/src/java/azkaban/executor/ExecutableNode.java
index 9d807ff..5fbeed2 100644
--- a/src/java/azkaban/executor/ExecutableNode.java
+++ b/src/java/azkaban/executor/ExecutableNode.java
@@ -61,7 +61,7 @@ public class ExecutableNode {
private Props inputProps;
private Props outputProps;
-
+
public static final String ATTEMPT_PARAM = "attempt";
public static final String PASTATTEMPTS_PARAM = "pastAttempts";
@@ -204,7 +204,7 @@ public class ExecutableNode {
public Props getOutputProps() {
return outputProps;
}
-
+
public long getDelayedExecution() {
return delayExecution;
}
@@ -428,4 +428,4 @@ public class ExecutableNode {
}
}
}
-}
\ No newline at end of file
+}
diff --git a/src/java/azkaban/executor/ExecutorLoader.java b/src/java/azkaban/executor/ExecutorLoader.java
index 04e0e44..4763ee5 100644
--- a/src/java/azkaban/executor/ExecutorLoader.java
+++ b/src/java/azkaban/executor/ExecutorLoader.java
@@ -47,7 +47,11 @@ public interface ExecutorLoader {
public LogData fetchLogs(int execId, String name, int attempt, int startByte, int endByte) throws ExecutorManagerException;
+ public List<Object> fetchAttachments(int execId, String name, int attempt) throws ExecutorManagerException;
+
public void uploadLogFile(int execId, String name, int attempt, File ... files) throws ExecutorManagerException;
+
+ public void uploadAttachmentFile(ExecutableNode node, File file) throws ExecutorManagerException;
public void updateExecutableFlow(ExecutableFlow flow) throws ExecutorManagerException;
src/java/azkaban/executor/ExecutorManager.java 94(+64 -30)
diff --git a/src/java/azkaban/executor/ExecutorManager.java b/src/java/azkaban/executor/ExecutorManager.java
index cb763f5..8b71e29 100644
--- a/src/java/azkaban/executor/ExecutorManager.java
+++ b/src/java/azkaban/executor/ExecutorManager.java
@@ -201,41 +201,41 @@ public class ExecutorManager extends EventHandler implements ExecutorManagerAdap
@Override
public List<ExecutableFlow> getExecutableFlows(
- Project project, String flowId, int skip, int size)
- throws ExecutorManagerException {
+ Project project, String flowId, int skip, int size)
+ throws ExecutorManagerException {
List<ExecutableFlow> flows = executorLoader.fetchFlowHistory(
- project.getId(), flowId, skip, size);
+ project.getId(), flowId, skip, size);
return flows;
}
@Override
public List<ExecutableFlow> getExecutableFlows(int skip, int size)
- throws ExecutorManagerException {
+ throws ExecutorManagerException {
List<ExecutableFlow> flows = executorLoader.fetchFlowHistory(skip, size);
return flows;
}
@Override
public List<ExecutableFlow> getExecutableFlows(
- String flowIdContains, int skip, int size)
- throws ExecutorManagerException {
+ String flowIdContains, int skip, int size)
+ throws ExecutorManagerException {
List<ExecutableFlow> flows = executorLoader.fetchFlowHistory(
- null, '%'+flowIdContains+'%', null, 0, -1, -1 , skip, size);
+ null, '%'+flowIdContains+'%', null, 0, -1, -1 , skip, size);
return flows;
}
@Override
public List<ExecutableFlow> getExecutableFlows(
- String projContain,
- String flowContain,
- String userContain,
- int status,
- long begin,
- long end,
- int skip,
- int size) throws ExecutorManagerException {
+ String projContain,
+ String flowContain,
+ String userContain,
+ int status,
+ long begin,
+ long end,
+ int skip,
+ int size) throws ExecutorManagerException {
List<ExecutableFlow> flows = executorLoader.fetchFlowHistory(
- projContain, flowContain, userContain, status, begin, end , skip, size);
+ projContain, flowContain, userContain, status, begin, end , skip, size);
return flows;
}
@@ -274,8 +274,11 @@ public class ExecutorManager extends EventHandler implements ExecutorManagerAdap
}
@Override
- public LogData getExecutionJobLog(ExecutableFlow exFlow, String jobId, int offset, int length, int attempt) throws ExecutorManagerException {
- Pair<ExecutionReference, ExecutableFlow> pair = runningFlows.get(exFlow.getExecutionId());
+ public LogData getExecutionJobLog(
+ ExecutableFlow exFlow, String jobId, int offset, int length, int attempt)
+ throws ExecutorManagerException {
+ Pair<ExecutionReference, ExecutableFlow> pair =
+ runningFlows.get(exFlow.getExecutionId());
if (pair != null) {
Pair<String,String> typeParam = new Pair<String,String>("type", "job");
Pair<String,String> jobIdParam = new Pair<String,String>("jobId", jobId);
@@ -284,14 +287,45 @@ public class ExecutorManager extends EventHandler implements ExecutorManagerAdap
Pair<String,String> attemptParam = new Pair<String,String>("attempt", String.valueOf(attempt));
@SuppressWarnings("unchecked")
- Map<String, Object> result = callExecutorServer(pair.getFirst(), ConnectorParams.LOG_ACTION, typeParam, jobIdParam, offsetParam, lengthParam, attemptParam);
+ Map<String, Object> result = callExecutorServer(
+ pair.getFirst(),
+ ConnectorParams.LOG_ACTION,
+ typeParam,
+ jobIdParam,
+ offsetParam,
+ lengthParam,
+ attemptParam);
return LogData.createLogDataFromObject(result);
}
else {
- LogData value = executorLoader.fetchLogs(exFlow.getExecutionId(), jobId, attempt, offset, length);
+ LogData value = executorLoader.fetchLogs(
+ exFlow.getExecutionId(), jobId, attempt, offset, length);
return value;
}
}
+
+ @Override
+ public List<Object> getExecutionJobStats(
+ ExecutableFlow exFlow, String jobId, int attempt)
+ throws ExecutorManagerException {
+ Pair<ExecutionReference, ExecutableFlow> pair =
+ runningFlows.get(exFlow.getExecutionId());
+ if (pair == null) {
+ return executorLoader.fetchAttachments(
+ exFlow.getExecutionId(), jobId, attempt);
+ }
+
+ Pair<String, String> jobIdParam = new Pair<String, String>("jobId", jobId);
+ Pair<String,String> attemptParam = new Pair<String,String>("attempt", String.valueOf(attempt));
+
+ @SuppressWarnings("unchecked")
+ Map<String, Object> result = callExecutorServer(
+ pair.getFirst(),
+ ConnectorParams.ATTACHMENTS_ACTION,
+ jobIdParam,
+ attemptParam);
+ return (List<Object>) result.get("attachments");
+ }
@Override
public JobMetaData getExecutionJobMetaData(ExecutableFlow exFlow, String jobId, int offset, int length, int attempt) throws ExecutorManagerException {
@@ -493,7 +527,7 @@ public class ExecutorManager extends EventHandler implements ExecutorManagerAdap
ExecutionReference reference = new ExecutionReference(exflow.getExecutionId(), executorHost, executorPort);
executorLoader.addActiveExecutableReference(reference);
try {
- callExecutorServer(reference, ConnectorParams.EXECUTE_ACTION);
+ callExecutorServer(reference, ConnectorParams.EXECUTE_ACTION);
runningFlows.put(exflow.getExecutionId(), new Pair<ExecutionReference, ExecutableFlow>(reference, exflow));
message += "Execution submitted successfully with exec id " + exflow.getExecutionId();
@@ -1115,23 +1149,23 @@ public class ExecutorManager extends EventHandler implements ExecutorManagerAdap
@Override
public int getExecutableFlows(
- int projectId,
- String flowId,
- int from,
- int length,
- List<ExecutableFlow> outputList) throws ExecutorManagerException {
+ int projectId,
+ String flowId,
+ int from,
+ int length,
+ List<ExecutableFlow> outputList) throws ExecutorManagerException {
List<ExecutableFlow> flows = executorLoader.fetchFlowHistory(
- projectId, flowId, from, length);
+ projectId, flowId, from, length);
outputList.addAll(flows);
return executorLoader.fetchNumExecutableFlows(projectId, flowId);
}
@Override
public List<ExecutableFlow> getExecutableFlows(
- int projectId, String flowId, int from, int length, Status status)
- throws ExecutorManagerException {
+ int projectId, String flowId, int from, int length, Status status)
+ throws ExecutorManagerException {
return executorLoader.fetchFlowHistory(
- projectId, flowId, from, length, status);
+ projectId, flowId, from, length, status);
}
/*
diff --git a/src/java/azkaban/executor/ExecutorManagerAdapter.java b/src/java/azkaban/executor/ExecutorManagerAdapter.java
index cc81589..af64ec4 100644
--- a/src/java/azkaban/executor/ExecutorManagerAdapter.java
+++ b/src/java/azkaban/executor/ExecutorManagerAdapter.java
@@ -90,6 +90,8 @@ public interface ExecutorManagerAdapter{
public LogData getExecutableFlowLog(ExecutableFlow exFlow, int offset, int length) throws ExecutorManagerException;
public LogData getExecutionJobLog(ExecutableFlow exFlow, String jobId, int offset, int length, int attempt) throws ExecutorManagerException;
+
+ public List<Object> getExecutionJobStats(ExecutableFlow exflow, String jobId, int attempt) throws ExecutorManagerException;
public JobMetaData getExecutionJobMetaData(ExecutableFlow exFlow, String jobId, int offset, int length, int attempt) throws ExecutorManagerException;
diff --git a/src/java/azkaban/executor/JdbcExecutorLoader.java b/src/java/azkaban/executor/JdbcExecutorLoader.java
index 70b73da..e756cfb 100644
--- a/src/java/azkaban/executor/JdbcExecutorLoader.java
+++ b/src/java/azkaban/executor/JdbcExecutorLoader.java
@@ -35,6 +35,7 @@ import org.apache.commons.dbutils.DbUtils;
import org.apache.commons.dbutils.QueryRunner;
import org.apache.commons.dbutils.ResultSetHandler;
import org.apache.commons.io.IOUtils;
+import org.apache.commons.io.FileUtils;
import org.apache.log4j.Logger;
import org.joda.time.DateTime;
@@ -707,6 +708,32 @@ public class JdbcExecutorLoader extends AbstractJdbcLoader
"Error fetching logs " + execId + " : " + name, e);
}
}
+
+ @Override
+ public List<Object> fetchAttachments(int execId, String jobId, int attempt)
+ throws ExecutorManagerException {
+ QueryRunner runner = createQueryRunner();
+
+ try {
+ String attachments = runner.query(
+ FetchExecutableJobAttachmentsHandler.FETCH_ATTACHMENTS_EXECUTABLE_NODE,
+ new FetchExecutableJobAttachmentsHandler(),
+ execId,
+ jobId);
+ if (attachments == null) {
+ return null;
+ }
+ return (List<Object>) JSONUtils.parseJSONFromString(attachments);
+ }
+ catch (IOException e) {
+ throw new ExecutorManagerException(
+ "Error converting job attachments to JSON " + jobId, e);
+ }
+ catch (SQLException e) {
+ throw new ExecutorManagerException(
+ "Error query job attachments " + jobId, e);
+ }
+ }
@Override
public void uploadLogFile(
@@ -838,6 +865,50 @@ public class JdbcExecutorLoader extends AbstractJdbcLoader
DateTime.now().getMillis());
}
+ @Override
+ public void uploadAttachmentFile(ExecutableNode node, File file)
+ throws ExecutorManagerException {
+ Connection connection = getConnection();
+ try {
+ uploadAttachmentFile(connection, node, file, defaultEncodingType);
+ connection.commit();
+ }
+ catch (SQLException e) {
+ throw new ExecutorManagerException("Error committing attachments ", e);
+ }
+ catch (IOException e) {
+ throw new ExecutorManagerException("Error uploading attachments ", e);
+ }
+ finally {
+ DbUtils.closeQuietly(connection);
+ }
+ }
+
+ private void uploadAttachmentFile(
+ Connection connection,
+ ExecutableNode node,
+ File file,
+ EncodingType encType) throws SQLException, IOException {
+
+ String jsonString = FileUtils.readFileToString(file);
+ byte[] attachments = GZIPUtils.gzipString(jsonString, "UTF-8");
+
+ final String UPDATE_EXECUTION_NODE_ATTACHMENTS =
+ "UPDATE execution_jobs " +
+ "SET attachments=? " +
+ "WHERE exec_id=? AND flow_id=? AND job_id=? AND attempt=?";
+
+ QueryRunner runner = new QueryRunner();
+ runner.update(
+ connection,
+ UPDATE_EXECUTION_NODE_ATTACHMENTS,
+ attachments,
+ node.getExecutableFlow().getExecutionId(),
+ node.getParentFlow().getNestedId(),
+ node.getId(),
+ node.getAttempt());
+ }
+
private Connection getConnection() throws ExecutorManagerException {
Connection connection = null;
try {
@@ -977,6 +1048,30 @@ public class JdbcExecutorLoader extends AbstractJdbcLoader
return execNodes;
}
}
+
+ private static class FetchExecutableJobAttachmentsHandler
+ implements ResultSetHandler<String> {
+ private static String FETCH_ATTACHMENTS_EXECUTABLE_NODE =
+ "SELECT attachments FROM execution_jobs WHERE exec_id=? AND job_id=?";
+
+ @SuppressWarnings("unchecked")
+ @Override
+ public String handle(ResultSet rs) throws SQLException {
+ String attachmentsJson = null;
+ if (rs.next()) {
+ try {
+ byte[] attachments = rs.getBytes(1);
+ if (attachments != null) {
+ attachmentsJson = GZIPUtils.unGzipString(attachments, "UTF-8");
+ }
+ }
+ catch (IOException e) {
+ throw new SQLException("Error decoding job attachments", e);
+ }
+ }
+ return attachmentsJson;
+ }
+ }
private static class FetchExecutableJobPropsHandler
implements ResultSetHandler<Pair<Props, Props>> {
diff --git a/src/java/azkaban/flow/CommonJobProperties.java b/src/java/azkaban/flow/CommonJobProperties.java
index 7324205..3873a56 100644
--- a/src/java/azkaban/flow/CommonJobProperties.java
+++ b/src/java/azkaban/flow/CommonJobProperties.java
@@ -75,6 +75,11 @@ public class CommonJobProperties {
* The attempt number of the executing job.
*/
public static final String JOB_METADATA_FILE = "azkaban.job.metadata.file";
+
+ /**
+ * The attempt number of the executing job.
+ */
+ public static final String JOB_ATTACHMENT_FILE = "azkaban.job.attachment.file";
/**
* The executing flow id
src/java/azkaban/jobtype/JobTypeManager.java 22(+11 -11)
diff --git a/src/java/azkaban/jobtype/JobTypeManager.java b/src/java/azkaban/jobtype/JobTypeManager.java
index 1ef57ef..6c275af 100644
--- a/src/java/azkaban/jobtype/JobTypeManager.java
+++ b/src/java/azkaban/jobtype/JobTypeManager.java
@@ -338,15 +338,16 @@ public class JobTypeManager
}
- public Job buildJobExecutor(String jobId, Props jobProps, Logger logger) throws JobTypeManagerException
- {
+ public Job buildJobExecutor(String jobId, Props jobProps, Logger logger)
+ throws JobTypeManagerException {
Job job = null;
try {
String jobType = jobProps.getString("type");
if (jobType == null || jobType.length() == 0) {
/*throw an exception when job name is null or empty*/
- throw new JobExecutionException (
- String.format("The 'type' parameter for job[%s] is null or empty", jobProps, logger));
+ throw new JobExecutionException(
+ String.format("The 'type' parameter for job[%s] is null or empty",
+ jobProps, logger));
}
logger.info("Building " + jobType + " job executor. ");
@@ -361,17 +362,16 @@ public class JobTypeManager
Props sysConf = jobtypeSysProps.get(jobType);
Props jobConf = jobProps;
- if(jobtypeJobProps.containsKey(jobType)) {
+ if (jobtypeJobProps.containsKey(jobType)) {
Props p = jobtypeJobProps.get(jobType);
- for(String k : p.getKeySet())
- {
- if(!jobConf.containsKey(k)) {
+ for (String k : p.getKeySet()) {
+ if (!jobConf.containsKey(k)) {
jobConf.put(k, p.get(k));
}
}
}
jobConf = PropsUtils.resolveProps(jobConf);
-
+
if (sysConf != null) {
sysConf = PropsUtils.resolveProps(sysConf);
}
@@ -379,11 +379,11 @@ public class JobTypeManager
sysConf = new Props();
}
-
// logger.info("sysConf is " + sysConf);
// logger.info("jobConf is " + jobConf);
//
- job = (Job)Utils.callConstructor(executorClass, jobId, sysConf, jobConf, logger);
+ job = (Job) Utils.callConstructor(
+ executorClass, jobId, sysConf, jobConf, logger);
}
catch (Exception e) {
//job = new InitErrorJob(jobId, e);
diff --git a/src/java/azkaban/webapp/AzkabanWebServer.java b/src/java/azkaban/webapp/AzkabanWebServer.java
index b6d961b..21460b2 100644
--- a/src/java/azkaban/webapp/AzkabanWebServer.java
+++ b/src/java/azkaban/webapp/AzkabanWebServer.java
@@ -197,12 +197,6 @@ public class AzkabanWebServer extends AzkabanServer {
tempDir = new File(props.getString("azkaban.temp.dir", "temp"));
- File statsDir = new File(props.getString("azkaban.stats.dir", "stats"));
- if (!statsDir.exists()) {
- statsDir.mkdir();
- }
- props.put("azkaban.stats.dir", statsDir.getCanonicalPath());
-
// Setup time zone
if (props.containsKey(DEFAULT_TIMEZONE_ID)) {
String timezone = props.getString(DEFAULT_TIMEZONE_ID);
diff --git a/src/java/azkaban/webapp/plugin/ViewerPlugin.java b/src/java/azkaban/webapp/plugin/ViewerPlugin.java
index 1ea864b..9e167ff 100644
--- a/src/java/azkaban/webapp/plugin/ViewerPlugin.java
+++ b/src/java/azkaban/webapp/plugin/ViewerPlugin.java
@@ -29,7 +29,10 @@ public class ViewerPlugin {
new Comparator<ViewerPlugin>() {
@Override
public int compare(ViewerPlugin o1, ViewerPlugin o2) {
- return o1.getOrder() - o2.getOrder();
+ if (o1.getOrder() != o2.getOrder()) {
+ return o1.getOrder() - o2.getOrder();
+ }
+ return o1.getPluginName().compareTo(o2.getPluginName());
}
};
diff --git a/src/java/azkaban/webapp/servlet/ExecutorServlet.java b/src/java/azkaban/webapp/servlet/ExecutorServlet.java
index e154480..bcab22d 100644
--- a/src/java/azkaban/webapp/servlet/ExecutorServlet.java
+++ b/src/java/azkaban/webapp/servlet/ExecutorServlet.java
@@ -60,8 +60,6 @@ public class ExecutorServlet extends LoginAbstractAzkabanServlet {
private ScheduleManager scheduleManager;
private ExecutorVelocityHelper velocityHelper;
- private String statsDir;
-
@Override
public void init(ServletConfig config) throws ServletException {
super.init(config);
@@ -70,7 +68,6 @@ public class ExecutorServlet extends LoginAbstractAzkabanServlet {
executorManager = server.getExecutorManager();
scheduleManager = server.getScheduleManager();
velocityHelper = new ExecutorVelocityHelper();
- statsDir = server.getServerProps().getString("azkaban.stats.dir");
}
@Override
@@ -133,9 +130,9 @@ public class ExecutorServlet extends LoginAbstractAzkabanServlet {
else if (ajaxName.equals("fetchExecJobLogs")) {
ajaxFetchJobLogs(req, resp, ret, session.getUser(), exFlow);
}
- else if (ajaxName.equals("fetchExecJobStats")) {
- ajaxFetchJobStats(req, resp, ret, session.getUser(), exFlow);
- }
+ else if (ajaxName.equals("fetchExecJobStats")) {
+ ajaxFetchJobStats(req, resp, ret, session.getUser(), exFlow);
+ }
else if (ajaxName.equals("retryFailedJobs")) {
ajaxRestartFailed(req, resp, ret, session.getUser(), exFlow);
}
@@ -467,41 +464,37 @@ public class ExecutorServlet extends LoginAbstractAzkabanServlet {
}
}
- private void ajaxFetchJobStats(
- HttpServletRequest req,
- HttpServletResponse resp,
- HashMap<String, Object> ret,
- User user,
- ExecutableFlow exFlow) throws ServletException {
+ private void ajaxFetchJobStats(
+ HttpServletRequest req,
+ HttpServletResponse resp,
+ HashMap<String, Object> ret,
+ User user,
+ ExecutableFlow exFlow) throws ServletException {
Project project = getProjectAjaxByPermission(
- ret, exFlow.getProjectId(), user, Type.READ);
+ ret, exFlow.getProjectId(), user, Type.READ);
if (project == null) {
return;
}
String jobId = this.getParam(req, "jobid");
resp.setCharacterEncoding("utf-8");
- String statsFilePath = null;
try {
ExecutableNode node = exFlow.getExecutableNode(jobId);
if (node == null) {
ret.put("error", "Job " + jobId + " doesn't exist in " +
- exFlow.getExecutionId());
+ exFlow.getExecutionId());
return;
}
-
- statsFilePath = statsDir + "/" + exFlow.getExecutionId() + "-" +
- jobId + "-stats.json";
- File statsFile = new File(statsFilePath);
- List<Object> jsonObj =
- (ArrayList<Object>) JSONUtils.parseJSONFromFile(statsFile);
- ret.put("jobStats", jsonObj);
- }
- catch (IOException e) {
- ret.put("error", "Cannot open stats file: " + statsFilePath);
- return;
- }
- }
+
+ List<Object> jsonObj = executorManager.getExecutionJobStats(
+ exFlow, jobId, node.getAttempt());
+ ret.put("jobStats", jsonObj);
+ }
+ catch (ExecutorManagerException e) {
+ ret.put("error", "Error retrieving stats for job " + jobId);
+ return;
+ }
+ }
private void ajaxFetchFlowInfo(HttpServletRequest req, HttpServletResponse resp, HashMap<String, Object> ret, User user, String projectName, String flowId) throws ServletException {
Project project = getProjectAjaxByPermission(ret, projectName, user, Type.READ);
@@ -729,7 +722,7 @@ public class ExecutorServlet extends LoginAbstractAzkabanServlet {
Map<String, Object> map = getExecutableFlowUpdateInfo(exFlow, lastUpdateTime);
map.put("status", exFlow.getStatus());
map.put("startTime", exFlow.getStartTime());
- map.put("endTime", exFlow.getEndTime());
+ map.put("endTime", exFlow.getEndTime());
map.put("updateTime", exFlow.getUpdateTime());
ret.putAll(map);
}
diff --git a/src/java/azkaban/webapp/servlet/velocity/executingflowpage.vm b/src/java/azkaban/webapp/servlet/velocity/executingflowpage.vm
index d944f4c..678c288 100644
--- a/src/java/azkaban/webapp/servlet/velocity/executingflowpage.vm
+++ b/src/java/azkaban/webapp/servlet/velocity/executingflowpage.vm
@@ -122,6 +122,7 @@
<thead>
<tr>
<th>Name</th>
+ <th class="jobtype">Type</th>
<th class="timeline">Timeline</th>
<th class="date">Start Time</th>
<th class="date">End Time</th>
src/less/tables.less 18(+18 -0)
diff --git a/src/less/tables.less b/src/less/tables.less
index d96815f..ba34cb4 100644
--- a/src/less/tables.less
+++ b/src/less/tables.less
@@ -9,10 +9,24 @@ table.table-properties {
font-weight: bold;
}
+.property-value {
+
+}
+
.property-value-half {
width: 25%;
}
+.property-key,
+.property-value,
+.property-value-half {
+ pre {
+ background: transparent;
+ padding: 0;
+ border: 0;
+ }
+}
+
.editable {
.remove-btn {
visibility: hidden;
@@ -77,6 +91,10 @@ table.table-properties {
width: 160px;
}
+ &.jobtype {
+ width: 90px;
+ }
+
&.execid {
width: 100px;
}
diff --git a/src/package/execserver/conf/azkaban.properties b/src/package/execserver/conf/azkaban.properties
index 585e208..b54b0f5 100644
--- a/src/package/execserver/conf/azkaban.properties
+++ b/src/package/execserver/conf/azkaban.properties
@@ -8,8 +8,6 @@ azkaban.jobtype.plugin.dir=plugins/jobtypes
executor.global.properties=conf/global.properties
azkaban.project.dir=projects
-azkaban.stats.dir=stats
-
database.type=mysql
mysql.port=3306
mysql.host=localhost
diff --git a/src/package/soloserver/conf/azkaban.properties b/src/package/soloserver/conf/azkaban.properties
index 9ca591e..7524a14 100644
--- a/src/package/soloserver/conf/azkaban.properties
+++ b/src/package/soloserver/conf/azkaban.properties
@@ -20,9 +20,6 @@ database.type=h2
h2.path=data/azkaban
h2.create.tables=true
-# Stats
-azkaban.stats.dir=stats
-
# Velocity dev mode
velocity.dev.mode=false
diff --git a/src/package/webserver/conf/azkaban.properties b/src/package/webserver/conf/azkaban.properties
index 97e40ae..3ccb2f3 100644
--- a/src/package/webserver/conf/azkaban.properties
+++ b/src/package/webserver/conf/azkaban.properties
@@ -22,8 +22,6 @@ mysql.user=azkaban
mysql.password=azkaban
mysql.numconnections=100
-azkaban.stats.dir=stats
-
# Velocity dev mode
velocity.dev.mode=false
diff --git a/src/web/js/azkaban/view/flow-execution-list.js b/src/web/js/azkaban/view/flow-execution-list.js
index 69621a3..2a0ce22 100644
--- a/src/web/js/azkaban/view/flow-execution-list.js
+++ b/src/web/js/azkaban/view/flow-execution-list.js
@@ -1,3 +1,19 @@
+/*
+ * Copyright 2014 LinkedIn Corp.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
var executionListView;
azkaban.ExecutionListView = Backbone.View.extend({
events: {
@@ -241,6 +257,7 @@ azkaban.ExecutionListView = Backbone.View.extend({
var self = this;
var tr = document.createElement("tr");
var tdName = document.createElement("td");
+ var tdType = document.createElement("td");
var tdTimeline = document.createElement("td");
var tdStart = document.createElement("td");
var tdEnd = document.createElement("td");
@@ -252,6 +269,7 @@ azkaban.ExecutionListView = Backbone.View.extend({
var padding = 15*$(body)[0].level;
$(tr).append(tdName);
+ $(tr).append(tdType);
$(tr).append(tdTimeline);
$(tr).append(tdStart);
$(tr).append(tdEnd);
@@ -261,6 +279,7 @@ azkaban.ExecutionListView = Backbone.View.extend({
$(tr).addClass("jobListRow");
$(tdName).addClass("jobname");
+ $(tdType).addClass("jobtype");
if (padding) {
$(tdName).css("padding-left", padding);
}
@@ -270,6 +289,8 @@ azkaban.ExecutionListView = Backbone.View.extend({
$(tdElapse).addClass("elapsedTime");
$(tdStatus).addClass("statustd");
$(tdDetails).addClass("details");
+
+ $(tdType).text(node.type);
var outerProgressBar = document.createElement("div");
//$(outerProgressBar).attr("id", node.id + "-outerprogressbar");
@@ -318,7 +339,7 @@ azkaban.ExecutionListView = Backbone.View.extend({
}
$(body).append(tr);
- if (node.type=="flow") {
+ if (node.type == "flow") {
var subFlowRow = document.createElement("tr");
var subFlowCell = document.createElement("td");
$(subFlowCell).addClass("subflowrow");