azkaban-uncached

Hive log parsing support

12/4/2013 6:41:56 PM

Details

diff --git a/src/java/azkaban/utils/LogSummary.java b/src/java/azkaban/utils/LogSummary.java
index 83e1942..8a561b0 100644
--- a/src/java/azkaban/utils/LogSummary.java
+++ b/src/java/azkaban/utils/LogSummary.java
@@ -11,15 +11,40 @@ import java.util.regex.Matcher;
 import java.util.regex.Pattern;
 
 public class LogSummary {
+	private static final String HIVE_PARSING_START = "Parsing command: ";
+	private static final String HIVE_PARSING_END = "Parse Completed";
+	private static final String HIVE_NUM_MAP_REDUCE_JOBS_STRING = "Total MapReduce jobs = ";
+	private static final String HIVE_MAP_REDUCE_JOB_START = "Starting Job";
+	private static final String HIVE_MAP_REDUCE_JOBS_SUMMARY = "MapReduce Jobs Launched:";
+	
+	// Regex to search for URLs to job details pages.
+	private static final Pattern jobTrackerUrl = Pattern.compile(
+			"https?://" + // http(s)://
+			"[-\\w\\.]+" + // domain
+			"(?::\\d+)?" + // port
+			"/[\\w/\\.]*" + // path
+			// query string
+			"\\?\\S+" + 
+			"(job_\\d{12}_\\d{4,})" + // job id
+			"\\S*"
+	);
+	
+	private String jobType = null;
 	private String command = null;
 	private List<String> classpath = new ArrayList<String>();
 	private List<String> params = new ArrayList<String>();
 	
-	private String[] statTableHeaders = null;
-	private List<String[]> statTableData = new ArrayList<String[]>();
+	private String[] pigStatTableHeaders = null;
+	private List<String[]> pigStatTableData = new ArrayList<String[]>();
+	
+	private String[] pigSummaryTableHeaders = null;
+	private List<String[]> pigSummaryTableData = new ArrayList<String[]>();
 	
-	private String[] summaryTableHeaders = null;
-	private List<String[]> summaryTableData = new ArrayList<String[]>();
+	private List<String> hiveQueries = new ArrayList<String>();
+	
+	// Each element in hiveQueryJobs contains a list of the jobs for a query.
+	// Each job contains a list of strings of the job summary values.
+	private List<List<List<String>>> hiveQueryJobs = new ArrayList<List<List<String>>>();
 	
 	public LogSummary(LogData log) {
 		if (log != null) {
@@ -32,12 +57,32 @@ public class LogSummary {
 		data = data.replaceAll("(?m)^.*? - ", "");
 		String[] lines = data.split("\n");
 		
-		parseCommand(lines);
-		parseJobSummary(lines);
-		parseJobStats(lines);
+		if (parseCommand(lines)) {
+			jobType = parseJobType(lines);
+			
+			if (jobType.contains("pig")) {
+				parsePigJobSummary(lines);
+				parsePigJobStats(lines);
+			} else if (jobType.contains("hive")) {
+				parseHiveQueries(lines);
+			}
+		}
 	}
 
-	private void parseCommand(String[] lines) {
+	private String parseJobType(String[] lines) {
+		Pattern p = Pattern.compile("Building (\\S+) job executor");
+		
+		for (String line : lines) {
+			Matcher m = p.matcher(line);
+			if (m.find()) {
+				return m.group(1);
+			}
+		}
+		
+		return null;
+	}
+	
+	private boolean parseCommand(String[] lines) {
 		int commandStartIndex = -1;
 		for (int i = 0; i < lines.length; i++) {
 			if (lines[i].startsWith("Command: ")) {
@@ -62,10 +107,14 @@ public class LogSummary {
 			while (m.find()) {
 				params.add(m.group(1));
 			}
+			
+			return true;
 		}
+		
+		return false;
 	}
 	
-	private void parseJobSummary(String[] lines) {
+	private void parsePigJobSummary(String[] lines) {
 		int jobSummaryStartIndex = -1;
 		for (int i = 0; i < lines.length; i++) {
 			if (lines[i].startsWith("HadoopVersion")) {
@@ -76,12 +125,12 @@ public class LogSummary {
 		
 		if (jobSummaryStartIndex != -1) {
 			String headerLine = lines[jobSummaryStartIndex];
-			summaryTableHeaders = headerLine.split("\t");
+			pigSummaryTableHeaders = headerLine.split("\t");
 			
 			int tableRowIndex = jobSummaryStartIndex + 1;
 			String line;
 			while (!(line = lines[tableRowIndex]).equals("")) {
-				summaryTableData.add(line.split("\t"));
+				pigSummaryTableData.add(line.split("\t"));
 				tableRowIndex++;
 			}
 		}
@@ -92,26 +141,16 @@ public class LogSummary {
 	 * Adds links to the job details pages on the job tracker.
 	 * @param lines
 	 */
-	private void parseJobStats(String[] lines) {
+	private void parsePigJobStats(String[] lines) {
 		int jobStatsStartIndex = -1;
 		
 		Map<String, String> jobDetailUrls = new HashMap<String, String>();
 
-		// Regex to search for URLs to job details pages.
-		Pattern p = Pattern.compile(
-				"https?://" + // http(s)://
-				"[-\\w\\.]+" + // domain
-				"(?::\\d+)?" + // port
-				"/[\\w/\\.]*" + // path
-				// query string
-				"\\?\\S+" + 
-				"(job_\\d{12}_\\d{4,})" + // job id
-				"\\S*"
-		);
+		
 		
 		for (int i = 0; i < lines.length; i++) {
 			String line = lines[i];
-			Matcher m = p.matcher(line);
+			Matcher m = jobTrackerUrl.matcher(line);
 			
 			if (m.find()) {
 				jobDetailUrls.put(m.group(1), m.group(0));
@@ -124,7 +163,7 @@ public class LogSummary {
 		
 		if (jobStatsStartIndex != -1) {
 			String headerLine = lines[jobStatsStartIndex];
-			statTableHeaders = headerLine.split("\t");
+			pigStatTableHeaders = headerLine.split("\t");
 			
 			int tableRowIndex = jobStatsStartIndex + 1;
 			String line;
@@ -133,26 +172,139 @@ public class LogSummary {
 				if (jobDetailUrls.containsKey(stats[0])) {
 					stats[0] = "<a href=\"" + jobDetailUrls.get(stats[0]) + "\">" + stats[0] + "</a>";
 				}
-				statTableData.add(stats);
+				pigStatTableData.add(stats);
 				tableRowIndex++;
 			}
 		}
 	}
 	
-	public String[] getStatTableHeaders() {
-		return statTableHeaders;
+	private void parseHiveQueries(String[] lines) {
+		for (int i = 0; i < lines.length;) {
+			String line = lines[i];
+			int parsingCommandIndex = line.indexOf(HIVE_PARSING_START);
+			if (parsingCommandIndex != -1) {
+				// parse query text
+				int queryStartIndex = parsingCommandIndex + HIVE_PARSING_START.length();
+				StringBuilder query = new StringBuilder(line.substring(queryStartIndex) + "\n");
+				
+				i++;
+				while (i < lines.length && !(line = lines[i]).contains(HIVE_PARSING_END)) {
+					query.append(line + "\n");
+					i++;
+				}
+				String queryString = query.toString().trim().replaceAll("\n","<br/>");
+				hiveQueries.add(queryString);
+				i++;
+				
+				// parse the query's Map-Reduce jobs, if any.
+				int numMRJobs = 0;
+				List<String> jobTrackerUrls = new ArrayList<String>();
+				while (i < lines.length) {
+					line = lines[i];
+					if (line.contains(HIVE_NUM_MAP_REDUCE_JOBS_STRING)) {
+						// query involves map reduce jobs
+						numMRJobs = Integer.parseInt(line.substring(HIVE_NUM_MAP_REDUCE_JOBS_STRING.length()));
+						i++;
+						
+						// get the job tracker URLs
+						String lastUrl = "";
+						int numJobsSeen = 0;
+						while (numJobsSeen < numMRJobs && i < lines.length) {
+							line = lines[i];
+							if (line.contains(HIVE_MAP_REDUCE_JOB_START)) {
+								Matcher m = jobTrackerUrl.matcher(line);
+								if (m.find() && !lastUrl.equals(m.group(1))) {
+									jobTrackerUrls.add(m.group(0));
+									lastUrl = m.group(1);
+									numJobsSeen++;
+								}
+							}
+							i++;
+						}
+						
+						// get the map reduce jobs summary
+						while (i < lines.length) {
+							line = lines[i];
+							if (line.contains(HIVE_MAP_REDUCE_JOBS_SUMMARY)) {
+								// job summary table found
+								i++;
+								
+								List<List<String>> queryJobs = new ArrayList<List<String>>();
+								
+								Pattern p = Pattern.compile(
+									"Job (\\d+): Map: (\\d+)  Reduce: (\\d+)   HDFS Read: (\\d+) HDFS Write: (\\d+)"
+								);
+								
+								int previousJob = -1;
+								numJobsSeen = 0;
+								while (numJobsSeen < numMRJobs && i < lines.length) {
+									line = lines[i];
+									Matcher m = p.matcher(line);
+									if (m.find()) {
+										int currJob = Integer.parseInt(m.group(1));
+										if (currJob == previousJob) {
+											i++;
+											continue;
+										}
+										
+										List<String> job = new ArrayList<String>();
+										job.add("<a href=\"" + jobTrackerUrls.get(currJob) +
+												"\">" + currJob + "</a>");
+										job.add(m.group(2));
+										job.add(m.group(3));
+										job.add(m.group(4));
+										job.add(m.group(5));
+										queryJobs.add(job);
+										previousJob = currJob;
+										numJobsSeen++;
+									}
+									i++;
+								}
+								
+								if (numJobsSeen == numMRJobs) {
+									hiveQueryJobs.add(queryJobs);
+								}
+								
+								break;
+							}
+							i++;
+						} 
+						break;
+					}
+					else if (line.contains(HIVE_PARSING_START)) {
+						if (numMRJobs == 0) {
+							hiveQueryJobs.add(null);
+						}
+						break;
+					}
+					i++;
+				}
+				continue;
+			}
+			
+			i++;
+		}
+		return;
+	}
+	
+	public String[] getPigStatTableHeaders() {
+		return pigStatTableHeaders;
 	}
 
-	public List<String[]> getStatTableData() {
-		return statTableData;
+	public List<String[]> getPigStatTableData() {
+		return pigStatTableData;
 	}
 
-	public String[] getSummaryTableHeaders() {
-		return summaryTableHeaders;
+	public String[] getPigSummaryTableHeaders() {
+		return pigSummaryTableHeaders;
 	}
 
-	public List<String[]> getSummaryTableData() {
-		return summaryTableData;
+	public List<String[]> getPigSummaryTableData() {
+		return pigSummaryTableData;
+	}
+	
+	public String getJobType() {
+		return jobType;
 	}
 	
 	public String getCommand() {
@@ -166,4 +318,12 @@ public class LogSummary {
 	public List<String> getParams() {
 		return params;
 	}
+
+	public List<String> getHiveQueries() {
+		return hiveQueries;
+	}
+
+	public List<List<List<String>>> getHiveQueryJobs() {
+		return hiveQueryJobs;
+	}
 }
diff --git a/src/java/azkaban/webapp/servlet/ExecutorServlet.java b/src/java/azkaban/webapp/servlet/ExecutorServlet.java
index 1ba4029..e45dcc7 100644
--- a/src/java/azkaban/webapp/servlet/ExecutorServlet.java
+++ b/src/java/azkaban/webapp/servlet/ExecutorServlet.java
@@ -479,10 +479,18 @@ public class ExecutorServlet extends LoginAbstractAzkabanServlet {
 			ret.put("command", summary.getCommand());
 			ret.put("classpath", summary.getClasspath());
 			ret.put("params", summary.getParams());
-			ret.put("summaryTableHeaders", summary.getSummaryTableHeaders());
-			ret.put("summaryTableData", summary.getSummaryTableData());
-			ret.put("statTableHeaders", summary.getStatTableHeaders());
-			ret.put("statTableData", summary.getStatTableData());
+			
+			String jobType = summary.getJobType();
+			
+			if (jobType.contains("pig")) {
+				ret.put("summaryTableHeaders", summary.getPigSummaryTableHeaders());
+				ret.put("summaryTableData", summary.getPigSummaryTableData());
+				ret.put("statTableHeaders", summary.getPigStatTableHeaders());
+				ret.put("statTableData", summary.getPigStatTableData());
+			} else if (jobType.contains("hive")) {
+				ret.put("hiveQueries", summary.getHiveQueries());
+				ret.put("hiveQueryJobs", summary.getHiveQueryJobs());
+			}
 		} catch (ExecutorManagerException e) {
 			throw new ServletException(e);
 		}
diff --git a/src/java/azkaban/webapp/servlet/velocity/jobdetailspage.vm b/src/java/azkaban/webapp/servlet/velocity/jobdetailspage.vm
index 0ab9e56..df18893 100644
--- a/src/java/azkaban/webapp/servlet/velocity/jobdetailspage.vm
+++ b/src/java/azkaban/webapp/servlet/velocity/jobdetailspage.vm
@@ -94,22 +94,34 @@
 				</table>
 				
 				<br/>
-				<h4>Job Summary</h4>
-				<table>
-					<thead id="summaryHeader">
-					</thead>
-					<tbody id="summaryBody">
-					</tbody>
-				</table>
+				<div id="jobsummary">Job Summary
+					<table>
+						<thead id="summaryHeader">
+						</thead>
+						<tbody id="summaryBody">
+						</tbody>
+					</table>
+				</div>
 				
 				<br/>
-				<h4>Job Stats</h4>
-				<table>
-					<thead id="statsHeader">
-					</thead>
-					<tbody id="statsBody">
-					</tbody>
-				</table>
+				<div id="jobstats">Job Stats
+					<table>
+						<thead id="statsHeader">
+						</thead>
+						<tbody id="statsBody">
+						</tbody>
+					</table>
+				</div>
+				
+				<br/>
+				<div id="hiveTable">Job Summary
+					<table>
+						<thead id="hiveTableHeader">
+						</thead>
+						<tbody id="hiveTableBody">
+						</tbody>
+					</table>
+				</div>
 			</div>
 
 #end
diff --git a/src/web/js/azkaban.jobdetails.view.js b/src/web/js/azkaban.jobdetails.view.js
index a809e4a..a47487c 100644
--- a/src/web/js/azkaban.jobdetails.view.js
+++ b/src/web/js/azkaban.jobdetails.view.js
@@ -139,6 +139,7 @@ azkaban.JobSummaryView = Backbone.View.extend({
 					self.renderCommandTable(data.command, data.classpath, data.params);
 					self.renderJobTable(data.summaryTableHeaders, data.summaryTableData, "summary");
 					self.renderJobTable(data.statTableHeaders, data.statTableData, "stats");
+					self.renderHiveTable(data.hiveQueries, data.hiveQueryJobs);
 				}
 			}
 		});
@@ -219,6 +220,71 @@ azkaban.JobSummaryView = Backbone.View.extend({
 				}
 				body.append(tr);
 			}
+		} else {
+			$("#job" + prefix).hide();
+		}
+	},
+	renderHiveTable: function(queries, queryJobs) {
+		if (queries) {
+			// Set up table column headers
+			var header = $("#hiveTableHeader");
+			var tr = document.createElement("tr");
+			var headers = ["Query","Job","Map","Reduce","HDFS Read","HDFS Write"];
+			var i;
+			
+			for (i = 0; i < headers.length; i++) {
+				var th = document.createElement("th");
+				$(th).text(headers[i]);
+				$(tr).append(th);
+			}
+			header.append(tr);
+			
+			// Construct table body
+			var body = $("#hiveTableBody");
+			for (i = 0; i < queries.length; i++) {
+				// new query
+				tr = document.createElement("tr");
+				var td = document.createElement("td");
+				$(td).html(queries[i]);
+				$(tr).append(td);
+				
+				var jobs = queryJobs[i];
+				if (jobs != null) {
+					// add first job for this query
+					var jobValues = jobs[0];
+					var j;
+					for (j = 0; j < jobValues.length; j++) {
+						td = document.createElement("td");
+						$(td).html(jobValues[j]);
+						$(tr).append(td);
+					}
+					body.append(tr);
+					
+					// add remaining jobs for this query
+					for (j = 1; j < jobs.length; j++) {
+						jobValues = jobs[j];
+						tr = document.createElement("tr");
+						
+						// add empty cell for query column
+						td = document.createElement("td");
+						$(td).html("&nbsp;");
+						$(tr).append(td);
+						
+						// add job values
+						for (var k = 0; k < jobValues.length; k++) {
+							td = document.createElement("td");
+							$(td).html(jobValues[k]);
+							$(tr).append(td);
+						}
+						body.append(tr);
+					}
+					
+				} else {
+					body.append(tr);
+				}
+			}
+		} else {
+			$("#hiveTable").hide();
 		}
 	}
 });