azkaban-aplcache
Changes
src/java/azkaban/execapp/FlowRunner.java 14(+13 -1)
src/java/azkaban/execapp/JobRunner.java 111(+82 -29)
src/java/azkaban/executor/ExecutorManager.java 94(+64 -30)
src/java/azkaban/jobtype/JobTypeManager.java 22(+11 -11)
src/java/azkaban/webapp/AzkabanWebServer.java 45(+13 -32)
src/less/tables.less 18(+18 -0)
src/web/js/azkaban/model/log-data.js 53(+37 -16)
src/web/js/azkaban/view/job-details.js 44(+44 -0)
Details
diff --git a/src/java/azkaban/execapp/AzkabanExecutorServer.java b/src/java/azkaban/execapp/AzkabanExecutorServer.java
index 50be4f6..a6ee307 100644
--- a/src/java/azkaban/execapp/AzkabanExecutorServer.java
+++ b/src/java/azkaban/execapp/AzkabanExecutorServer.java
@@ -110,12 +110,6 @@ public class AzkabanExecutorServer {
configureMBeanServer();
- File statsDir = new File(props.getString("azkaban.stats.dir", "stats"));
- if (!statsDir.exists()) {
- statsDir.mkdir();
- }
- props.put("azkaban.stats.dir", statsDir.getCanonicalPath());
-
try {
server.start();
}
diff --git a/src/java/azkaban/execapp/ExecutorServlet.java b/src/java/azkaban/execapp/ExecutorServlet.java
index 94fb4c2..75a1a08 100644
--- a/src/java/azkaban/execapp/ExecutorServlet.java
+++ b/src/java/azkaban/execapp/ExecutorServlet.java
@@ -20,6 +20,7 @@ import java.io.IOException;
import java.io.OutputStream;
import java.util.ArrayList;
import java.util.HashMap;
+import java.util.List;
import java.util.Map;
import javax.servlet.ServletConfig;
@@ -47,7 +48,6 @@ public class ExecutorServlet extends HttpServlet implements ConnectorParams {
private AzkabanExecutorServer application;
private FlowRunnerManager flowRunnerManager;
-
public ExecutorServlet() {
super();
}
@@ -63,7 +63,6 @@ public class ExecutorServlet extends HttpServlet implements ConnectorParams {
flowRunnerManager = application.getFlowRunnerManager();
}
-
protected void writeJSON(HttpServletResponse resp, Object obj) throws IOException {
resp.setContentType(JSON_MIME_TYPE);
ObjectMapper mapper = new ObjectMapper();
@@ -100,6 +99,9 @@ public class ExecutorServlet extends HttpServlet implements ConnectorParams {
else if (action.equals(LOG_ACTION)) {
handleFetchLogEvent(execid, req, resp, respMap);
}
+ else if (action.equals(ATTACHMENTS_ACTION)) {
+ handleFetchAttachmentsEvent(execid, req, resp, respMap);
+ }
else if (action.equals(EXECUTE_ACTION)) {
handleAjaxExecute(req, respMap, execid);
}
@@ -170,7 +172,11 @@ public class ExecutorServlet extends HttpServlet implements ConnectorParams {
}
}
- private void handleFetchLogEvent(int execId, HttpServletRequest req, HttpServletResponse resp, Map<String, Object> respMap) throws ServletException {
+ private void handleFetchLogEvent(
+ int execId,
+ HttpServletRequest req,
+ HttpServletResponse resp,
+ Map<String, Object> respMap) throws ServletException {
String type = getParam(req, "type");
int startByte = getIntParam(req, "offset");
int length = getIntParam(req, "length");
@@ -200,6 +206,25 @@ public class ExecutorServlet extends HttpServlet implements ConnectorParams {
}
}
}
+
+ private void handleFetchAttachmentsEvent(
+ int execId,
+ HttpServletRequest req,
+ HttpServletResponse resp,
+ Map<String, Object> respMap) throws ServletException {
+
+ String jobId = getParam(req, "jobId");
+ int attempt = getIntParam(req, "attempt", 0);
+ try {
+ List<Object> result = flowRunnerManager.readJobAttachments(
+ execId, jobId, attempt);
+ respMap.put("attachments", result);
+ }
+ catch (Exception e) {
+ logger.error(e);
+ respMap.put("error", e.getMessage());
+ }
+ }
private void handleFetchMetaDataEvent(int execId, HttpServletRequest req, HttpServletResponse resp, Map<String, Object> respMap) throws ServletException {
int startByte = getIntParam(req, "offset");
@@ -217,7 +242,6 @@ public class ExecutorServlet extends HttpServlet implements ConnectorParams {
logger.error(e);
respMap.put("error", e.getMessage());
}
-
}
@SuppressWarnings("unchecked")
src/java/azkaban/execapp/FlowRunner.java 14(+13 -1)
diff --git a/src/java/azkaban/execapp/FlowRunner.java b/src/java/azkaban/execapp/FlowRunner.java
index ba207c1..cab5f58 100644
--- a/src/java/azkaban/execapp/FlowRunner.java
+++ b/src/java/azkaban/execapp/FlowRunner.java
@@ -1013,9 +1013,21 @@ public class FlowRunner extends EventHandler implements Runnable {
return logFile;
}
+
+ public File getJobAttachmentFile(String jobId, int attempt) {
+ ExecutableNode node = flow.getExecutableNodePath(jobId);
+ File path = new File(execDir, node.getJobSource());
+
+ String attachmentFileName = JobRunner.createAttachmentFileName(node, attempt);
+ File attachmentFile = new File(path.getParentFile(), attachmentFileName);
+ if (!attachmentFile.exists()) {
+ return null;
+ }
+ return attachmentFile;
+ }
public File getJobMetaDataFile(String jobId, int attempt) {
- ExecutableNode node = flow.getExecutableNode(jobId);
+ ExecutableNode node = flow.getExecutableNodePath(jobId);
File path = new File(execDir, node.getJobSource());
String metaDataFileName = JobRunner.createMetaDataFileName(node, attempt);
diff --git a/src/java/azkaban/execapp/FlowRunnerManager.java b/src/java/azkaban/execapp/FlowRunnerManager.java
index ecc0847..17d3ae3 100644
--- a/src/java/azkaban/execapp/FlowRunnerManager.java
+++ b/src/java/azkaban/execapp/FlowRunnerManager.java
@@ -25,6 +25,7 @@ import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
+import java.util.List;
import java.util.Map;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ConcurrentHashMap;
@@ -50,6 +51,7 @@ import azkaban.jobtype.JobTypeManager;
import azkaban.utils.FileIOUtils;
import azkaban.utils.FileIOUtils.JobMetaData;
import azkaban.utils.FileIOUtils.LogData;
+import azkaban.utils.JSONUtils;
import azkaban.utils.Pair;
import azkaban.utils.Props;
@@ -137,7 +139,6 @@ public class FlowRunnerManager implements EventListener {
cleanerThread.start();
jobtypeManager = new JobTypeManager(props.getString(AzkabanExecutorServer.JOBTYPE_PLUGIN_DIR, JobTypeManager.DEFAULT_JOBTYPEPLUGINDIR), parentClassLoader);
-
}
private Map<Pair<Integer, Integer>, ProjectVersion> loadExistingProjects() {
@@ -555,7 +556,7 @@ public class FlowRunnerManager implements EventListener {
File dir = runner.getExecutionDir();
if (dir != null && dir.exists()) {
try {
- synchronized(executionDirDeletionSync) {
+ synchronized (executionDirDeletionSync) {
if (!dir.exists()) {
throw new ExecutorManagerException("Execution dir file doesn't exist. Probably has beend deleted");
}
@@ -574,6 +575,39 @@ public class FlowRunnerManager implements EventListener {
throw new ExecutorManagerException("Error reading file. Log directory doesn't exist.");
}
+
+ public List<Object> readJobAttachments(int execId, String jobId, int attempt)
+ throws ExecutorManagerException {
+ FlowRunner runner = runningFlows.get(execId);
+ if (runner == null) {
+ throw new ExecutorManagerException(
+ "Running flow " + execId + " not found.");
+ }
+
+ File dir = runner.getExecutionDir();
+ if (dir == null || !dir.exists()) {
+ throw new ExecutorManagerException(
+ "Error reading file. Log directory doesn't exist.");
+ }
+
+ try {
+ synchronized (executionDirDeletionSync) {
+ if (!dir.exists()) {
+ throw new ExecutorManagerException(
+ "Execution dir file doesn't exist. Probably has beend deleted");
+ }
+
+ File attachmentFile = runner.getJobAttachmentFile(jobId, attempt);
+ if (attachmentFile == null || !attachmentFile.exists()) {
+ return null;
+ }
+ return (ArrayList<Object>) JSONUtils.parseJSONFromFile(attachmentFile);
+ }
+ }
+ catch (IOException e) {
+ throw new ExecutorManagerException(e);
+ }
+ }
public JobMetaData readJobMetaData(int execId, String jobId, int attempt, int startByte, int length) throws ExecutorManagerException {
FlowRunner runner = runningFlows.get(execId);
src/java/azkaban/execapp/JobRunner.java 111(+82 -29)
diff --git a/src/java/azkaban/execapp/JobRunner.java b/src/java/azkaban/execapp/JobRunner.java
index 4fec88f..3de3d23 100644
--- a/src/java/azkaban/execapp/JobRunner.java
+++ b/src/java/azkaban/execapp/JobRunner.java
@@ -65,6 +65,7 @@ public class JobRunner extends EventHandler implements Runnable {
private Appender jobAppender;
private File logFile;
+ private String attachmentFileName;
private Job job;
private int executionId = -1;
@@ -217,12 +218,19 @@ public class JobRunner extends EventHandler implements Runnable {
jobAppender = fileAppender;
logger.addAppender(jobAppender);
logger.setAdditivity(false);
- } catch (IOException e) {
+ }
+ catch (IOException e) {
flowLogger.error("Could not open log file in " + workingDir + " for job " + this.jobId, e);
}
}
}
+ private void createAttachmentFile() {
+ String fileName = createAttachmentFileName(node);
+ File file = new File(workingDir, fileName);
+ attachmentFileName = file.getAbsolutePath();
+ }
+
private void closeLogger() {
if (jobAppender != null) {
logger.removeAppender(jobAppender);
@@ -234,7 +242,8 @@ public class JobRunner extends EventHandler implements Runnable {
try {
node.setUpdateTime(System.currentTimeMillis());
loader.updateExecutableNode(node);
- } catch (ExecutorManagerException e) {
+ }
+ catch (ExecutorManagerException e) {
flowLogger.error("Could not update job properties in db for " + this.jobId, e);
}
}
@@ -296,7 +305,7 @@ public class JobRunner extends EventHandler implements Runnable {
if (!blockingStatus.isEmpty()) {
logger.info("Pipeline job " + this.jobId + " waiting on " + blockedList + " in execution " + watcher.getExecId());
- for(BlockingStatus bStatus: blockingStatus) {
+ for (BlockingStatus bStatus: blockingStatus) {
logger.info("Waiting on pipelined job " + bStatus.getJobId());
currentBlockStatus = bStatus;
bStatus.blockOnFinishedStatus();
@@ -323,11 +332,12 @@ public class JobRunner extends EventHandler implements Runnable {
long currentTime = System.currentTimeMillis();
if (delayStartMs > 0) {
logger.info("Delaying start of execution for " + delayStartMs + " milliseconds.");
- synchronized(this) {
+ synchronized (this) {
try {
this.wait(delayStartMs);
logger.info("Execution has been delayed for " + delayStartMs + " ms. Continuing with execution.");
- } catch (InterruptedException e) {
+ }
+ catch (InterruptedException e) {
logger.error("Job " + this.jobId + " was to be delayed for " + delayStartMs + ". Interrupted after " + (System.currentTimeMillis() - currentTime));
}
}
@@ -343,26 +353,45 @@ public class JobRunner extends EventHandler implements Runnable {
private void finalizeLogFile() {
closeLogger();
+ if (logFile == null) {
+ flowLogger.info("Log file for job " + this.jobId + " is null");
+ return;
+ }
- if (logFile != null) {
- try {
- File[] files = logFile.getParentFile().listFiles(new FilenameFilter() {
-
- @Override
- public boolean accept(File dir, String name) {
- return name.startsWith(logFile.getName());
- }
- }
- );
- Arrays.sort(files, Collections.reverseOrder());
-
- loader.uploadLogFile(executionId, this.node.getNestedId(), node.getAttempt(), files);
- } catch (ExecutorManagerException e) {
- flowLogger.error("Error writing out logs for job " + this.node.getNestedId(), e);
+ try {
+ File[] files = logFile.getParentFile().listFiles(new FilenameFilter() {
+ @Override
+ public boolean accept(File dir, String name) {
+ return name.startsWith(logFile.getName());
+ }
+ });
+ Arrays.sort(files, Collections.reverseOrder());
+
+ loader.uploadLogFile(executionId, this.node.getNestedId(), node.getAttempt(), files);
+ }
+ catch (ExecutorManagerException e) {
+ flowLogger.error("Error writing out logs for job " + this.node.getNestedId(), e);
+ }
+ }
+
+ private void finalizeAttachmentFile() {
+ if (attachmentFileName == null) {
+ flowLogger.info("Attachment file for job " + this.jobId + " is null");
+ return;
+ }
+
+ try {
+ File file = new File(attachmentFileName);
+ if (!file.exists()) {
+ flowLogger.info("No attachment file for job " + this.jobId +
+ " written.");
+ return;
}
+ loader.uploadAttachmentFile(node, file);
}
- else {
- flowLogger.info("Log file for job " + this.jobId + " is null");
+ catch (ExecutorManagerException e) {
+ flowLogger.error("Error writing out attachment for job " +
+ this.node.getNestedId(), e);
}
}
@@ -379,12 +408,14 @@ public class JobRunner extends EventHandler implements Runnable {
return;
}
+ createAttachmentFile();
createLogger();
boolean errorFound = false;
// Delay execution if necessary. Will return a true if something went wrong.
errorFound |= delayExecution();
- // For pipelining of jobs. Will watch other jobs. Will return true if something went wrong.
+ // For pipelining of jobs. Will watch other jobs. Will return true if
+ // something went wrong.
errorFound |= blockOnPipeLine();
// Start the node.
@@ -393,7 +424,8 @@ public class JobRunner extends EventHandler implements Runnable {
fireEvent(Event.create(this, Type.JOB_STARTED, null, false));
try {
loader.uploadExecutableNode(node, props);
- } catch (ExecutorManagerException e1) {
+ }
+ catch (ExecutorManagerException e1) {
logger.error("Error writing initial node properties");
}
@@ -418,6 +450,7 @@ public class JobRunner extends EventHandler implements Runnable {
fireEvent(Event.create(this, Type.JOB_FINISHED), false);
finalizeLogFile();
+ finalizeAttachmentFile();
}
private boolean prepareJob() throws RuntimeException {
@@ -427,7 +460,7 @@ public class JobRunner extends EventHandler implements Runnable {
return false;
}
- synchronized(syncObject) {
+ synchronized (syncObject) {
if (node.getStatus() == Status.FAILED || cancelled) {
return false;
}
@@ -447,6 +480,7 @@ public class JobRunner extends EventHandler implements Runnable {
props.put(CommonJobProperties.JOB_ATTEMPT, node.getAttempt());
props.put(CommonJobProperties.JOB_METADATA_FILE, createMetaDataFileName(node));
+ props.put(CommonJobProperties.JOB_ATTACHMENT_FILE, attachmentFileName);
changeStatus(Status.RUNNING);
// Ability to specify working directory
@@ -454,9 +488,9 @@ public class JobRunner extends EventHandler implements Runnable {
props.put(AbstractProcessJob.WORKING_DIR, workingDir.getAbsolutePath());
}
- if(props.containsKey("user.to.proxy")) {
+ if (props.containsKey("user.to.proxy")) {
String jobProxyUser = props.getString("user.to.proxy");
- if(proxyUsers != null && !proxyUsers.contains(jobProxyUser)) {
+ if (proxyUsers != null && !proxyUsers.contains(jobProxyUser)) {
logger.error("User " + jobProxyUser + " has no permission to execute this job " + this.jobId + "!");
return false;
}
@@ -477,7 +511,8 @@ public class JobRunner extends EventHandler implements Runnable {
private void runJob() {
try {
job.run();
- } catch (Exception e) {
+ }
+ catch (Exception e) {
e.printStackTrace();
if (props.getBoolean("job.succeed.on.failure", false)) {
@@ -544,7 +579,8 @@ public class JobRunner extends EventHandler implements Runnable {
try {
job.cancel();
- } catch (Exception e) {
+ }
+ catch (Exception e) {
logError(e.getMessage());
logError("Failed trying to cancel job. Maybe it hasn't started running yet or just finished.");
}
@@ -603,4 +639,21 @@ public class JobRunner extends EventHandler implements Runnable {
public static String createMetaDataFileName(ExecutableNode node) {
return JobRunner.createMetaDataFileName(node, node.getAttempt());
}
+
+ public static String createAttachmentFileName(ExecutableNode node) {
+
+ return JobRunner.createAttachmentFileName(node, node.getAttempt());
+ }
+
+ public static String createAttachmentFileName(ExecutableNode node, int attempt) {
+ int executionId = node.getExecutableFlow().getExecutionId();
+ String jobId = node.getId();
+ if (node.getExecutableFlow() != node.getParentFlow()) {
+ // Posix safe file delimiter
+ jobId = node.getPrintableId("._.");
+ }
+
+ return attempt > 0 ? "_job." + executionId + "." + attempt + "." + jobId + ".attach" : "_job." + executionId + "." + jobId + ".attach";
+
+ }
}
diff --git a/src/java/azkaban/executor/ConnectorParams.java b/src/java/azkaban/executor/ConnectorParams.java
index 0c0c7e0..c84436e 100644
--- a/src/java/azkaban/executor/ConnectorParams.java
+++ b/src/java/azkaban/executor/ConnectorParams.java
@@ -30,6 +30,7 @@ public interface ConnectorParams {
public static final String RESUME_ACTION = "resume";
public static final String PING_ACTION = "ping";
public static final String LOG_ACTION = "log";
+ public static final String ATTACHMENTS_ACTION = "attachments";
public static final String METADATA_ACTION = "metadata";
public static final String MODIFY_EXECUTION_ACTION = "modifyExecution";
diff --git a/src/java/azkaban/executor/ExecutableFlow.java b/src/java/azkaban/executor/ExecutableFlow.java
index a6ccea5..bc33c0e 100644
--- a/src/java/azkaban/executor/ExecutableFlow.java
+++ b/src/java/azkaban/executor/ExecutableFlow.java
@@ -89,7 +89,8 @@ public class ExecutableFlow extends ExecutableFlowBase {
protected void setFlow(Project project, Flow flow) {
super.setFlow(project, flow);
executionOptions = new ExecutionOptions();
-
+ executionOptions.setMailCreator(flow.getMailCreator());
+
if (flow.getSuccessEmails() != null) {
executionOptions.setSuccessEmails(flow.getSuccessEmails());
}
diff --git a/src/java/azkaban/executor/ExecutableNode.java b/src/java/azkaban/executor/ExecutableNode.java
index bb896a2..bacad6c 100644
--- a/src/java/azkaban/executor/ExecutableNode.java
+++ b/src/java/azkaban/executor/ExecutableNode.java
@@ -61,7 +61,7 @@ public class ExecutableNode {
private Props inputProps;
private Props outputProps;
-
+
public static final String ATTEMPT_PARAM = "attempt";
public static final String PASTATTEMPTS_PARAM = "pastAttempts";
@@ -204,7 +204,7 @@ public class ExecutableNode {
public Props getOutputProps() {
return outputProps;
}
-
+
public long getDelayedExecution() {
return delayExecution;
}
@@ -438,4 +438,5 @@ public class ExecutableNode {
public long getRetryBackoff() {
return inputProps.getLong("retry.backoff", 0);
}
-}
\ No newline at end of file
+}
+
diff --git a/src/java/azkaban/executor/ExecutorLoader.java b/src/java/azkaban/executor/ExecutorLoader.java
index 04e0e44..4763ee5 100644
--- a/src/java/azkaban/executor/ExecutorLoader.java
+++ b/src/java/azkaban/executor/ExecutorLoader.java
@@ -47,7 +47,11 @@ public interface ExecutorLoader {
public LogData fetchLogs(int execId, String name, int attempt, int startByte, int endByte) throws ExecutorManagerException;
+ public List<Object> fetchAttachments(int execId, String name, int attempt) throws ExecutorManagerException;
+
public void uploadLogFile(int execId, String name, int attempt, File ... files) throws ExecutorManagerException;
+
+ public void uploadAttachmentFile(ExecutableNode node, File file) throws ExecutorManagerException;
public void updateExecutableFlow(ExecutableFlow flow) throws ExecutorManagerException;
src/java/azkaban/executor/ExecutorManager.java 94(+64 -30)
diff --git a/src/java/azkaban/executor/ExecutorManager.java b/src/java/azkaban/executor/ExecutorManager.java
index fbeadb5..f2e979b 100644
--- a/src/java/azkaban/executor/ExecutorManager.java
+++ b/src/java/azkaban/executor/ExecutorManager.java
@@ -200,41 +200,41 @@ public class ExecutorManager extends EventHandler implements ExecutorManagerAdap
@Override
public List<ExecutableFlow> getExecutableFlows(
- Project project, String flowId, int skip, int size)
- throws ExecutorManagerException {
+ Project project, String flowId, int skip, int size)
+ throws ExecutorManagerException {
List<ExecutableFlow> flows = executorLoader.fetchFlowHistory(
- project.getId(), flowId, skip, size);
+ project.getId(), flowId, skip, size);
return flows;
}
@Override
public List<ExecutableFlow> getExecutableFlows(int skip, int size)
- throws ExecutorManagerException {
+ throws ExecutorManagerException {
List<ExecutableFlow> flows = executorLoader.fetchFlowHistory(skip, size);
return flows;
}
@Override
public List<ExecutableFlow> getExecutableFlows(
- String flowIdContains, int skip, int size)
- throws ExecutorManagerException {
+ String flowIdContains, int skip, int size)
+ throws ExecutorManagerException {
List<ExecutableFlow> flows = executorLoader.fetchFlowHistory(
- null, '%'+flowIdContains+'%', null, 0, -1, -1 , skip, size);
+ null, '%'+flowIdContains+'%', null, 0, -1, -1 , skip, size);
return flows;
}
@Override
public List<ExecutableFlow> getExecutableFlows(
- String projContain,
- String flowContain,
- String userContain,
- int status,
- long begin,
- long end,
- int skip,
- int size) throws ExecutorManagerException {
+ String projContain,
+ String flowContain,
+ String userContain,
+ int status,
+ long begin,
+ long end,
+ int skip,
+ int size) throws ExecutorManagerException {
List<ExecutableFlow> flows = executorLoader.fetchFlowHistory(
- projContain, flowContain, userContain, status, begin, end , skip, size);
+ projContain, flowContain, userContain, status, begin, end , skip, size);
return flows;
}
@@ -273,8 +273,11 @@ public class ExecutorManager extends EventHandler implements ExecutorManagerAdap
}
@Override
- public LogData getExecutionJobLog(ExecutableFlow exFlow, String jobId, int offset, int length, int attempt) throws ExecutorManagerException {
- Pair<ExecutionReference, ExecutableFlow> pair = runningFlows.get(exFlow.getExecutionId());
+ public LogData getExecutionJobLog(
+ ExecutableFlow exFlow, String jobId, int offset, int length, int attempt)
+ throws ExecutorManagerException {
+ Pair<ExecutionReference, ExecutableFlow> pair =
+ runningFlows.get(exFlow.getExecutionId());
if (pair != null) {
Pair<String,String> typeParam = new Pair<String,String>("type", "job");
Pair<String,String> jobIdParam = new Pair<String,String>("jobId", jobId);
@@ -283,14 +286,45 @@ public class ExecutorManager extends EventHandler implements ExecutorManagerAdap
Pair<String,String> attemptParam = new Pair<String,String>("attempt", String.valueOf(attempt));
@SuppressWarnings("unchecked")
- Map<String, Object> result = callExecutorServer(pair.getFirst(), ConnectorParams.LOG_ACTION, typeParam, jobIdParam, offsetParam, lengthParam, attemptParam);
+ Map<String, Object> result = callExecutorServer(
+ pair.getFirst(),
+ ConnectorParams.LOG_ACTION,
+ typeParam,
+ jobIdParam,
+ offsetParam,
+ lengthParam,
+ attemptParam);
return LogData.createLogDataFromObject(result);
}
else {
- LogData value = executorLoader.fetchLogs(exFlow.getExecutionId(), jobId, attempt, offset, length);
+ LogData value = executorLoader.fetchLogs(
+ exFlow.getExecutionId(), jobId, attempt, offset, length);
return value;
}
}
+
+ @Override
+ public List<Object> getExecutionJobStats(
+ ExecutableFlow exFlow, String jobId, int attempt)
+ throws ExecutorManagerException {
+ Pair<ExecutionReference, ExecutableFlow> pair =
+ runningFlows.get(exFlow.getExecutionId());
+ if (pair == null) {
+ return executorLoader.fetchAttachments(
+ exFlow.getExecutionId(), jobId, attempt);
+ }
+
+ Pair<String, String> jobIdParam = new Pair<String, String>("jobId", jobId);
+ Pair<String,String> attemptParam = new Pair<String,String>("attempt", String.valueOf(attempt));
+
+ @SuppressWarnings("unchecked")
+ Map<String, Object> result = callExecutorServer(
+ pair.getFirst(),
+ ConnectorParams.ATTACHMENTS_ACTION,
+ jobIdParam,
+ attemptParam);
+ return (List<Object>) result.get("attachments");
+ }
@Override
public JobMetaData getExecutionJobMetaData(ExecutableFlow exFlow, String jobId, int offset, int length, int attempt) throws ExecutorManagerException {
@@ -492,7 +526,7 @@ public class ExecutorManager extends EventHandler implements ExecutorManagerAdap
ExecutionReference reference = new ExecutionReference(exflow.getExecutionId(), executorHost, executorPort);
executorLoader.addActiveExecutableReference(reference);
try {
- callExecutorServer(reference, ConnectorParams.EXECUTE_ACTION);
+ callExecutorServer(reference, ConnectorParams.EXECUTE_ACTION);
runningFlows.put(exflow.getExecutionId(), new Pair<ExecutionReference, ExecutableFlow>(reference, exflow));
message += "Execution submitted successfully with exec id " + exflow.getExecutionId();
@@ -1114,23 +1148,23 @@ public class ExecutorManager extends EventHandler implements ExecutorManagerAdap
@Override
public int getExecutableFlows(
- int projectId,
- String flowId,
- int from,
- int length,
- List<ExecutableFlow> outputList) throws ExecutorManagerException {
+ int projectId,
+ String flowId,
+ int from,
+ int length,
+ List<ExecutableFlow> outputList) throws ExecutorManagerException {
List<ExecutableFlow> flows = executorLoader.fetchFlowHistory(
- projectId, flowId, from, length);
+ projectId, flowId, from, length);
outputList.addAll(flows);
return executorLoader.fetchNumExecutableFlows(projectId, flowId);
}
@Override
public List<ExecutableFlow> getExecutableFlows(
- int projectId, String flowId, int from, int length, Status status)
- throws ExecutorManagerException {
+ int projectId, String flowId, int from, int length, Status status)
+ throws ExecutorManagerException {
return executorLoader.fetchFlowHistory(
- projectId, flowId, from, length, status);
+ projectId, flowId, from, length, status);
}
/*
diff --git a/src/java/azkaban/executor/ExecutorManagerAdapter.java b/src/java/azkaban/executor/ExecutorManagerAdapter.java
index cc81589..af64ec4 100644
--- a/src/java/azkaban/executor/ExecutorManagerAdapter.java
+++ b/src/java/azkaban/executor/ExecutorManagerAdapter.java
@@ -90,6 +90,8 @@ public interface ExecutorManagerAdapter{
public LogData getExecutableFlowLog(ExecutableFlow exFlow, int offset, int length) throws ExecutorManagerException;
public LogData getExecutionJobLog(ExecutableFlow exFlow, String jobId, int offset, int length, int attempt) throws ExecutorManagerException;
+
+ public List<Object> getExecutionJobStats(ExecutableFlow exflow, String jobId, int attempt) throws ExecutorManagerException;
public JobMetaData getExecutionJobMetaData(ExecutableFlow exFlow, String jobId, int offset, int length, int attempt) throws ExecutorManagerException;
diff --git a/src/java/azkaban/executor/JdbcExecutorLoader.java b/src/java/azkaban/executor/JdbcExecutorLoader.java
index 4736421..8c7a8ed 100644
--- a/src/java/azkaban/executor/JdbcExecutorLoader.java
+++ b/src/java/azkaban/executor/JdbcExecutorLoader.java
@@ -35,6 +35,7 @@ import org.apache.commons.dbutils.DbUtils;
import org.apache.commons.dbutils.QueryRunner;
import org.apache.commons.dbutils.ResultSetHandler;
import org.apache.commons.io.IOUtils;
+import org.apache.commons.io.FileUtils;
import org.apache.log4j.Logger;
import org.joda.time.DateTime;
@@ -702,6 +703,32 @@ public class JdbcExecutorLoader extends AbstractJdbcLoader
"Error fetching logs " + execId + " : " + name, e);
}
}
+
+ @Override
+ public List<Object> fetchAttachments(int execId, String jobId, int attempt)
+ throws ExecutorManagerException {
+ QueryRunner runner = createQueryRunner();
+
+ try {
+ String attachments = runner.query(
+ FetchExecutableJobAttachmentsHandler.FETCH_ATTACHMENTS_EXECUTABLE_NODE,
+ new FetchExecutableJobAttachmentsHandler(),
+ execId,
+ jobId);
+ if (attachments == null) {
+ return null;
+ }
+ return (List<Object>) JSONUtils.parseJSONFromString(attachments);
+ }
+ catch (IOException e) {
+ throw new ExecutorManagerException(
+ "Error converting job attachments to JSON " + jobId, e);
+ }
+ catch (SQLException e) {
+ throw new ExecutorManagerException(
+ "Error query job attachments " + jobId, e);
+ }
+ }
@Override
public void uploadLogFile(
@@ -833,6 +860,50 @@ public class JdbcExecutorLoader extends AbstractJdbcLoader
DateTime.now().getMillis());
}
+ @Override
+ public void uploadAttachmentFile(ExecutableNode node, File file)
+ throws ExecutorManagerException {
+ Connection connection = getConnection();
+ try {
+ uploadAttachmentFile(connection, node, file, defaultEncodingType);
+ connection.commit();
+ }
+ catch (SQLException e) {
+ throw new ExecutorManagerException("Error committing attachments ", e);
+ }
+ catch (IOException e) {
+ throw new ExecutorManagerException("Error uploading attachments ", e);
+ }
+ finally {
+ DbUtils.closeQuietly(connection);
+ }
+ }
+
+ private void uploadAttachmentFile(
+ Connection connection,
+ ExecutableNode node,
+ File file,
+ EncodingType encType) throws SQLException, IOException {
+
+ String jsonString = FileUtils.readFileToString(file);
+ byte[] attachments = GZIPUtils.gzipString(jsonString, "UTF-8");
+
+ final String UPDATE_EXECUTION_NODE_ATTACHMENTS =
+ "UPDATE execution_jobs " +
+ "SET attachments=? " +
+ "WHERE exec_id=? AND flow_id=? AND job_id=? AND attempt=?";
+
+ QueryRunner runner = new QueryRunner();
+ runner.update(
+ connection,
+ UPDATE_EXECUTION_NODE_ATTACHMENTS,
+ attachments,
+ node.getExecutableFlow().getExecutionId(),
+ node.getParentFlow().getNestedId(),
+ node.getId(),
+ node.getAttempt());
+ }
+
private Connection getConnection() throws ExecutorManagerException {
Connection connection = null;
try {
@@ -972,6 +1043,30 @@ public class JdbcExecutorLoader extends AbstractJdbcLoader
return execNodes;
}
}
+
+ private static class FetchExecutableJobAttachmentsHandler
+ implements ResultSetHandler<String> {
+ private static String FETCH_ATTACHMENTS_EXECUTABLE_NODE =
+ "SELECT attachments FROM execution_jobs WHERE exec_id=? AND job_id=?";
+
+ @SuppressWarnings("unchecked")
+ @Override
+ public String handle(ResultSet rs) throws SQLException {
+ String attachmentsJson = null;
+ if (rs.next()) {
+ try {
+ byte[] attachments = rs.getBytes(1);
+ if (attachments != null) {
+ attachmentsJson = GZIPUtils.unGzipString(attachments, "UTF-8");
+ }
+ }
+ catch (IOException e) {
+ throw new SQLException("Error decoding job attachments", e);
+ }
+ }
+ return attachmentsJson;
+ }
+ }
private static class FetchExecutableJobPropsHandler
implements ResultSetHandler<Pair<Props, Props>> {
diff --git a/src/java/azkaban/flow/CommonJobProperties.java b/src/java/azkaban/flow/CommonJobProperties.java
index 7324205..3873a56 100644
--- a/src/java/azkaban/flow/CommonJobProperties.java
+++ b/src/java/azkaban/flow/CommonJobProperties.java
@@ -75,6 +75,11 @@ public class CommonJobProperties {
* The attempt number of the executing job.
*/
public static final String JOB_METADATA_FILE = "azkaban.job.metadata.file";
+
+ /**
+ * The attempt number of the executing job.
+ */
+ public static final String JOB_ATTACHMENT_FILE = "azkaban.job.attachment.file";
/**
* The executing flow id
src/java/azkaban/jobtype/JobTypeManager.java 22(+11 -11)
diff --git a/src/java/azkaban/jobtype/JobTypeManager.java b/src/java/azkaban/jobtype/JobTypeManager.java
index 1ef57ef..6c275af 100644
--- a/src/java/azkaban/jobtype/JobTypeManager.java
+++ b/src/java/azkaban/jobtype/JobTypeManager.java
@@ -338,15 +338,16 @@ public class JobTypeManager
}
- public Job buildJobExecutor(String jobId, Props jobProps, Logger logger) throws JobTypeManagerException
- {
+ public Job buildJobExecutor(String jobId, Props jobProps, Logger logger)
+ throws JobTypeManagerException {
Job job = null;
try {
String jobType = jobProps.getString("type");
if (jobType == null || jobType.length() == 0) {
/*throw an exception when job name is null or empty*/
- throw new JobExecutionException (
- String.format("The 'type' parameter for job[%s] is null or empty", jobProps, logger));
+ throw new JobExecutionException(
+ String.format("The 'type' parameter for job[%s] is null or empty",
+ jobProps, logger));
}
logger.info("Building " + jobType + " job executor. ");
@@ -361,17 +362,16 @@ public class JobTypeManager
Props sysConf = jobtypeSysProps.get(jobType);
Props jobConf = jobProps;
- if(jobtypeJobProps.containsKey(jobType)) {
+ if (jobtypeJobProps.containsKey(jobType)) {
Props p = jobtypeJobProps.get(jobType);
- for(String k : p.getKeySet())
- {
- if(!jobConf.containsKey(k)) {
+ for (String k : p.getKeySet()) {
+ if (!jobConf.containsKey(k)) {
jobConf.put(k, p.get(k));
}
}
}
jobConf = PropsUtils.resolveProps(jobConf);
-
+
if (sysConf != null) {
sysConf = PropsUtils.resolveProps(sysConf);
}
@@ -379,11 +379,11 @@ public class JobTypeManager
sysConf = new Props();
}
-
// logger.info("sysConf is " + sysConf);
// logger.info("jobConf is " + jobConf);
//
- job = (Job)Utils.callConstructor(executorClass, jobId, sysConf, jobConf, logger);
+ job = (Job) Utils.callConstructor(
+ executorClass, jobId, sysConf, jobConf, logger);
}
catch (Exception e) {
//job = new InitErrorJob(jobId, e);
src/java/azkaban/webapp/AzkabanWebServer.java 45(+13 -32)
diff --git a/src/java/azkaban/webapp/AzkabanWebServer.java b/src/java/azkaban/webapp/AzkabanWebServer.java
index 52e8d7c..21460b2 100644
--- a/src/java/azkaban/webapp/AzkabanWebServer.java
+++ b/src/java/azkaban/webapp/AzkabanWebServer.java
@@ -93,8 +93,9 @@ import azkaban.webapp.servlet.HistoryServlet;
import azkaban.webapp.servlet.ProjectServlet;
import azkaban.webapp.servlet.ProjectManagerServlet;
import azkaban.webapp.servlet.TriggerManagerServlet;
-import azkaban.webapp.servlet.TriggerPlugin;
-import azkaban.webapp.servlet.ViewerPlugin;
+import azkaban.webapp.plugin.TriggerPlugin;
+import azkaban.webapp.plugin.ViewerPlugin;
+import azkaban.webapp.plugin.PluginRegistry;
import azkaban.webapp.session.SessionCache;
/**
@@ -153,7 +154,6 @@ public class AzkabanWebServer extends AzkabanServer {
private Props props;
private SessionCache sessionCache;
private File tempDir;
- private List<ViewerPlugin> viewerPlugins;
private Map<String, TriggerPlugin> triggerPlugins;
private MBeanServer mbeanServer;
@@ -197,12 +197,6 @@ public class AzkabanWebServer extends AzkabanServer {
tempDir = new File(props.getString("azkaban.temp.dir", "temp"));
- File statsDir = new File(props.getString("azkaban.stats.dir", "stats"));
- if (!statsDir.exists()) {
- statsDir.mkdir();
- }
- props.put("azkaban.stats.dir", statsDir.getCanonicalPath());
-
// Setup time zone
if (props.containsKey(DEFAULT_TIMEZONE_ID)) {
String timezone = props.getString(DEFAULT_TIMEZONE_ID);
@@ -215,10 +209,6 @@ public class AzkabanWebServer extends AzkabanServer {
configureMBeanServer();
}
- private void setViewerPlugins(List<ViewerPlugin> viewerPlugins) {
- this.viewerPlugins = viewerPlugins;
- }
-
private void setTriggerPlugins(Map<String, TriggerPlugin> triggerPlugins) {
this.triggerPlugins = triggerPlugins;
}
@@ -775,7 +765,7 @@ public class AzkabanWebServer extends AzkabanServer {
root.addServlet(new ServletHolder(new TriggerManagerServlet()),"/triggers");
String viewerPluginDir = azkabanSettings.getString("viewer.plugin.dir", "plugins/viewer");
- app.setViewerPlugins(loadViewerPlugins(root, viewerPluginDir, app.getVelocityEngine()));
+ loadViewerPlugins(root, viewerPluginDir, app.getVelocityEngine());
// triggerplugin
String triggerPluginDir = azkabanSettings.getString("trigger.plugin.dir", "plugins/triggers");
@@ -963,13 +953,12 @@ public class AzkabanWebServer extends AzkabanServer {
return triggerPlugins;
}
- private static List<ViewerPlugin> loadViewerPlugins(Context root, String pluginPath, VelocityEngine ve) {
+ private static void loadViewerPlugins(Context root, String pluginPath, VelocityEngine ve) {
File viewerPluginPath = new File(pluginPath);
if (!viewerPluginPath.exists()) {
- return Collections.<ViewerPlugin>emptyList();
+ return;
}
- ArrayList<ViewerPlugin> installedViewerPlugins = new ArrayList<ViewerPlugin>();
ClassLoader parentLoader = AzkabanWebServer.class.getClassLoader();
File[] pluginDirs = viewerPluginPath.listFiles();
ArrayList<String> jarPaths = new ArrayList<String>();
@@ -1011,6 +1000,7 @@ public class AzkabanWebServer extends AzkabanServer {
String pluginName = pluginProps.getString("viewer.name");
String pluginWebPath = pluginProps.getString("viewer.path");
+ String pluginJobType = pluginProps.getString("viewer.jobtype", null);
int pluginOrder = pluginProps.getInt("viewer.order", 0);
boolean pluginHidden = pluginProps.getBoolean("viewer.hidden", false);
List<String> extLibClasspath = pluginProps.getStringList("viewer.external.classpaths", (List<String>)null);
@@ -1114,27 +1104,18 @@ public class AzkabanWebServer extends AzkabanServer {
AbstractAzkabanServlet avServlet = (AbstractAzkabanServlet)obj;
root.addServlet(new ServletHolder(avServlet), "/" + pluginWebPath + "/*");
- installedViewerPlugins.add(new ViewerPlugin(pluginName, pluginWebPath, pluginOrder, pluginHidden));
+ PluginRegistry.getRegistry().register(new ViewerPlugin(
+ pluginName,
+ pluginWebPath,
+ pluginOrder,
+ pluginHidden,
+ pluginJobType));
}
// Velocity needs the jar resource paths to be set.
String jarResourcePath = StringUtils.join(jarPaths, ", ");
logger.info("Setting jar resource path " + jarResourcePath);
ve.addProperty("jar.resource.loader.path", jarResourcePath);
-
- // Sort plugins based on order
- Collections.sort(installedViewerPlugins, new Comparator<ViewerPlugin>() {
- @Override
- public int compare(ViewerPlugin o1, ViewerPlugin o2) {
- return o1.getOrder() - o2.getOrder();
- }
- });
-
- return installedViewerPlugins;
- }
-
- public List<ViewerPlugin> getViewerPlugins() {
- return viewerPlugins;
}
/**
diff --git a/src/java/azkaban/webapp/plugin/PluginRegistry.java b/src/java/azkaban/webapp/plugin/PluginRegistry.java
new file mode 100644
index 0000000..d49db54
--- /dev/null
+++ b/src/java/azkaban/webapp/plugin/PluginRegistry.java
@@ -0,0 +1,75 @@
+/*
+ * Copyright 2014 LinkedIn Corp.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package azkaban.webapp.plugin;
+
+import java.util.List;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.TreeSet;
+import java.util.Set;
+
+public class PluginRegistry {
+
+ private static PluginRegistry registry;
+
+ public TreeSet<ViewerPlugin> viewerPlugins;
+
+ public Map<String, TreeSet<ViewerPlugin>> jobTypeViewerPlugins;
+
+ private PluginRegistry() {
+ viewerPlugins = new TreeSet<ViewerPlugin>(ViewerPlugin.COMPARATOR);
+ jobTypeViewerPlugins = new HashMap<String, TreeSet<ViewerPlugin>>();
+ }
+
+ public void register(ViewerPlugin plugin) {
+ viewerPlugins.add(plugin);
+ String jobType = plugin.getJobType();
+ if (jobType == null) {
+ return;
+ }
+ TreeSet<ViewerPlugin> plugins = null;
+ if (!jobTypeViewerPlugins.containsKey(jobType)) {
+ plugins = new TreeSet<ViewerPlugin>(ViewerPlugin.COMPARATOR);
+ plugins.add(plugin);
+ jobTypeViewerPlugins.put(jobType, plugins);
+ }
+ else {
+ plugins = jobTypeViewerPlugins.get(jobType);
+ plugins.add(plugin);
+ }
+ }
+
+ public List<ViewerPlugin> getViewerPlugins() {
+ return new ArrayList<ViewerPlugin>(viewerPlugins);
+ }
+
+ public List<ViewerPlugin> getViewerPluginsForJobType(String jobType) {
+ TreeSet<ViewerPlugin> plugins = jobTypeViewerPlugins.get(jobType);
+ if (plugins == null) {
+ return null;
+ }
+ return new ArrayList<ViewerPlugin>(plugins);
+ }
+
+ public static PluginRegistry getRegistry() {
+ if (registry == null) {
+ registry = new PluginRegistry();
+ }
+ return registry;
+ }
+}
diff --git a/src/java/azkaban/webapp/servlet/AbstractAzkabanServlet.java b/src/java/azkaban/webapp/servlet/AbstractAzkabanServlet.java
index 8ac470e..2fc6ecc 100644
--- a/src/java/azkaban/webapp/servlet/AbstractAzkabanServlet.java
+++ b/src/java/azkaban/webapp/servlet/AbstractAzkabanServlet.java
@@ -39,6 +39,9 @@ import azkaban.utils.Props;
import azkaban.webapp.AzkabanServer;
import azkaban.webapp.AzkabanWebServer;
import azkaban.webapp.session.Session;
+import azkaban.webapp.plugin.ViewerPlugin;
+import azkaban.webapp.plugin.TriggerPlugin;
+import azkaban.webapp.plugin.PluginRegistry;
/**
* Base Servlet for pages
@@ -91,7 +94,7 @@ public abstract class AbstractAzkabanServlet extends HttpServlet {
if (application instanceof AzkabanWebServer) {
AzkabanWebServer server = (AzkabanWebServer)application;
- viewerPlugins = server.getViewerPlugins();
+ viewerPlugins = PluginRegistry.getRegistry().getViewerPlugins();
triggerPlugins = new ArrayList<TriggerPlugin>(server.getTriggerPlugins().values());
}
}
diff --git a/src/java/azkaban/webapp/servlet/ExecutorServlet.java b/src/java/azkaban/webapp/servlet/ExecutorServlet.java
index c875428..6f78ac8 100644
--- a/src/java/azkaban/webapp/servlet/ExecutorServlet.java
+++ b/src/java/azkaban/webapp/servlet/ExecutorServlet.java
@@ -17,7 +17,6 @@
package azkaban.webapp.servlet;
import java.io.IOException;
-import java.io.File;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
@@ -46,9 +45,10 @@ import azkaban.user.Permission;
import azkaban.user.User;
import azkaban.user.Permission.Type;
import azkaban.utils.FileIOUtils.LogData;
-import azkaban.utils.JSONUtils;
import azkaban.webapp.AzkabanWebServer;
import azkaban.webapp.session.Session;
+import azkaban.webapp.plugin.PluginRegistry;
+import azkaban.webapp.plugin.ViewerPlugin;
public class ExecutorServlet extends LoginAbstractAzkabanServlet {
private static final long serialVersionUID = 1L;
@@ -57,8 +57,6 @@ public class ExecutorServlet extends LoginAbstractAzkabanServlet {
private ScheduleManager scheduleManager;
private ExecutorVelocityHelper velocityHelper;
- private String statsDir;
-
@Override
public void init(ServletConfig config) throws ServletException {
super.init(config);
@@ -67,7 +65,6 @@ public class ExecutorServlet extends LoginAbstractAzkabanServlet {
executorManager = server.getExecutorManager();
scheduleManager = server.getScheduleManager();
velocityHelper = new ExecutorVelocityHelper();
- statsDir = server.getServerProps().getString("azkaban.stats.dir");
}
@Override
@@ -130,9 +127,9 @@ public class ExecutorServlet extends LoginAbstractAzkabanServlet {
else if (ajaxName.equals("fetchExecJobLogs")) {
ajaxFetchJobLogs(req, resp, ret, session.getUser(), exFlow);
}
- else if (ajaxName.equals("fetchExecJobStats")) {
- ajaxFetchJobStats(req, resp, ret, session.getUser(), exFlow);
- }
+ else if (ajaxName.equals("fetchExecJobStats")) {
+ ajaxFetchJobStats(req, resp, ret, session.getUser(), exFlow);
+ }
else if (ajaxName.equals("retryFailedJobs")) {
ajaxRestartFailed(req, resp, ret, session.getUser(), exFlow);
}
@@ -189,6 +186,7 @@ public class ExecutorServlet extends LoginAbstractAzkabanServlet {
page.add("attempt", attempt);
ExecutableFlow flow = null;
+ ExecutableNode node = null;
try {
flow = executorManager.getExecutableFlow(execId);
if (flow == null) {
@@ -196,7 +194,18 @@ public class ExecutorServlet extends LoginAbstractAzkabanServlet {
page.render();
return;
}
- } catch (ExecutorManagerException e) {
+
+ node = flow.getExecutableNodePath(jobId);
+ if (node == null) {
+ page.add("errorMsg", "Job " + jobId + " doesn't exist in " + flow.getExecutionId());
+ return;
+ }
+
+ List<ViewerPlugin> jobViewerPlugins = PluginRegistry.getRegistry()
+ .getViewerPluginsForJobType(node.getType());
+ page.add("jobViewerPlugins", jobViewerPlugins);
+ }
+ catch (ExecutorManagerException e) {
page.add("errorMsg", "Error loading executing flow: " + e.getMessage());
page.render();
return;
@@ -209,7 +218,6 @@ public class ExecutorServlet extends LoginAbstractAzkabanServlet {
return;
}
- ExecutableNode node = flow.getExecutableNodePath(jobId);
page.add("projectName", project.getName());
page.add("flowid", flow.getId());
page.add("parentflowid", node.getParentFlow().getFlowId());
@@ -456,35 +464,35 @@ public class ExecutorServlet extends LoginAbstractAzkabanServlet {
}
}
- private void ajaxFetchJobStats(HttpServletRequest req,
- HttpServletResponse resp, HashMap<String, Object> ret, User user,
+ private void ajaxFetchJobStats(
+ HttpServletRequest req,
+ HttpServletResponse resp,
+ HashMap<String, Object> ret,
+ User user,
ExecutableFlow exFlow) throws ServletException {
- Project project = getProjectAjaxByPermission(ret,
- exFlow.getProjectId(), user, Type.READ);
+ Project project = getProjectAjaxByPermission(
+ ret, exFlow.getProjectId(), user, Type.READ);
if (project == null) {
return;
}
String jobId = this.getParam(req, "jobid");
resp.setCharacterEncoding("utf-8");
- String statsFilePath = null;
+
try {
- ExecutableNode node = exFlow.getExecutableNode(jobId);
+ ExecutableNode node = exFlow.getExecutableNodePath(jobId);
if (node == null) {
- ret.put("error",
- "Job " + jobId + " doesn't exist in "
- + exFlow.getExecutionId());
+ ret.put("error", "Job " + jobId + " doesn't exist in " +
+ exFlow.getExecutionId());
return;
}
- statsFilePath = statsDir + "/" + exFlow.getExecutionId() + "-"
- + jobId + "-stats.json";
- File statsFile = new File(statsFilePath);
- @SuppressWarnings("unchecked")
- List<Object> jsonObj = (ArrayList<Object>) JSONUtils.parseJSONFromFile(statsFile);
+ List<Object> jsonObj = executorManager.getExecutionJobStats(
+ exFlow, jobId, node.getAttempt());
ret.put("jobStats", jsonObj);
- } catch (IOException e) {
- ret.put("error", "Cannot open stats file: " + statsFilePath);
+ }
+ catch (ExecutorManagerException e) {
+ ret.put("error", "Error retrieving stats for job " + jobId);
return;
}
}
@@ -715,7 +723,7 @@ public class ExecutorServlet extends LoginAbstractAzkabanServlet {
Map<String, Object> map = getExecutableFlowUpdateInfo(exFlow, lastUpdateTime);
map.put("status", exFlow.getStatus());
map.put("startTime", exFlow.getStartTime());
- map.put("endTime", exFlow.getEndTime());
+ map.put("endTime", exFlow.getEndTime());
map.put("updateTime", exFlow.getUpdateTime());
ret.putAll(map);
}
diff --git a/src/java/azkaban/webapp/servlet/velocity/executingflowpage.vm b/src/java/azkaban/webapp/servlet/velocity/executingflowpage.vm
index d944f4c..678c288 100644
--- a/src/java/azkaban/webapp/servlet/velocity/executingflowpage.vm
+++ b/src/java/azkaban/webapp/servlet/velocity/executingflowpage.vm
@@ -122,6 +122,7 @@
<thead>
<tr>
<th>Name</th>
+ <th class="jobtype">Type</th>
<th class="timeline">Timeline</th>
<th class="date">Start Time</th>
<th class="date">End Time</th>
diff --git a/src/java/azkaban/webapp/servlet/velocity/jobdetailsheader.vm b/src/java/azkaban/webapp/servlet/velocity/jobdetailsheader.vm
new file mode 100644
index 0000000..5b172c8
--- /dev/null
+++ b/src/java/azkaban/webapp/servlet/velocity/jobdetailsheader.vm
@@ -0,0 +1,62 @@
+#*
+ * Copyright 2014 LinkedIn Corp.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+*#
+
+ ## Page header.
+
+ <div class="az-page-header">
+ <div class="container-full">
+ <div class="row">
+ <div class="col-xs-6">
+ <h1><a href="${context}/executor?execid=${execid}&job=${jobid}">Job Execution <small>$jobid</small></a></h1>
+ </div>
+ <div class="col-xs-6">
+ <div class="pull-right az-page-header-form">
+ <a href="${context}/manager?project=${projectName}&flow=${parentflowid}&job=$jobname" class="btn btn-info">Job Properties</a>
+ </div>
+ </div>
+ </div>
+ </div>
+ </div>
+
+
+ <div class="container-full">
+
+ #parse ("azkaban/webapp/servlet/velocity/alerts.vm")
+
+ ## Breadcrumb
+
+ <ol class="breadcrumb">
+ <li><a href="${context}/manager?project=${projectName}"><strong>Project</strong> $projectName</a></li>
+ <li><a href="${context}/manager?project=${projectName}&flow=${flowid}"><strong>Flow</strong> $flowid</a></li>
+ <li><a href="${context}/executor?execid=${execid}#jobslist"><strong>Execution</strong> $execid</a></li>
+ <li class="active"><strong>Job</strong> $jobid</li>
+ </ol>
+
+ ## Tabs
+
+ <ul class="nav nav-tabs" id="headertabs">
+ #if ($current_page == "executing")
+ <li id="jobLogViewLink"><a href="#logs">Job Logs</a></li>
+ <li id="jobSummaryViewLink"><a href="#summary">Summary</a></li>
+ #else
+ <li id="jobLogViewLink"><a href="${context}/executor?execid=${execid}&job=${jobid}#logs">Job Logs</a></li>
+ <li id="jobSummaryViewLink"><a href="${context}/executor?execid=${execid}&job=${jobid}#summary">Summary</a></li>
+ #end
+ #foreach ($jobViewerPlugin in $jobViewerPlugins)
+ <li#if($current_page == $jobViewerPlugin.pluginName) class="active"#end><a href="$!context/${jobViewerPlugin.pluginPath}?execid=${execid}&jobid=${jobid}">$jobViewerPlugin.pluginName</a></li>
+ #end
+ </ul>
+ </div>
diff --git a/src/java/azkaban/webapp/servlet/velocity/jobdetailspage.vm b/src/java/azkaban/webapp/servlet/velocity/jobdetailspage.vm
index 0d15964..2f377e5 100644
--- a/src/java/azkaban/webapp/servlet/velocity/jobdetailspage.vm
+++ b/src/java/azkaban/webapp/servlet/velocity/jobdetailspage.vm
@@ -47,8 +47,8 @@
#parse ("azkaban/webapp/servlet/velocity/errormsg.vm")
#else
- ## Page header.
+<<<<<<< HEAD
<div class="az-page-header">
<div class="container-full">
<div class="row">
@@ -85,6 +85,9 @@
<li><a href="${context}/pigvisualizer?execid=${execid}&jobid=${jobid}">Visualization</a></li>
</ul>
</div>
+=======
+ #parse ("azkaban/webapp/servlet/velocity/jobdetailsheader.vm")
+>>>>>>> 00fce49bbc08509c17de9efbd475c9ca92311dfd
## Log content.
@@ -120,6 +123,11 @@
</div>
</h3>
+ <div id="jobType">
+ <table id="jobTypeTable" class="table table-striped table-bordered table-hover">
+ </table>
+ </div>
+
<div id="command-summary">
<h4>Command Summary</h4>
<table id="commandTable" class="table table-striped table-bordered table-hover">
@@ -157,6 +165,13 @@
</tbody>
</table>
</div>
+
+ <div id="jobIds">
+ <h4>Map Reduce Jobs</h4>
+ <table class="table table-striped table-bordered table-hover">
+ <tbody id="jobIdsTableBody"></tbody>
+ </table>
+ </div>
</div>
</div>
</div>
src/less/tables.less 18(+18 -0)
diff --git a/src/less/tables.less b/src/less/tables.less
index 8cc5764..5d5aeea 100644
--- a/src/less/tables.less
+++ b/src/less/tables.less
@@ -9,10 +9,24 @@ table.table-properties {
font-weight: bold;
}
+.property-value {
+
+}
+
.property-value-half {
width: 25%;
}
+.property-key,
+.property-value,
+.property-value-half {
+ pre {
+ background: transparent;
+ padding: 0;
+ border: 0;
+ }
+}
+
.editable {
.remove-btn {
visibility: hidden;
@@ -77,6 +91,10 @@ table.table-properties {
width: 160px;
}
+ &.jobtype {
+ width: 90px;
+ }
+
&.execid {
width: 100px;
}
diff --git a/src/package/execserver/conf/azkaban.properties b/src/package/execserver/conf/azkaban.properties
index 585e208..b54b0f5 100644
--- a/src/package/execserver/conf/azkaban.properties
+++ b/src/package/execserver/conf/azkaban.properties
@@ -8,8 +8,6 @@ azkaban.jobtype.plugin.dir=plugins/jobtypes
executor.global.properties=conf/global.properties
azkaban.project.dir=projects
-azkaban.stats.dir=stats
-
database.type=mysql
mysql.port=3306
mysql.host=localhost
diff --git a/src/package/soloserver/conf/azkaban.properties b/src/package/soloserver/conf/azkaban.properties
index 9ca591e..7524a14 100644
--- a/src/package/soloserver/conf/azkaban.properties
+++ b/src/package/soloserver/conf/azkaban.properties
@@ -20,9 +20,6 @@ database.type=h2
h2.path=data/azkaban
h2.create.tables=true
-# Stats
-azkaban.stats.dir=stats
-
# Velocity dev mode
velocity.dev.mode=false
diff --git a/src/package/webserver/conf/azkaban.properties b/src/package/webserver/conf/azkaban.properties
index 97e40ae..3ccb2f3 100644
--- a/src/package/webserver/conf/azkaban.properties
+++ b/src/package/webserver/conf/azkaban.properties
@@ -22,8 +22,6 @@ mysql.user=azkaban
mysql.password=azkaban
mysql.numconnections=100
-azkaban.stats.dir=stats
-
# Velocity dev mode
velocity.dev.mode=false
src/web/js/azkaban/model/log-data.js 53(+37 -16)
diff --git a/src/web/js/azkaban/model/log-data.js b/src/web/js/azkaban/model/log-data.js
index 2fe3ac5..90a958f 100644
--- a/src/web/js/azkaban/model/log-data.js
+++ b/src/web/js/azkaban/model/log-data.js
@@ -24,6 +24,8 @@ azkaban.LogDataModel = Backbone.Model.extend({
HIVE_MAP_REDUCE_JOBS_SUMMARY: "MapReduce Jobs Launched:",
HIVE_MAP_REDUCE_SUMMARY_REGEX: /Job (\d+):\s+Map: (\d+)\s+Reduce: (\d+)\s+(?:Cumulative CPU: (.+?))?\s+HDFS Read: (\d+)\s+HDFS Write: (\d+)/,
+ JOB_ID_REGEX: /job_\d{12}_\d{4,}/,
+
initialize: function() {
this.set("offset", 0 );
this.set("logData", "");
@@ -91,14 +93,19 @@ azkaban.LogDataModel = Backbone.Model.extend({
var lines = data.split("\n");
if (this.parseCommand(lines)) {
+ this.parseJobType(lines);
this.parseJobTrackerUrls(lines);
- var jobType = this.parseJobType(lines);
- if (jobType.indexOf("pig") !== -1) {
- this.parsePigTable(lines, "pigSummary", this.PIG_JOB_SUMMARY_START, "", 0);
- this.parsePigTable(lines, "pigStats", this.PIG_JOB_STATS_START, "", 1);
- } else if (jobType.indexOf("hive") !== -1) {
- this.parseHiveQueries(lines);
+ var jobType = this.get("jobType");
+ if (jobType) {
+ if (jobType.indexOf("pig") !== -1) {
+ this.parsePigTable(lines, "pigSummary", this.PIG_JOB_SUMMARY_START, "", 0);
+ this.parsePigTable(lines, "pigStats", this.PIG_JOB_STATS_START, "", 1);
+ } else if (jobType.indexOf("hive") !== -1) {
+ this.parseHiveQueries(lines);
+ } else {
+ this.parseJobIds(lines);
+ }
}
}
},
@@ -163,16 +170,32 @@ azkaban.LogDataModel = Backbone.Model.extend({
this.set("jobTrackerUrlsOrdered", jobTrackerUrlsOrdered);
},
+ parseJobIds: function(lines) {
+ var seenJobIds = {};
+ var jobIds = [];
+ var numLines = lines.length;
+ var match;
+ for (var i = 0; i < numLines; i++) {
+ if ((match = this.JOB_ID_REGEX.exec(lines[i])) && !seenJobIds[match[0]]) {
+ seenJobIds[match[0]] = true;
+ jobIds.push(match[0]);
+ }
+ }
+
+ if (jobIds.length > 0) {
+ this.set("jobIds", jobIds);
+ }
+ },
+
parseJobType: function(lines) {
var numLines = lines.length;
var match;
- for (var i = 0; numLines; i++) {
+ for (var i = 0; i < numLines; i++) {
if (match = this.JOB_TYPE_REGEX.exec(lines[i])) {
- return match[1];
+ this.set("jobType", match[1]);
+ break;
}
}
-
- return null;
},
parsePigTable: function(lines, tableName, startPattern, endPattern, linesToSkipAfterStart) {
@@ -262,14 +285,12 @@ azkaban.LogDataModel = Backbone.Model.extend({
job.push("<a href='" + this.get("jobTrackerUrlsOrdered")[currMapReduceJob++] + "'>" + currJob + "</a>");
job.push(match[2]);
job.push(match[3]);
- job.push(match[4]);
- job.push(match[5]);
- job.push(match[6]);
-
- if (match[7]) {
+ if (match[4]) {
this.set("hasCumulativeCPU", true);
- job.push(match[7]);
+ job.push(match[4]);
}
+ job.push(match[5]);
+ job.push(match[6]);
queryJobs.push(job);
previousJob = currJob;
diff --git a/src/web/js/azkaban/view/flow-execution-list.js b/src/web/js/azkaban/view/flow-execution-list.js
index 0f37929..0ec15a9 100644
--- a/src/web/js/azkaban/view/flow-execution-list.js
+++ b/src/web/js/azkaban/view/flow-execution-list.js
@@ -1,3 +1,19 @@
+/*
+ * Copyright 2014 LinkedIn Corp.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
var executionListView;
azkaban.ExecutionListView = Backbone.View.extend({
events: {
@@ -244,6 +260,7 @@ azkaban.ExecutionListView = Backbone.View.extend({
var self = this;
var tr = document.createElement("tr");
var tdName = document.createElement("td");
+ var tdType = document.createElement("td");
var tdTimeline = document.createElement("td");
var tdStart = document.createElement("td");
var tdEnd = document.createElement("td");
@@ -255,6 +272,7 @@ azkaban.ExecutionListView = Backbone.View.extend({
var padding = 15*$(body)[0].level;
$(tr).append(tdName);
+ $(tr).append(tdType);
$(tr).append(tdTimeline);
$(tr).append(tdStart);
$(tr).append(tdEnd);
@@ -264,6 +282,7 @@ azkaban.ExecutionListView = Backbone.View.extend({
$(tr).addClass("jobListRow");
$(tdName).addClass("jobname");
+ $(tdType).addClass("jobtype");
if (padding) {
$(tdName).css("padding-left", padding);
}
@@ -273,6 +292,8 @@ azkaban.ExecutionListView = Backbone.View.extend({
$(tdElapse).addClass("elapsedTime");
$(tdStatus).addClass("statustd");
$(tdDetails).addClass("details");
+
+ $(tdType).text(node.type);
var outerProgressBar = document.createElement("div");
//$(outerProgressBar).attr("id", node.id + "-outerprogressbar");
@@ -321,7 +342,7 @@ azkaban.ExecutionListView = Backbone.View.extend({
}
$(body).append(tr);
- if (node.type=="flow") {
+ if (node.type == "flow") {
var subFlowRow = document.createElement("tr");
var subFlowCell = document.createElement("td");
$(subFlowCell).addClass("subflowrow");
src/web/js/azkaban/view/job-details.js 44(+44 -0)
diff --git a/src/web/js/azkaban/view/job-details.js b/src/web/js/azkaban/view/job-details.js
index ab77045..07b823e 100644
--- a/src/web/js/azkaban/view/job-details.js
+++ b/src/web/js/azkaban/view/job-details.js
@@ -45,15 +45,19 @@ azkaban.JobSummaryView = Backbone.View.extend({
},
initialize: function(settings) {
+ $("#jobType").hide();
$("#commandSummary").hide();
$("#pigJobSummary").hide();
$("#pigJobStats").hide();
$("#hiveJobSummary").hide();
+ $("#jobIds").hide();
+ this.listenTo(this.model, "change:jobType", this.renderJobTypeTable);
this.listenTo(this.model, "change:commandProperties", this.renderCommandTable);
this.listenTo(this.model, "change:pigSummary", this.renderPigSummaryTable);
this.listenTo(this.model, "change:pigStats", this.renderPigStatsTable);
this.listenTo(this.model, "change:hiveSummary", this.renderHiveTable);
+ this.listenTo(this.model, "change:jobIds", this.renderJobIdsTable);
},
refresh: function() {
@@ -65,6 +69,46 @@ azkaban.JobSummaryView = Backbone.View.extend({
renderJobTable(jobSummary.statTableHeaders, jobSummary.statTableData, "stats");
renderHiveTable(jobSummary.hiveQueries, jobSummary.hiveQueryJobs);
},
+
+ renderJobTypeTable: function() {
+ var jobTypeTable = $("#jobTypeTable");
+ var jobType = this.model.get("jobType");
+
+ var tr = document.createElement("tr");
+ var td = document.createElement("td");
+ $(td).html("<b>Job Type</b>");
+ $(tr).append(td);
+ td = document.createElement("td");
+ $(td).html(jobType);
+ $(tr).append(td);
+
+ jobTypeTable.append(tr);
+
+ $("#jobType").show();
+ },
+
+ renderJobIdsTable: function() {
+ var oldBody = $("#jobIdsTableBody");
+ var newBody = $(document.createElement("tbody")).attr("id", "jobIdsTableBody");
+
+ var jobIds = this.model.get("jobIds");
+ var jobUrls = this.model.get("jobTrackerUrls");
+ var numJobs = jobIds.length;
+ for (var i = 0; i < numJobs; i++) {
+ var job = jobIds[i];
+ var tr = document.createElement("tr");
+ var td = document.createElement("td");
+ var html = jobUrls[job] ? "<a href='" + jobUrls[job] + "'>" + job + "</a>" : job;
+ $(td).html(html);
+ $(tr).append(td);
+ newBody.append(tr);
+ }
+
+ oldBody.replaceWith(newBody);
+
+ $("#jobIds").show();
+ },
+
renderCommandTable: function() {
var commandTable = $("#commandTable");
var commandProperties = this.model.get("commandProperties");
diff --git a/unit/java/azkaban/test/execapp/MockExecutorLoader.java b/unit/java/azkaban/test/execapp/MockExecutorLoader.java
index 05641ec..6c1246d 100644
--- a/unit/java/azkaban/test/execapp/MockExecutorLoader.java
+++ b/unit/java/azkaban/test/execapp/MockExecutorLoader.java
@@ -202,5 +202,19 @@ public class MockExecutorLoader implements ExecutorLoader {
return null;
}
+ @Override
+ public List<Object> fetchAttachments(int execId, String name, int attempt)
+ throws ExecutorManagerException {
+ // TODO Auto-generated method stub
+ return null;
+ }
+
+ @Override
+ public void uploadAttachmentFile(ExecutableNode node, File file)
+ throws ExecutorManagerException {
+ // TODO Auto-generated method stub
+
+ }
+
}
\ No newline at end of file