azkaban-uncached
Changes
src/java/azkaban/utils/LogSummary.java 228(+194 -34)
src/web/js/azkaban.jobdetails.view.js 66(+66 -0)
Details
src/java/azkaban/utils/LogSummary.java 228(+194 -34)
diff --git a/src/java/azkaban/utils/LogSummary.java b/src/java/azkaban/utils/LogSummary.java
index 83e1942..8a561b0 100644
--- a/src/java/azkaban/utils/LogSummary.java
+++ b/src/java/azkaban/utils/LogSummary.java
@@ -11,15 +11,40 @@ import java.util.regex.Matcher;
import java.util.regex.Pattern;
public class LogSummary {
+ private static final String HIVE_PARSING_START = "Parsing command: ";
+ private static final String HIVE_PARSING_END = "Parse Completed";
+ private static final String HIVE_NUM_MAP_REDUCE_JOBS_STRING = "Total MapReduce jobs = ";
+ private static final String HIVE_MAP_REDUCE_JOB_START = "Starting Job";
+ private static final String HIVE_MAP_REDUCE_JOBS_SUMMARY = "MapReduce Jobs Launched:";
+
+ // Regex to search for URLs to job details pages.
+ private static final Pattern jobTrackerUrl = Pattern.compile(
+ "https?://" + // http(s)://
+ "[-\\w\\.]+" + // domain
+ "(?::\\d+)?" + // port
+ "/[\\w/\\.]*" + // path
+ // query string
+ "\\?\\S+" +
+ "(job_\\d{12}_\\d{4,})" + // job id
+ "\\S*"
+ );
+
+ private String jobType = null;
private String command = null;
private List<String> classpath = new ArrayList<String>();
private List<String> params = new ArrayList<String>();
- private String[] statTableHeaders = null;
- private List<String[]> statTableData = new ArrayList<String[]>();
+ private String[] pigStatTableHeaders = null;
+ private List<String[]> pigStatTableData = new ArrayList<String[]>();
+
+ private String[] pigSummaryTableHeaders = null;
+ private List<String[]> pigSummaryTableData = new ArrayList<String[]>();
- private String[] summaryTableHeaders = null;
- private List<String[]> summaryTableData = new ArrayList<String[]>();
+ private List<String> hiveQueries = new ArrayList<String>();
+
+ // Each element in hiveQueryJobs contains a list of the jobs for a query.
+ // Each job contains a list of strings of the job summary values.
+ private List<List<List<String>>> hiveQueryJobs = new ArrayList<List<List<String>>>();
public LogSummary(LogData log) {
if (log != null) {
@@ -32,12 +57,32 @@ public class LogSummary {
data = data.replaceAll("(?m)^.*? - ", "");
String[] lines = data.split("\n");
- parseCommand(lines);
- parseJobSummary(lines);
- parseJobStats(lines);
+ if (parseCommand(lines)) {
+ jobType = parseJobType(lines);
+
+ if (jobType.contains("pig")) {
+ parsePigJobSummary(lines);
+ parsePigJobStats(lines);
+ } else if (jobType.contains("hive")) {
+ parseHiveQueries(lines);
+ }
+ }
}
- private void parseCommand(String[] lines) {
+ private String parseJobType(String[] lines) {
+ Pattern p = Pattern.compile("Building (\\S+) job executor");
+
+ for (String line : lines) {
+ Matcher m = p.matcher(line);
+ if (m.find()) {
+ return m.group(1);
+ }
+ }
+
+ return null;
+ }
+
+ private boolean parseCommand(String[] lines) {
int commandStartIndex = -1;
for (int i = 0; i < lines.length; i++) {
if (lines[i].startsWith("Command: ")) {
@@ -62,10 +107,14 @@ public class LogSummary {
while (m.find()) {
params.add(m.group(1));
}
+
+ return true;
}
+
+ return false;
}
- private void parseJobSummary(String[] lines) {
+ private void parsePigJobSummary(String[] lines) {
int jobSummaryStartIndex = -1;
for (int i = 0; i < lines.length; i++) {
if (lines[i].startsWith("HadoopVersion")) {
@@ -76,12 +125,12 @@ public class LogSummary {
if (jobSummaryStartIndex != -1) {
String headerLine = lines[jobSummaryStartIndex];
- summaryTableHeaders = headerLine.split("\t");
+ pigSummaryTableHeaders = headerLine.split("\t");
int tableRowIndex = jobSummaryStartIndex + 1;
String line;
while (!(line = lines[tableRowIndex]).equals("")) {
- summaryTableData.add(line.split("\t"));
+ pigSummaryTableData.add(line.split("\t"));
tableRowIndex++;
}
}
@@ -92,26 +141,16 @@ public class LogSummary {
* Adds links to the job details pages on the job tracker.
* @param lines
*/
- private void parseJobStats(String[] lines) {
+ private void parsePigJobStats(String[] lines) {
int jobStatsStartIndex = -1;
Map<String, String> jobDetailUrls = new HashMap<String, String>();
- // Regex to search for URLs to job details pages.
- Pattern p = Pattern.compile(
- "https?://" + // http(s)://
- "[-\\w\\.]+" + // domain
- "(?::\\d+)?" + // port
- "/[\\w/\\.]*" + // path
- // query string
- "\\?\\S+" +
- "(job_\\d{12}_\\d{4,})" + // job id
- "\\S*"
- );
+
for (int i = 0; i < lines.length; i++) {
String line = lines[i];
- Matcher m = p.matcher(line);
+ Matcher m = jobTrackerUrl.matcher(line);
if (m.find()) {
jobDetailUrls.put(m.group(1), m.group(0));
@@ -124,7 +163,7 @@ public class LogSummary {
if (jobStatsStartIndex != -1) {
String headerLine = lines[jobStatsStartIndex];
- statTableHeaders = headerLine.split("\t");
+ pigStatTableHeaders = headerLine.split("\t");
int tableRowIndex = jobStatsStartIndex + 1;
String line;
@@ -133,26 +172,139 @@ public class LogSummary {
if (jobDetailUrls.containsKey(stats[0])) {
stats[0] = "<a href=\"" + jobDetailUrls.get(stats[0]) + "\">" + stats[0] + "</a>";
}
- statTableData.add(stats);
+ pigStatTableData.add(stats);
tableRowIndex++;
}
}
}
- public String[] getStatTableHeaders() {
- return statTableHeaders;
+ private void parseHiveQueries(String[] lines) {
+ for (int i = 0; i < lines.length;) {
+ String line = lines[i];
+ int parsingCommandIndex = line.indexOf(HIVE_PARSING_START);
+ if (parsingCommandIndex != -1) {
+ // parse query text
+ int queryStartIndex = parsingCommandIndex + HIVE_PARSING_START.length();
+ StringBuilder query = new StringBuilder(line.substring(queryStartIndex) + "\n");
+
+ i++;
+ while (i < lines.length && !(line = lines[i]).contains(HIVE_PARSING_END)) {
+ query.append(line + "\n");
+ i++;
+ }
+ String queryString = query.toString().trim().replaceAll("\n","<br/>");
+ hiveQueries.add(queryString);
+ i++;
+
+ // parse the query's Map-Reduce jobs, if any.
+ int numMRJobs = 0;
+ List<String> jobTrackerUrls = new ArrayList<String>();
+ while (i < lines.length) {
+ line = lines[i];
+ if (line.contains(HIVE_NUM_MAP_REDUCE_JOBS_STRING)) {
+ // query involves map reduce jobs
+ numMRJobs = Integer.parseInt(line.substring(HIVE_NUM_MAP_REDUCE_JOBS_STRING.length()));
+ i++;
+
+ // get the job tracker URLs
+ String lastUrl = "";
+ int numJobsSeen = 0;
+ while (numJobsSeen < numMRJobs && i < lines.length) {
+ line = lines[i];
+ if (line.contains(HIVE_MAP_REDUCE_JOB_START)) {
+ Matcher m = jobTrackerUrl.matcher(line);
+ if (m.find() && !lastUrl.equals(m.group(1))) {
+ jobTrackerUrls.add(m.group(0));
+ lastUrl = m.group(1);
+ numJobsSeen++;
+ }
+ }
+ i++;
+ }
+
+ // get the map reduce jobs summary
+ while (i < lines.length) {
+ line = lines[i];
+ if (line.contains(HIVE_MAP_REDUCE_JOBS_SUMMARY)) {
+ // job summary table found
+ i++;
+
+ List<List<String>> queryJobs = new ArrayList<List<String>>();
+
+ Pattern p = Pattern.compile(
+ "Job (\\d+): Map: (\\d+) Reduce: (\\d+) HDFS Read: (\\d+) HDFS Write: (\\d+)"
+ );
+
+ int previousJob = -1;
+ numJobsSeen = 0;
+ while (numJobsSeen < numMRJobs && i < lines.length) {
+ line = lines[i];
+ Matcher m = p.matcher(line);
+ if (m.find()) {
+ int currJob = Integer.parseInt(m.group(1));
+ if (currJob == previousJob) {
+ i++;
+ continue;
+ }
+
+ List<String> job = new ArrayList<String>();
+ job.add("<a href=\"" + jobTrackerUrls.get(currJob) +
+ "\">" + currJob + "</a>");
+ job.add(m.group(2));
+ job.add(m.group(3));
+ job.add(m.group(4));
+ job.add(m.group(5));
+ queryJobs.add(job);
+ previousJob = currJob;
+ numJobsSeen++;
+ }
+ i++;
+ }
+
+ if (numJobsSeen == numMRJobs) {
+ hiveQueryJobs.add(queryJobs);
+ }
+
+ break;
+ }
+ i++;
+ }
+ break;
+ }
+ else if (line.contains(HIVE_PARSING_START)) {
+ if (numMRJobs == 0) {
+ hiveQueryJobs.add(null);
+ }
+ break;
+ }
+ i++;
+ }
+ continue;
+ }
+
+ i++;
+ }
+ return;
+ }
+
+ public String[] getPigStatTableHeaders() {
+ return pigStatTableHeaders;
}
- public List<String[]> getStatTableData() {
- return statTableData;
+ public List<String[]> getPigStatTableData() {
+ return pigStatTableData;
}
- public String[] getSummaryTableHeaders() {
- return summaryTableHeaders;
+ public String[] getPigSummaryTableHeaders() {
+ return pigSummaryTableHeaders;
}
- public List<String[]> getSummaryTableData() {
- return summaryTableData;
+ public List<String[]> getPigSummaryTableData() {
+ return pigSummaryTableData;
+ }
+
+ public String getJobType() {
+ return jobType;
}
public String getCommand() {
@@ -166,4 +318,12 @@ public class LogSummary {
public List<String> getParams() {
return params;
}
+
+ public List<String> getHiveQueries() {
+ return hiveQueries;
+ }
+
+ public List<List<List<String>>> getHiveQueryJobs() {
+ return hiveQueryJobs;
+ }
}
diff --git a/src/java/azkaban/webapp/servlet/ExecutorServlet.java b/src/java/azkaban/webapp/servlet/ExecutorServlet.java
index 1ba4029..e45dcc7 100644
--- a/src/java/azkaban/webapp/servlet/ExecutorServlet.java
+++ b/src/java/azkaban/webapp/servlet/ExecutorServlet.java
@@ -479,10 +479,18 @@ public class ExecutorServlet extends LoginAbstractAzkabanServlet {
ret.put("command", summary.getCommand());
ret.put("classpath", summary.getClasspath());
ret.put("params", summary.getParams());
- ret.put("summaryTableHeaders", summary.getSummaryTableHeaders());
- ret.put("summaryTableData", summary.getSummaryTableData());
- ret.put("statTableHeaders", summary.getStatTableHeaders());
- ret.put("statTableData", summary.getStatTableData());
+
+ String jobType = summary.getJobType();
+
+ if (jobType.contains("pig")) {
+ ret.put("summaryTableHeaders", summary.getPigSummaryTableHeaders());
+ ret.put("summaryTableData", summary.getPigSummaryTableData());
+ ret.put("statTableHeaders", summary.getPigStatTableHeaders());
+ ret.put("statTableData", summary.getPigStatTableData());
+ } else if (jobType.contains("hive")) {
+ ret.put("hiveQueries", summary.getHiveQueries());
+ ret.put("hiveQueryJobs", summary.getHiveQueryJobs());
+ }
} catch (ExecutorManagerException e) {
throw new ServletException(e);
}
diff --git a/src/java/azkaban/webapp/servlet/velocity/jobdetailspage.vm b/src/java/azkaban/webapp/servlet/velocity/jobdetailspage.vm
index 0ab9e56..df18893 100644
--- a/src/java/azkaban/webapp/servlet/velocity/jobdetailspage.vm
+++ b/src/java/azkaban/webapp/servlet/velocity/jobdetailspage.vm
@@ -94,22 +94,34 @@
</table>
<br/>
- <h4>Job Summary</h4>
- <table>
- <thead id="summaryHeader">
- </thead>
- <tbody id="summaryBody">
- </tbody>
- </table>
+ <div id="jobsummary">Job Summary
+ <table>
+ <thead id="summaryHeader">
+ </thead>
+ <tbody id="summaryBody">
+ </tbody>
+ </table>
+ </div>
<br/>
- <h4>Job Stats</h4>
- <table>
- <thead id="statsHeader">
- </thead>
- <tbody id="statsBody">
- </tbody>
- </table>
+ <div id="jobstats">Job Stats
+ <table>
+ <thead id="statsHeader">
+ </thead>
+ <tbody id="statsBody">
+ </tbody>
+ </table>
+ </div>
+
+ <br/>
+ <div id="hiveTable">Job Summary
+ <table>
+ <thead id="hiveTableHeader">
+ </thead>
+ <tbody id="hiveTableBody">
+ </tbody>
+ </table>
+ </div>
</div>
#end
src/web/js/azkaban.jobdetails.view.js 66(+66 -0)
diff --git a/src/web/js/azkaban.jobdetails.view.js b/src/web/js/azkaban.jobdetails.view.js
index a809e4a..a47487c 100644
--- a/src/web/js/azkaban.jobdetails.view.js
+++ b/src/web/js/azkaban.jobdetails.view.js
@@ -139,6 +139,7 @@ azkaban.JobSummaryView = Backbone.View.extend({
self.renderCommandTable(data.command, data.classpath, data.params);
self.renderJobTable(data.summaryTableHeaders, data.summaryTableData, "summary");
self.renderJobTable(data.statTableHeaders, data.statTableData, "stats");
+ self.renderHiveTable(data.hiveQueries, data.hiveQueryJobs);
}
}
});
@@ -219,6 +220,71 @@ azkaban.JobSummaryView = Backbone.View.extend({
}
body.append(tr);
}
+ } else {
+ $("#job" + prefix).hide();
+ }
+ },
+ renderHiveTable: function(queries, queryJobs) {
+ if (queries) {
+ // Set up table column headers
+ var header = $("#hiveTableHeader");
+ var tr = document.createElement("tr");
+ var headers = ["Query","Job","Map","Reduce","HDFS Read","HDFS Write"];
+ var i;
+
+ for (i = 0; i < headers.length; i++) {
+ var th = document.createElement("th");
+ $(th).text(headers[i]);
+ $(tr).append(th);
+ }
+ header.append(tr);
+
+ // Construct table body
+ var body = $("#hiveTableBody");
+ for (i = 0; i < queries.length; i++) {
+ // new query
+ tr = document.createElement("tr");
+ var td = document.createElement("td");
+ $(td).html(queries[i]);
+ $(tr).append(td);
+
+ var jobs = queryJobs[i];
+ if (jobs != null) {
+ // add first job for this query
+ var jobValues = jobs[0];
+ var j;
+ for (j = 0; j < jobValues.length; j++) {
+ td = document.createElement("td");
+ $(td).html(jobValues[j]);
+ $(tr).append(td);
+ }
+ body.append(tr);
+
+ // add remaining jobs for this query
+ for (j = 1; j < jobs.length; j++) {
+ jobValues = jobs[j];
+ tr = document.createElement("tr");
+
+ // add empty cell for query column
+ td = document.createElement("td");
+ $(td).html(" ");
+ $(tr).append(td);
+
+ // add job values
+ for (var k = 0; k < jobValues.length; k++) {
+ td = document.createElement("td");
+ $(td).html(jobValues[k]);
+ $(tr).append(td);
+ }
+ body.append(tr);
+ }
+
+ } else {
+ body.append(tr);
+ }
+ }
+ } else {
+ $("#hiveTable").hide();
}
}
});