azkaban-uncached

Stripped executor and the hdfs browser out of Azkaban. Both

1/15/2013 11:17:03 PM

Changes

.classpath 7(+2 -5)

lib/avro-1.4.1.jar 0(+0 -0)

lib/hadoop-core-1.0.2-p1.jar 0(+0 -0)

lib/pig-0.9.1-withouthadoop.jar 0(+0 -0)

lib/voldemort-0.96.jar 0(+0 -0)

src/java/azkaban/fsviewers/HdfsAvroFileViewer.java 120(+0 -120)

src/java/azkaban/fsviewers/HdfsFileViewer.java 35(+0 -35)

src/java/azkaban/fsviewers/HdfsSequenceFileViewer.java 67(+0 -67)

src/java/azkaban/fsviewers/JsonSequenceFileViewer.java 85(+0 -85)

src/java/azkaban/fsviewers/TextFileViewer.java 82(+0 -82)

src/java/azkaban/jobExecutor/PigProcessJob.java 179(+0 -179)

src/java/azkaban/jobExecutor/SecurePigWrapper.java 101(+0 -101)

src/java/azkaban/jobtype/JobtypeManager.java 222(+0 -222)

src/java/azkaban/security/DefaultHadoopSecurityManager.java 69(+0 -69)

src/java/azkaban/security/HadoopSecurityManager.java 65(+0 -65)

src/java/azkaban/security/HadoopSecurityManager_H_1_0_2.java 271(+0 -271)

src/java/azkaban/security/SecurityManagerException.java 13(+0 -13)

src/java/azkaban/utils/SecurityUtils.java 175(+0 -175)

src/java/azkaban/webapp/servlet/HdfsBrowserServlet.java 287(+0 -287)

src/java/azkaban/webapp/servlet/velocity/hdfsbrowserpage.vm 137(+0 -137)

Details

.classpath 7(+2 -5)

diff --git a/.classpath b/.classpath
index 12ee55d..ed3781d 100644
--- a/.classpath
+++ b/.classpath
@@ -23,14 +23,11 @@
 	<classpathentry kind="lib" path="lib/commons-io-2.4.jar"/>
 	<classpathentry kind="lib" path="lib/httpcore-4.2.1.jar"/>
 	<classpathentry kind="lib" path="lib/commons-logging-1.1.1.jar"/>
-	<classpathentry kind="lib" path="lib/hadoop-core-1.0.2-p1.jar"/>
 	<classpathentry kind="lib" path="lib/guava-13.0.1.jar"/>
-	<classpathentry kind="lib" path="lib/pig-0.9.1-withouthadoop.jar"/>
-	<classpathentry kind="lib" path="plugins/ldap/li-azkaban-ldap.jar"/>
 	<classpathentry kind="lib" path="lib/commons-email-1.2.jar"/>
 	<classpathentry kind="lib" path="lib/mail-1.4.5.jar"/>
-	<classpathentry kind="lib" path="lib/avro-1.4.1.jar"/>
 	<classpathentry kind="lib" path="lib/commons-configuration-1.8.jar"/>
-	<classpathentry kind="lib" path="lib/voldemort-0.96.jar"/>
+	<classpathentry kind="lib" path="lib/commons-dbutils-1.5.jar"/>
+	<classpathentry kind="lib" path="lib/commons-dbcp-1.4.jar"/>
 	<classpathentry kind="output" path="dist/classes"/>
 </classpath>
diff --git a/src/java/azkaban/execapp/FlowRunner.java b/src/java/azkaban/execapp/FlowRunner.java
index beab501..6712ba8 100644
--- a/src/java/azkaban/execapp/FlowRunner.java
+++ b/src/java/azkaban/execapp/FlowRunner.java
@@ -32,8 +32,7 @@ import azkaban.executor.ExecutableNode;
 import azkaban.executor.ExecutorLoader;
 import azkaban.executor.ExecutorManagerException;
 import azkaban.flow.FlowProps;
