azkaban-uncached

Details

diff --git a/src/java/azkaban/utils/LogSummary.java b/src/java/azkaban/utils/LogSummary.java
index 9d67e53..c9e5495 100644
--- a/src/java/azkaban/utils/LogSummary.java
+++ b/src/java/azkaban/utils/LogSummary.java
@@ -4,20 +4,46 @@ import azkaban.utils.FileIOUtils.LogData;
 
 import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.Collection;
+import java.util.HashMap;
 import java.util.List;
+import java.util.Map;
 import java.util.regex.Matcher;
 import java.util.regex.Pattern;
 
 public class LogSummary {
-	private String command = null;
-	private List<String> classpath = new ArrayList<String>();
-	private List<String> params = new ArrayList<String>();
+	private static final String HIVE_PARSING_START = "Parsing command: ";
+	private static final String HIVE_PARSING_END = "Parse Completed";
+	private static final String HIVE_NUM_MAP_REDUCE_JOBS_STRING = "Total MapReduce jobs = ";
+	private static final String HIVE_MAP_REDUCE_JOB_START = "Starting Job";
+	private static final String HIVE_MAP_REDUCE_JOBS_SUMMARY = "MapReduce Jobs Launched:";
 	
-	private String[] statTableHeaders = null;
-	private List<String[]> statTableData = new ArrayList<String[]>();
+	// Regex to search for URLs to job details pages.
+	private static final Pattern jobTrackerUrl = Pattern.compile(
+			"https?://" + // http(s)://
+			"[-\\w\\.]+" + // domain
+			"(?::\\d+)?" + // port
+			"/[\\w/\\.]*" + // path
+			// query string
+			"\\?\\S+" + 
+			"(job_\\d{12}_\\d{4,})" + // job id
+			"\\S*"
+	);
 	
-	private String[] summaryTableHeaders = null;
-	private List<String[]> summaryTableData = new ArrayList<String[]>();
+	private String jobType = null;
+	private List<Pair<String,String>> commandProperties = new ArrayList<Pair<String,String>>();
+	
+	private String[] pigStatTableHeaders = null;
+	private List<String[]> pigStatTableData = new ArrayList<String[]>();
+	
+	private String[] pigSummaryTableHeaders = null;
+	private List<String[]> pigSummaryTableData = new ArrayList<String[]>();
+	
+	private List<String> hiveQueries = new ArrayList<String>();
+	
+	// Each element in hiveQueryJobs contains a list of the jobs for a query.
+	// Each job contains a list of strings of the job summary values.
+	private List<List<List<String>>> hiveQueryJobs = new ArrayList<List<List<String>>>();
 	
 	public LogSummary(LogData log) {
 		if (log != null) {
@@ -26,15 +52,36 @@ public class LogSummary {
 	}
 	
 	private void parseLogData(String data) {
-		data = data.replaceAll(".*? - ", "");
+		// Filter out all the timestamps
+		data = data.replaceAll("(?m)^.*? - ", "");
 		String[] lines = data.split("\n");
 		
-		parseCommand(lines);
-		parseJobSummary(lines);
-		parseJobStats(lines);
+		if (parseCommand(lines)) {
+			jobType = parseJobType(lines);
+			
+			if (jobType.contains("pig")) {
+				parsePigJobSummary(lines);
+				parsePigJobStats(lines);
+			} else if (jobType.contains("hive")) {
+				parseHiveQueries(lines);
+			}
+		}
 	}
 
-	private void parseCommand(String[] lines) {
+	private String parseJobType(String[] lines) {
+		Pattern p = Pattern.compile("Building (\\S+) job executor");
+		
+		for (String line : lines) {
+			Matcher m = p.matcher(line);
+			if (m.find()) {
+				return m.group(1);
+			}
+		}
+		
+		return null;
+	}
+	
+	private boolean parseCommand(String[] lines) {
 		int commandStartIndex = -1;
 		for (int i = 0; i < lines.length; i++) {
 			if (lines[i].startsWith("Command: ")) {
@@ -44,25 +91,58 @@ public class LogSummary {
 		}
 		
 		if (commandStartIndex != -1) {
-			command = lines[commandStartIndex].substring(9);
+			String command = lines[commandStartIndex].substring(9);
+			commandProperties.add(new Pair<String,String>("Command", command));
 			
 			// Parse classpath
 			Pattern p = Pattern.compile("(?:-cp|-classpath)\\s+(\\S+)");
 			Matcher m = p.matcher(command);
+			StringBuilder sb = new StringBuilder();
 			if (m.find()) {
-				classpath = Arrays.asList(m.group(1).split(":"));
+				sb.append(StringUtils.join((Collection<String>)Arrays.asList(m.group(1).split(":")), "<br/>"));
+				commandProperties.add(new Pair<String,String>("Classpath", sb.toString()));
+			}
+			
+			// Parse environment variables
+			p = Pattern.compile("-D(\\S+)");
+			m = p.matcher(command);
+			sb = new StringBuilder();
+			while (m.find()) {
+				sb.append(m.group(1) + "<br/>");
+			}
+			if (sb.length() > 0) {
+				commandProperties.add(new Pair<String,String>("-D", sb.toString()));
+			}
+			
+			// Parse memory settings
+			p = Pattern.compile("(-Xm\\S+)");
+			m = p.matcher(command);
+			sb = new StringBuilder();
+			while (m.find()) {
+				sb.append(m.group(1) + "<br/>");
+			}
+			if (sb.length() > 0) {
+				commandProperties.add(new Pair<String,String>("Memory Settings", sb.toString()));
 			}
 			
 			// Parse Pig params
 			p = Pattern.compile("-param\\s+(\\S+)");
 			m = p.matcher(command);
+			sb = new StringBuilder();
 			while (m.find()) {
-				params.add(m.group(1));
+				sb.append(m.group(1) + "<br/>");
 			}
+			if (sb.length() > 0) {
+				commandProperties.add(new Pair<String,String>("Params", sb.toString()));
+			}
+			
+			return true;
 		}
+		
+		return false;
 	}
 	
-	private void parseJobSummary(String[] lines) {
+	private void parsePigJobSummary(String[] lines) {
 		int jobSummaryStartIndex = -1;
 		for (int i = 0; i < lines.length; i++) {
 			if (lines[i].startsWith("HadoopVersion")) {
@@ -73,21 +153,37 @@ public class LogSummary {
 		
 		if (jobSummaryStartIndex != -1) {
 			String headerLine = lines[jobSummaryStartIndex];
-			summaryTableHeaders = headerLine.split("\t");
+			pigSummaryTableHeaders = headerLine.split("\t");
 			
 			int tableRowIndex = jobSummaryStartIndex + 1;
 			String line;
 			while (!(line = lines[tableRowIndex]).equals("")) {
-				summaryTableData.add(line.split("\t"));
+				pigSummaryTableData.add(line.split("\t"));
 				tableRowIndex++;
 			}
 		}
 	}
 	
-	private void parseJobStats(String[] lines) {
+	/**
+	 * Parses the Pig Job Stats table that includes the max/min mapper and reduce times.
+	 * Adds links to the job details pages on the job tracker.
+	 * @param lines
+	 */
+	private void parsePigJobStats(String[] lines) {
 		int jobStatsStartIndex = -1;
+		
+		Map<String, String> jobDetailUrls = new HashMap<String, String>();
+
+		
+		
 		for (int i = 0; i < lines.length; i++) {
-			if (lines[i].startsWith("Job Stats (time in seconds):")) {
+			String line = lines[i];
+			Matcher m = jobTrackerUrl.matcher(line);
+			
+			if (m.find()) {
+				jobDetailUrls.put(m.group(1), m.group(0));
+			}
+			else if (line.startsWith("Job Stats (time in seconds):")) {
 				jobStatsStartIndex = i+1;
 				break;
 			}
@@ -95,42 +191,159 @@ public class LogSummary {
 		
 		if (jobStatsStartIndex != -1) {
 			String headerLine = lines[jobStatsStartIndex];
-			statTableHeaders = headerLine.split("\t");
+			pigStatTableHeaders = headerLine.split("\t");
 			
 			int tableRowIndex = jobStatsStartIndex + 1;
 			String line;
 			while (!(line = lines[tableRowIndex]).equals("")) {
-				statTableData.add(line.split("\t"));
+				String[] stats = line.split("\t");
+				if (jobDetailUrls.containsKey(stats[0])) {
+					stats[0] = "<a href=\"" + jobDetailUrls.get(stats[0]) + "\">" + stats[0] + "</a>";
+				}
+				pigStatTableData.add(stats);
 				tableRowIndex++;
 			}
 		}
 	}
 	
-	public String[] getStatTableHeaders() {
-		return statTableHeaders;
+	private void parseHiveQueries(String[] lines) {
+		for (int i = 0; i < lines.length;) {
+			String line = lines[i];
+			int parsingCommandIndex = line.indexOf(HIVE_PARSING_START);
+			if (parsingCommandIndex != -1) {
+				// parse query text
+				int queryStartIndex = parsingCommandIndex + HIVE_PARSING_START.length();
+				StringBuilder query = new StringBuilder(line.substring(queryStartIndex) + "\n");
+				
+				i++;
+				while (i < lines.length && !(line = lines[i]).contains(HIVE_PARSING_END)) {
+					query.append(line + "\n");
+					i++;
+				}
+				String queryString = query.toString().trim().replaceAll("\n","<br/>");
+				hiveQueries.add(queryString);
+				i++;
+				
+				// parse the query's Map-Reduce jobs, if any.
+				int numMRJobs = 0;
+				List<String> jobTrackerUrls = new ArrayList<String>();
+				while (i < lines.length) {
+					line = lines[i];
+					if (line.contains(HIVE_NUM_MAP_REDUCE_JOBS_STRING)) {
+						// query involves map reduce jobs
+						numMRJobs = Integer.parseInt(line.substring(HIVE_NUM_MAP_REDUCE_JOBS_STRING.length()));
+						i++;
+						
+						// get the job tracker URLs
+						String lastUrl = "";
+						int numJobsSeen = 0;
+						while (numJobsSeen < numMRJobs && i < lines.length) {
+							line = lines[i];
+							if (line.contains(HIVE_MAP_REDUCE_JOB_START)) {
+								Matcher m = jobTrackerUrl.matcher(line);
+								if (m.find() && !lastUrl.equals(m.group(1))) {
+									jobTrackerUrls.add(m.group(0));
+									lastUrl = m.group(1);
+									numJobsSeen++;
+								}
+							}
+							i++;
+						}
+						
+						// get the map reduce jobs summary
+						while (i < lines.length) {
+							line = lines[i];
+							if (line.contains(HIVE_MAP_REDUCE_JOBS_SUMMARY)) {
+								// job summary table found
+								i++;
+								
+								List<List<String>> queryJobs = new ArrayList<List<String>>();
+								
+								Pattern p = Pattern.compile(
+									"Job (\\d+): Map: (\\d+)  Reduce: (\\d+)   HDFS Read: (\\d+) HDFS Write: (\\d+)"
+								);
+								
+								int previousJob = -1;
+								numJobsSeen = 0;
+								while (numJobsSeen < numMRJobs && i < lines.length) {
+									line = lines[i];
+									Matcher m = p.matcher(line);
+									if (m.find()) {
+										int currJob = Integer.parseInt(m.group(1));
+										if (currJob == previousJob) {
+											i++;
+											continue;
+										}
+										
+										List<String> job = new ArrayList<String>();
+										job.add("<a href=\"" + jobTrackerUrls.get(currJob) +
+												"\">" + currJob + "</a>");
+										job.add(m.group(2));
+										job.add(m.group(3));
+										job.add(m.group(4));
+										job.add(m.group(5));
+										queryJobs.add(job);
+										previousJob = currJob;
+										numJobsSeen++;
+									}
+									i++;
+								}
+								
+								if (numJobsSeen == numMRJobs) {
+									hiveQueryJobs.add(queryJobs);
+								}
+								
+								break;
+							}
+							i++;
+						} 
+						break;
+					}
+					else if (line.contains(HIVE_PARSING_START)) {
+						if (numMRJobs == 0) {
+							hiveQueryJobs.add(null);
+						}
+						break;
+					}
+					i++;
+				}
+				continue;
+			}
+			
+			i++;
+		}
+		return;
+	}
+	
+	public String[] getPigStatTableHeaders() {
+		return pigStatTableHeaders;
 	}
 
-	public List<String[]> getStatTableData() {
-		return statTableData;
+	public List<String[]> getPigStatTableData() {
+		return pigStatTableData;
 	}
 
-	public String[] getSummaryTableHeaders() {
-		return summaryTableHeaders;
+	public String[] getPigSummaryTableHeaders() {
+		return pigSummaryTableHeaders;
 	}
 
-	public List<String[]> getSummaryTableData() {
-		return summaryTableData;
+	public List<String[]> getPigSummaryTableData() {
+		return pigSummaryTableData;
+	}
+	
+	public String getJobType() {
+		return jobType;
 	}
 	
-	public String getCommand() {
-		return command;
+	public List<Pair<String,String>> getCommandProperties() {
+		return commandProperties;
 	}
 
-	public List<String> getClasspath() {
-		return classpath;
+	public List<String> getHiveQueries() {
+		return hiveQueries;
 	}
 
-	public List<String> getParams() {
-		return params;
+	public List<List<List<String>>> getHiveQueryJobs() {
+		return hiveQueryJobs;
 	}
 }
diff --git a/src/java/azkaban/webapp/servlet/ExecutorServlet.java b/src/java/azkaban/webapp/servlet/ExecutorServlet.java
index 1ba4029..ad572ed 100644
--- a/src/java/azkaban/webapp/servlet/ExecutorServlet.java
+++ b/src/java/azkaban/webapp/servlet/ExecutorServlet.java
@@ -476,13 +476,19 @@ public class ExecutorServlet extends LoginAbstractAzkabanServlet {
 			LogData data = executorManager.getExecutionJobLog(exFlow, jobId, 0, Integer.MAX_VALUE, attempt);
 			
 			LogSummary summary = new LogSummary(data);
-			ret.put("command", summary.getCommand());
-			ret.put("classpath", summary.getClasspath());
-			ret.put("params", summary.getParams());
-			ret.put("summaryTableHeaders", summary.getSummaryTableHeaders());
-			ret.put("summaryTableData", summary.getSummaryTableData());
-			ret.put("statTableHeaders", summary.getStatTableHeaders());
-			ret.put("statTableData", summary.getStatTableData());
+			ret.put("commandProperties", summary.getCommandProperties());
+			
+			String jobType = summary.getJobType();
+			
+			if (jobType.contains("pig")) {
+				ret.put("summaryTableHeaders", summary.getPigSummaryTableHeaders());
+				ret.put("summaryTableData", summary.getPigSummaryTableData());
+				ret.put("statTableHeaders", summary.getPigStatTableHeaders());
+				ret.put("statTableData", summary.getPigStatTableData());
+			} else if (jobType.contains("hive")) {
+				ret.put("hiveQueries", summary.getHiveQueries());
+				ret.put("hiveQueryJobs", summary.getHiveQueryJobs());
+			}
 		} catch (ExecutorManagerException e) {
 			throw new ServletException(e);
 		}
diff --git a/src/java/azkaban/webapp/servlet/velocity/jobdetailspage.vm b/src/java/azkaban/webapp/servlet/velocity/jobdetailspage.vm
index 0ab9e56..df18893 100644
--- a/src/java/azkaban/webapp/servlet/velocity/jobdetailspage.vm
+++ b/src/java/azkaban/webapp/servlet/velocity/jobdetailspage.vm
@@ -94,22 +94,34 @@
 				</table>
 				
 				<br/>
-				<h4>Job Summary</h4>
-				<table>
-					<thead id="summaryHeader">
-					</thead>
-					<tbody id="summaryBody">
-					</tbody>
-				</table>
+				<div id="jobsummary">Job Summary
+					<table>
+						<thead id="summaryHeader">
+						</thead>
+						<tbody id="summaryBody">
+						</tbody>
+					</table>
+				</div>
 				
 				<br/>
-				<h4>Job Stats</h4>
-				<table>
-					<thead id="statsHeader">
-					</thead>
-					<tbody id="statsBody">
-					</tbody>
-				</table>
+				<div id="jobstats">Job Stats
+					<table>
+						<thead id="statsHeader">
+						</thead>
+						<tbody id="statsBody">
+						</tbody>
+					</table>
+				</div>
+				
+				<br/>
+				<div id="hiveTable">Job Summary
+					<table>
+						<thead id="hiveTableHeader">
+						</thead>
+						<tbody id="hiveTableBody">
+						</tbody>
+					</table>
+				</div>
 			</div>
 
 #end
diff --git a/src/sql/create.project_properties.sql b/src/sql/create.project_properties.sql
index 9ed6267..7adf25e 100644
--- a/src/sql/create.project_properties.sql
+++ b/src/sql/create.project_properties.sql
@@ -1,7 +1,7 @@
 CREATE TABLE project_properties (
 	project_id INT NOT NULL,
 	version INT NOT NULL,
-	name VARCHAR(128),
+	name VARCHAR(255),
 	modified_time BIGINT NOT NULL,
 	encoding_type TINYINT,
 	property BLOB,
diff --git a/src/web/js/azkaban.jobdetails.view.js b/src/web/js/azkaban.jobdetails.view.js
index aba399c..a24943d 100644
--- a/src/web/js/azkaban.jobdetails.view.js
+++ b/src/web/js/azkaban.jobdetails.view.js
@@ -136,55 +136,27 @@ azkaban.JobSummaryView = Backbone.View.extend({
 					console.log(data.error);
 				}
 				else {
-					self.renderCommandTable(data.command, data.classpath, data.params);
+					self.renderCommandTable(data.commandProperties);
 					self.renderJobTable(data.summaryTableHeaders, data.summaryTableData, "summary");
 					self.renderJobTable(data.statTableHeaders, data.statTableData, "stats");
+					self.renderHiveTable(data.hiveQueries, data.hiveQueryJobs);
 				}
 			}
 		});
 	},
-	renderCommandTable: function(command, classpath, params) {
-		if (command) {
+	renderCommandTable: function(commandProperties) {
+		if (commandProperties) {
 			var commandTable = $("#commandTable");
-			var i;
-			
-			// Add row for command
-			var tr = document.createElement("tr");
-			var td = document.createElement("td");
-			$(td).append("<b>Command</b>");
-			$(tr).append(td);
-			td = document.createElement("td");
-			$(td).text(command);
-			$(tr).append(td);
-			commandTable.append(tr);
 			
-			// Add row for classpath
-			if (classpath && classpath.length > 0) {
-				tr = document.createElement("tr");
-				td = document.createElement("td");
-				$(td).append("<b>Classpath</b>");
-				$(tr).append(td);
-				td = document.createElement("td");
-				$(td).append(classpath[0]);
-				for (i = 1; i < classpath.length; i++) {
-					$(td).append("<br/>" + classpath[i]);
-				}
-				$(tr).append(td);
-				commandTable.append(tr);
-			}
-			
-			// Add row for params
-			if (params && params.length > 0) {
-				tr = document.createElement("tr");
-				td = document.createElement("td");
-				$(td).append("<b>Params</b>");
-				$(tr).append(td);
-				td = document.createElement("td");
-				$(td).append(params[0]);
-				for (i = 1; i < params.length; i++) {
-					$(td).append("<br/>" + params[i]);
-				}
-				$(tr).append(td);
+			for (var i = 0; i < commandProperties.length; i++) {
+				var prop = commandProperties[i];
+				var tr = document.createElement("tr");
+				var name = document.createElement("td");
+				var value = document.createElement("td");
+				$(name).html("<b>" + prop.first + "</b>");
+				$(value).html(prop.second);
+				$(tr).append(name);
+				$(tr).append(value);
 				commandTable.append(tr);
 			}
 		}
@@ -207,13 +179,83 @@ azkaban.JobSummaryView = Backbone.View.extend({
 			for (i = 0; i < data.length; i++) {
 				tr = document.createElement("tr");
 				var row = data[i];
-				for (var j = 0; j < headers.length; j++) {
+				for (var j = 0; j < row.length; j++) {
 					var td = document.createElement("td");
-					$(td).text(row[j]);
+					if (j == 0) {
+						// first column is a link to job details page 
+						$(td).html(row[j]);
+					} else {
+						$(td).text(row[j]);
+					}
 					$(tr).append(td);
 				}
 				body.append(tr);
 			}
+		} else {
+			$("#job" + prefix).hide();
+		}
+	},
+	renderHiveTable: function(queries, queryJobs) {
+		if (queries) {
+			// Set up table column headers
+			var header = $("#hiveTableHeader");
+			var tr = document.createElement("tr");
+			var headers = ["Query","Job","Map","Reduce","HDFS Read","HDFS Write"];
+			var i;
+			
+			for (i = 0; i < headers.length; i++) {
+				var th = document.createElement("th");
+				$(th).text(headers[i]);
+				$(tr).append(th);
+			}
+			header.append(tr);
+			
+			// Construct table body
+			var body = $("#hiveTableBody");
+			for (i = 0; i < queries.length; i++) {
+				// new query
+				tr = document.createElement("tr");
+				var td = document.createElement("td");
+				$(td).html("<b>" + queries[i] + "</b>");
+				$(tr).append(td);
+				
+				var jobs = queryJobs[i];
+				if (jobs != null) {
+					// add first job for this query
+					var jobValues = jobs[0];
+					var j;
+					for (j = 0; j < jobValues.length; j++) {
+						td = document.createElement("td");
+						$(td).html(jobValues[j]);
+						$(tr).append(td);
+					}
+					body.append(tr);
+					
+					// add remaining jobs for this query
+					for (j = 1; j < jobs.length; j++) {
+						jobValues = jobs[j];
+						tr = document.createElement("tr");
+						
+						// add empty cell for query column
+						td = document.createElement("td");
+						$(td).html("&nbsp;");
+						$(tr).append(td);
+						
+						// add job values
+						for (var k = 0; k < jobValues.length; k++) {
+							td = document.createElement("td");
+							$(td).html(jobValues[k]);
+							$(tr).append(td);
+						}
+						body.append(tr);
+					}
+					
+				} else {
+					body.append(tr);
+				}
+			}
+		} else {
+			$("#hiveTable").hide();
 		}
 	}
 });