azkaban-uncached
Changes
src/java/azkaban/utils/LogSummary.java 285(+249 -36)
src/web/js/azkaban.jobdetails.view.js 128(+85 -43)
Details
src/java/azkaban/utils/LogSummary.java 285(+249 -36)
diff --git a/src/java/azkaban/utils/LogSummary.java b/src/java/azkaban/utils/LogSummary.java
index 9d67e53..c9e5495 100644
--- a/src/java/azkaban/utils/LogSummary.java
+++ b/src/java/azkaban/utils/LogSummary.java
@@ -4,20 +4,46 @@ import azkaban.utils.FileIOUtils.LogData;
import java.util.ArrayList;
import java.util.Arrays;
+import java.util.Collection;
+import java.util.HashMap;
import java.util.List;
+import java.util.Map;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
public class LogSummary {
- private String command = null;
- private List<String> classpath = new ArrayList<String>();
- private List<String> params = new ArrayList<String>();
+ private static final String HIVE_PARSING_START = "Parsing command: ";
+ private static final String HIVE_PARSING_END = "Parse Completed";
+ private static final String HIVE_NUM_MAP_REDUCE_JOBS_STRING = "Total MapReduce jobs = ";
+ private static final String HIVE_MAP_REDUCE_JOB_START = "Starting Job";
+ private static final String HIVE_MAP_REDUCE_JOBS_SUMMARY = "MapReduce Jobs Launched:";
- private String[] statTableHeaders = null;
- private List<String[]> statTableData = new ArrayList<String[]>();
+ // Regex to search for URLs to job details pages.
+ private static final Pattern jobTrackerUrl = Pattern.compile(
+ "https?://" + // http(s)://
+ "[-\\w\\.]+" + // domain
+ "(?::\\d+)?" + // port
+ "/[\\w/\\.]*" + // path
+ // query string
+ "\\?\\S+" +
+ "(job_\\d{12}_\\d{4,})" + // job id
+ "\\S*"
+ );
- private String[] summaryTableHeaders = null;
- private List<String[]> summaryTableData = new ArrayList<String[]>();
+ private String jobType = null;
+ private List<Pair<String,String>> commandProperties = new ArrayList<Pair<String,String>>();
+
+ private String[] pigStatTableHeaders = null;
+ private List<String[]> pigStatTableData = new ArrayList<String[]>();
+
+ private String[] pigSummaryTableHeaders = null;
+ private List<String[]> pigSummaryTableData = new ArrayList<String[]>();
+
+ private List<String> hiveQueries = new ArrayList<String>();
+
+ // Each element in hiveQueryJobs contains a list of the jobs for a query.
+ // Each job contains a list of strings of the job summary values.
+ private List<List<List<String>>> hiveQueryJobs = new ArrayList<List<List<String>>>();
public LogSummary(LogData log) {
if (log != null) {
@@ -26,15 +52,36 @@ public class LogSummary {
}
private void parseLogData(String data) {
- data = data.replaceAll(".*? - ", "");
+ // Filter out all the timestamps
+ data = data.replaceAll("(?m)^.*? - ", "");
String[] lines = data.split("\n");
- parseCommand(lines);
- parseJobSummary(lines);
- parseJobStats(lines);
+ if (parseCommand(lines)) {
+ jobType = parseJobType(lines);
+
+ if (jobType.contains("pig")) {
+ parsePigJobSummary(lines);
+ parsePigJobStats(lines);
+ } else if (jobType.contains("hive")) {
+ parseHiveQueries(lines);
+ }
+ }
}
- private void parseCommand(String[] lines) {
+ private String parseJobType(String[] lines) {
+ Pattern p = Pattern.compile("Building (\\S+) job executor");
+
+ for (String line : lines) {
+ Matcher m = p.matcher(line);
+ if (m.find()) {
+ return m.group(1);
+ }
+ }
+
+ return null;
+ }
+
+ private boolean parseCommand(String[] lines) {
int commandStartIndex = -1;
for (int i = 0; i < lines.length; i++) {
if (lines[i].startsWith("Command: ")) {
@@ -44,25 +91,58 @@ public class LogSummary {
}
if (commandStartIndex != -1) {
- command = lines[commandStartIndex].substring(9);
+ String command = lines[commandStartIndex].substring(9);
+ commandProperties.add(new Pair<String,String>("Command", command));
// Parse classpath
Pattern p = Pattern.compile("(?:-cp|-classpath)\\s+(\\S+)");
Matcher m = p.matcher(command);
+ StringBuilder sb = new StringBuilder();
if (m.find()) {
- classpath = Arrays.asList(m.group(1).split(":"));
+ sb.append(StringUtils.join((Collection<String>)Arrays.asList(m.group(1).split(":")), "<br/>"));
+ commandProperties.add(new Pair<String,String>("Classpath", sb.toString()));
+ }
+
+ // Parse environment variables
+ p = Pattern.compile("-D(\\S+)");
+ m = p.matcher(command);
+ sb = new StringBuilder();
+ while (m.find()) {
+ sb.append(m.group(1) + "<br/>");
+ }
+ if (sb.length() > 0) {
+ commandProperties.add(new Pair<String,String>("-D", sb.toString()));
+ }
+
+ // Parse memory settings
+ p = Pattern.compile("(-Xm\\S+)");
+ m = p.matcher(command);
+ sb = new StringBuilder();
+ while (m.find()) {
+ sb.append(m.group(1) + "<br/>");
+ }
+ if (sb.length() > 0) {
+ commandProperties.add(new Pair<String,String>("Memory Settings", sb.toString()));
}
// Parse Pig params
p = Pattern.compile("-param\\s+(\\S+)");
m = p.matcher(command);
+ sb = new StringBuilder();
while (m.find()) {
- params.add(m.group(1));
+ sb.append(m.group(1) + "<br/>");
}
+ if (sb.length() > 0) {
+ commandProperties.add(new Pair<String,String>("Params", sb.toString()));
+ }
+
+ return true;
}
+
+ return false;
}
- private void parseJobSummary(String[] lines) {
+ private void parsePigJobSummary(String[] lines) {
int jobSummaryStartIndex = -1;
for (int i = 0; i < lines.length; i++) {
if (lines[i].startsWith("HadoopVersion")) {
@@ -73,21 +153,37 @@ public class LogSummary {
if (jobSummaryStartIndex != -1) {
String headerLine = lines[jobSummaryStartIndex];
- summaryTableHeaders = headerLine.split("\t");
+ pigSummaryTableHeaders = headerLine.split("\t");
int tableRowIndex = jobSummaryStartIndex + 1;
String line;
while (!(line = lines[tableRowIndex]).equals("")) {
- summaryTableData.add(line.split("\t"));
+ pigSummaryTableData.add(line.split("\t"));
tableRowIndex++;
}
}
}
- private void parseJobStats(String[] lines) {
+ /**
+ * Parses the Pig Job Stats table that includes the max/min mapper and reduce times.
+ * Adds links to the job details pages on the job tracker.
+ * @param lines
+ */
+ private void parsePigJobStats(String[] lines) {
int jobStatsStartIndex = -1;
+
+ Map<String, String> jobDetailUrls = new HashMap<String, String>();
+
+
+
for (int i = 0; i < lines.length; i++) {
- if (lines[i].startsWith("Job Stats (time in seconds):")) {
+ String line = lines[i];
+ Matcher m = jobTrackerUrl.matcher(line);
+
+ if (m.find()) {
+ jobDetailUrls.put(m.group(1), m.group(0));
+ }
+ else if (line.startsWith("Job Stats (time in seconds):")) {
jobStatsStartIndex = i+1;
break;
}
@@ -95,42 +191,159 @@ public class LogSummary {
if (jobStatsStartIndex != -1) {
String headerLine = lines[jobStatsStartIndex];
- statTableHeaders = headerLine.split("\t");
+ pigStatTableHeaders = headerLine.split("\t");
int tableRowIndex = jobStatsStartIndex + 1;
String line;
while (!(line = lines[tableRowIndex]).equals("")) {
- statTableData.add(line.split("\t"));
+ String[] stats = line.split("\t");
+ if (jobDetailUrls.containsKey(stats[0])) {
+ stats[0] = "<a href=\"" + jobDetailUrls.get(stats[0]) + "\">" + stats[0] + "</a>";
+ }
+ pigStatTableData.add(stats);
tableRowIndex++;
}
}
}
- public String[] getStatTableHeaders() {
- return statTableHeaders;
+ private void parseHiveQueries(String[] lines) {
+ for (int i = 0; i < lines.length;) {
+ String line = lines[i];
+ int parsingCommandIndex = line.indexOf(HIVE_PARSING_START);
+ if (parsingCommandIndex != -1) {
+ // parse query text
+ int queryStartIndex = parsingCommandIndex + HIVE_PARSING_START.length();
+ StringBuilder query = new StringBuilder(line.substring(queryStartIndex) + "\n");
+
+ i++;
+ while (i < lines.length && !(line = lines[i]).contains(HIVE_PARSING_END)) {
+ query.append(line + "\n");
+ i++;
+ }
+ String queryString = query.toString().trim().replaceAll("\n","<br/>");
+ hiveQueries.add(queryString);
+ i++;
+
+ // parse the query's Map-Reduce jobs, if any.
+ int numMRJobs = 0;
+ List<String> jobTrackerUrls = new ArrayList<String>();
+ while (i < lines.length) {
+ line = lines[i];
+ if (line.contains(HIVE_NUM_MAP_REDUCE_JOBS_STRING)) {
+ // query involves map reduce jobs
+ numMRJobs = Integer.parseInt(line.substring(HIVE_NUM_MAP_REDUCE_JOBS_STRING.length()));
+ i++;
+
+ // get the job tracker URLs
+ String lastUrl = "";
+ int numJobsSeen = 0;
+ while (numJobsSeen < numMRJobs && i < lines.length) {
+ line = lines[i];
+ if (line.contains(HIVE_MAP_REDUCE_JOB_START)) {
+ Matcher m = jobTrackerUrl.matcher(line);
+ if (m.find() && !lastUrl.equals(m.group(1))) {
+ jobTrackerUrls.add(m.group(0));
+ lastUrl = m.group(1);
+ numJobsSeen++;
+ }
+ }
+ i++;
+ }
+
+ // get the map reduce jobs summary
+ while (i < lines.length) {
+ line = lines[i];
+ if (line.contains(HIVE_MAP_REDUCE_JOBS_SUMMARY)) {
+ // job summary table found
+ i++;
+
+ List<List<String>> queryJobs = new ArrayList<List<String>>();
+
+ Pattern p = Pattern.compile(
+ "Job (\\d+): Map: (\\d+) Reduce: (\\d+) HDFS Read: (\\d+) HDFS Write: (\\d+)"
+ );
+
+ int previousJob = -1;
+ numJobsSeen = 0;
+ while (numJobsSeen < numMRJobs && i < lines.length) {
+ line = lines[i];
+ Matcher m = p.matcher(line);
+ if (m.find()) {
+ int currJob = Integer.parseInt(m.group(1));
+ if (currJob == previousJob) {
+ i++;
+ continue;
+ }
+
+ List<String> job = new ArrayList<String>();
+ job.add("<a href=\"" + jobTrackerUrls.get(currJob) +
+ "\">" + currJob + "</a>");
+ job.add(m.group(2));
+ job.add(m.group(3));
+ job.add(m.group(4));
+ job.add(m.group(5));
+ queryJobs.add(job);
+ previousJob = currJob;
+ numJobsSeen++;
+ }
+ i++;
+ }
+
+ if (numJobsSeen == numMRJobs) {
+ hiveQueryJobs.add(queryJobs);
+ }
+
+ break;
+ }
+ i++;
+ }
+ break;
+ }
+ else if (line.contains(HIVE_PARSING_START)) {
+ if (numMRJobs == 0) {
+ hiveQueryJobs.add(null);
+ }
+ break;
+ }
+ i++;
+ }
+ continue;
+ }
+
+ i++;
+ }
+ return;
+ }
+
+ public String[] getPigStatTableHeaders() {
+ return pigStatTableHeaders;
}
- public List<String[]> getStatTableData() {
- return statTableData;
+ public List<String[]> getPigStatTableData() {
+ return pigStatTableData;
}
- public String[] getSummaryTableHeaders() {
- return summaryTableHeaders;
+ public String[] getPigSummaryTableHeaders() {
+ return pigSummaryTableHeaders;
}
- public List<String[]> getSummaryTableData() {
- return summaryTableData;
+ public List<String[]> getPigSummaryTableData() {
+ return pigSummaryTableData;
+ }
+
+ public String getJobType() {
+ return jobType;
}
- public String getCommand() {
- return command;
+ public List<Pair<String,String>> getCommandProperties() {
+ return commandProperties;
}
- public List<String> getClasspath() {
- return classpath;
+ public List<String> getHiveQueries() {
+ return hiveQueries;
}
- public List<String> getParams() {
- return params;
+ public List<List<List<String>>> getHiveQueryJobs() {
+ return hiveQueryJobs;
}
}
diff --git a/src/java/azkaban/webapp/servlet/ExecutorServlet.java b/src/java/azkaban/webapp/servlet/ExecutorServlet.java
index 1ba4029..ad572ed 100644
--- a/src/java/azkaban/webapp/servlet/ExecutorServlet.java
+++ b/src/java/azkaban/webapp/servlet/ExecutorServlet.java
@@ -476,13 +476,19 @@ public class ExecutorServlet extends LoginAbstractAzkabanServlet {
LogData data = executorManager.getExecutionJobLog(exFlow, jobId, 0, Integer.MAX_VALUE, attempt);
LogSummary summary = new LogSummary(data);
- ret.put("command", summary.getCommand());
- ret.put("classpath", summary.getClasspath());
- ret.put("params", summary.getParams());
- ret.put("summaryTableHeaders", summary.getSummaryTableHeaders());
- ret.put("summaryTableData", summary.getSummaryTableData());
- ret.put("statTableHeaders", summary.getStatTableHeaders());
- ret.put("statTableData", summary.getStatTableData());
+ ret.put("commandProperties", summary.getCommandProperties());
+
+ String jobType = summary.getJobType();
+
+ if (jobType.contains("pig")) {
+ ret.put("summaryTableHeaders", summary.getPigSummaryTableHeaders());
+ ret.put("summaryTableData", summary.getPigSummaryTableData());
+ ret.put("statTableHeaders", summary.getPigStatTableHeaders());
+ ret.put("statTableData", summary.getPigStatTableData());
+ } else if (jobType.contains("hive")) {
+ ret.put("hiveQueries", summary.getHiveQueries());
+ ret.put("hiveQueryJobs", summary.getHiveQueryJobs());
+ }
} catch (ExecutorManagerException e) {
throw new ServletException(e);
}
diff --git a/src/java/azkaban/webapp/servlet/velocity/jobdetailspage.vm b/src/java/azkaban/webapp/servlet/velocity/jobdetailspage.vm
index 0ab9e56..df18893 100644
--- a/src/java/azkaban/webapp/servlet/velocity/jobdetailspage.vm
+++ b/src/java/azkaban/webapp/servlet/velocity/jobdetailspage.vm
@@ -94,22 +94,34 @@
</table>
<br/>
- <h4>Job Summary</h4>
- <table>
- <thead id="summaryHeader">
- </thead>
- <tbody id="summaryBody">
- </tbody>
- </table>
+ <div id="jobsummary">Job Summary
+ <table>
+ <thead id="summaryHeader">
+ </thead>
+ <tbody id="summaryBody">
+ </tbody>
+ </table>
+ </div>
<br/>
- <h4>Job Stats</h4>
- <table>
- <thead id="statsHeader">
- </thead>
- <tbody id="statsBody">
- </tbody>
- </table>
+ <div id="jobstats">Job Stats
+ <table>
+ <thead id="statsHeader">
+ </thead>
+ <tbody id="statsBody">
+ </tbody>
+ </table>
+ </div>
+
+ <br/>
+ <div id="hiveTable">Job Summary
+ <table>
+ <thead id="hiveTableHeader">
+ </thead>
+ <tbody id="hiveTableBody">
+ </tbody>
+ </table>
+ </div>
</div>
#end
diff --git a/src/sql/create.project_properties.sql b/src/sql/create.project_properties.sql
index 9ed6267..7adf25e 100644
--- a/src/sql/create.project_properties.sql
+++ b/src/sql/create.project_properties.sql
@@ -1,7 +1,7 @@
CREATE TABLE project_properties (
project_id INT NOT NULL,
version INT NOT NULL,
- name VARCHAR(128),
+ name VARCHAR(255),
modified_time BIGINT NOT NULL,
encoding_type TINYINT,
property BLOB,
src/web/js/azkaban.jobdetails.view.js 128(+85 -43)
diff --git a/src/web/js/azkaban.jobdetails.view.js b/src/web/js/azkaban.jobdetails.view.js
index aba399c..a24943d 100644
--- a/src/web/js/azkaban.jobdetails.view.js
+++ b/src/web/js/azkaban.jobdetails.view.js
@@ -136,55 +136,27 @@ azkaban.JobSummaryView = Backbone.View.extend({
console.log(data.error);
}
else {
- self.renderCommandTable(data.command, data.classpath, data.params);
+ self.renderCommandTable(data.commandProperties);
self.renderJobTable(data.summaryTableHeaders, data.summaryTableData, "summary");
self.renderJobTable(data.statTableHeaders, data.statTableData, "stats");
+ self.renderHiveTable(data.hiveQueries, data.hiveQueryJobs);
}
}
});
},
- renderCommandTable: function(command, classpath, params) {
- if (command) {
+ renderCommandTable: function(commandProperties) {
+ if (commandProperties) {
var commandTable = $("#commandTable");
- var i;
-
- // Add row for command
- var tr = document.createElement("tr");
- var td = document.createElement("td");
- $(td).append("<b>Command</b>");
- $(tr).append(td);
- td = document.createElement("td");
- $(td).text(command);
- $(tr).append(td);
- commandTable.append(tr);
- // Add row for classpath
- if (classpath && classpath.length > 0) {
- tr = document.createElement("tr");
- td = document.createElement("td");
- $(td).append("<b>Classpath</b>");
- $(tr).append(td);
- td = document.createElement("td");
- $(td).append(classpath[0]);
- for (i = 1; i < classpath.length; i++) {
- $(td).append("<br/>" + classpath[i]);
- }
- $(tr).append(td);
- commandTable.append(tr);
- }
-
- // Add row for params
- if (params && params.length > 0) {
- tr = document.createElement("tr");
- td = document.createElement("td");
- $(td).append("<b>Params</b>");
- $(tr).append(td);
- td = document.createElement("td");
- $(td).append(params[0]);
- for (i = 1; i < params.length; i++) {
- $(td).append("<br/>" + params[i]);
- }
- $(tr).append(td);
+ for (var i = 0; i < commandProperties.length; i++) {
+ var prop = commandProperties[i];
+ var tr = document.createElement("tr");
+ var name = document.createElement("td");
+ var value = document.createElement("td");
+ $(name).html("<b>" + prop.first + "</b>");
+ $(value).html(prop.second);
+ $(tr).append(name);
+ $(tr).append(value);
commandTable.append(tr);
}
}
@@ -207,13 +179,83 @@ azkaban.JobSummaryView = Backbone.View.extend({
for (i = 0; i < data.length; i++) {
tr = document.createElement("tr");
var row = data[i];
- for (var j = 0; j < headers.length; j++) {
+ for (var j = 0; j < row.length; j++) {
var td = document.createElement("td");
- $(td).text(row[j]);
+ if (j == 0) {
+ // first column is a link to job details page
+ $(td).html(row[j]);
+ } else {
+ $(td).text(row[j]);
+ }
$(tr).append(td);
}
body.append(tr);
}
+ } else {
+ $("#job" + prefix).hide();
+ }
+ },
+ renderHiveTable: function(queries, queryJobs) {
+ if (queries) {
+ // Set up table column headers
+ var header = $("#hiveTableHeader");
+ var tr = document.createElement("tr");
+ var headers = ["Query","Job","Map","Reduce","HDFS Read","HDFS Write"];
+ var i;
+
+ for (i = 0; i < headers.length; i++) {
+ var th = document.createElement("th");
+ $(th).text(headers[i]);
+ $(tr).append(th);
+ }
+ header.append(tr);
+
+ // Construct table body
+ var body = $("#hiveTableBody");
+ for (i = 0; i < queries.length; i++) {
+ // new query
+ tr = document.createElement("tr");
+ var td = document.createElement("td");
+ $(td).html("<b>" + queries[i] + "</b>");
+ $(tr).append(td);
+
+ var jobs = queryJobs[i];
+ if (jobs != null) {
+ // add first job for this query
+ var jobValues = jobs[0];
+ var j;
+ for (j = 0; j < jobValues.length; j++) {
+ td = document.createElement("td");
+ $(td).html(jobValues[j]);
+ $(tr).append(td);
+ }
+ body.append(tr);
+
+ // add remaining jobs for this query
+ for (j = 1; j < jobs.length; j++) {
+ jobValues = jobs[j];
+ tr = document.createElement("tr");
+
+ // add empty cell for query column
+ td = document.createElement("td");
+ $(td).html(" ");
+ $(tr).append(td);
+
+ // add job values
+ for (var k = 0; k < jobValues.length; k++) {
+ td = document.createElement("td");
+ $(td).html(jobValues[k]);
+ $(tr).append(td);
+ }
+ body.append(tr);
+ }
+
+ } else {
+ body.append(tr);
+ }
+ }
+ } else {
+ $("#hiveTable").hide();
}
}
});