-import azkaban.jobtype.JobtypeManager;
-import azkaban.security.HadoopSecurityManager;
+import azkaban.jobtype.JobTypeManager;
 import azkaban.utils.Props;
 
 public class FlowRunner extends EventHandler implements Runnable {
@@ -59,8 +58,7 @@ public class FlowRunner extends EventHandler implements Runnable {
 	private Map<String, Props> jobOutputProps = new HashMap<String, Props>();
 	
 	private Props globalProps;
-	private final JobtypeManager jobtypeManager;
-	private final HadoopSecurityManager hadoopSecurityManager;
+	private final JobTypeManager jobtypeManager;
 	
 	private JobRunnerEventListener listener = new JobRunnerEventListener();
 	private BlockingQueue<JobRunner> jobsToRun = new LinkedBlockingQueue<JobRunner>();
@@ -74,14 +72,13 @@ public class FlowRunner extends EventHandler implements Runnable {
 	private boolean flowFinished = false;
 	private boolean flowCancelled = false;
 	
-	public FlowRunner(ExecutableFlow flow, ExecutorLoader executorLoader, JobtypeManager jobtypeManager, HadoopSecurityManager hadoopSecurityManager) throws ExecutorManagerException {
+	public FlowRunner(ExecutableFlow flow, ExecutorLoader executorLoader, JobTypeManager jobtypeManager) throws ExecutorManagerException {
 		this.execId = flow.getExecutionId();
 		this.flow = flow;
 		this.executorLoader = executorLoader;
 		this.executorService = Executors.newFixedThreadPool(numThreads);
 		this.execDir = new File(flow.getExecutionPath());
 		this.jobtypeManager = jobtypeManager;
-		this.hadoopSecurityManager = hadoopSecurityManager;
 	}
 
 	public FlowRunner setGlobalProps(Props globalProps) {
@@ -316,7 +313,8 @@ public class FlowRunner extends EventHandler implements Runnable {
 			logger.error("Error loading job file " + source + " for job " + node.getJobId());
 		}
 		
-		JobRunner jobRunner = new JobRunner(node, prop, path.getParentFile(), executorLoader, jobtypeManager, hadoopSecurityManager);
+		// should have one prop with system secrets, the other user level props
+		JobRunner jobRunner = new JobRunner(node, new Props(), prop, path.getParentFile(), executorLoader, jobtypeManager);
 		jobRunner.addListener(listener);
 
 		return jobRunner;
@@ -510,8 +508,6 @@ public class FlowRunner extends EventHandler implements Runnable {
 					queueNextJobs(node);
 				}
 				
-				runner.cleanup();
-
 				if (isFlowFinished()) {
 					logger.info("Flow appears finished. Cleaning up.");
 					flowFinished = true;
diff --git a/src/java/azkaban/execapp/FlowRunnerManager.java b/src/java/azkaban/execapp/FlowRunnerManager.java
index 74e5ce4..3cb9978 100644
--- a/src/java/azkaban/execapp/FlowRunnerManager.java
+++ b/src/java/azkaban/execapp/FlowRunnerManager.java
@@ -40,10 +40,8 @@ import azkaban.executor.ExecutableFlow;
 import azkaban.executor.ExecutableFlow.Status;
 import azkaban.executor.ExecutorLoader;
 import azkaban.executor.ExecutorManagerException;
-import azkaban.jobtype.JobtypeManager;
+import azkaban.jobtype.JobTypeManager;
 
-import azkaban.security.DefaultHadoopSecurityManager;
-import azkaban.security.HadoopSecurityManager;
 import azkaban.utils.FileIOUtils;
 import azkaban.utils.FileIOUtils.LogData;
 import azkaban.utils.Pair;
@@ -76,8 +74,7 @@ public class FlowRunnerManager implements EventListener {
 	private ExecutorLoader executorLoader;
 	private ProjectLoader projectLoader;
 	
-	private JobtypeManager jobtypeManager;
-	private HadoopSecurityManager hadoopSecurityManager;
+	private JobTypeManager jobtypeManager;
 	
 	private Props globalProps;
 	
@@ -107,35 +104,8 @@ public class FlowRunnerManager implements EventListener {
 		cleanerThread = new CleanerThread();
 		cleanerThread.start();
 		
-		jobtypeManager = new JobtypeManager(props.getString(AzkabanExecutorServer.JOBTYPE_PLUGIN_DIR, null), parentClassLoader);
+		jobtypeManager = new JobTypeManager(props.getString(AzkabanExecutorServer.JOBTYPE_PLUGIN_DIR, null), parentClassLoader);
 		
-		hadoopSecurityManager = loadHadoopSecurityManager(props);
-		
-	}
-
-	private HadoopSecurityManager loadHadoopSecurityManager(Props props) {
-		
-		Class<?> hadoopSecurityManagerClass = props.getClass(HADOOP_SECURITY_MANAGER_CLASS_PARAM, null);
-		logger.info("Loading hadoop security manager class " + hadoopSecurityManagerClass.getName());
-		HadoopSecurityManager hadoopSecurityManager = null;
-
-		if (hadoopSecurityManagerClass != null && hadoopSecurityManagerClass.getConstructors().length > 0) {
-
-			try {
-				Constructor<?> hsmConstructor = hadoopSecurityManagerClass.getConstructor(Props.class);
-				hadoopSecurityManager = (HadoopSecurityManager) hsmConstructor.newInstance(props);
-			} 
-			catch (Exception e) {
-				logger.error("Could not instantiate Hadoop Security Manager "+ hadoopSecurityManagerClass.getName());
-				throw new RuntimeException(e);
-			}
-		} 
-		else {
-			hadoopSecurityManager = new DefaultHadoopSecurityManager();
-		}
-
-		return hadoopSecurityManager;
-
 	}
 
 	public Props getGlobalProps() {
@@ -274,7 +244,7 @@ public class FlowRunnerManager implements EventListener {
 		setupFlow(flow);
 		
 		// Setup flow runner
-		FlowRunner runner = new FlowRunner(flow, executorLoader, jobtypeManager, hadoopSecurityManager);
+		FlowRunner runner = new FlowRunner(flow, executorLoader, jobtypeManager);
 		runner.setGlobalProps(globalProps);
 		runner.addListener(this);
 		
diff --git a/src/java/azkaban/execapp/JobRunner.java b/src/java/azkaban/execapp/JobRunner.java
index 298fa6c..66a8a76 100644
--- a/src/java/azkaban/execapp/JobRunner.java
+++ b/src/java/azkaban/execapp/JobRunner.java
@@ -18,7 +18,6 @@ package azkaban.execapp;
 import java.io.File;
 import java.io.IOException;
 
-import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.log4j.Appender;
 import org.apache.log4j.FileAppender;
 import org.apache.log4j.Layout;
@@ -35,17 +34,16 @@ import azkaban.executor.ExecutorLoader;
 import azkaban.executor.ExecutorManagerException;
 import azkaban.jobExecutor.AbstractProcessJob;
 import azkaban.jobExecutor.Job;
-import azkaban.jobtype.JobtypeManager;
+import azkaban.jobtype.JobTypeManager;
+import azkaban.jobtype.JobTypeManagerException;
 
-import azkaban.security.HadoopSecurityManager;
-import azkaban.security.SecurityManagerException;
 import azkaban.utils.Props;
-import azkaban.utils.SecurityUtils;
 
 public class JobRunner extends EventHandler implements Runnable {
 	private static final Layout DEFAULT_LAYOUT = new PatternLayout("%d{dd-MM-yyyy HH:mm:ss z} %c{1} %p - %m\n");
 
 	private ExecutorLoader loader;
+	private Props sysProps;
 	private Props props;
 	private Props outputProps;
 	private ExecutableNode node;
@@ -62,17 +60,16 @@ public class JobRunner extends EventHandler implements Runnable {
 	private static final Object logCreatorLock = new Object();
 	private Object syncObject = new Object();
 	
-	private final JobtypeManager jobtypeManager;
-	private final HadoopSecurityManager hadoopSecurityManager;
+	private final JobTypeManager jobtypeManager;
 
-	public JobRunner(ExecutableNode node, Props props, File workingDir, ExecutorLoader loader, JobtypeManager jobtypeManager, HadoopSecurityManager hadoopSecurityManager) {
+	public JobRunner(ExecutableNode node, Props sysProps, Props props, File workingDir, ExecutorLoader loader, JobTypeManager jobtypeManager) {
+		this.sysProps = sysProps;
 		this.props = props;
 		this.node = node;
 		this.workingDir = workingDir;
 		this.executionId = node.getExecutionId();
 		this.loader = loader;
 		this.jobtypeManager = jobtypeManager;
-		this.hadoopSecurityManager = hadoopSecurityManager;
 	}
 	
 	public ExecutableNode getNode() {
@@ -186,7 +183,7 @@ public class JobRunner extends EventHandler implements Runnable {
 		this.fireEventListeners(event);
 	}
 	
-	private boolean prepareJob() {
+	private boolean prepareJob() throws RuntimeException{
 		// Check pre conditions
 		if (props == null) {
 			node.setStatus(Status.FAILED);
@@ -201,43 +198,15 @@ public class JobRunner extends EventHandler implements Runnable {
 
 			logInfo("Starting job " + node.getJobId() + " at " + node.getStartTime());
 			node.setStatus(Status.RUNNING);
-	
+			props.put(AbstractProcessJob.JOB_FULLPATH, props.getSource());
 			props.put(AbstractProcessJob.WORKING_DIR, workingDir.getAbsolutePath());
 			//job = JobWrappingFactory.getJobWrappingFactory().buildJobExecutor(node.getJobId(), props, logger);
-			job = jobtypeManager.buildJobExecutor(node.getJobId(), props, logger);
-			if(props.containsKey(HadoopSecurityManager.TO_PROXY)) {
-				try{
-					getHadoopTokens(props);
-				}
-				catch (Exception e) {
-					logger.error("Failed to get Hadoop tokens. " + e);
-				}
-			}
+			job = jobtypeManager.buildJobExecutor(node.getJobId(), sysProps, props, logger);
 		}
 		
 		return true;
 	}
 
-	protected void getHadoopTokens(Props props) {
-
-		File tokenFile = null;
-		try {
-			tokenFile = File.createTempFile("mr-azkaban", ".token");
-		} catch (IOException e) {
-			// TODO Auto-generated catch block
-			e.printStackTrace();
-		}
-		
-		try {
-			hadoopSecurityManager.prefetchToken(tokenFile, props.getString(HadoopSecurityManager.TO_PROXY));
-		} catch (SecurityManagerException e) {
-			e.printStackTrace();
-		}
-		
-		props.put("env."+UserGroupInformation.HADOOP_TOKEN_FILE_LOCATION, tokenFile.getAbsolutePath());
-		
-	}
-	
 	private void runJob() {
 		try {
 			job.run();
@@ -298,20 +267,4 @@ public class JobRunner extends EventHandler implements Runnable {
 		return logFile;
 	}
 
-	public void cleanup() {
-		// clean up the tokens.
-		try{
-			if(props.containsKey("env."+UserGroupInformation.HADOOP_TOKEN_FILE_LOCATION)) {
-				String tokenFile = props.getString("env."+UserGroupInformation.HADOOP_TOKEN_FILE_LOCATION);
-				if(tokenFile != null) {
-					File f = new File(tokenFile);
-					f.delete();
-				}
-			}
-		}
-		catch(Exception e) {
-			logger.error("Failed to clean up hadoop token file.");
-		}
-		
-	}
 }
diff --git a/src/java/azkaban/executor/ExecutorMailer.java b/src/java/azkaban/executor/ExecutorMailer.java
index 560df54..75b2723 100644
--- a/src/java/azkaban/executor/ExecutorMailer.java
+++ b/src/java/azkaban/executor/ExecutorMailer.java
@@ -120,7 +120,7 @@ public class ExecutorMailer {
 				message.println("<li><a href=\"" + executionUrl + "&job=" + jobId + "\">Failed job '" + jobId + "' Link</a></li>" );
 			}
 			for (String reasons: extraReasons) {
-				message.println("<li>" + extraReasons + "</li>");
+				message.println("<li>" + reasons + "</li>");
 			}
 			
 			message.println("</ul>");
diff --git a/src/java/azkaban/executor/ExecutorManager.java b/src/java/azkaban/executor/ExecutorManager.java
index e0a9604..d278d59 100644
--- a/src/java/azkaban/executor/ExecutorManager.java
+++ b/src/java/azkaban/executor/ExecutorManager.java
@@ -17,7 +17,6 @@
 package azkaban.executor;
 
 import java.io.IOException;
-import java.io.Writer;
 import java.net.URI;
 import java.net.URISyntaxException;
 import java.util.ArrayList;
diff --git a/src/java/azkaban/jobExecutor/AbstractProcessJob.java b/src/java/azkaban/jobExecutor/AbstractProcessJob.java
index 7fa242a..f6a49c1 100644
--- a/src/java/azkaban/jobExecutor/AbstractProcessJob.java
+++ b/src/java/azkaban/jobExecutor/AbstractProcessJob.java
@@ -48,24 +48,35 @@ public abstract class AbstractProcessJob extends AbstractJob {
 
 	protected final String _jobPath;
 
-	protected volatile Props _props;
+	protected volatile Props jobProps;
+	protected volatile Props sysProps;
 
 	protected String _cwd;
 
 	private volatile Props generatedPropeties;
 
-	protected AbstractProcessJob(String jobid, final Props props, final Logger log) {
+	protected AbstractProcessJob(String jobid, final Props sysProps, final Props jobProps, final Logger log) {
 		super(jobid, log);
 
-		_props = props;
-		_jobPath = props.getString(JOB_FULLPATH, props.getSource());
+		this.jobProps = jobProps;
+		this.sysProps = sysProps;
+		_jobPath = jobProps.getString(JOB_FULLPATH, jobProps.getSource());
 
 		_cwd = getWorkingDirectory();
 		this.log = log;
 	}
 
+	@Deprecated
 	public Props getProps() {
-		return _props;
+		return jobProps;
+	}
+	
+	public Props getJobProps()	{
+		return jobProps;
+	}
+	
+	public Props getSysProps()	{
+		return sysProps;
 	}
 
 	public String getJobPath() {
@@ -73,7 +84,7 @@ public abstract class AbstractProcessJob extends AbstractJob {
 	}
 
 	protected void resolveProps() {
-		_props = PropsUtils.resolveProps(_props);
+		jobProps = PropsUtils.resolveProps(jobProps);
 	}
 
 	@Override
@@ -91,11 +102,11 @@ public abstract class AbstractProcessJob extends AbstractJob {
 		File[] files = new File[2];
 		files[0] = createFlattenedPropsFile(_cwd);
 
-		_props.put(ENV_PREFIX + JOB_PROP_ENV, files[0].getAbsolutePath());
-		_props.put(ENV_PREFIX + JOB_NAME_ENV, getId());
+		jobProps.put(ENV_PREFIX + JOB_PROP_ENV, files[0].getAbsolutePath());
+		jobProps.put(ENV_PREFIX + JOB_NAME_ENV, getId());
 
 		files[1] = createOutputPropsFile(getId(), _cwd);
-		_props.put(ENV_PREFIX + JOB_OUTPUT_PROP_FILE,
+		jobProps.put(ENV_PREFIX + JOB_OUTPUT_PROP_FILE,
 				files[1].getAbsolutePath());
 
 		return files;
@@ -158,7 +169,7 @@ public abstract class AbstractProcessJob extends AbstractJob {
 		File tempFile = null;
 		try {
 			tempFile = File.createTempFile(getId() + "_", "_tmp", directory);
-			_props.storeFlattened(tempFile);
+			jobProps.storeFlattened(tempFile);
 		} catch (IOException e) {
 			throw new RuntimeException("Failed to create temp property file ", e);
 		}
diff --git a/src/java/azkaban/jobExecutor/JavaProcessJob.java b/src/java/azkaban/jobExecutor/JavaProcessJob.java
index 1b3d115..0009e32 100644
--- a/src/java/azkaban/jobExecutor/JavaProcessJob.java
+++ b/src/java/azkaban/jobExecutor/JavaProcessJob.java
@@ -40,8 +40,8 @@ public class JavaProcessJob extends ProcessJob {
 
 	public static String JAVA_COMMAND = "java";
 
-	public JavaProcessJob(String jobid, Props prop, Logger logger) {
-		super(jobid, prop, logger);
+	public JavaProcessJob(String jobid, Props sysProps, Props jobProps, Logger logger) {
+		super(jobid, sysProps, jobProps, logger);
 	}
 
 	@Override
diff --git a/src/java/azkaban/jobExecutor/LongArgJob.java b/src/java/azkaban/jobExecutor/LongArgJob.java
index 425ef57..4d28f08 100644
--- a/src/java/azkaban/jobExecutor/LongArgJob.java
+++ b/src/java/azkaban/jobExecutor/LongArgJob.java
@@ -39,13 +39,13 @@ public abstract class LongArgJob extends AbstractProcessJob {
     private final AzkabanProcessBuilder builder;
     private volatile AzkabanProcess process;
 
-    public LongArgJob(String jobid, String[] command, Props prop, Logger log) {
-        this(jobid, command, prop, log, new HashSet<String>(0));
+    public LongArgJob(String jobid, String[] command, Props sysProps, Props jobProps, Logger log) {
+        this(jobid, command, sysProps, jobProps, log, new HashSet<String>(0));
     }
     
-    public LongArgJob(String jobid, String[] command, Props prop, Logger log, Set<String> suppressedKeys) {
+    public LongArgJob(String jobid, String[] command, Props sysProps, Props jobProp, Logger log, Set<String> suppressedKeys) {
         //super(command, desc);
-         super(jobid, prop, log);
+         super(jobid, sysProps, jobProp, log);
         //String cwd = descriptor.getProps().getString(WORKING_DIR, new File(descriptor.getFullPath()).getParent());
        
         this.builder = new AzkabanProcessBuilder(command).
diff --git a/src/java/azkaban/jobExecutor/ProcessJob.java b/src/java/azkaban/jobExecutor/ProcessJob.java
index 8d42a4d..50233c0 100644
--- a/src/java/azkaban/jobExecutor/ProcessJob.java
+++ b/src/java/azkaban/jobExecutor/ProcessJob.java
@@ -39,8 +39,8 @@ public class ProcessJob extends AbstractProcessJob {
 	private static final long KILL_TIME_MS = 5000;
 	private volatile AzkabanProcess process;
 
-	public ProcessJob(final String jobId, final Props props, final Logger log) {
-		super(jobId, props, log);
+	public ProcessJob(final String jobId, final Props sysProps, final Props jobProps, final Logger log) {
+		super(jobId, sysProps, jobProps, log);
 	}
 
 	@Override
@@ -61,12 +61,12 @@ public class ProcessJob extends AbstractProcessJob {
 		Map<String, String> envVars = getEnvironmentVariables();
 
 		for (String command : commands) {
+			info("Command: " + command);
 			AzkabanProcessBuilder builder = new AzkabanProcessBuilder(partitionCommandLine(command))
 					.setEnv(envVars)
 					.setWorkingDir(getCwd())
 					.setLogger(getLog());
-
-			info("Command: " + builder.getCommandString());
+			
 			if (builder.getEnv().size() > 0) {
 				info("Environment variables: " + builder.getEnv());
 			}
@@ -97,9 +97,9 @@ public class ProcessJob extends AbstractProcessJob {
 
 	protected List<String> getCommandList() {
 		List<String> commands = new ArrayList<String>();
-		commands.add(_props.getString(COMMAND));
-		for (int i = 1; _props.containsKey(COMMAND + "." + i); i++) {
-			commands.add(_props.getString(COMMAND + "." + i));
+		commands.add(jobProps.getString(COMMAND));
+		for (int i = 1; jobProps.containsKey(COMMAND + "." + i); i++) {
+			commands.add(jobProps.getString(COMMAND + "." + i));
 		}
 
 		return commands;
@@ -127,7 +127,7 @@ public class ProcessJob extends AbstractProcessJob {
 
 	@Override
 	public Props getProps() {
-		return _props;
+		return jobProps;
 	}
 
 	public String getPath() {
diff --git a/src/java/azkaban/jobExecutor/PythonJob.java b/src/java/azkaban/jobExecutor/PythonJob.java
index 6a66965..8989ff5 100644
--- a/src/java/azkaban/jobExecutor/PythonJob.java
+++ b/src/java/azkaban/jobExecutor/PythonJob.java
@@ -26,10 +26,11 @@ public class PythonJob extends LongArgJob {
 	private static final String PYTHON_BINARY_KEY = "python";
 	private static final String SCRIPT_KEY = "script";
 
-	public PythonJob(String jobid, Props props, Logger log) {
+	public PythonJob(String jobid, Props sysProps, Props jobProps, Logger log) {
 		super(jobid, 
-				new String[] { props.getString(PYTHON_BINARY_KEY, "python"),props.getString(SCRIPT_KEY) }, 
-				props, 
+				new String[] { jobProps.getString(PYTHON_BINARY_KEY, "python"),jobProps.getString(SCRIPT_KEY) }, 
+				sysProps, 
+				jobProps, 
 				log, 
 				ImmutableSet.of(PYTHON_BINARY_KEY, SCRIPT_KEY, JOB_TYPE));
 	}
diff --git a/src/java/azkaban/jobExecutor/RubyJob.java b/src/java/azkaban/jobExecutor/RubyJob.java
index 497d203..c913e97 100644
--- a/src/java/azkaban/jobExecutor/RubyJob.java
+++ b/src/java/azkaban/jobExecutor/RubyJob.java
@@ -26,10 +26,11 @@ public class RubyJob extends LongArgJob {
 	private static final String RUBY_BINARY_KEY = "ruby";
 	private static final String SCRIPT_KEY = "script";
 
-	public RubyJob(String jobid, Props props, Logger log) {
+	public RubyJob(String jobid, Props sysProps, Props jobProps, Logger log) {
 		super(jobid, 
-				new String[] { props.getString(RUBY_BINARY_KEY, "ruby"), props.getString(SCRIPT_KEY) }, 
-				props, 
+				new String[] { jobProps.getString(RUBY_BINARY_KEY, "ruby"), jobProps.getString(SCRIPT_KEY) }, 
+				sysProps,
+				jobProps,
 				log, 
 				ImmutableSet.of(RUBY_BINARY_KEY, SCRIPT_KEY, JOB_TYPE));
 	}
diff --git a/src/java/azkaban/jobExecutor/ScriptJob.java b/src/java/azkaban/jobExecutor/ScriptJob.java
index 5ed0e04..52623e7 100644
--- a/src/java/azkaban/jobExecutor/ScriptJob.java
+++ b/src/java/azkaban/jobExecutor/ScriptJob.java
@@ -32,10 +32,11 @@ public class ScriptJob extends LongArgJob {
 	private static final String DEFAULT_EXECUTABLE_KEY = "executable";
 	private static final String SCRIPT_KEY = "script";
 
-	public ScriptJob(String jobid, Props props, Logger log) {
+	public ScriptJob(String jobid, Props sysProps, Props jobProps, Logger log) {
 		super(jobid, 
-				new String[] { props.getString(DEFAULT_EXECUTABLE_KEY),props.getString(SCRIPT_KEY) }, 
-				props, 
+				new String[] { jobProps.getString(DEFAULT_EXECUTABLE_KEY), jobProps.getString(SCRIPT_KEY) }, 
+				sysProps,
+				jobProps,
 				log, 
 				ImmutableSet.of(DEFAULT_EXECUTABLE_KEY, SCRIPT_KEY, JOB_TYPE));
 	}
diff --git a/src/java/azkaban/jobtype/JobTypeManager.java b/src/java/azkaban/jobtype/JobTypeManager.java
new file mode 100644
index 0000000..845f3e9
--- /dev/null
+++ b/src/java/azkaban/jobtype/JobTypeManager.java
@@ -0,0 +1,290 @@
+package azkaban.jobtype;
+
+/*
+ * Copyright 2012 LinkedIn, Inc
+ * 
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+import azkaban.jobExecutor.JavaProcessJob;
+import azkaban.jobExecutor.Job;
+import azkaban.jobExecutor.NoopJob;
+import azkaban.jobExecutor.ProcessJob;
+import azkaban.jobExecutor.PythonJob;
+import azkaban.jobExecutor.RubyJob;
+import azkaban.jobExecutor.ScriptJob;
+import azkaban.utils.Props;
+import azkaban.utils.PropsUtils;
+import azkaban.utils.Utils;
+import azkaban.jobExecutor.utils.InitErrorJob;
+import azkaban.jobExecutor.utils.JobExecutionException;
+import java.io.File;
+import java.net.MalformedURLException;
+import java.net.URL;
+import java.net.URLClassLoader;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.log4j.Logger;
+
+public class JobTypeManager 
+{
+	
+	private final String jobtypePluginDir;	// the dir for jobtype plugins
+	private final ClassLoader parentLoader; 
+	
+	private static final String jobtypeConfFile = "plugin.properties"; // need jars.to.include property, will be loaded with user property
+	private static final String jobtypeSysConfFile = "private.properties"; // not exposed to users
+	private static final String commonConfFile = "types.properties";	// common properties for multiple plugins
+	private static final String commonSysConfFile = "typesprivate.properties"; // common private properties for multiple plugins
+	private static final Logger logger = Logger.getLogger(JobTypeManager.class);
+	
+	private Map<String, Class<? extends Job>> jobToClass;
+	private Map<String, Props> jobtypeJobProps;
+	private Map<String, Props> jobtypeSysProps;
+
+	public JobTypeManager(String jobtypePluginDir, ClassLoader parentClassLoader)
+	{
+		this.jobtypePluginDir = jobtypePluginDir;
+		this.parentLoader = parentClassLoader;
+		
+		jobToClass = new HashMap<String, Class<? extends Job>>();
+		jobtypeJobProps = new HashMap<String, Props>();
+		jobtypeSysProps = new HashMap<String, Props>();
+		
+		loadDefaultTypes();
+		
+		if(jobtypePluginDir != null) {
+			logger.info("job type plugin directory set. Loading extra job types.");
+			loadPluginJobTypes();
+		}
+		
+	}
+
+	private void loadDefaultTypes() throws JobTypeManagerException{
+//		jobToClass.put("java", JavaJob.class);
+		jobToClass.put("command", ProcessJob.class);
+		jobToClass.put("javaprocess", JavaProcessJob.class);
+//		jobToClass.put("pig", PigProcessJob.class);
+		jobToClass.put("propertyPusher", NoopJob.class);
+		jobToClass.put("python", PythonJob.class);
+		jobToClass.put("ruby", RubyJob.class);
+		jobToClass.put("script", ScriptJob.class);	
+//		jobToClass.put("myjobtype", MyTestJobType.class);
+		
+	}
+
+	// load Job Typs from dir
+	private void loadPluginJobTypes() throws JobTypeManagerException
+	{
+		if(jobtypePluginDir == null || parentLoader == null) throw new JobTypeManagerException("JobTypeDir not set! JobTypeManager not properly initiated!");
+		
+		File jobPluginsDir = new File(jobtypePluginDir);
+		if(!jobPluginsDir.isDirectory()) throw new JobTypeManagerException("Job type plugin dir " + jobtypePluginDir + " is not a directory!");
+		if(!jobPluginsDir.canRead()) throw new JobTypeManagerException("Job type plugin dir " + jobtypePluginDir + " is not readable!");
+		
+		// look for global conf
+		Props globalConf = null;
+		Props globalSysConf = null;
+		File confFile = findFilefromDir(jobPluginsDir, commonConfFile);
+		File sysConfFile = findFilefromDir(jobPluginsDir, commonSysConfFile);
+		try {
+			if(confFile != null) {
+				globalConf = new Props(null, confFile);
+			}
+			if(sysConfFile != null) {
+				globalSysConf = new Props(null, sysConfFile);
+			}
+		}
+		catch (Exception e) {
+			throw new JobTypeManagerException("Failed to get global jobtype properties" + e.getCause());
+		}
+		
+		for(File dir : jobPluginsDir.listFiles()) {
+			if(dir.isDirectory() && dir.canRead()) {
+				// get its conf file
+				try {
+					loadJobType(dir, globalConf, globalSysConf);
+				}
+				catch (Exception e) {
+					throw new JobTypeManagerException(e);
+				}
+			}
+		}
+
+	}
+	
+	public static File findFilefromDir(File dir, String fn){
+		if(dir.isDirectory()) {
+			for(File f : dir.listFiles()) {
+				if(f.getName().equals(fn)) {
+					return f;
+				}
+			}
+		}
+		return null;
+	}
+	
+	public void loadJobType(File dir, Props globalConf, Props globalSysConf) throws JobTypeManagerException{
+		
+		// look for common conf
+		Props conf = null;
+		Props sysConf = null;
+		File confFile = findFilefromDir(dir, commonConfFile);
+		File sysConfFile = findFilefromDir(dir, commonSysConfFile);
+		
+		try {
+			if(confFile != null) {
+				conf = new Props(globalConf, confFile);
+			}
+			else {
+				conf = globalConf;
+			}
+			if(sysConfFile != null) {
+				sysConf = new Props(globalSysConf, sysConfFile);
+			}
+			else {
+				sysConf = globalSysConf;
+			}
+		}
+		catch (Exception e) {
+			throw new JobTypeManagerException("Failed to get common jobtype properties" + e.getCause());
+		}
+		
+		// look for jobtypeConf.properties and load it 		
+		for(File f: dir.listFiles()) {
+			if(f.isFile() && f.getName().equals(jobtypeSysConfFile)) {
+				loadJob(dir, f, conf, sysConf);
+				return;
+			}
+		}
+
+		// no hit, keep looking
+		for(File f : dir.listFiles()) {
+			if(f.isDirectory() && f.canRead())
+				loadJobType(f, conf, sysConf);
+		}
+		
+	}
+	
+	@SuppressWarnings("unchecked")
+	public void loadJob(File dir, File jobConfFile, Props commonConf, Props commonSysConf) throws JobTypeManagerException{
+		Props conf = null;
+		Props sysConf = null;
+		File confFile = findFilefromDir(dir, jobtypeConfFile);
+		File sysConfFile = findFilefromDir(dir, jobtypeSysConfFile);
+		
+		try {
+			if(confFile != null) {
+				conf = new Props(commonConf, confFile);
+				conf = PropsUtils.resolveProps(conf);
+			}
+			if(sysConfFile != null) {
+				sysConf = new Props(commonSysConf, sysConfFile);
+				sysConf = PropsUtils.resolveProps(sysConf);
+			}
+		}
+		catch (Exception e) {
+			throw new JobTypeManagerException("Failed to get jobtype properties", e);
+		}
+		
+		// use directory name as job type name
+		String jobtypeName = dir.getName();
+		
+		String jobtypeClass = sysConf.get("jobtype.class");
+		
+		logger.info("Loading jobtype " + jobtypeName );
+
+		// sysconf says what jars/confs to load
+		List<String> jobtypeClasspath = sysConf.getStringList("jobtype.classpath", null, ",");
+		List<URL> resources = new ArrayList<URL>();		
+		for(String s : jobtypeClasspath) {
+			try {
+				File path = new File(s);
+				if(path.isDirectory()) {
+					for(File f : path.listFiles()) {
+						resources.add(f.toURI().toURL());
+						logger.info("adding to classpath " + f);
+					}
+				}
+				resources.add(path.toURI().toURL());
+				logger.info("adding to classpath " + path);
+			} catch (MalformedURLException e) {
+				// TODO Auto-generated catch block
+				throw new JobTypeManagerException(e);
+			}
+		}
+		
+		// each job type can have a different class loader
+		ClassLoader jobTypeLoader = new URLClassLoader(resources.toArray(new URL[resources.size()]), parentLoader);
+		
+		Class<? extends Job> clazz = null;
+		try {
+			clazz = (Class<? extends Job>)jobTypeLoader.loadClass(jobtypeClass);
+			jobToClass.put(jobtypeName, clazz);
+		}
+		catch (ClassNotFoundException e) {
+			throw new JobTypeManagerException(e);
+		}
+		logger.info("Loaded jobtype " + jobtypeName + " " + jobtypeClass);
+		
+		if(conf != null) jobtypeJobProps.put(jobtypeName, conf);
+		jobtypeSysProps.put(jobtypeName, sysConf);
+		
+	}
+	
+	public Job buildJobExecutor(String jobId, Props sysProps, Props jobProps, Logger logger) throws JobTypeManagerException
+	{
+
+		Job job;
+		try {
+			String jobType = jobProps.getString("type");
+			if (jobType == null || jobType.length() == 0) {
+				/*throw an exception when job name is null or empty*/
+				throw new JobExecutionException (
+						String.format("The 'type' parameter for job[%s] is null or empty", jobProps, logger));
+			}
+			
+			logger.info("Building " + jobType + " job executor. ");		
+			Class<? extends Object> executorClass = jobToClass.get(jobType);
+
+			if (executorClass == null) {
+				throw new JobExecutionException(
+						String.format("Could not construct job[%s] of type[%s].", jobProps, jobType));
+			}
+			
+			Props sysConf = jobtypeSysProps.containsKey(jobType) ? new Props(sysProps, jobtypeSysProps.get(jobType)) : sysProps;
+			Props jobConf = jobtypeJobProps.containsKey(jobType) ? new Props(jobProps, jobtypeJobProps.get(jobType)) : jobProps;
+			sysConf = PropsUtils.resolveProps(sysConf);
+			jobConf = PropsUtils.resolveProps(jobConf);
+			
+//			logger.info("sysConf is " + sysConf);
+//			logger.info("jobConf is " + jobConf);
+			
+			job = (Job)Utils.callConstructor(executorClass, jobId, sysConf, jobConf, logger);
+
+		}
+		catch (Exception e) {
+			job = new InitErrorJob(jobId, e);
+			//throw new JobTypeManagerException(e);
+		}
+
+		return job;
+	}
+
+	public void registerJobType(String typeName, Class<? extends Job> jobTypeClass) {
+		jobToClass.put(typeName, jobTypeClass);
+	}
+}
+
diff --git a/src/java/azkaban/scheduler/ScheduleManager.java b/src/java/azkaban/scheduler/ScheduleManager.java
index 7ba04f8..ef0f076 100644
--- a/src/java/azkaban/scheduler/ScheduleManager.java
+++ b/src/java/azkaban/scheduler/ScheduleManager.java
@@ -223,10 +223,15 @@ public class ScheduleManager {
 	 * @param flow
 	 */
 	public synchronized void insertSchedule(Schedule s) {
+		boolean exist = scheduleIDMap.containsKey(s.getScheduleId());
 		if(s.updateTime()) {
 			internalSchedule(s);
 			try {
-				loader.insertSchedule(s);
+				if(!exist) {
+					loader.insertSchedule(s);
+				}
+				else
+					loader.updateSchedule(s);
 			} catch (ScheduleManagerException e) {
 				// TODO Auto-generated catch block
 				e.printStackTrace();
diff --git a/src/java/azkaban/utils/FileIOUtils.java b/src/java/azkaban/utils/FileIOUtils.java
index 7bda63b..d09b3a0 100644
--- a/src/java/azkaban/utils/FileIOUtils.java
+++ b/src/java/azkaban/utils/FileIOUtils.java
@@ -12,6 +12,7 @@ import java.util.HashMap;
 import java.util.HashSet;
 import java.util.Map;
 import java.util.Set;
+import java.util.StringTokenizer;
 
 import org.apache.commons.io.IOUtils;
 
@@ -19,6 +20,23 @@ import org.apache.commons.io.IOUtils;
  * Runs a few unix commands. Created this so that I can move to JNI in the future.
  */
 public class FileIOUtils {
+	public static String getSourcePathFromClass(Class<?> containedClass) {
+		File file = new File(containedClass.getProtectionDomain().getCodeSource().getLocation().getPath());
+
+		if (!file.isDirectory() && file.getName().endsWith(".class")) {
+			String name = containedClass.getName();
+			StringTokenizer tokenizer = new StringTokenizer(name, ".");
+			while(tokenizer.hasMoreTokens()) {
+				tokenizer.nextElement();
+				file = file.getParentFile();
+			}
+			return file.getPath();  
+		}
+		else {
+			return containedClass.getProtectionDomain().getCodeSource().getLocation().getPath();
+		}
+	}
+	
 	/**
 	 * Run a unix command that will symlink files, and recurse into directories.
 	 */
diff --git a/src/java/azkaban/utils/GZIPUtils.java b/src/java/azkaban/utils/GZIPUtils.java
index 147cb55..b402067 100644
--- a/src/java/azkaban/utils/GZIPUtils.java
+++ b/src/java/azkaban/utils/GZIPUtils.java
@@ -6,7 +6,7 @@ import java.io.IOException;
 import java.util.zip.GZIPInputStream;
 import java.util.zip.GZIPOutputStream;
 
-import org.apache.hadoop.io.IOUtils;
+import org.apache.commons.io.IOUtils;
 
 public class GZIPUtils {
 	
@@ -36,7 +36,7 @@ public class GZIPUtils {
 		GZIPInputStream gzipInputStream = new GZIPInputStream(byteInputStream);
 		
 		ByteArrayOutputStream byteOutputStream = new ByteArrayOutputStream();
-		IOUtils.copyBytes(gzipInputStream, byteOutputStream, 1024);
+		IOUtils.copy(gzipInputStream, byteOutputStream);
 
 		return byteOutputStream.toByteArray();
 	}
diff --git a/src/java/azkaban/webapp/AzkabanWebServer.java b/src/java/azkaban/webapp/AzkabanWebServer.java
index fbe9878..01261a8 100644
--- a/src/java/azkaban/webapp/AzkabanWebServer.java
+++ b/src/java/azkaban/webapp/AzkabanWebServer.java
@@ -20,17 +20,22 @@ import java.io.File;
 import java.io.FileNotFoundException;
 import java.io.IOException;
 import java.lang.reflect.Constructor;
+import java.lang.reflect.InvocationTargetException;
 import java.net.MalformedURLException;
 import java.net.URL;
 import java.net.URLClassLoader;
+import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.List;
 import java.util.Properties;
 import java.util.TimeZone;
 
+import org.apache.commons.lang.StringUtils;
 import org.apache.log4j.Logger;
 import org.apache.velocity.app.VelocityEngine;
 import org.apache.velocity.runtime.log.Log4JLogChute;
 import org.apache.velocity.runtime.resource.loader.ClasspathResourceLoader;
+import org.apache.velocity.runtime.resource.loader.JarResourceLoader;
 import org.joda.time.DateTimeZone;
 import org.mortbay.jetty.Server;
 import org.mortbay.jetty.security.SslSocketConnector;
@@ -46,20 +51,22 @@ import azkaban.project.ProjectManager;
 
 import azkaban.scheduler.JdbcScheduleLoader;
 import azkaban.scheduler.ScheduleManager;
-import azkaban.security.DefaultHadoopSecurityManager;
-import azkaban.security.HadoopSecurityManager;
 import azkaban.user.UserManager;
 import azkaban.user.XmlUserManager;
+import azkaban.utils.FileIOUtils;
 import azkaban.utils.Props;
+import azkaban.utils.PropsUtils;
 import azkaban.utils.Utils;
 import azkaban.webapp.servlet.AzkabanServletContextListener;
 
+import azkaban.webapp.servlet.AbstractAzkabanServlet;
 import azkaban.webapp.servlet.ExecutorServlet;
-import azkaban.webapp.servlet.HdfsBrowserServlet;
+import azkaban.webapp.servlet.LoginAbstractAzkabanServlet;
 import azkaban.webapp.servlet.ScheduleServlet;
 import azkaban.webapp.servlet.HistoryServlet;
 import azkaban.webapp.servlet.IndexServlet;
 import azkaban.webapp.servlet.ProjectManagerServlet;
+import azkaban.webapp.servlet.ViewerPlugin;
 import azkaban.webapp.session.SessionCache;
 
 import joptsimple.OptionParser;
@@ -104,7 +111,6 @@ public class AzkabanWebServer implements AzkabanServer {
 	private static final int DEFAULT_THREAD_NUMBER = 20;
 	private static final String VELOCITY_DEV_MODE_PARAM = "velocity.dev.mode";
 	private static final String USER_MANAGER_CLASS_PARAM = "user.manager.class";
-	private static final String HADOOP_SECURITY_MANAGER_CLASS_PARAM = "hadoop.security.manager.class";
 	private static final String DEFAULT_STATIC_DIR = "";
 
 	private final VelocityEngine velocityEngine;
@@ -115,11 +121,11 @@ public class AzkabanWebServer implements AzkabanServer {
 	private ScheduleManager scheduleManager;
 
 	private final ClassLoader baseClassLoader;
-	private HadoopSecurityManager hadoopSecurityManager;
 	
 	private Props props;
 	private SessionCache sessionCache;
 	private File tempDir;
+	private List<ViewerPlugin> viewerPlugins;
 
 	/**
 	 * Constructor usually called by tomcat AzkabanServletContext to create the
@@ -141,7 +147,6 @@ public class AzkabanWebServer implements AzkabanServer {
 		executorManager = loadExecutorManager(props);
 		scheduleManager = loadScheduleManager(executorManager, props);
 		baseClassLoader = getBaseClassloader();
-		hadoopSecurityManager = loadHadoopSecurityManager(props);
 		
 		tempDir = new File(props.getString("azkaban.temp.dir", "temp"));
 
@@ -155,6 +160,10 @@ public class AzkabanWebServer implements AzkabanServer {
 		}
 	}
 	
+	private void setViewerPlugins(List<ViewerPlugin> viewerPlugins) {
+		this.viewerPlugins = viewerPlugins;
+	}
+	
 	private UserManager loadUserManager(Props props) {
 		Class<?> userManagerClass = props.getClass(USER_MANAGER_CLASS_PARAM, null);
 		logger.info("Loading user manager class " + userManagerClass.getName());
@@ -253,10 +262,12 @@ public class AzkabanWebServer implements AzkabanServer {
 	 */
 	private VelocityEngine configureVelocityEngine(final boolean devMode) {
 		VelocityEngine engine = new VelocityEngine();
-		engine.setProperty("resource.loader", "classpath");
+		engine.setProperty("resource.loader", "classpath, jar");
 		engine.setProperty("classpath.resource.loader.class", ClasspathResourceLoader.class.getName());
 		engine.setProperty("classpath.resource.loader.cache", !devMode);
 		engine.setProperty("classpath.resource.loader.modificationCheckInterval", 5L);
+		engine.setProperty("jar.resource.loader.class", JarResourceLoader.class.getName());
+		engine.setProperty("jar.resource.loader.cache", !devMode);
 		engine.setProperty("resource.manager.logwhenfound", false);
 		engine.setProperty("input.encoding", "UTF-8");
 		engine.setProperty("output.encoding", "UTF-8");
@@ -297,35 +308,6 @@ public class AzkabanWebServer implements AzkabanServer {
 		return retVal;
 	}
 
-	public HadoopSecurityManager getHadoopSecurityManager() {
-		return hadoopSecurityManager;
-	}
-	
-	private HadoopSecurityManager loadHadoopSecurityManager(Props props) {
-		
-		Class<?> hadoopSecurityManagerClass = props.getClass(HADOOP_SECURITY_MANAGER_CLASS_PARAM, null);
-		logger.info("Loading hadoop security manager class " + hadoopSecurityManagerClass.getName());
-		HadoopSecurityManager hadoopSecurityManager = null;
-
-		if (hadoopSecurityManagerClass != null && hadoopSecurityManagerClass.getConstructors().length > 0) {
-
-			try {
-				Constructor<?> hsmConstructor = hadoopSecurityManagerClass.getConstructor(Props.class);
-				hadoopSecurityManager = (HadoopSecurityManager) hsmConstructor.newInstance(props);
-			} 
-			catch (Exception e) {
-				logger.error("Could not instantiate Hadoop Security Manager "+ hadoopSecurityManagerClass.getName());
-				throw new RuntimeException(e);
-			}
-		} 
-		else {
-			hadoopSecurityManager = new DefaultHadoopSecurityManager();
-		}
-
-		return hadoopSecurityManager;
-
-	}
-	
 	public ClassLoader getClassLoader() {
 		return baseClassLoader;
 	}
@@ -426,10 +408,16 @@ public class AzkabanWebServer implements AzkabanServer {
 		root.addServlet(new ServletHolder(new ExecutorServlet()),"/executor");
 		root.addServlet(new ServletHolder(new HistoryServlet()), "/history");
 		root.addServlet(new ServletHolder(new ScheduleServlet()),"/schedule");
-		root.addServlet(new ServletHolder(new HdfsBrowserServlet()), "/hdfs/*");
+		
+		String viewerPluginDir = azkabanSettings.getString("viewer.plugin.dir", "plugins/viewer");
+		List<String> viewerPlugins = azkabanSettings.getStringList("viewer.plugins", (List<String>) null);
+		if (viewerPlugins != null) {
+			app.setViewerPlugins(loadViewerPlugins(root, viewerPluginDir, viewerPlugins, app.getVelocityEngine()));
+		}
+		//root.addServlet(new ServletHolder(new HdfsBrowserServlet()), "/hdfs/*");
 		
 		root.setAttribute(AzkabanServletContextListener.AZKABAN_SERVLET_CONTEXT_KEY, app);
-
+		
 		try {
 			server.start();
 		} 
@@ -455,6 +443,115 @@ public class AzkabanWebServer implements AzkabanServer {
 		logger.info("Server running on port " + sslPortNumber + ".");
 	}
 
+	private static List<ViewerPlugin> loadViewerPlugins(Context root, String pluginPath, List<String> plugins, VelocityEngine ve) {
+		ArrayList<ViewerPlugin> installedViewerPlugins = new ArrayList<ViewerPlugin>();
+		
+		File viewerPluginPath = new File(pluginPath);
+		ClassLoader parentLoader = AzkabanWebServer.class.getClassLoader();
+		ArrayList<String> jarPaths = new ArrayList<String>();
+		for (String plug: plugins) {
+			File pluginDir = new File(viewerPluginPath, plug);
+			if (!pluginDir.exists()) {
+				logger.error("Error viewer plugin path " + pluginDir.getPath() + " doesn't exist.");
+				continue;
+			}
+			
+			if (!pluginDir.isDirectory()) {
+				logger.error("The plugin path " + pluginDir + " is not a directory.");
+				continue;
+			}
+			
+			// Load the conf directory
+			File propertiesDir = new File(pluginDir, "conf");
+			Props pluginProps = null;
+			if (propertiesDir.exists() && propertiesDir.isDirectory()) {
+				pluginProps = PropsUtils.loadPropsInDir(propertiesDir, "properties");
+			}
+			else {
+				logger.error("Plugin conf path " + propertiesDir + " not found.");
+				continue;
+			}
+			
+			String pluginName = pluginProps.getString("viewer.name");
+			String pluginWebPath = pluginProps.getString("viewer.path");
+			String pluginClass = pluginProps.getString("viewer.servlet.class");
+			if (pluginClass == null) {
+				logger.error("Viewer class is not set.");
+			}
+			else {
+				logger.error("Plugin class " + pluginClass);
+			}
+			
+			URLClassLoader urlClassLoader = null;
+			File libDir = new File(pluginDir, "lib");
+			if (libDir.exists() && libDir.isDirectory()) {
+				File[] files = libDir.listFiles();
+				
+				URL[] url = new URL[files.length];
+				for (int i=0; i < files.length; ++i) {
+					try {
+						url[i] = files[i].toURI().toURL();
+					} catch (MalformedURLException e) {
+						logger.error(e);
+					}
+				}
+				
+				urlClassLoader = new URLClassLoader(url, parentLoader);
+			}
+			else {
+				logger.error("Library path " + propertiesDir + " not found.");
+				continue;
+			}
+			
+			Class<?> viewerClass = null;
+			try {
+				viewerClass = urlClassLoader.loadClass(pluginClass);
+			}
+			catch (ClassNotFoundException e) {
+				logger.error("Class " + pluginClass + " not found.");
+				continue;
+			}
+
+			String source = FileIOUtils.getSourcePathFromClass(viewerClass);
+			logger.info("Source jar " + source);
+			jarPaths.add("jar:file:" + source);
+			
+			Constructor<?> constructor = null;
+			try {
+				constructor = viewerClass.getConstructor(Props.class);
+			} catch (NoSuchMethodException e) {
+				logger.error("Constructor not found in " + pluginClass);
+				continue;
+			}
+			
+			Object obj = null;
+			try {
+				obj = constructor.newInstance(pluginProps);
+			} catch (Exception e) {
+				logger.error(e);
+			} 
+			
+			if (!(obj instanceof AbstractAzkabanServlet)) {
+				logger.error("The object is not an AbstractViewerServlet");
+				continue;
+			}
+			
+			AbstractAzkabanServlet avServlet = (AbstractAzkabanServlet)obj;
+			root.addServlet(new ServletHolder(avServlet), "/" + pluginWebPath + "/*");
+			installedViewerPlugins.add(new ViewerPlugin(pluginName, pluginWebPath));
+		}
+		
+		String jarResourcePath = StringUtils.join(jarPaths, ", ");
+		logger.info("Setting jar resource path " + jarResourcePath);
+		ve.addProperty("jar.resource.loader.path", jarResourcePath);
+		
+		return installedViewerPlugins;
+	}
+	
+	public List<ViewerPlugin> getViewerPlugins() {
+		return viewerPlugins;
+	}
+	
 	/**
 	 * Loads the Azkaban property file from the AZKABAN_HOME conf directory
 	 * 
diff --git a/src/java/azkaban/webapp/servlet/AbstractAzkabanServlet.java b/src/java/azkaban/webapp/servlet/AbstractAzkabanServlet.java
index e1fd52c..9817b56 100644
--- a/src/java/azkaban/webapp/servlet/AbstractAzkabanServlet.java
+++ b/src/java/azkaban/webapp/servlet/AbstractAzkabanServlet.java
@@ -17,6 +17,7 @@
 package azkaban.webapp.servlet;
 
 import java.io.IOException;
+import java.net.URL;
 import java.util.ArrayList;
 import java.util.Enumeration;
 import java.util.HashMap;
@@ -30,6 +31,8 @@ import javax.servlet.http.HttpServlet;
 import javax.servlet.http.HttpServletRequest;
 import javax.servlet.http.HttpServletResponse;
 
+import org.apache.velocity.Template;
+import org.apache.velocity.app.VelocityEngine;
 import org.codehaus.jackson.map.ObjectMapper;
 import org.joda.time.DateTime;
 import org.joda.time.format.DateTimeFormat;
@@ -64,6 +67,8 @@ public abstract class AbstractAzkabanServlet extends HttpServlet {
 	private String label;
 	private String color;
 
+	private List<ViewerPlugin> viewerPlugins;
+	
 	/**
 	 * To retrieve the application for the servlet
 	 * 
@@ -86,6 +91,11 @@ public abstract class AbstractAzkabanServlet extends HttpServlet {
 		name = props.getString("azkaban.name", "");
 		label = props.getString("azkaban.label", "");
 		color = props.getString("azkaban.color", "#FF0000");
+		
+		if (application instanceof AzkabanWebServer) {
+			AzkabanWebServer server = (AzkabanWebServer)application;
+			viewerPlugins = server.getViewerPlugins();
+		}
 	}
 
 	/**
@@ -329,6 +339,14 @@ public abstract class AbstractAzkabanServlet extends HttpServlet {
 		page.add("success_message", successMsg == null || successMsg.isEmpty() ? "null" : successMsg);
 		setSuccessMessageInCookie(resp, null);
 
+		//@TODO, allow more than one type of viewer. For time sake, I only install the first one
+		if (viewerPlugins != null && !viewerPlugins.isEmpty()) {
+			page.add("viewers", viewerPlugins);
+			ViewerPlugin plugin = viewerPlugins.get(0);
+			page.add("viewerName", plugin.getPluginName());
+			page.add("viewerPath", plugin.getPluginPath());
+		}
+		
 		return page;
 	}
 
@@ -348,9 +366,18 @@ public abstract class AbstractAzkabanServlet extends HttpServlet {
 		page.add("timezone", ZONE_FORMATTER.print(System.currentTimeMillis()));
 		page.add("currentTime", (new DateTime()).getMillis());
 		page.add("context", req.getContextPath());
+		
+		//@TODO, allow more than one type of viewer. For time sake, I only install the first one
+		if (viewerPlugins != null && !viewerPlugins.isEmpty()) {
+			page.add("viewers", viewerPlugins);
+			ViewerPlugin plugin = viewerPlugins.get(0);
+			page.add("viewerName", plugin.getPluginName());
+			page.add("viewerPath", plugin.getPluginPath());
+		}
+		
 		return page;
 	}
-
+	
 	/**
 	 * Writes json out to the stream.
 	 * 
diff --git a/src/java/azkaban/webapp/servlet/ExecutorServlet.java b/src/java/azkaban/webapp/servlet/ExecutorServlet.java
index 8fb522f..9932e12 100644
--- a/src/java/azkaban/webapp/servlet/ExecutorServlet.java
+++ b/src/java/azkaban/webapp/servlet/ExecutorServlet.java
@@ -661,11 +661,11 @@ public class ExecutorServlet extends LoginAbstractAzkabanServlet {
 		
 		for(String roleName: user.getRoles()) {
 			Role role = userManager.getRole(roleName);
-			if (role.getPermission().isPermissionSet(type)) {
+			if (role.getPermission().isPermissionSet(type) || role.getPermission().isPermissionSet(Permission.Type.ADMIN)) {
 				return true;
 			}
 		}
 		
-		return true;
+		return false;
 	}
 }
diff --git a/src/java/azkaban/webapp/servlet/LoginAbstractAzkabanServlet.java b/src/java/azkaban/webapp/servlet/LoginAbstractAzkabanServlet.java
index 5455044..c66aff3 100644
--- a/src/java/azkaban/webapp/servlet/LoginAbstractAzkabanServlet.java
+++ b/src/java/azkaban/webapp/servlet/LoginAbstractAzkabanServlet.java
@@ -22,16 +22,20 @@ import java.util.HashMap;
 import java.util.Map;
 import java.util.UUID;
 
+import javax.servlet.ServletConfig;
 import javax.servlet.ServletException;
 import javax.servlet.http.Cookie;
 import javax.servlet.http.HttpServletRequest;
 import javax.servlet.http.HttpServletResponse;
 
+import org.apache.commons.fileupload.servlet.ServletFileUpload;
 import org.apache.log4j.Logger;
 
 import azkaban.user.User;
 import azkaban.user.UserManager;
 import azkaban.user.UserManagerException;
+import azkaban.utils.Props;
+import azkaban.webapp.AzkabanServer;
 import azkaban.webapp.session.Session;
 
 /**
@@ -44,7 +48,17 @@ public abstract class LoginAbstractAzkabanServlet extends AbstractAzkabanServlet
 
 	private static final Logger logger = Logger.getLogger(LoginAbstractAzkabanServlet.class.getName());
 	private static final String SESSION_ID_NAME = "azkaban.browser.session.id";
-
+	private static final int DEFAULT_UPLOAD_DISK_SPOOL_SIZE = 20 * 1024 * 1024;
+	
+	private MultipartParser multipartParser;
+	
+	@Override
+	public void init(ServletConfig config) throws ServletException {
+		super.init(config);
+		
+		multipartParser = new MultipartParser(DEFAULT_UPLOAD_DISK_SPOOL_SIZE);
+	}
+	
 	@Override
 	protected void doGet(HttpServletRequest req, HttpServletResponse resp) throws ServletException, IOException {
 		// Set session id
@@ -72,7 +86,7 @@ public abstract class LoginAbstractAzkabanServlet extends AbstractAzkabanServlet
 		}
 	}
 	
-	private Session getSessionFromRequest(HttpServletRequest req) {
+	private Session getSessionFromRequest(HttpServletRequest req) throws ServletException {
 		String remoteIp = req.getRemoteAddr();
 		Cookie cookie = getCookieByName(req, SESSION_ID_NAME);
 		String sessionId = null;
@@ -81,17 +95,25 @@ public abstract class LoginAbstractAzkabanServlet extends AbstractAzkabanServlet
 			sessionId = cookie.getValue();
 			logger.info("Session id " + sessionId);
 		}
+		
+		if (sessionId == null && hasParam(req, "session.id")) {
+			sessionId = getParam(req, "session.id");
+		}
+		return getSessionFromSessionId(sessionId, remoteIp);
+	}
+	
+	private Session getSessionFromSessionId(String sessionId, String remoteIp) {
 		if (sessionId == null) {
 			return null;
-		} else {
-			Session session = getApplication().getSessionCache().getSession(sessionId);
-			// Check if the IP's are equal. If not, we invalidate the sesson.
-			if (session == null || !remoteIp.equals(session.getIp())) {
-				return null;
-			}
-			
-			return session;
 		}
+		
+		Session session = getApplication().getSessionCache().getSession(sessionId);
+		// Check if the IP's are equal. If not, we invalidate the sesson.
+		if (session == null || !remoteIp.equals(session.getIp())) {
+			return null;
+		}
+		
+		return session;
 	}
 
 	private void handleLogin(HttpServletRequest req, HttpServletResponse resp) throws ServletException, IOException {
@@ -109,32 +131,62 @@ public abstract class LoginAbstractAzkabanServlet extends AbstractAzkabanServlet
 
 	@Override
 	protected void doPost(HttpServletRequest req, HttpServletResponse resp) throws ServletException, IOException {
-		if (hasParam(req, "action")) {
-			String action = getParam(req, "action");
-			if (action.equals("login")) {
-				HashMap<String,Object> obj = new HashMap<String,Object>();
-				handleAjaxLoginAction(req, resp, obj);
-				this.writeJSON(resp, obj);
-			} 
-			else {
-				Session session = getSessionFromRequest(req);
-				if (session == null) {
-					if (isAjaxCall(req)) {
-						String response = createJsonResponse("error", "Invalid Session. Need to re-login", "login", null);
-						writeResponse(resp, response);
-					} 
-					else {
-						handleLogin(req, resp, "Enter username and password");
+		Session session = getSessionFromRequest(req);
+		
+		// Handle Multipart differently from other post messages
+		if(ServletFileUpload.isMultipartContent(req)) {
+			Map<String, Object> params = multipartParser.parseMultipart(req);
+			if (session == null) {
+				// See if the session id is properly set.
+				if (params.containsKey("session.id")) {
+					String sessionId = (String)params.get("session.id");
+					String ip = req.getRemoteAddr();
+					
+					session = getSessionFromSessionId(sessionId, ip);
+					if (session != null) {
+						handleMultiformPost(req, resp, params, session);
+						return;
 					}
-				} 
-				else {
-					handlePost(req, resp, session);
+				}
+
+				// if there's no valid session, see if it's a one time session.
+				if (!params.containsKey("username") || !params.containsKey("password")) {
+					writeResponse(resp, "Login error. Need username and password");
+					return;
+				}
+				
+				String username = (String)params.get("username");
+				String password = (String)params.get("password");
+				String ip = req.getRemoteAddr();
+				
+				try {
+					session = createSession(username, password, ip);
+				} catch (UserManagerException e) {
+					writeResponse(resp, "Login error: " + e.getMessage());
+					return;
 				}
 			}
-		} 
-		else {
-			Session session = getSessionFromRequest(req);
-			if (session == null) {
+
+			handleMultiformPost(req, resp, params, session);
+		}
+		else if (hasParam(req, "action") && getParam(req, "action").equals("login")) {
+			HashMap<String,Object> obj = new HashMap<String,Object>();
+			handleAjaxLoginAction(req, resp, obj);
+			this.writeJSON(resp, obj);
+		}
+		else if (session == null) {
+			if (hasParam(req, "username") && hasParam(req, "password")) {
+				// If it's a post command with curl, we create a temporary session
+				try {
+					session = createSession(req);
+				} catch (UserManagerException e) {
+					writeResponse(resp, "Login error: " + e.getMessage());
+				}
+				
+				handlePost(req, resp, session);
+			}
+			else {
+				// There are no valid sessions and temporary logins, no we either pass back a message or redirect.
 				if (isAjaxCall(req)) {
 					String response = createJsonResponse("error", "Invalid Session. Need to re-login", "login", null);
 					writeResponse(resp, response);
@@ -142,37 +194,47 @@ public abstract class LoginAbstractAzkabanServlet extends AbstractAzkabanServlet
 				else {
 					handleLogin(req, resp, "Enter username and password");
 				}
-			} 
-			else {
-				handlePost(req, resp, session);
 			}
 		}
+		else {
+			handlePost(req, resp, session);
+		}
 	}
 
+	private Session createSession(HttpServletRequest req) throws UserManagerException, ServletException {
+		String username = getParam(req, "username");
+		String password = getParam(req, "password");
+		String ip = req.getRemoteAddr();
+		
+		return createSession(username, password, ip);
+	}
+	
+	private Session createSession(String username, String password, String ip) throws UserManagerException, ServletException {
+		UserManager manager = getApplication().getUserManager();
+		User user = manager.getUser(username, password);
+
+		String randomUID = UUID.randomUUID().toString();
+		Session session = new Session(randomUID, user, ip);
+		
+		return session;
+	}
+	
 	protected void handleAjaxLoginAction(HttpServletRequest req, HttpServletResponse resp, Map<String, Object> ret) throws ServletException {
 		if (hasParam(req, "username") && hasParam(req, "password")) {
-			String username = getParam(req, "username");
-			String password = getParam(req, "password");
-
-			UserManager manager = getApplication().getUserManager();
-
-			User user = null;
+			Session session = null;
 			try {
-				user = manager.getUser(username, password);
-			} 
-			catch (UserManagerException e) {
-				ret.put("error", "Incorrect Login. " + e.getCause());
+				session = createSession(req);
+			} catch (UserManagerException e) {
+				ret.put("error", "Incorrect Login. " + e.getMessage());
 				return;
 			}
-
-			String ip = req.getRemoteAddr();
-			String randomUID = UUID.randomUUID().toString();
-			Session session = new Session(randomUID, user, ip);
-			Cookie cookie = new Cookie(SESSION_ID_NAME, randomUID);
+			
+			Cookie cookie = new Cookie(SESSION_ID_NAME, session.getSessionId());
 			cookie.setPath("/");
 			resp.addCookie(cookie);
 			getApplication().getSessionCache().addSession(session);
 			ret.put("status", "success");
+			ret.put("session.id", session.getSessionId());
 		} 
 		else {
 			ret.put("error", "Incorrect Login.");
@@ -218,4 +280,17 @@ public abstract class LoginAbstractAzkabanServlet extends AbstractAzkabanServlet
 	 * @throws IOException
 	 */
 	protected abstract void handlePost(HttpServletRequest req, HttpServletResponse resp, Session session) throws ServletException, IOException;
+	
+	/**
+	 * The post request is handed off to the implementor after the user is
+	 * logged in.
+	 * 
+	 * @param req
+	 * @param resp
+	 * @param session
+	 * @throws ServletException
+	 * @throws IOException
+	 */
+	protected void handleMultiformPost(HttpServletRequest req, HttpServletResponse resp, Map<String,Object> multipart, Session session) throws ServletException, IOException {
+	}
 }
\ No newline at end of file
diff --git a/src/java/azkaban/webapp/servlet/Page.java b/src/java/azkaban/webapp/servlet/Page.java
index 7d44f8f..d8d5194 100644
--- a/src/java/azkaban/webapp/servlet/Page.java
+++ b/src/java/azkaban/webapp/servlet/Page.java
@@ -62,8 +62,7 @@ public class Page {
 	public void render() {
 		try {
 			response.setContentType(mimeType);
-			engine.mergeTemplate(template, "UTF-8", context,
-					response.getWriter());
+			engine.mergeTemplate(template, "UTF-8", context, response.getWriter());
 		} catch (Exception e) {
 			throw new PageRenderException(e);
 		}
diff --git a/src/java/azkaban/webapp/servlet/ProjectManagerServlet.java b/src/java/azkaban/webapp/servlet/ProjectManagerServlet.java
index dcefce6..0bbe2ba 100644
--- a/src/java/azkaban/webapp/servlet/ProjectManagerServlet.java
+++ b/src/java/azkaban/webapp/servlet/ProjectManagerServlet.java
@@ -73,15 +73,12 @@ import azkaban.webapp.servlet.MultipartParser;
 public class ProjectManagerServlet extends LoginAbstractAzkabanServlet {
 	private static final long serialVersionUID = 1;
 	private static final Logger logger = Logger.getLogger(ProjectManagerServlet.class);
-	private static final int DEFAULT_UPLOAD_DISK_SPOOL_SIZE = 20 * 1024 * 1024;
 	private static final NodeLevelComparator NODE_LEVEL_COMPARATOR = new NodeLevelComparator();
 	
 	private ProjectManager projectManager;
 	private ExecutorManager executorManager;
 	private ScheduleManager scheduleManager;
 	private UserManager userManager;
-	
-	private MultipartParser multipartParser;
 
 	private static Comparator<Flow> FLOW_ID_COMPARATOR = new Comparator<Flow>() {
 		@Override
@@ -99,8 +96,6 @@ public class ProjectManagerServlet extends LoginAbstractAzkabanServlet {
 		executorManager = server.getExecutorManager();
 		scheduleManager = server.getScheduleManager();
 		userManager = server.getUserManager();
-
-		multipartParser = new MultipartParser(DEFAULT_UPLOAD_DISK_SPOOL_SIZE);
 	}
 
 	@Override
@@ -145,18 +140,18 @@ public class ProjectManagerServlet extends LoginAbstractAzkabanServlet {
 	}
 
 	@Override
-	protected void handlePost(HttpServletRequest req, HttpServletResponse resp, Session session) throws ServletException, IOException {
-		if (ServletFileUpload.isMultipartContent(req)) {
-			logger.info("Post is multipart");
-			Map<String, Object> params = multipartParser.parseMultipart(req);
-			if (params.containsKey("action")) {
-				String action = (String)params.get("action");
-				if (action.equals("upload")) {
-					handleUpload(req, resp, params, session);
-				}
+	protected void handleMultiformPost(HttpServletRequest req, HttpServletResponse resp, Map<String, Object> params, Session session) throws ServletException, IOException {
+		if (params.containsKey("action")) {
+			String action = (String)params.get("action");
+			if (action.equals("upload")) {
+				handleUpload(req, resp, params, session);
 			}
 		}
-		else if (hasParam(req, "action")) {
+	}
+	
+	@Override
+	protected void handlePost(HttpServletRequest req, HttpServletResponse resp, Session session) throws ServletException, IOException {
+		if (hasParam(req, "action")) {
 			String action = getParam(req, "action");
 			if (action.equals("create")) {
 				handleCreate(req, resp, session);
@@ -1161,12 +1156,12 @@ public class ProjectManagerServlet extends LoginAbstractAzkabanServlet {
 		
 		for(String roleName: user.getRoles()) {
 			Role role = userManager.getRole(roleName);
-			if (role.getPermission().isPermissionSet(type)) {
+			if (role.getPermission().isPermissionSet(type) || role.getPermission().isPermissionSet(Permission.Type.ADMIN)) {
 				return true;
 			}
 		}
 		
-		return true;
+		return false;
 	}
 	
 	private Permission getPermissionObject(Project project, User user, Permission.Type type) {
diff --git a/src/java/azkaban/webapp/servlet/ScheduleServlet.java b/src/java/azkaban/webapp/servlet/ScheduleServlet.java
index 85731a8..480ea9a 100644
--- a/src/java/azkaban/webapp/servlet/ScheduleServlet.java
+++ b/src/java/azkaban/webapp/servlet/ScheduleServlet.java
@@ -207,7 +207,7 @@ public class ScheduleServlet extends LoginAbstractAzkabanServlet {
 		//project.info("User '" + user.getUserId() + "' has scheduled " + flow.getId() + "[" + schedFlow.toNiceString() + "].");
 		Schedule schedule = scheduleManager.scheduleFlow(projectId, projectName, flowName, "ready", firstSchedTime.getMillis(), timezone, thePeriod, submitTime.getMillis(), firstSchedTime.getMillis(), firstSchedTime.getMillis(), user.getUserId());
 		logger.info("User '" + user.getUserId() + "' has scheduled " + "[" + projectName + flowName +  " (" + projectId +")" + "].");
-		projectManager.postProjectEvent(project, EventType.SCHEDULE, user.getUserId(), "Schedule " + schedule.toString() + " has been added.");
+		projectManager.postProjectEvent(project, EventType.SCHEDULE, user.getUserId(), "Schedule " + schedule.getScheduleName() + " has been added.");
 		
 		ret.put("status", "success");
 		ret.put("message", projectName + "." + flowName + " scheduled.");
@@ -239,11 +239,11 @@ public class ScheduleServlet extends LoginAbstractAzkabanServlet {
 		
 		for(String roleName: user.getRoles()) {
 			Role role = userManager.getRole(roleName);
-			if (role.getPermission().isPermissionSet(type)) {
+			if (role.getPermission().isPermissionSet(type) || role.getPermission().isPermissionSet(Permission.Type.ADMIN)) {
 				return true;
 			}
 		}
 		
-		return true;
+		return false;
 	}
 }
diff --git a/src/java/azkaban/webapp/servlet/velocity/nav.vm b/src/java/azkaban/webapp/servlet/velocity/nav.vm
index 5d98006..6988e36 100644
--- a/src/java/azkaban/webapp/servlet/velocity/nav.vm
+++ b/src/java/azkaban/webapp/servlet/velocity/nav.vm
@@ -19,6 +19,8 @@
 				function navMenuClick(url) {
 					window.location.href=url;
 				}
+				
+				function 
 			</script>
 
 			<ul id="nav" class="nav">
@@ -26,14 +28,18 @@
 				<li id="scheduled-jobs-tab" #if($current_page == 'schedule')class="selected"#end onClick="navMenuClick('$!context/schedule')"><a href="$!context/schedule">Scheduled</a></li>
 				<li id="executing-jobs-tab" #if($current_page == 'executing')class="selected"#end onClick="navMenuClick('$!context/executor')"><a href="$!context/executor">Executing</a></li>
 				<li id="history-jobs-tab" #if($current_page == 'history')class="selected"#end onClick="navMenuClick('$!context/history')"><a href="$!context/history">History</a></li>
-				<li id="hdfs-browser-tab" #if($current_page == 'hdfsbrowser')class="selected"#end onClick="navMenuClick('$!context/hdfs')"><a href="$!context/hdfs">HDFS</a></li>
+				
+				#if ($viewers) 
+					<li id="viewer-tab" #if($current_page == 'viewer') class="selected"#end onClick="navMenuClick('$!context/$viewerPath')">
+						<a href="$!context/$viewerPath">$viewerName</a>
+					</li>
+				#end
 			</ul>
 			
-			
 			<div id="user-id">
 				<a>${user_id}<div id="user-down"></div></a>    
 				<div id="user-menu">
 					<ul><li><a id="logout" href="$!context?logout">logout</a></li></ul>
 				</div>
 			</div>
-		</div>
+		</div>
\ No newline at end of file
diff --git a/src/java/azkaban/webapp/servlet/ViewerPlugin.java b/src/java/azkaban/webapp/servlet/ViewerPlugin.java
new file mode 100644
index 0000000..725473f
--- /dev/null
+++ b/src/java/azkaban/webapp/servlet/ViewerPlugin.java
@@ -0,0 +1,19 @@
+package azkaban.webapp.servlet;
+
+public class ViewerPlugin {
+	private final String pluginName;
+	private final String pluginPath;
+	
+	public ViewerPlugin(String pluginName, String pluginPath) {
+		this.pluginName = pluginName;
+		this.pluginPath = pluginPath;
+	}
+
+	public String getPluginName() {
+		return pluginName;
+	}
+
+	public String getPluginPath() {
+		return pluginPath;
+	}
+}
diff --git a/src/java/azkaban/webapp/session/Session.java b/src/java/azkaban/webapp/session/Session.java
index d12b1f4..38f4aa0 100644
--- a/src/java/azkaban/webapp/session/Session.java
+++ b/src/java/azkaban/webapp/session/Session.java
@@ -15,6 +15,9 @@
  */
 package azkaban.webapp.session;
 
+import java.util.HashMap;
+import java.util.Map;
+
 import azkaban.user.User;
 
 /**
@@ -24,6 +27,7 @@ public class Session {
 	private final User user;
 	private final String sessionId;
 	private final String ip;
+	private Map<String, Object> sessionData = new HashMap<String, Object>();
 	
 	/**
 	 * Constructor for the session
@@ -58,4 +62,12 @@ public class Session {
 	public String getIp() {
 		return ip;
 	}
+	
+	public void setSessionData(String key, Object value) {
+		sessionData.put(key, value);
+	}
+	
+	public Object getSessionData(String key) {
+		return sessionData.get(key);
+	}
 }
diff --git a/unit/executions/logtest/largeLog1.log.short b/unit/executions/logtest/largeLog1.log.short
new file mode 100644
index 0000000..83d535f
--- /dev/null
+++ b/unit/executions/logtest/largeLog1.log.short
@@ -0,0 +1,68 @@
+I'm Henery the Eighth, I am,
+Henery the Eighth I am, I am!
+I got married to the widow next door,
+She's been married seven times before.
+And every one was an Henery
+It wouldn't be a Willie or a Sam
+I'm her eighth old man I'm Henery
+Henery the Eighth, I am!
+Second verse, same as the first!
+I'm Henery the Eighth, I am,
+Henery the Eighth I am, I am!
+I got married to the widow next door,
+She's been married seven times before.
+And every one was an Henery
+It wouldn't be a Willie or a Sam
+I'm her eighth old man I'm Henery
+Henery the Eighth, I am!
+Second verse, same as the first!
+I'm Henery the Eighth, I am,
+Henery the Eighth I am, I am!
+I got married to the widow next door,
+She's been married seven times before.
+And every one was an Henery
+It wouldn't be a Willie or a Sam
+I'm her eighth old man I'm Henery
+Henery the Eighth, I am!
+Second verse, same as the first!
+I'm Henery the Eighth, I am,
+Henery the Eighth I am, I am!
+I got married to the widow next door,
+She's been married seven times before.
+And every one was an 
+...
+Log file is too big. Skipping 101991 in the middle.
+...
+ore.
+And every one was an Henery
+It wouldn't be a Willie or a Sam
+I'm her eighth old man I'm Henery
+Henery the Eighth, I am!
+Second verse, same as the first!
+I'm Henery the Eighth, I am,
+Henery the Eighth I am, I am!
+I got married to the widow next door,
+She's been married seven times before.
+And every one was an Henery
+It wouldn't be a Willie or a Sam
+I'm her eighth old man I'm Henery
+Henery the Eighth, I am!
+Second verse, same as the first!
+I'm Henery the Eighth, I am,
+Henery the Eighth I am, I am!
+I got married to the widow next door,
+She's been married seven times before.
+And every one was an Henery
+It wouldn't be a Willie or a Sam
+I'm her eighth old man I'm Henery
+Henery the Eighth, I am!
+Second verse, same as the first!
+I'm Henery the Eighth, I am,
+Henery the Eighth I am, I am!
+I got married to the widow next door,
+She's been married seven times before.
+And every one was an Henery
+It wouldn't be a Willie or a Sam
+I'm her eighth old man I'm Henery
+Henery the Eighth, I am!
+Second verse, same as the first!
\ No newline at end of file
diff --git a/unit/java/azkaban/test/execapp/FlowRunnerTest.java b/unit/java/azkaban/test/execapp/FlowRunnerTest.java
index 57c6777..e4496bb 100644
--- a/unit/java/azkaban/test/execapp/FlowRunnerTest.java
+++ b/unit/java/azkaban/test/execapp/FlowRunnerTest.java
@@ -21,18 +21,15 @@ import azkaban.executor.ExecutableFlow.Status;
 import azkaban.executor.ExecutorLoader;
 
 import azkaban.flow.Flow;
-import azkaban.jobtype.JobtypeManager;
-import azkaban.security.HadoopSecurityManager;
-import azkaban.security.HadoopSecurityManager_H_1_0_2;
+import azkaban.jobtype.JobTypeManager;
+import azkaban.test.executor.JavaJob;
 import azkaban.utils.JSONUtils;
 import azkaban.utils.Props;
 
 public class FlowRunnerTest {
 	private File workingDir;
 	private Logger logger = Logger.getLogger(FlowRunnerTest.class);
-	private JobtypeManager jobtypeManager;
-	private HadoopSecurityManager hadoopSecurityManager;
-	
+	private JobTypeManager jobtypeManager;
 	public FlowRunnerTest() {
 		
 	}
@@ -45,8 +42,8 @@ public class FlowRunnerTest {
 			FileUtils.deleteDirectory(workingDir);
 		}
 		workingDir.mkdirs();
-		jobtypeManager = new JobtypeManager(null, this.getClass().getClassLoader());
-		hadoopSecurityManager = new HadoopSecurityManager_H_1_0_2(new Props());
+		jobtypeManager = new JobTypeManager(null, this.getClass().getClassLoader());
+		jobtypeManager.registerJobType("java", JavaJob.class);
 	}
 	
 	@After
@@ -355,7 +352,7 @@ public class FlowRunnerTest {
 		MockProjectLoader projectLoader = new MockProjectLoader(new File(flow.getExecutionPath()));
 		
 		loader.uploadExecutableFlow(flow);
-		FlowRunner runner = new FlowRunner(flow, loader, jobtypeManager, hadoopSecurityManager);
+		FlowRunner runner = new FlowRunner(flow, loader, jobtypeManager);
 		runner.addListener(eventCollector);
 		
 		return runner;
@@ -367,7 +364,7 @@ public class FlowRunnerTest {
 		MockProjectLoader projectLoader = new MockProjectLoader(new File(exFlow.getExecutionPath()));
 		
 		loader.uploadExecutableFlow(exFlow);
-		FlowRunner runner = new FlowRunner(exFlow, loader, jobtypeManager, hadoopSecurityManager);
+		FlowRunner runner = new FlowRunner(exFlow, loader, jobtypeManager);
 		runner.addListener(eventCollector);
 		
 		return runner;
diff --git a/unit/java/azkaban/test/execapp/JobRunnerTest.java b/unit/java/azkaban/test/execapp/JobRunnerTest.java
index 9bbcfd3..4009f87 100644
--- a/unit/java/azkaban/test/execapp/JobRunnerTest.java
+++ b/unit/java/azkaban/test/execapp/JobRunnerTest.java
@@ -2,6 +2,7 @@ package azkaban.test.execapp;
 
 import java.io.File;
 import java.io.IOException;
+import java.util.StringTokenizer;
 
 import junit.framework.Assert;
 
@@ -17,17 +18,15 @@ import azkaban.executor.ExecutableFlow;
 import azkaban.executor.ExecutableNode;
 import azkaban.executor.ExecutableFlow.Status;
 import azkaban.executor.ExecutorLoader;
-import azkaban.jobExecutor.JavaJob;
 import azkaban.jobExecutor.ProcessJob;
-import azkaban.jobtype.JobtypeManager;
-import azkaban.security.HadoopSecurityManager;
-import azkaban.security.HadoopSecurityManager_H_1_0_2;
+import azkaban.jobtype.JobTypeManager;
+import azkaban.test.executor.JavaJob;
+import azkaban.test.executor.SleepJavaJob;
 import azkaban.utils.Props;
 
 public class JobRunnerTest {
 	private File workingDir;
-	private JobtypeManager jobtypeManager;
-	private HadoopSecurityManager hadoopSecurityManager;
+	private JobTypeManager jobtypeManager;
 	
 	public JobRunnerTest() {
 
@@ -41,8 +40,8 @@ public class JobRunnerTest {
 			FileUtils.deleteDirectory(workingDir);
 		}
 		workingDir.mkdirs();
-		jobtypeManager = new JobtypeManager(null, this.getClass().getClassLoader());
-		hadoopSecurityManager = new HadoopSecurityManager_H_1_0_2(new Props());
+		jobtypeManager = new JobTypeManager(null, this.getClass().getClassLoader());
+		jobtypeManager.registerJobType("java", JavaJob.class);
 	}
 
 	@After
@@ -244,7 +243,8 @@ public class JobRunnerTest {
 	private Props createProps( int sleepSec, boolean fail) {
 		Props props = new Props();
 		props.put("type", "java");
-		props.put(JavaJob.JOB_CLASS, "azkaban.test.executor.SleepJavaJob");
+
+		props.put(JavaJob.JOB_CLASS, SleepJavaJob.class.getName());
 		props.put("seconds", sleepSec);
 		props.put(ProcessJob.WORKING_DIR, workingDir.getPath());
 		props.put("fail", String.valueOf(fail));
@@ -260,8 +260,26 @@ public class JobRunnerTest {
 		node.setExecutableFlow(flow);
 		
 		Props props = createProps(time, fail);
-		JobRunner runner = new JobRunner(node, props, workingDir, loader, jobtypeManager, hadoopSecurityManager);
+		
+		JobRunner runner = new JobRunner(node, props, props, workingDir, loader, jobtypeManager);
 		runner.addListener(listener);
 		return runner;
 	}
+	
+	private static String getSourcePathFromClass(Class containedClass) {
+		File file = new File(containedClass.getProtectionDomain().getCodeSource().getLocation().getPath());
+
+		if (!file.isDirectory() && file.getName().endsWith(".class")) {
+			String name = containedClass.getName();
+			StringTokenizer tokenizer = new StringTokenizer(name, ".");
+			while(tokenizer.hasMoreTokens()) {
+				tokenizer.nextElement();
+				file = file.getParentFile();
+			}
+			return file.getPath();  
+		}
+		else {
+			return containedClass.getProtectionDomain().getCodeSource().getLocation().getPath();
+		}
+	}
 }
\ No newline at end of file
diff --git a/unit/java/azkaban/test/jobExecutor/AllJobExecutorTests.java b/unit/java/azkaban/test/jobExecutor/AllJobExecutorTests.java
index 4579331..83e55b2 100644
--- a/unit/java/azkaban/test/jobExecutor/AllJobExecutorTests.java
+++ b/unit/java/azkaban/test/jobExecutor/AllJobExecutorTests.java
@@ -5,7 +5,7 @@ import org.junit.runners.Suite;
 import org.junit.runners.Suite.SuiteClasses;
 
 @RunWith(Suite.class)
-@SuiteClasses({ JavaJobTest.class, ProcessJobTest.class, PythonJobTest.class })
+@SuiteClasses({ JavaProcessJobTest.class, ProcessJobTest.class, PythonJobTest.class })
 public class AllJobExecutorTests {
 
 }
diff --git a/unit/java/azkaban/test/jobExecutor/ProcessJobTest.java b/unit/java/azkaban/test/jobExecutor/ProcessJobTest.java
index fabefe3..fde3e53 100644
--- a/unit/java/azkaban/test/jobExecutor/ProcessJobTest.java
+++ b/unit/java/azkaban/test/jobExecutor/ProcessJobTest.java
@@ -34,7 +34,7 @@ public class ProcessJobTest
 //    
 //    EasyMock.replay(props);
     
-    job = new ProcessJob("TestProcess", props, log);
+    job = new ProcessJob("TestProcess", props, props, log);
     
 
   }
diff --git a/unit/java/azkaban/test/jobExecutor/PythonJobTest.java b/unit/java/azkaban/test/jobExecutor/PythonJobTest.java
index 0881eff..d42cd22 100644
--- a/unit/java/azkaban/test/jobExecutor/PythonJobTest.java
+++ b/unit/java/azkaban/test/jobExecutor/PythonJobTest.java
@@ -90,7 +90,7 @@ public class PythonJobTest
 //    EasyMock.expect(descriptor.getProps()).andReturn(props).times(3);
 //    EasyMock.expect(descriptor.getFullPath()).andReturn(".").times(1);
 //    EasyMock.replay(descriptor);
-    job = new PythonJob("TestProcess", props, log);
+    job = new PythonJob("TestProcess", props, props, log);
 //    EasyMock.verify(descriptor);
     try
     {