azkaban-developers

Details

diff --git a/src/java/azkaban/webapp/AzkabanWebServer.java b/src/java/azkaban/webapp/AzkabanWebServer.java
index 52e8d7c..b6d961b 100644
--- a/src/java/azkaban/webapp/AzkabanWebServer.java
+++ b/src/java/azkaban/webapp/AzkabanWebServer.java
@@ -93,8 +93,9 @@ import azkaban.webapp.servlet.HistoryServlet;
 import azkaban.webapp.servlet.ProjectServlet;
 import azkaban.webapp.servlet.ProjectManagerServlet;
 import azkaban.webapp.servlet.TriggerManagerServlet;
-import azkaban.webapp.servlet.TriggerPlugin;
-import azkaban.webapp.servlet.ViewerPlugin;
+import azkaban.webapp.plugin.TriggerPlugin;
+import azkaban.webapp.plugin.ViewerPlugin;
+import azkaban.webapp.plugin.PluginRegistry;
 import azkaban.webapp.session.SessionCache;
 
 /**
@@ -153,7 +154,6 @@ public class AzkabanWebServer extends AzkabanServer {
 	private Props props;
 	private SessionCache sessionCache;
 	private File tempDir;
-	private List<ViewerPlugin> viewerPlugins;
 	private Map<String, TriggerPlugin> triggerPlugins;
 	
 	private MBeanServer mbeanServer;
@@ -215,10 +215,6 @@ public class AzkabanWebServer extends AzkabanServer {
 		configureMBeanServer();
 	}
 
-	private void setViewerPlugins(List<ViewerPlugin> viewerPlugins) {
-		this.viewerPlugins = viewerPlugins;
-	}
-	
 	private void setTriggerPlugins(Map<String, TriggerPlugin> triggerPlugins) {
 		this.triggerPlugins = triggerPlugins;
 	}
@@ -775,7 +771,7 @@ public class AzkabanWebServer extends AzkabanServer {
 		root.addServlet(new ServletHolder(new TriggerManagerServlet()),"/triggers");
 		
 		String viewerPluginDir = azkabanSettings.getString("viewer.plugin.dir", "plugins/viewer");
-		app.setViewerPlugins(loadViewerPlugins(root, viewerPluginDir, app.getVelocityEngine()));
+		loadViewerPlugins(root, viewerPluginDir, app.getVelocityEngine());
 		
 		// triggerplugin
 		String triggerPluginDir = azkabanSettings.getString("trigger.plugin.dir", "plugins/triggers");
@@ -963,13 +959,12 @@ public class AzkabanWebServer extends AzkabanServer {
 		return triggerPlugins;
 	}
 	
-	private static List<ViewerPlugin> loadViewerPlugins(Context root, String pluginPath, VelocityEngine ve) {
+	private static void loadViewerPlugins(Context root, String pluginPath, VelocityEngine ve) {
 		File viewerPluginPath = new File(pluginPath);
 		if (!viewerPluginPath.exists()) {
-			return Collections.<ViewerPlugin>emptyList();
+			return;
 		}
 			
-		ArrayList<ViewerPlugin> installedViewerPlugins = new ArrayList<ViewerPlugin>();
 		ClassLoader parentLoader = AzkabanWebServer.class.getClassLoader();
 		File[] pluginDirs = viewerPluginPath.listFiles();
 		ArrayList<String> jarPaths = new ArrayList<String>();
@@ -1011,6 +1006,7 @@ public class AzkabanWebServer extends AzkabanServer {
 			
 			String pluginName = pluginProps.getString("viewer.name");
 			String pluginWebPath = pluginProps.getString("viewer.path");
+			String pluginJobType = pluginProps.getString("viewer.jobtype", null);
 			int pluginOrder = pluginProps.getInt("viewer.order", 0);
 			boolean pluginHidden = pluginProps.getBoolean("viewer.hidden", false);
 			List<String> extLibClasspath = pluginProps.getStringList("viewer.external.classpaths", (List<String>)null);
@@ -1114,27 +1110,18 @@ public class AzkabanWebServer extends AzkabanServer {
 			
 			AbstractAzkabanServlet avServlet = (AbstractAzkabanServlet)obj;
 			root.addServlet(new ServletHolder(avServlet), "/" + pluginWebPath + "/*");
-			installedViewerPlugins.add(new ViewerPlugin(pluginName, pluginWebPath, pluginOrder, pluginHidden));
+			PluginRegistry.getRegistry().register(new ViewerPlugin(
+						pluginName, 
+						pluginWebPath, 
+						pluginOrder, 
+						pluginHidden,
+						pluginJobType));
 		}
 		
 		// Velocity needs the jar resource paths to be set.
 		String jarResourcePath = StringUtils.join(jarPaths, ", ");
 		logger.info("Setting jar resource path " + jarResourcePath);
 		ve.addProperty("jar.resource.loader.path", jarResourcePath);
-		
-		// Sort plugins based on order
-		Collections.sort(installedViewerPlugins, new Comparator<ViewerPlugin>() {
-			@Override
-			public int compare(ViewerPlugin o1, ViewerPlugin o2) {
-				return o1.getOrder() - o2.getOrder();
-			}
-		});
-		
-		return installedViewerPlugins;
-	}
-	
-	public List<ViewerPlugin> getViewerPlugins() {
-		return viewerPlugins;
 	}
 	
 	/**
diff --git a/src/java/azkaban/webapp/plugin/PluginRegistry.java b/src/java/azkaban/webapp/plugin/PluginRegistry.java
new file mode 100644
index 0000000..d49db54
--- /dev/null
+++ b/src/java/azkaban/webapp/plugin/PluginRegistry.java
@@ -0,0 +1,75 @@
+/*
+ * Copyright 2014 LinkedIn Corp.
+ * 
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package azkaban.webapp.plugin;
+
+import java.util.List;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.TreeSet;
+import java.util.Set;
+
+public class PluginRegistry {
+
+  private static PluginRegistry registry;
+
+  public TreeSet<ViewerPlugin> viewerPlugins;
+
+	public Map<String, TreeSet<ViewerPlugin>> jobTypeViewerPlugins;
+
+  private PluginRegistry() {
+		viewerPlugins = new TreeSet<ViewerPlugin>(ViewerPlugin.COMPARATOR);
+		jobTypeViewerPlugins = new HashMap<String, TreeSet<ViewerPlugin>>();
+  }
+
+  public void register(ViewerPlugin plugin) {
+		viewerPlugins.add(plugin);
+		String jobType = plugin.getJobType();
+		if (jobType == null) {
+			return;
+		}
+		TreeSet<ViewerPlugin> plugins = null;
+		if (!jobTypeViewerPlugins.containsKey(jobType)) {
+			plugins = new TreeSet<ViewerPlugin>(ViewerPlugin.COMPARATOR);
+			plugins.add(plugin);
+			jobTypeViewerPlugins.put(jobType, plugins);
+		}
+		else {
+			plugins = jobTypeViewerPlugins.get(jobType);
+			plugins.add(plugin);
+		}
+  }
+
+	public List<ViewerPlugin> getViewerPlugins() {
+		return new ArrayList<ViewerPlugin>(viewerPlugins);
+	}
+
+	public List<ViewerPlugin> getViewerPluginsForJobType(String jobType) {
+		TreeSet<ViewerPlugin> plugins = jobTypeViewerPlugins.get(jobType);
+		if (plugins == null) {
+			return null;
+		}
+		return new ArrayList<ViewerPlugin>(plugins);
+	}
+
+  public static PluginRegistry getRegistry() {
+    if (registry == null) {
+      registry = new PluginRegistry();
+    }
+    return registry;
+  }
+}
diff --git a/src/java/azkaban/webapp/servlet/AbstractAzkabanServlet.java b/src/java/azkaban/webapp/servlet/AbstractAzkabanServlet.java
index 8ac470e..2fc6ecc 100644
--- a/src/java/azkaban/webapp/servlet/AbstractAzkabanServlet.java
+++ b/src/java/azkaban/webapp/servlet/AbstractAzkabanServlet.java
@@ -39,6 +39,9 @@ import azkaban.utils.Props;
 import azkaban.webapp.AzkabanServer;
 import azkaban.webapp.AzkabanWebServer;
 import azkaban.webapp.session.Session;
+import azkaban.webapp.plugin.ViewerPlugin;
+import azkaban.webapp.plugin.TriggerPlugin;
+import azkaban.webapp.plugin.PluginRegistry;
 
 /**
  * Base Servlet for pages
@@ -91,7 +94,7 @@ public abstract class AbstractAzkabanServlet extends HttpServlet {
 		
 		if (application instanceof AzkabanWebServer) {
 			AzkabanWebServer server = (AzkabanWebServer)application;
-			viewerPlugins = server.getViewerPlugins();
+			viewerPlugins = PluginRegistry.getRegistry().getViewerPlugins();
 			triggerPlugins = new ArrayList<TriggerPlugin>(server.getTriggerPlugins().values());
 		}
 	}
diff --git a/src/java/azkaban/webapp/servlet/ExecutorServlet.java b/src/java/azkaban/webapp/servlet/ExecutorServlet.java
index 309ffc8..e154480 100644
--- a/src/java/azkaban/webapp/servlet/ExecutorServlet.java
+++ b/src/java/azkaban/webapp/servlet/ExecutorServlet.java
@@ -50,6 +50,8 @@ import azkaban.utils.FileIOUtils.LogData;
 import azkaban.utils.JSONUtils;
 import azkaban.webapp.AzkabanWebServer;
 import azkaban.webapp.session.Session;
+import azkaban.webapp.plugin.PluginRegistry;
+import azkaban.webapp.plugin.ViewerPlugin;
 
 public class ExecutorServlet extends LoginAbstractAzkabanServlet {
 	private static final long serialVersionUID = 1L;
@@ -197,7 +199,18 @@ public class ExecutorServlet extends LoginAbstractAzkabanServlet {
 				page.render();
 				return;
 			}
-		} catch (ExecutorManagerException e) {
+
+			ExecutableNode node = flow.getExecutableNode(jobId);
+			if (node == null) {
+				page.add("errorMsg", "Job " + jobId + " doesn't exist in " + flow.getExecutionId());
+				return;
+			}
+		
+			List<ViewerPlugin> jobViewerPlugins = PluginRegistry.getRegistry()
+					.getViewerPluginsForJobType(node.getType());
+			page.add("jobViewerPlugins", jobViewerPlugins);
+		}
+		catch (ExecutorManagerException e) {
 			page.add("errorMsg", "Error loading executing flow: " + e.getMessage());
 			page.render();
 			return;
@@ -209,7 +222,7 @@ public class ExecutorServlet extends LoginAbstractAzkabanServlet {
 			page.render();
 			return;
 		}
-		
+
 		page.add("projectName", project.getName());
 		page.add("flowid", flow.getFlowId());
 		
diff --git a/src/java/azkaban/webapp/servlet/velocity/jobdetailsheader.vm b/src/java/azkaban/webapp/servlet/velocity/jobdetailsheader.vm
new file mode 100644
index 0000000..4bf06c3
--- /dev/null
+++ b/src/java/azkaban/webapp/servlet/velocity/jobdetailsheader.vm
@@ -0,0 +1,61 @@
+#*
+ * Copyright 2014 LinkedIn Corp.
+ * 
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+*#
+
+	## Page header.
+
+		<div class="az-page-header">
+			<div class="container-full">
+				<div class="row">
+					<div class="col-xs-6">
+						<h1><a href="${context}/executor?execid=${execid}&job=${jobid}">Job Execution <small>$jobid</small></a></h1>
+					</div>
+					<div class="col-xs-6">
+						<div class="pull-right az-page-header-form">
+							<a href="${context}/manager?project=${projectName}&flow=${flowid}&job=$jobid" class="btn btn-info">Job Properties</a>
+						</div>
+					</div>
+				</div>
+			</div>
+		</div>
+	
+    <div class="container-full">
+
+  #parse ("azkaban/webapp/servlet/velocity/alerts.vm")
+
+  ## Breadcrumb
+
+			<ol class="breadcrumb">
+				<li><a href="${context}/manager?project=${projectName}"><strong>Project</strong> $projectName</a></li>
+				<li><a href="${context}/manager?project=${projectName}&flow=${flowid}"><strong>Flow</strong> $flowid</a></li>
+				<li><a href="${context}/executor?execid=${execid}#jobslist"><strong>Execution</strong> $execid</a></li>
+        <li class="active"><strong>Job</strong> $jobid</li>
+			</ol>
+
+  ## Tabs
+
+			<ul class="nav nav-tabs" id="headertabs">
+	#if ($current_page == "executing")
+				<li id="jobLogViewLink"><a href="#logs">Job Logs</a></li>
+				<li id="jobSummaryViewLink"><a href="#summary">Summary</a></li>
+	#else
+				<li id="jobLogViewLink"><a href="${context}/executor?execid=${execid}&job=${jobid}#logs">Job Logs</a></li>
+				<li id="jobSummaryViewLink"><a href="${context}/executor?execid=${execid}&job=${jobid}#summary">Summary</a></li>
+	#end
+	#foreach ($jobViewerPlugin in $jobViewerPlugins)
+				<li#if($current_page == $jobViewerPlugin.pluginName) class="active"#end><a href="$!context/${jobViewerPlugin.pluginPath}?execid=${execid}&jobid=${jobid}">$jobViewerPlugin.pluginName</a></li>
+	#end
+			</ul>
+    </div>
diff --git a/src/java/azkaban/webapp/servlet/velocity/jobdetailspage.vm b/src/java/azkaban/webapp/servlet/velocity/jobdetailspage.vm
index ad579ce..f551245 100644
--- a/src/java/azkaban/webapp/servlet/velocity/jobdetailspage.vm
+++ b/src/java/azkaban/webapp/servlet/velocity/jobdetailspage.vm
@@ -47,44 +47,8 @@
   #parse ("azkaban/webapp/servlet/velocity/errormsg.vm")
 #else
 
-	## Page header.
 
-		<div class="az-page-header">
-			<div class="container-full">
-				<div class="row">
-					<div class="col-xs-6">
-						<h1><a href="${context}/executor?execid=${execid}&job=${jobid}">Job Execution <small>$jobid</small></a></h1>
-					</div>
-					<div class="col-xs-6">
-						<div class="pull-right az-page-header-form">
-							<a href="${context}/manager?project=${projectName}&flow=${flowid}&job=$jobid" class="btn btn-info">Job Properties</a>
-						</div>
-					</div>
-				</div>
-			</div>
-		</div>
-	
-    <div class="container-full">
-
-  #parse ("azkaban/webapp/servlet/velocity/alerts.vm")
-
-  ## Breadcrumb
-
-			<ol class="breadcrumb">
-				<li><a href="${context}/manager?project=${projectName}"><strong>Project</strong> $projectName</a></li>
-				<li><a href="${context}/manager?project=${projectName}&flow=${flowid}"><strong>Flow</strong> $flowid</a></li>
-				<li><a href="${context}/executor?execid=${execid}#jobslist"><strong>Execution</strong> $execid</a></li>
-        <li class="active"><strong>Job</strong> $jobid</li>
-			</ol>
-
-  ## Tabs
-
-			<ul class="nav nav-tabs" id="headertabs">
-				<li id="jobLogViewLink"><a href="#logs">Job Logs</a></li>
-				<li id="jobSummaryViewLink"><a href="#summary">Summary</a></li>
-				<li><a href="${context}/pigvisualizer?execid=${execid}&jobid=${jobid}">Visualization</a></li>
-			</ul>
-    </div>
+  #parse ("azkaban/webapp/servlet/velocity/jobdetailsheader.vm")
 
 	## Log content.