azkaban-uncached

Changes

README.md 4(+2 -2)

Details

README.md 4(+2 -2)

diff --git a/README.md b/README.md
index 897a0b5..672010c 100644
--- a/README.md
+++ b/README.md
@@ -12,8 +12,8 @@ Here are a few features:
 * Project workspaces
 * Logging and auditing of workflow and jobs
 
-To go to the package download links: [Download-Packages](azkaban2/wiki/Download-Packages)
+To go to the package download links: [Download-Packages](https://github.com/azkaban/azkaban2/wiki/Download-Packages)
 
-To go to the azkaban-plugins github repo: [Azkaban-plugins](azkaban-plugins)
+To go to the azkaban-plugins github repo: [Azkaban-plugins](https://github.com/azkaban/azkaban-plugins)
 
 There is a google groups: [Azkaban Group](https://groups.google.com/forum/?fromgroups#!forum/azkaban-dev)
diff --git a/src/java/azkaban/execapp/ExecutorServlet.java b/src/java/azkaban/execapp/ExecutorServlet.java
index 4fd55c2..1475a9a 100644
--- a/src/java/azkaban/execapp/ExecutorServlet.java
+++ b/src/java/azkaban/execapp/ExecutorServlet.java
@@ -58,13 +58,16 @@ public class ExecutorServlet extends HttpServlet implements ConnectorParams {
 	@Override
 	public void doGet(HttpServletRequest req, HttpServletResponse resp) throws ServletException, IOException {
 		HashMap<String,Object> respMap= new HashMap<String,Object>();
+		logger.info("ExecutorServer called by " + req.getRemoteAddr());
 		try {
 			if (!hasParam(req, ACTION_PARAM)) {
+				logger.error("Parameter action not set");
 				respMap.put("error", "Parameter action not set");
 			}
 			else {
 				String action = getParam(req, ACTION_PARAM);
 				if (action.equals(UPDATE_ACTION)) {
+					logger.info("Updated called");
 					handleAjaxUpdateRequest(req, respMap);
 				}
 				else if (action.equals(PING_ACTION)) {
@@ -74,7 +77,8 @@ public class ExecutorServlet extends HttpServlet implements ConnectorParams {
 					int execid = Integer.parseInt(getParam(req, EXECID_PARAM));
 					String user = getParam(req, USER_PARAM, null);
 					
-					if (action.equals(LOG_ACTION)) {
+					logger.info("User " + user + " has called action " + action + " on " + execid);
+					if (action.equals(LOG_ACTION)) { 
 						handleFetchLogEvent(execid, req, resp, respMap);
 					}
 					else if (action.equals(EXECUTE_ACTION)) {
@@ -100,11 +104,13 @@ public class ExecutorServlet extends HttpServlet implements ConnectorParams {
 						handleModifyExecutionRequest(respMap, execid, user, req);
 					}
 					else {
+						logger.error("action: '" + action + "' not supported.");
 						respMap.put("error", "action: '" + action + "' not supported.");
 					}
 				}
 			}
 		} catch (Exception e) {
+			logger.error(e);
 			respMap.put(RESPONSE_ERROR, e.getMessage());
 		}
 		writeJSON(resp, respMap);
@@ -139,6 +145,7 @@ public class ExecutorServlet extends HttpServlet implements ConnectorParams {
 				
 			}
 		} catch (ExecutorManagerException e) {
+			logger.error(e);
 			respMap.put("error", e.getMessage());
 		}
 	}
@@ -157,6 +164,7 @@ public class ExecutorServlet extends HttpServlet implements ConnectorParams {
 				result = flowRunnerManager.readFlowLogs(execId, startByte, length);
 				respMap.putAll(result.toObject());
 			} catch (Exception e) {
+				logger.error(e);
 				respMap.put(RESPONSE_ERROR, e.getMessage());
 			}
 		}
@@ -167,6 +175,7 @@ public class ExecutorServlet extends HttpServlet implements ConnectorParams {
 				LogData result = flowRunnerManager.readJobLogs(execId, jobId, attempt, startByte, length);
 				respMap.putAll(result.toObject());
 			} catch (Exception e) {
+				logger.error(e);
 				respMap.put("error", e.getMessage());
 			}
 		}
@@ -203,6 +212,8 @@ public class ExecutorServlet extends HttpServlet implements ConnectorParams {
 		try {
 			flowRunnerManager.submitFlow(execId);
 		} catch (ExecutorManagerException e) {
+			e.printStackTrace();
+			logger.error(e);
 			respMap.put(RESPONSE_ERROR, e.getMessage());
 		}
 	}
@@ -228,7 +239,7 @@ public class ExecutorServlet extends HttpServlet implements ConnectorParams {
 			flowRunnerManager.pauseFlow(execid, user);
 			respMap.put(STATUS_PARAM, RESPONSE_SUCCESS);
 		} catch (ExecutorManagerException e) {
-			e.printStackTrace();
+			logger.error(e);
 			respMap.put(RESPONSE_ERROR, e.getMessage());
 		}
 	}
@@ -258,7 +269,7 @@ public class ExecutorServlet extends HttpServlet implements ConnectorParams {
 			flowRunnerManager.cancelFlow(execid, user);
 			respMap.put(STATUS_PARAM, RESPONSE_SUCCESS);
 		} catch (ExecutorManagerException e) {
-			e.printStackTrace();
+			logger.error(e);
 			respMap.put(RESPONSE_ERROR, e.getMessage());
 		}
 	}
diff --git a/src/java/azkaban/execapp/FlowRunner.java b/src/java/azkaban/execapp/FlowRunner.java
index c61d724..2f58a1a 100644
--- a/src/java/azkaban/execapp/FlowRunner.java
+++ b/src/java/azkaban/execapp/FlowRunner.java
@@ -5,6 +5,7 @@ import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.HashMap;
+import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.concurrent.BlockingQueue;
@@ -88,8 +89,14 @@ public class FlowRunner extends EventHandler implements Runnable {
 	
 	// Watches external flows for execution.
 	private FlowWatcher watcher = null;
+
+	private HashSet<String> proxyUsers = null;
+	private boolean validateUserProxy;
+	
+	private String jobLogFileSize = "5MB";
+	private int jobLogNumFiles = 4;
 	
-	public FlowRunner(ExecutableFlow flow, FlowWatcher watcher, ExecutorLoader executorLoader, ProjectLoader projectLoader, JobTypeManager jobtypeManager) throws ExecutorManagerException {
+	public FlowRunner(ExecutableFlow flow, ExecutorLoader executorLoader, ProjectLoader projectLoader, JobTypeManager jobtypeManager) throws ExecutorManagerException {
 		this.execId = flow.getExecutionId();
 		this.flow = flow;
 		this.executorLoader = executorLoader;
@@ -97,17 +104,35 @@ public class FlowRunner extends EventHandler implements Runnable {
 		this.executorService = Executors.newFixedThreadPool(numThreads);
 		this.execDir = new File(flow.getExecutionPath());
 		this.jobtypeManager = jobtypeManager;
-		
+
 		this.pipelineLevel = flow.getPipelineLevel();
 		this.pipelineExecId = flow.getPipelineExecutionId();
-		this.watcher = watcher;
+
+		this.proxyUsers = flow.getProxyUsers();
 	}
 
+	public FlowRunner setFlowWatcher(FlowWatcher watcher) {
+		this.watcher = watcher;
+		return this;
+	}
+	
 	public FlowRunner setGlobalProps(Props globalProps) {
 		this.globalProps = globalProps;
 		return this;
 	}
-
+	
+	public FlowRunner setJobLogSettings(String jobLogFileSize, int jobLogNumFiles) {
+		this.jobLogFileSize = jobLogFileSize;
+		this.jobLogNumFiles = jobLogNumFiles;
+		
+		return this;
+	}
+	
+	public FlowRunner setValidateProxyUser(boolean validateUserProxy) {
+		this.validateUserProxy = validateUserProxy;
+		return this;
+	}
+	
 	public File getExecutionDir() {
 		return execDir;
 	}
@@ -360,12 +385,15 @@ public class FlowRunner extends EventHandler implements Runnable {
 		prop.setSource(path.getPath());
 		prop.setParent(parentProps);
 		
-		// should have one prop with system secrets, the other user level props
-		JobRunner jobRunner = new JobRunner(node, prop, path.getParentFile(), executorLoader, jobtypeManager, logger);
+		JobRunner jobRunner = new JobRunner(node, prop, path.getParentFile(), executorLoader, jobtypeManager);
 		if (watcher != null) {
 			jobRunner.setPipeline(watcher, pipelineLevel);
 		}
+		if (validateUserProxy) {
+			jobRunner.setValidatedProxyUsers(proxyUsers);
+		}
 		
+		jobRunner.setLogSettings(logger, jobLogFileSize, jobLogNumFiles);
 		jobRunner.addListener(listener);
 
 		return jobRunner;
diff --git a/src/java/azkaban/execapp/FlowRunnerManager.java b/src/java/azkaban/execapp/FlowRunnerManager.java
index 11c6bc1..9e9d8c7 100644
--- a/src/java/azkaban/execapp/FlowRunnerManager.java
+++ b/src/java/azkaban/execapp/FlowRunnerManager.java
@@ -18,6 +18,7 @@ package azkaban.execapp;
 
 import java.io.File;
 import java.io.FileFilter;
+import java.io.FilenameFilter;
 import java.io.IOException;
 import java.lang.Thread.State;
 import java.util.ArrayList;
@@ -79,16 +80,27 @@ public class FlowRunnerManager implements EventListener {
 	
 	private Props globalProps;
 	
+	private final Props azkabanProps;
+	
 	private long lastSubmitterThreadCheckTime = -1;
 	private long lastCleanerThreadCheckTime = -1;
 	private long executionDirRetention = 1*24*60*60*1000;
 	
-	private Object executionDirDeletionSync = new Object();
+	// We want to limit the log sizes to about 20 megs
+	private String jobLogChunkSize = "5MB";
+	private int jobLogNumFiles = 4;
+	
+	// If true, jobs will validate proxy user against a list of valid proxy users.
+	private boolean validateProxyUser = false;
 	
+	private Object executionDirDeletionSync = new Object();
+		
 	public FlowRunnerManager(Props props, ExecutorLoader executorLoader, ProjectLoader projectLoader, ClassLoader parentClassLoader) throws IOException {
 		executionDirectory = new File(props.getString("azkaban.execution.dir", "executions"));
 		projectDirectory = new File(props.getString("azkaban.project.dir", "projects"));
 		
+		azkabanProps = props;
+		
 		//JobWrappingFactory.init(props, getClass().getClassLoader());
 		executionDirRetention = props.getLong("execution.dir.retention", executionDirRetention);
 		logger.info("Execution dir retention set to " + executionDirRetention + " ms");
@@ -100,6 +112,8 @@ public class FlowRunnerManager implements EventListener {
 			projectDirectory.mkdirs();
 		}
 
+		installedProjects = loadExistingProjects();
+		
 		//azkaban.temp.dir
 		numThreads = props.getInt("executor.flow.threads", DEFAULT_NUM_EXECUTING_FLOWS);
 		executorService = Executors.newFixedThreadPool(numThreads);
@@ -107,6 +121,11 @@ public class FlowRunnerManager implements EventListener {
 		this.executorLoader = executorLoader;
 		this.projectLoader = projectLoader;
 		
+		this.jobLogChunkSize = azkabanProps.getString("job.log.chunk.size", "5MB");
+		this.jobLogNumFiles = azkabanProps.getInt("job.log.backup.index", 4);
+		
+		this.validateProxyUser = azkabanProps.getBoolean("proxy.user.lock.down", false);
+		
 		submitterThread = new SubmitterThread(flowQueue);
 		submitterThread.start();
 		
@@ -117,6 +136,32 @@ public class FlowRunnerManager implements EventListener {
 		
 	}
 
+	private Map<Pair<Integer, Integer>, ProjectVersion> loadExistingProjects() {
+		Map<Pair<Integer, Integer>, ProjectVersion> allProjects = new HashMap<Pair<Integer,Integer>, ProjectVersion>();
+		for(File project : projectDirectory.listFiles(new FilenameFilter() {
+			
+			String pattern = "[0-9]+\\.[0-9]+";
+			@Override
+			public boolean accept(File dir, String name) {
+				return name.matches(pattern);
+			}
+		})) {
+			if(project.isDirectory()) {
+				try {
+					String fileName = new File(project.getAbsolutePath()).getName();
+					int projectId = Integer.parseInt(fileName.split("\\.")[0]);
+					int versionNum = Integer.parseInt(fileName.split("\\.")[1]);
+					ProjectVersion version = new ProjectVersion(projectId, versionNum);
+					allProjects.put(new Pair<Integer, Integer>(projectId, versionNum), version);
+				}
+				catch (Exception e) {
+					e.printStackTrace();
+				}
+			}
+		}
+		return allProjects;
+	}
+
 	public Props getGlobalProps() {
 		return globalProps;
 	}
@@ -334,8 +379,11 @@ public class FlowRunnerManager implements EventListener {
 				watcher = new RemoteFlowWatcher(pipelineExecId, executorLoader);
 			}
 		}
-		
-		FlowRunner runner = new FlowRunner(flow, watcher, executorLoader, projectLoader, jobtypeManager);
+
+		FlowRunner runner = new FlowRunner(flow, executorLoader, projectLoader, jobtypeManager);
+		runner.setFlowWatcher(watcher);
+		runner.setJobLogSettings(jobLogChunkSize, jobLogNumFiles);
+		runner.setValidateProxyUser(validateProxyUser);
 		runner.setGlobalProps(globalProps);
 		runner.addListener(this);
 		
diff --git a/src/java/azkaban/execapp/JobRunner.java b/src/java/azkaban/execapp/JobRunner.java
index f6c482b..a14aff5 100644
--- a/src/java/azkaban/execapp/JobRunner.java
+++ b/src/java/azkaban/execapp/JobRunner.java
@@ -16,17 +16,21 @@ package azkaban.execapp;
  */
 
 import java.io.File;
+import java.io.FilenameFilter;
 import java.io.IOException;
 import java.util.ArrayList;
+
 import java.util.HashSet;
-import java.util.List;
 import java.util.Set;
 
+import java.util.Arrays;
+import java.util.Collections;
+
 import org.apache.log4j.Appender;
-import org.apache.log4j.FileAppender;
 import org.apache.log4j.Layout;
 import org.apache.log4j.Logger;
 import org.apache.log4j.PatternLayout;
+import org.apache.log4j.RollingFileAppender;
 
 import azkaban.execapp.event.BlockingStatus;
 import azkaban.execapp.event.Event;
@@ -41,12 +45,13 @@ import azkaban.flow.CommonJobProperties;
 import azkaban.jobExecutor.AbstractProcessJob;
 import azkaban.jobExecutor.Job;
 import azkaban.jobtype.JobTypeManager;
+import azkaban.jobtype.JobTypeManagerException;
 
 import azkaban.utils.Props;
 
 public class JobRunner extends EventHandler implements Runnable {
 	private static final Layout DEFAULT_LAYOUT = new PatternLayout("%d{dd-MM-yyyy HH:mm:ss z} %c{1} %p - %m\n");
-
+	
 	private ExecutorLoader loader;
 	private Props props;
 	private Props outputProps;
@@ -67,28 +72,44 @@ public class JobRunner extends EventHandler implements Runnable {
 	private Object syncObject = new Object();
 	
 	private final JobTypeManager jobtypeManager;
+
+	// Used by the job to watch and block against another flow
 	private Integer pipelineLevel = null;
 	private FlowWatcher watcher = null;
 	private Set<String> pipelineJobs = new HashSet<String>();
+
+	private Set<String> proxyUsers = null;
+
+	private String jobLogChunkSize;
+	private int jobLogBackupIndex;
 	
-	public JobRunner(ExecutableNode node, Props props, File workingDir, ExecutorLoader loader, JobTypeManager jobtypeManager, Logger flowLogger) {
+	public JobRunner(ExecutableNode node, Props props, File workingDir, ExecutorLoader loader, JobTypeManager jobtypeManager) {
 		this.props = props;
 		this.node = node;
 		this.workingDir = workingDir;
 		this.executionId = node.getExecutionId();
 		this.loader = loader;
 		this.jobtypeManager = jobtypeManager;
+	}
+	
+	public void setValidatedProxyUsers(Set<String> proxyUsers) {
+		this.proxyUsers = proxyUsers;
+	}
+	
+	public void setLogSettings(Logger flowLogger, String logFileChuckSize, int numLogBackup ) {
 		this.flowLogger = flowLogger;
+		this.jobLogChunkSize = logFileChuckSize;
+		this.jobLogBackupIndex = numLogBackup;
 	}
 	
 	public void setPipeline(FlowWatcher watcher, int pipelineLevel) {
 		this.watcher = watcher;
 		this.pipelineLevel = pipelineLevel;
 
-		if (pipelineLevel == 1) {
+		if (this.pipelineLevel == 1) {
 			pipelineJobs.add(node.getJobId());
 		}
-		else if (pipelineLevel == 2) {
+		else if (this.pipelineLevel == 2) {
 			pipelineJobs.add(node.getJobId());
 			pipelineJobs.addAll(node.getOutNodes());
 		}
@@ -115,8 +136,9 @@ public class JobRunner extends EventHandler implements Runnable {
 
 			jobAppender = null;
 			try {
-				FileAppender fileAppender = new FileAppender(loggerLayout, absolutePath, true);
-				
+				RollingFileAppender fileAppender = new RollingFileAppender(loggerLayout, absolutePath, true);
+				fileAppender.setMaxBackupIndex(jobLogBackupIndex);
+				fileAppender.setMaxFileSize(jobLogChunkSize);
 				jobAppender = fileAppender;
 				logger.addAppender(jobAppender);
 			} catch (IOException e) {
@@ -199,6 +221,10 @@ public class JobRunner extends EventHandler implements Runnable {
 				fireEvent(Event.create(this, Type.JOB_STATUS_CHANGED), false);
 				runJob();
 			}
+			else {
+				node.setStatus(Status.FAILED);
+				logError("Job run failed!");
+			}
 			
 			node.setEndTime(System.currentTimeMillis());
 
@@ -209,7 +235,18 @@ public class JobRunner extends EventHandler implements Runnable {
 			
 			if (logFile != null) {
 				try {
-					loader.uploadLogFile(executionId, node.getJobId(), node.getAttempt(), logFile);
+					File[] files = logFile.getParentFile().listFiles(new FilenameFilter() {
+						
+						@Override
+						public boolean accept(File dir, String name) {
+							return name.startsWith(logFile.getName());
+						}
+					} 
+					);
+					Arrays.sort(files, Collections.reverseOrder());
+					
+					
+					loader.uploadLogFile(executionId, node.getJobId(), node.getAttempt(), files);
 				} catch (ExecutorManagerException e) {
 					flowLogger.error("Error writing out logs for job " + node.getJobId(), e);
 				}
@@ -258,9 +295,23 @@ public class JobRunner extends EventHandler implements Runnable {
 			if (!props.containsKey(AbstractProcessJob.WORKING_DIR)) {
 				props.put(AbstractProcessJob.WORKING_DIR, workingDir.getAbsolutePath());
 			}
-
+			
+			if(props.containsKey("user.to.proxy")) {
+				String jobProxyUser = props.getString("user.to.proxy");
+				if(proxyUsers != null && !proxyUsers.contains(jobProxyUser)) {
+					logger.error("User " + jobProxyUser + " has no permission to execute this job " + node.getJobId() + "!");
+					return false;
+				}
+			}
+			
 			//job = JobWrappingFactory.getJobWrappingFactory().buildJobExecutor(node.getJobId(), props, logger);
-			job = jobtypeManager.buildJobExecutor(node.getJobId(), props, logger);
+			try {
+				job = jobtypeManager.buildJobExecutor(node.getJobId(), props, logger);
+			}
+			catch (JobTypeManagerException e) {
+				logger.error("Failed to build job type, skipping this job");
+				return false;
+			}
 		}
 		
 		return true;
diff --git a/src/java/azkaban/executor/ExecutableFlow.java b/src/java/azkaban/executor/ExecutableFlow.java
index d2d44dc..5d25081 100644
--- a/src/java/azkaban/executor/ExecutableFlow.java
+++ b/src/java/azkaban/executor/ExecutableFlow.java
@@ -60,6 +60,8 @@ public class ExecutableFlow {
 	private Integer pipelineLevel = null;
 	private Integer pipelineExecId = null;
 	private Map<String, String> flowParameters = new HashMap<String, String>();
+	
+	private HashSet<String> proxyUsers = new HashSet<String>();
 
 	public enum FailureAction {
 		FINISH_CURRENTLY_RUNNING,
@@ -117,6 +119,14 @@ public class ExecutableFlow {
 		return flowParameters;
 	}
 	
+	public void setProxyUsers(HashSet<String> proxyUsers) {
+		this.proxyUsers = proxyUsers;
+	}
+	
+	public HashSet<String> getProxyUsers() {
+		return this.proxyUsers;
+	}
+	
 	private void setFlow(Flow flow) {
 		for (Node node: flow.getNodes()) {
 			String id = node.getId();
@@ -310,6 +320,10 @@ public class ExecutableFlow {
 			nodes.add(node.toObject());
 		}
 		flowObj.put("nodes", nodes);
+		
+		ArrayList<String> proxyUserList = new ArrayList<String>(proxyUsers);
+		
+		flowObj.put("proxyUsers", proxyUserList);
 
 		return flowObj;
 	}
@@ -447,6 +461,11 @@ public class ExecutableFlow {
 		// Failure emails
 		exFlow.setFailureEmails((List<String>)flowObj.get("failureEmails"));
 		
+		if(flowObj.containsKey("proxyUsers")) {
+			ArrayList<String> proxyUserList = (ArrayList<String>) flowObj.get("proxyUsers");
+			exFlow.setProxyUsers(new HashSet<String>(proxyUserList));
+		}
+		
 		return exFlow;
 	}
 	
diff --git a/src/java/azkaban/executor/JdbcExecutorLoader.java b/src/java/azkaban/executor/JdbcExecutorLoader.java
index 2fb2dbe..481a8fe 100644
--- a/src/java/azkaban/executor/JdbcExecutorLoader.java
+++ b/src/java/azkaban/executor/JdbcExecutorLoader.java
@@ -684,6 +684,7 @@ public class JdbcExecutorLoader implements ExecutorLoader {
 			do {
 				//int execId = rs.getInt(1);
 				//String name = rs.getString(2);
+				@SuppressWarnings("unused")
 				int attempt = rs.getInt(3);
 				EncodingType encType = EncodingType.fromInteger(rs.getInt(4));
 				int startByte = rs.getInt(5);
diff --git a/src/java/azkaban/jobtype/JobTypeManager.java b/src/java/azkaban/jobtype/JobTypeManager.java
index d43b0da..65c747e 100644
--- a/src/java/azkaban/jobtype/JobTypeManager.java
+++ b/src/java/azkaban/jobtype/JobTypeManager.java
@@ -73,6 +73,7 @@ public class JobTypeManager
 			}
 			catch (Exception e) {
 				logger.info("Plugin jobtypes failed to load. " + e.getCause());
+				throw new JobTypeManagerException(e);
 			}
 		}
 		
@@ -120,6 +121,7 @@ public class JobTypeManager
 					loadJob(dir, globalConf, globalSysConf);
 				}
 				catch (Exception e) {
+					logger.error("Failed to load jobtype " + dir.getName() + e.getMessage());
 					throw new JobTypeManagerException(e);
 				}
 			}
@@ -195,7 +197,7 @@ public class JobTypeManager
 		try {
 			if(confFile != null) {
 				conf = new Props(commonConf, confFile);
-				conf = PropsUtils.resolveProps(conf);
+//				conf = PropsUtils.resolveProps(conf);
 			}
 			if(sysConfFile != null) {
 				sysConf = new Props(commonSysConf, sysConfFile);
@@ -203,7 +205,7 @@ public class JobTypeManager
 			}
 		}
 		catch (Exception e) {
-			throw new JobTypeManagerException("Failed to get jobtype properties", e);
+			throw new JobTypeManagerException("Failed to get jobtype properties" + e.getMessage());
 		}
 		sysConf.put("plugin.dir", dir.getAbsolutePath());
 		
@@ -221,7 +223,7 @@ public class JobTypeManager
 			try {
 				if(f.getName().endsWith(".jar")) {
 					resources.add(f.toURI().toURL());
-					logger.info("adding to classpath " + f);
+					logger.info("adding to classpath " + f.toURI().toURL());
 				}
 			} catch (MalformedURLException e) {
 				// TODO Auto-generated catch block
@@ -240,6 +242,23 @@ public class JobTypeManager
 		catch (ClassNotFoundException e) {
 			throw new JobTypeManagerException(e);
 		}
+		
+		logger.info("Doing simple testing...");
+		try {
+			Props fakeSysProps = new Props(sysConf);
+			fakeSysProps.put("type", jobtypeName);
+			Props fakeJobProps = new Props(conf);
+			Job job = (Job)Utils.callConstructor(clazz, "dummy", fakeSysProps, fakeJobProps, logger);
+		}
+		catch (Exception e) {
+			logger.info("Jobtype " + jobtypeName + " failed test!", e);
+			throw new JobExecutionException(e);
+		}
+		catch (Throwable t) {
+			logger.info("Jobtype " + jobtypeName + " failed test!", t);
+			throw new JobExecutionException(t);
+		}
+		
 		logger.info("Loaded jobtype " + jobtypeName + " " + jobtypeClass);
 		
 		if(conf != null) jobtypeJobProps.put(jobtypeName, conf);
@@ -249,7 +268,7 @@ public class JobTypeManager
 	
 	public Job buildJobExecutor(String jobId, Props jobProps, Logger logger) throws JobTypeManagerException
 	{
-		Job job;
+		Job job = null;
 		try {
 			String jobType = jobProps.getString("type");
 			if (jobType == null || jobType.length() == 0) {
@@ -274,14 +293,12 @@ public class JobTypeManager
 				Props p = jobtypeJobProps.get(jobType);
 				for(String k : p.getKeySet())
 				{
-					if(!jobProps.containsKey(k)) {
-						jobProps.put(k, p.get(k));
+					if(!jobConf.containsKey(k)) {
+						jobConf.put(k, p.get(k));
 					}
 				}
 			}
-			else {
-				jobConf = jobProps;
-			}
+			jobConf = PropsUtils.resolveProps(jobConf);
 
 			if (sysConf != null) {
 				sysConf = PropsUtils.resolveProps(sysConf);
@@ -289,19 +306,24 @@ public class JobTypeManager
 			else {
 				sysConf = new Props();
 			}
-
-			jobConf = PropsUtils.resolveProps(jobConf);
+			
 			
 //			logger.info("sysConf is " + sysConf);
 //			logger.info("jobConf is " + jobConf);
-			
+//			
 			job = (Job)Utils.callConstructor(executorClass, jobId, sysConf, jobConf, logger);
 			logger.info("job built.");
 		}
 		catch (Exception e) {
-			job = new InitErrorJob(jobId, e);
+			//job = new InitErrorJob(jobId, e);
+			logger.error("Failed to build job executor for job " + jobId + e.getMessage());
+			throw new JobTypeManagerException("Failed to build job executor for job " + jobId, e);
 			//throw new JobTypeManagerException(e);
 		}
+		catch (Throwable t) {
+			logger.error("Failed to build job executor for job " + jobId + t.getMessage(), t);
+			throw new JobTypeManagerException("Failed to build job executor for job " + jobId, t);
+		}
 
 		return job;
 	}
diff --git a/src/java/azkaban/project/JdbcProjectLoader.java b/src/java/azkaban/project/JdbcProjectLoader.java
index 01976b7..c692c27 100644
--- a/src/java/azkaban/project/JdbcProjectLoader.java
+++ b/src/java/azkaban/project/JdbcProjectLoader.java
@@ -236,11 +236,11 @@ public class JdbcProjectLoader implements ProjectLoader {
 			throw new ProjectManagerException("Checking for existing project failed. " + name, e);
 		}
 		
-		final String INSERT_PROJECT = "INSERT INTO projects ( name, active, modified_time, create_time, version, last_modified_by, description) values (?,?,?,?,?,?,?)";
+		final String INSERT_PROJECT = "INSERT INTO projects ( name, active, modified_time, create_time, version, last_modified_by, description, enc_type, settings_blob) values (?,?,?,?,?,?,?,?,?)";
 		// Insert project
 		try {
 			long time = System.currentTimeMillis();
-			int i = runner.update(connection, INSERT_PROJECT, name, true, time, time, null, creator.getUserId(), description);
+			int i = runner.update(connection, INSERT_PROJECT, name, true, time, time, null, creator.getUserId(), description, defaultEncodingType.numVal, null);
 			if (i == 0) {
 				throw new ProjectManagerException("No projects have been inserted.");
 			}
@@ -504,6 +504,49 @@ public class JdbcProjectLoader implements ProjectLoader {
 		}
 	}
 	
+	
+	
+	@Override
+	public void updateProjectSettings(Project project) throws ProjectManagerException {
+		Connection connection = getConnection();
+		try {
+			updateProjectSettings(connection, project, defaultEncodingType);
+			connection.commit();
+		}
+		catch (SQLException e) {
+			throw new ProjectManagerException("Error updating project settings", e);
+		}
+		finally {
+			DbUtils.closeQuietly(connection);
+		}
+	}
+	
+	private void updateProjectSettings(Connection connection, Project project, EncodingType encType) throws ProjectManagerException {
+		QueryRunner runner = new QueryRunner();
+		final String UPDATE_PROJECT_SETTINGS = "UPDATE projects SET enc_type=?, settings_blob=? WHERE id=?";
+		
+		String json = JSONUtils.toJSON(project.toObject());
+		byte[] data = null;
+		try {
+			byte[] stringData = json.getBytes("UTF-8");
+			data = stringData;
+	
+			if (encType == EncodingType.GZIP) {
+				data = GZIPUtils.gzipBytes(stringData);
+			}
+			logger.debug("NumChars: " + json.length() + " UTF-8:" + stringData.length + " Gzip:"+ data.length);
+		} catch(IOException e) {
+			throw new ProjectManagerException("Failed to encode. ", e);
+		}
+
+		try {
+			runner.update(connection, UPDATE_PROJECT_SETTINGS, encType.numVal, data, project.getId());
+			connection.commit();
+		} catch (SQLException e) {
+			throw new ProjectManagerException("Error updating project " + project.getName() + " version " + project.getVersion(), e);
+		}
+	}
+	
 	@Override
 	public void removePermission(Project project, String name, boolean isGroup) throws ProjectManagerException {
 		QueryRunner runner = new QueryRunner(dataSource);
@@ -525,6 +568,21 @@ public class JdbcProjectLoader implements ProjectLoader {
 	}
 	
 	@Override
+	public List<Triple<String, Boolean, Permission>> getProjectPermissions(int projectId) throws ProjectManagerException {
+		ProjectPermissionsResultHandler permHander = new ProjectPermissionsResultHandler();
+		QueryRunner runner = new QueryRunner(dataSource);
+		List<Triple<String, Boolean,Permission>> permissions = null;
+		try {
+			permissions = runner.query(ProjectPermissionsResultHandler.SELECT_PROJECT_PERMISSION, permHander, projectId);
+		} catch (SQLException e) {
+			throw new ProjectManagerException("Query for permissions for " + projectId + " failed.", e);
+		}
+		
+		return permissions;
+	}
+	
+	
+	@Override
 	public void removeProject(Project project, String user) throws ProjectManagerException {
 		QueryRunner runner = new QueryRunner(dataSource);
 		
@@ -927,13 +985,13 @@ public class JdbcProjectLoader implements ProjectLoader {
 	
 	private static class ProjectResultHandler implements ResultSetHandler<List<Project>> {
 		private static String SELECT_PROJECT_BY_ID = 
-				"SELECT id, name, active, modified_time, create_time, version, last_modified_by, description FROM projects WHERE id=?";
+				"SELECT id, name, active, modified_time, create_time, version, last_modified_by, description, enc_type, settings_blob FROM projects WHERE id=?";
 		
 		private static String SELECT_ALL_ACTIVE_PROJECTS = 
-				"SELECT id, name, active, modified_time, create_time, version, last_modified_by, description FROM projects WHERE active=true";
+				"SELECT id, name, active, modified_time, create_time, version, last_modified_by, description, enc_type, settings_blob FROM projects WHERE active=true";
 		
 		private static String SELECT_ACTIVE_PROJECT_BY_NAME = 
-				"SELECT id, name, active, modified_time, create_time, version, last_modified_by, description FROM projects WHERE name=? AND active=true";
+				"SELECT id, name, active, modified_time, create_time, version, last_modified_by, description, enc_type, settings_blob FROM projects WHERE name=? AND active=true";
 		
 		@Override
 		public List<Project> handle(ResultSet rs) throws SQLException {
@@ -951,8 +1009,35 @@ public class JdbcProjectLoader implements ProjectLoader {
 				int version = rs.getInt(6);
 				String lastModifiedBy = rs.getString(7);
 				String description = rs.getString(8);
+				int encodingType = rs.getInt(9);
+				byte[] data = rs.getBytes(10);
+				
+				Project project;
+				if (data != null) {
+					EncodingType encType = EncodingType.fromInteger(encodingType);
+					Object blobObj;
+					try {
+						// Convoluted way to inflate strings. Should find common package or helper function.
+						if (encType == EncodingType.GZIP) {
+							// Decompress the sucker.
+							String jsonString = GZIPUtils.unGzipString(data, "UTF-8");
+							blobObj = JSONUtils.parseJSONFromString(jsonString);
+						}
+						else {
+							String jsonString = new String(data, "UTF-8");
+							blobObj = JSONUtils.parseJSONFromString(jsonString);
+						}	
+						project = Project.projectFromObject(blobObj);
+					} catch (IOException e) {
+						throw new SQLException("Failed to get project.", e);
+					}
+				}
+				else {
+					project = new Project(id, name);
+				}
+				
+				// update the fields as they may have changed
 				
-				Project project = new Project(id, name);
 				project.setActive(active);
 				project.setLastModifiedTimestamp(modifiedTime);
 				project.setCreateTimestamp(createTime);
diff --git a/src/java/azkaban/project/Project.java b/src/java/azkaban/project/Project.java
index 5a611fc..7344820 100644
--- a/src/java/azkaban/project/Project.java
+++ b/src/java/azkaban/project/Project.java
@@ -18,6 +18,7 @@ package azkaban.project;
 
 import java.util.ArrayList;
 import java.util.HashMap;
+import java.util.HashSet;
 import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
@@ -41,6 +42,15 @@ public class Project {
 	private LinkedHashMap<String, Permission> userPermissionMap = new LinkedHashMap<String, Permission>();
 	private LinkedHashMap<String, Permission> groupPermissionMap = new LinkedHashMap<String, Permission>();
 	private Map<String, Flow> flows = null;
+	private HashSet<String> proxyUsers = new HashSet<String>();
+	
+	public HashSet<String> getProxyUsers() {
+		return proxyUsers;
+	}
+	
+	public void setProxyUsers(HashSet<String> proxyUsers) {
+		this.proxyUsers = proxyUsers;
+	}
 	
 	public Project(int id, String name) {
 		this.id = id;
@@ -235,8 +245,13 @@ public class Project {
 			userMap.put("permissions", entry.getValue().toStringArray());
 			users.add(userMap);
 		}
-
+		
 		projectObject.put("users", users);
+		
+
+		ArrayList<String> proxyUserList = new ArrayList<String>(proxyUsers);
+		projectObject.put("proxyUsers", proxyUserList);
+		
 		return projectObject;
 	}
 
@@ -277,6 +292,10 @@ public class Project {
 
 			project.setUserPermission(userid, perm);
 		}
+		
+		List<String> proxyUserList = (List<String>) projectObject.get("proxyUsers");
+		HashSet<String> proxyUsers = new HashSet<String>(proxyUserList);
+		project.setProxyUsers(proxyUsers);
 
 		return project;
 	}
@@ -390,4 +409,27 @@ public class Project {
 	public void setVersion(int version) {
 		this.version = version;
 	}
+
+	public List<String> getProxyUserList() {
+		return new ArrayList<String>(proxyUsers);
+	}
+
+//	public Object getSettingsObject() {
+//		HashMap<String, Object> projectObject = new HashMap<String, Object>();
+//		ArrayList<String> proxyUserList = new ArrayList<String>(proxyUsers);
+//		projectObject.put("proxyUsers", proxyUserList);
+//		
+//		return projectObject;
+//	}
+//	
+//	@SuppressWarnings("unchecked")
+//	public static HashSet<String> proxyUserFromSettingsObj(Object object) {
+//		Map<String, Object> settingsObj = (Map<String, Object>) object;
+//
+//		List<String> proxyUserList = (List<String>) settingsObj.get("proxyUsers");
+//		HashSet<String> proxyUsers = new HashSet<String>(proxyUserList);
+//		
+//		return proxyUsers;
+//	}
+
 }
diff --git a/src/java/azkaban/project/ProjectLoader.java b/src/java/azkaban/project/ProjectLoader.java
index 4711dc0..7e99298 100644
--- a/src/java/azkaban/project/ProjectLoader.java
+++ b/src/java/azkaban/project/ProjectLoader.java
@@ -10,6 +10,7 @@ import azkaban.project.ProjectLogEvent.EventType;
 import azkaban.user.Permission;
 import azkaban.user.User;
 import azkaban.utils.Props;
+import azkaban.utils.Triple;
 
 public interface ProjectLoader {
 
@@ -218,5 +219,9 @@ public interface ProjectLoader {
 	public void updateProjectProperty(Project project, Props props) throws ProjectManagerException;
 
 	Props fetchProjectProperty(int projectId, int projectVer, String propsName) throws ProjectManagerException;
+
+	List<Triple<String, Boolean, Permission>> getProjectPermissions(int projectId) throws ProjectManagerException;
+
+	void updateProjectSettings(Project project) throws ProjectManagerException;
 	
 }
\ No newline at end of file
diff --git a/src/java/azkaban/project/ProjectManager.java b/src/java/azkaban/project/ProjectManager.java
index e526474..a5be28b 100644
--- a/src/java/azkaban/project/ProjectManager.java
+++ b/src/java/azkaban/project/ProjectManager.java
@@ -32,6 +32,7 @@ public class ProjectManager {
 	private final Props props;
 	private final File tempDir;
 	private final int projectVersionRetention;
+	private final boolean creatorDefaultPermissions;
 	
 	public ProjectManager(ProjectLoader loader, Props props) {
 		this.projectLoader = loader;
@@ -40,6 +41,8 @@ public class ProjectManager {
 		this.projectVersionRetention = (props.getInt("project.version.retention", 3));
 		logger.info("Project version retention is set to " + projectVersionRetention);
 		
+		this.creatorDefaultPermissions = props.getBoolean("azkaban.creator.default.permissions", true);
+		
 		if (!tempDir.exists()) {
 			tempDir.mkdirs();
 		}
@@ -170,8 +173,19 @@ public class ProjectManager {
 		projectsByName.put(newProject.getName(), newProject);
 		projectsById.put(newProject.getId(), newProject);
 		
+		if(creatorDefaultPermissions) {
 		// Add permission to project
-		projectLoader.updatePermission(newProject, creator.getUserId(), new Permission(Permission.Type.ADMIN), false);
+			projectLoader.updatePermission(newProject, creator.getUserId(), new Permission(Permission.Type.ADMIN), false);
+			
+			// Add proxy user 
+			newProject.getProxyUsers().add(creator.getUserId());
+			try {
+				updateProjectSetting(newProject);
+			} catch (ProjectManagerException e) {
+				e.printStackTrace();
+				throw e;
+			}
+		}
 		
 		projectLoader.postEvent(newProject, EventType.CREATED, creator.getUserId(), null);
 		
@@ -217,6 +231,10 @@ public class ProjectManager {
 		return;
 	}
 
+	public void updateProjectSetting(Project project) throws ProjectManagerException {
+		projectLoader.updateProjectSettings(project);		
+	}
+	
 	public void updateProjectPermission(Project project, String name, Permission perm, boolean group, User modifier) throws ProjectManagerException {
 		logger.info("User " + modifier.getUserId() + " updating permissions for project " + project.getName() + " for " + name + " " + perm.toString());
 		projectLoader.updatePermission(project, name, perm, group);
@@ -329,4 +347,6 @@ public class ProjectManager {
 	public void postProjectEvent(Project project, EventType type, String user,String message) {
 		projectLoader.postEvent(project, type, user, message);
 	}
+
+
 }
diff --git a/src/java/azkaban/scheduler/ScheduleManager.java b/src/java/azkaban/scheduler/ScheduleManager.java
index 540fea6..7fad4e2 100644
--- a/src/java/azkaban/scheduler/ScheduleManager.java
+++ b/src/java/azkaban/scheduler/ScheduleManager.java
@@ -379,6 +379,7 @@ public class ScheduleManager {
 										// Create ExecutableFlow
 										ExecutableFlow exflow = new ExecutableFlow(flow);
 										exflow.setSubmitUser(runningSched.getSubmitUser());
+										exflow.setProxyUsers(project.getProxyUsers());
 										
 										FlowOptions flowOptions = runningSched.getFlowOptions();
 										
diff --git a/src/java/azkaban/sla/SLAManager.java b/src/java/azkaban/sla/SLAManager.java
index 124e8f4..911f9e3 100644
--- a/src/java/azkaban/sla/SLAManager.java
+++ b/src/java/azkaban/sla/SLAManager.java
@@ -3,34 +3,23 @@ package azkaban.sla;
 import java.lang.Thread.State;
 import java.util.ArrayList;
 import java.util.Comparator;
-import java.util.LinkedHashMap;
 import java.util.List;
-import java.util.Map;
 import java.util.concurrent.PriorityBlockingQueue;
 import java.util.concurrent.atomic.AtomicBoolean;
 
 import org.apache.log4j.Logger;
-import org.apache.velocity.runtime.parser.node.GetExecutor;
 import org.joda.time.DateTime;
-import org.joda.time.DateTimeZone;
-import org.joda.time.ReadablePeriod;
-import org.joda.time.format.DateTimeFormat;
-import org.joda.time.format.DateTimeFormatter;
 
 import azkaban.executor.ExecutableFlow;
 import azkaban.executor.ExecutableNode;
-import azkaban.executor.ExecutorMailer;
 import azkaban.executor.ExecutorManager;
 import azkaban.executor.ExecutorManagerException;
+
 import azkaban.executor.Status;
-import azkaban.flow.Flow;
-import azkaban.project.Project;
-import azkaban.project.ProjectManager;
+
 import azkaban.sla.SLA.SlaAction;
 import azkaban.sla.SLA.SlaRule;
 import azkaban.sla.SLA.SlaSetting;
-import azkaban.user.User;
-import azkaban.utils.Pair;
 import azkaban.utils.Props;
 
 /*
@@ -61,7 +50,6 @@ public class SLAManager {
 	private SLALoader loader;
 
 	private final SLARunner runner;
-	private final SLAPreRunner prerunner;
 	private final ExecutorManager executorManager;
 	private SlaMailer mailer;
 
@@ -82,7 +70,6 @@ public class SLAManager {
 		this.loader = loader;
 		this.mailer = new SlaMailer(props);
 		this.runner = new SLARunner();
-		this.prerunner = new SLAPreRunner();
 
 		List<SLA> SLAList = null;
 		try {
@@ -105,7 +92,6 @@ public class SLAManager {
 	 */
 	public void shutdown() {
 		this.runner.shutdown();
-		this.prerunner.shutdown();
 	}
 
 	/**
@@ -476,90 +462,6 @@ public class SLAManager {
 		}
 		mailer.sendSlaEmail(s, message);
 	}
-	
-	
-	public class SLAPreRunner extends Thread {
-		private final List<SLA> preSlas;
-		private AtomicBoolean stillAlive = new AtomicBoolean(true);
-
-		// Five minute minimum intervals
-		private static final int TIMEOUT_MS = 300000;
-
-		public SLAPreRunner() {
-			preSlas = new ArrayList<SLA>();
-		}
-
-		public void shutdown() {
-			logger.error("Shutting down pre-sla checker thread");
-			stillAlive.set(false);
-			this.interrupt();
-		}
-
-		/**
-		 * Return a list of flow with SLAs
-		 * 
-		 * @return
-		 */
-		protected synchronized List<SLA> getPreSlas() {
-			return new ArrayList<SLA>(preSlas);
-		}
-
-		/**
-		 * Adds SLA into runner and then interrupts so it will update
-		 * its wait time.
-		 * 
-		 * @param flow
-		 */
-		public synchronized void addCheckerPreSla(SLA s) {
-			logger.info("Adding " + s + " to pre-sla checker.");
-			preSlas.add(s);
-			this.interrupt();
-		}
-		
-		/**
-		 * Remove runner SLA. Does not interrupt.
-		 * 
-		 * @param flow
-		 * @throws SLAManagerException 
-		 */
-		public synchronized void removeCheckerPreSla(SLA s) {
-			logger.info("Removing " + s + " from the pre-sla checker.");
-			preSlas.remove(s);
-		}
-
-		public void run() {
-			while (stillAlive.get()) {
-				synchronized (this) {
-					try {
-						// TODO clear up the exception handling
-
-						if (preSlas.size() == 0) {
-							try {
-								this.wait(TIMEOUT_MS);
-							} catch (InterruptedException e) {
-								// interruption should occur when items are added or removed from the queue.
-							}
-						} else {
-							for(SLA s : preSlas) {
-								ExecutableFlow exflow = executorManager.getExecutableFlow(s.getExecId());
-								String id = s.getJobName();
-								if(!s.equals("")) {
-									ExecutableNode exnode = exflow.getExecutableNode(id);
-									if(exnode.getStartTime() != -1) {
-										
-									}
-								}
-							}
-						}
-					} catch (Exception e) {
-						logger.error("Unexpected exception has been thrown in scheduler", e);
-					} catch (Throwable e) {
-						logger.error("Unexpected throwable has been thrown in scheduler", e);
-					}
-				}
-			}
-		}
-	}
 
 	public int getNumActiveSLA() {
 		return runner.getRunnerSLAs().size();
diff --git a/src/java/azkaban/user/UserManager.java b/src/java/azkaban/user/UserManager.java
index 114ccbe..a5329c4 100644
--- a/src/java/azkaban/user/UserManager.java
+++ b/src/java/azkaban/user/UserManager.java
@@ -58,4 +58,6 @@ public interface UserManager {
 	 * @return
 	 */
 	public Role getRole(String roleName);
+
+	public boolean validateProxyUser(String proxyUser, User realUser);
 }
diff --git a/src/java/azkaban/user/XmlUserManager.java b/src/java/azkaban/user/XmlUserManager.java
index 09539b5..7c26daf 100644
--- a/src/java/azkaban/user/XmlUserManager.java
+++ b/src/java/azkaban/user/XmlUserManager.java
@@ -19,6 +19,7 @@ package azkaban.user;
 import java.io.File;
 import java.io.IOException;
 import java.util.HashMap;
+import java.util.HashSet;
 
 import javax.xml.parsers.DocumentBuilder;
 import javax.xml.parsers.DocumentBuilderFactory;
@@ -53,6 +54,7 @@ public class XmlUserManager implements UserManager {
 	public static final String USERNAME_ATTR = "username";
 	public static final String PASSWORD_ATTR = "password";
 	public static final String ROLES_ATTR = "roles";
+	public static final String PROXY_ATTR = "proxy";
 	public static final String GROUPS_ATTR = "groups";
 
 	private String xmlPath;
@@ -60,6 +62,7 @@ public class XmlUserManager implements UserManager {
 	private HashMap<String, User> users;
 	private HashMap<String, String> userPassword;
 	private HashMap<String, Role> roles;
+	private HashMap<String, HashSet<String>> proxyUserMap;
 
 	/**
 	 * The constructor.
@@ -81,6 +84,7 @@ public class XmlUserManager implements UserManager {
 		HashMap<String, User> users = new HashMap<String, User>();
 		HashMap<String, String> userPassword = new HashMap<String, String>();
 		HashMap<String, Role> roles = new HashMap<String, Role>();
+		HashMap<String, HashSet<String>> proxyUserMap = new HashMap<String, HashSet<String>>();
 
 		// Creating the document builder to parse xml.
 		DocumentBuilderFactory docBuilderFactory = DocumentBuilderFactory
@@ -113,7 +117,7 @@ public class XmlUserManager implements UserManager {
 			Node node = azkabanUsersList.item(i);
 			if (node.getNodeType() == Node.ELEMENT_NODE) {
 				if (node.getNodeName().equals(USER_TAG)) {
-					parseUserTag(node, users, userPassword);
+					parseUserTag(node, users, userPassword, proxyUserMap);
 				}
 				else if (node.getNodeName().equals(ROLE_TAG)) {
 					parseRoleTag(node, roles);
@@ -126,10 +130,11 @@ public class XmlUserManager implements UserManager {
 			this.users = users;
 			this.userPassword = userPassword;
 			this.roles = roles;
+			this.proxyUserMap = proxyUserMap;
 		}
 	}
 
-	private void parseUserTag(Node node, HashMap<String, User> users, HashMap<String, String> userPassword) {
+	private void parseUserTag(Node node, HashMap<String, User> users, HashMap<String, String> userPassword, HashMap<String, HashSet<String>> proxyUserMap) {
 		NamedNodeMap userAttrMap = node.getAttributes();
 		Node userNameAttr = userAttrMap.getNamedItem(USERNAME_ATTR);
 		if (userNameAttr == null) {
@@ -157,6 +162,22 @@ public class XmlUserManager implements UserManager {
 				user.addRole(role);
 			}
 		}
+		
+		Node proxy = userAttrMap.getNamedItem(PROXY_ATTR);
+		if (proxy != null) {
+			String value = proxy.getNodeValue();
+			String[] groupSplit = value.split("\\s*,\\s*");
+			for (String group : groupSplit) {
+				if(proxyUserMap.containsKey(username)) {
+					proxyUserMap.get(username).add(group);
+				}
+				else {
+					HashSet<String> proxySet = new HashSet<String>();
+					proxySet.add(group);
+					proxyUserMap.put(username, proxySet);
+				}
+			}
+		}
 
 		Node groups = userAttrMap.getNamedItem(GROUPS_ATTR);
 		if (groups != null) {
@@ -248,4 +269,14 @@ public class XmlUserManager implements UserManager {
 		// Return true. Validation should be added when groups are added to the xml.
 		return true;
 	}
+
+	@Override
+	public boolean validateProxyUser(String proxyUser, User realUser) {
+		if(proxyUserMap.containsKey(realUser.getUserId()) && proxyUserMap.get(realUser.getUserId()).contains(proxyUser)) {
+			return true;
+		}
+		else {
+			return false;
+		}
+	}
 }
diff --git a/src/java/azkaban/utils/EmailMessage.java b/src/java/azkaban/utils/EmailMessage.java
index b510d6c..06ca9bc 100644
--- a/src/java/azkaban/utils/EmailMessage.java
+++ b/src/java/azkaban/utils/EmailMessage.java
@@ -15,7 +15,6 @@ import javax.mail.BodyPart;
 import javax.mail.Message;
 import javax.mail.MessagingException;
 import javax.mail.Session;
-import javax.mail.Transport;
 import javax.mail.internet.InternetAddress;
 import javax.mail.internet.MimeBodyPart;
 import javax.mail.internet.MimeMessage;
diff --git a/src/java/azkaban/utils/Props.java b/src/java/azkaban/utils/Props.java
index 4b16b9c..12da4a7 100644
--- a/src/java/azkaban/utils/Props.java
+++ b/src/java/azkaban/utils/Props.java
@@ -418,6 +418,19 @@ public class Props {
 		}
 	}
 
+	public Class<?> getClass(String key, boolean initialize, ClassLoader cl) {
+		try {
+			if (containsKey(key)) {
+				return Class.forName(get(key), initialize, cl);
+			} else {
+				throw new UndefinedPropertyException(
+						"Missing required property '" + key + "'");
+			}
+		} catch (ClassNotFoundException e) {
+			throw new IllegalArgumentException(e);
+		}
+	}
+	
 	/**
 	 * Gets the class from the Props. If it doesn't exist, it will return the
 	 * defaultClass
diff --git a/src/java/azkaban/webapp/servlet/ExecutorServlet.java b/src/java/azkaban/webapp/servlet/ExecutorServlet.java
index 9c68784..7438c0c 100644
--- a/src/java/azkaban/webapp/servlet/ExecutorServlet.java
+++ b/src/java/azkaban/webapp/servlet/ExecutorServlet.java
@@ -264,6 +264,9 @@ public class ExecutorServlet extends LoginAbstractAzkabanServlet {
 				else if (ajaxName.equals("retryFailedJobs")) {
 					ajaxRestartFailed(req, resp, ret, session.getUser(), exFlow);
 				}
+//				else if (ajaxName.equals("fetchLatestJobStatus")) {
+//					ajaxFetchLatestJobStatus(req, resp, ret, session.getUser(), exFlow);
+//				}
 				else if (ajaxName.equals("flowInfo")) {
 					//String projectName = getParam(req, "project");
 					//Project project = projectManager.getProject(projectName);
@@ -295,6 +298,50 @@ public class ExecutorServlet extends LoginAbstractAzkabanServlet {
 		}
 	}
 
+//	private void ajaxFetchLatestJobStatus(HttpServletRequest req,HttpServletResponse resp, HashMap<String, Object> ret, User user, ExecutableFlow exFlow) {
+//		Project project = getProjectAjaxByPermission(ret, exFlow.getProjectId(), user, Type.READ);
+//		if (project == null) {
+//			ret.put("error", "Project doesn't exist or incorrect access permission.");
+//			return;
+//		}
+//		
+//		String projectName;
+//		String flowName;
+//		String jobName;
+//		try {
+//			projectName = getParam(req, "projectName");
+//			flowName = getParam(req, "flowName");
+//			jobName = getParam(req, "jobName");
+//		} catch (Exception e) {
+//			ret.put("error", e.getMessage());
+//			return;
+//		}
+//		
+//		try {
+//			ExecutableNode node = exFlow.getExecutableNode(jobId);
+//			if (node == null) {
+//				ret.put("error", "Job " + jobId + " doesn't exist in " + exFlow.getExecutionId());
+//				return;
+//			}
+//			
+//			int attempt = this.getIntParam(req, "attempt", node.getAttempt());
+//			LogData data = executorManager.getExecutionJobLog(exFlow, jobId, offset, length, attempt);
+//			if (data == null) {
+//				ret.put("length", 0);
+//				ret.put("offset", offset);
+//				ret.put("data", "");
+//			}
+//			else {
+//				ret.put("length", data.getLength());
+//				ret.put("offset", data.getOffset());
+//				ret.put("data", data.getData());
+//			}
+//		} catch (ExecutorManagerException e) {
+//			throw new ServletException(e);
+//		}
+//		
+//	}
+
 	private void ajaxRestartFailed(HttpServletRequest req, HttpServletResponse resp, HashMap<String, Object> ret, User user, ExecutableFlow exFlow) throws ServletException {
 		Project project = getProjectAjaxByPermission(ret, exFlow.getProjectId(), user, Type.EXECUTE);
 		if (project == null) {
@@ -660,6 +707,7 @@ public class ExecutorServlet extends LoginAbstractAzkabanServlet {
 		
 		ExecutableFlow exflow = new ExecutableFlow(flow);
 		exflow.setSubmitUser(user.getUserId());
+		exflow.setProxyUsers(project.getProxyUsers());
 
 		if (hasParam(req, "failureAction")) {
 			String option = getParam(req, "failureAction");
diff --git a/src/java/azkaban/webapp/servlet/ProjectManagerServlet.java b/src/java/azkaban/webapp/servlet/ProjectManagerServlet.java
index 0e06e59..423cbcb 100644
--- a/src/java/azkaban/webapp/servlet/ProjectManagerServlet.java
+++ b/src/java/azkaban/webapp/servlet/ProjectManagerServlet.java
@@ -28,6 +28,7 @@ import java.util.Collection;
 import java.util.Collections;
 import java.util.Comparator;
 import java.util.HashMap;
+import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
@@ -212,6 +213,11 @@ public class ProjectManagerServlet extends LoginAbstractAzkabanServlet {
 				ajaxAddPermission(project, ret, req, user);
 			}
 		}
+		else if (ajaxName.equals("addProxyUser")) {
+			if (handleAjaxPermission(project, user, Type.ADMIN, ret)) {
+				ajaxAddProxyUser(project, ret, req, user);
+			}
+		}
 		else if (ajaxName.equals("fetchFlowExecutions")) {
 			if (handleAjaxPermission(project, user, Type.READ, ret)) {
 				ajaxFetchFlowExecutions(project, ret, req);
@@ -546,6 +552,55 @@ public class ProjectManagerServlet extends LoginAbstractAzkabanServlet {
 		ret.put("nodes", nodeList);
 	}
 	
+	private void ajaxAddProxyUser(Project project, HashMap<String, Object> ret, HttpServletRequest req, User user) throws ServletException {
+		String name = getParam(req, "name");
+		
+		logger.info("Adding proxy user " + name + " by " + user.getUserId());
+		
+
+		
+		boolean doProxy = Boolean.parseBoolean(getParam(req, "doProxy"));
+		
+		
+		HashSet<String> proxyUsers = project.getProxyUsers();
+		//add
+		if(doProxy) {			
+			if(proxyUsers.contains(name)) {
+				return;
+			}
+			else {
+				if(userManager.validateProxyUser(name, user)) {
+					proxyUsers.add(name);
+				}
+				else {
+					ret.put("error", "User " + user.getUserId() + " has no permission to add " + name + " as proxy user for project " + project.getName());
+					return;
+				}
+			}
+		}
+		else {
+			if(!proxyUsers.contains(name)) {
+				return;
+			}
+			else {
+				if(userManager.validateProxyUser(name, user)) {
+					proxyUsers.remove(name);
+				}
+				else {
+					ret.put("error", "User " + user.getUserId() + " has no permission to remove " + name + " as proxy user for project " + project.getName());
+					return;
+				}
+			}
+		}
+		try {
+			projectManager.updateProjectSetting(project);
+		} catch (ProjectManagerException e) {
+			// TODO Auto-generated catch block
+			ret.put("error", e.getMessage());
+		}
+		
+	}
+	
 	private void ajaxAddPermission(Project project, HashMap<String, Object> ret, HttpServletRequest req, User user) throws ServletException {
 		String name = getParam(req, "name");
 		boolean group = Boolean.parseBoolean(getParam(req, "group"));
@@ -792,6 +847,7 @@ public class ProjectManagerServlet extends LoginAbstractAzkabanServlet {
 
 				page.add("permissions", project.getUserPermissions());
 				page.add("groupPermissions", project.getGroupPermissions());
+				page.add("proxyUsers", project.getProxyUserList());
 				
 				if(hasPermission(project, user, Type.ADMIN)) {
 					page.add("isAdmin", true);
diff --git a/src/java/azkaban/webapp/servlet/velocity/permissionspage.vm b/src/java/azkaban/webapp/servlet/velocity/permissionspage.vm
index 159bd05..9b13af6 100644
--- a/src/java/azkaban/webapp/servlet/velocity/permissionspage.vm
+++ b/src/java/azkaban/webapp/servlet/velocity/permissionspage.vm
@@ -71,6 +71,7 @@
 					#if($isAdmin)
 						<button id="addUser" class="btn1">Add User</button>
 						<button id="addGroup" class="btn1">Add Group</button>
+						<button id="addProxyUser" class="btn2">Add Proxy User</button>
 					#end
 				</div>
 
@@ -93,7 +94,7 @@
 					<th class="tb-read">Read</th>
 					<th class="tb-write">Write</th>
 					<th class="tb-execute">Execute</th>
-					<th class="tb-schedule">Schedule</th>
+					<th class="tb-schedule">Schedule</th>					
 					#if($isAdmin)
 						<th class="tb-action"></th>
 					#end
@@ -172,6 +173,45 @@
 #end
 			</tbody>
 		</table>
+		
+		<br></br>
+		<table id="proxy-user-table" class="all-jobs permission-table">
+			<thead>
+				<tr>
+					<th class="tb-username">Proxy User</th>
+					<th class="tb-perm">Admin</th>
+					<th class="tb-read">Read</th>
+					<th class="tb-write">Write</th>
+					<th class="tb-execute">Execute</th>
+					<th class="tb-schedule">Schedule</th>
+					<th class="tb-proxy">Proxy</th>
+					#if($isAdmin)
+						<th class="tb-action"></th>
+					#end
+				</tr>
+			</thead>
+			<tbody>
+#if($proxyUsers)
+#foreach($proxyUser in $proxyUsers)
+	<tr id="${proxyUser}-row" >
+		<td class="tb-name">#if($proxyUser == $username) ${proxyUser} <span class="sublabel">(you)</span> #else $proxyUser #end</td>
+			<td><input id="proxy-${proxyUser}-admin-checkbox" type="checkbox" name="admin" disabled="disabled" ></input></td>
+			<td><input id="proxy-${proxyUser}-read-checkbox" type="checkbox" name="read" disabled="disabled" ></input></td>
+			<td><input id="proxy-${proxyUser}-write-checkbox" type="checkbox" name="write" disabled="disabled" ></input></td>
+			<td><input id="proxy-${proxyUser}-execute-checkbox" type="checkbox" name="execute" disabled="disabled" ></input></td>
+			<td><input id="proxy-${proxyUser}-schedule-checkbox" type="checkbox" name="schedule" disabled="disabled" ></input></td>
+			<td><input id="proxy-${proxyUser}-proxy-checkbox" type="checkbox" name="proxy" disabled="disabled" checked="true"></input></td>
+
+		#if($isAdmin)
+			<td><button id="proxy-${proxyUser}" class="change-btn btn2">Change</button></td>
+		#end
+	</tr>
+#end
+#else
+	<tr><td class="last">No Proxy User Found.</td></tr>
+#end
+			</tbody>
+		</table>
 #end
 
 		</div>
@@ -184,6 +224,7 @@
 					<dl>
 						<dt>User</dt>
 						<dd><input id="user-box" name="userid" type="text" /></dd>
+					<div id="otherCheckBoxes">
 						<dt class="nextline">Admin</dt>
 						<dd><input id="admin-change" name="admin" type="checkbox" /></dd>
 						<dt>Read</dt>
@@ -194,6 +235,11 @@
 					    <dd><input id="execute-change" name="execute" type="checkbox" /></dd>
 					    <dt>Schedule</dt>
 					    <dd><input id="schedule-change" name="schedule" type="checkbox" /></dd>
+					</div>
+					<div id="proxyCheckBox" hidden=true>
+				    	<dt>Proxy</dt>
+				    	<dd><input id="proxy-change" name="proxy" type="checkbox" /></dd>
+				    </div>
 					</dl>
 				</fieldset>
 			</div>
diff --git a/src/sql/create_execution_logs.sql b/src/sql/create_execution_logs.sql
index 6e76312..a1eab7a 100644
--- a/src/sql/create_execution_logs.sql
+++ b/src/sql/create_execution_logs.sql
@@ -8,6 +8,5 @@ CREATE TABLE execution_logs (
 	log LONGBLOB,
 	PRIMARY KEY (exec_id, name, attempt, start_byte),
 	INDEX log_attempt (exec_id, name, attempt),
-	INDEX log_index (exec_id, name),
-	INDEX byte_log_index(exec_id, name, start_byte, end_byte)
+	INDEX log_index (exec_id, name)
 ) ENGINE=InnoDB;
diff --git a/src/sql/create_project_table.sql b/src/sql/create_project_table.sql
index e52fa76..34115a7 100644
--- a/src/sql/create_project_table.sql
+++ b/src/sql/create_project_table.sql
@@ -7,6 +7,8 @@ CREATE TABLE projects (
 	version INT,
 	last_modified_by VARCHAR(64) NOT NULL,
 	description VARCHAR(255),
+	enc_type TINYINT,
+	settings_blob LONGBLOB,
 	UNIQUE INDEX project_id (id),
 	INDEX project_name (name)
 ) ENGINE=InnoDB;
diff --git a/src/sql/update_2.0_to_2.01.sql b/src/sql/update_2.0_to_2.01.sql
index 9ab565e..309742e 100644
--- a/src/sql/update_2.0_to_2.01.sql
+++ b/src/sql/update_2.0_to_2.01.sql
@@ -14,6 +14,9 @@ ALTER TABLE execution_logs ADD INDEX log_attempt (exec_id, name, attempt)
 ALTER TABLE schedules ADD COLUMN enc_type TINYINT;
 ALTER TABLE schedules ADD COLUMN schedule_options LONGBLOB;
 
+ALTER TABLE project_events MODIFY COLUMN message VARCHAR(512);
 
+ALTER TABLE projects ADD COLUMN enc_type TINYINT;
+ALTER TABLE projects ADD COLUMN settings_blob LONGBLOB;
 
 
diff --git a/src/web/js/azkaban.exflow.view.js b/src/web/js/azkaban.exflow.view.js
index 4fb7326..e35c340 100644
--- a/src/web/js/azkaban.exflow.view.js
+++ b/src/web/js/azkaban.exflow.view.js
@@ -83,6 +83,7 @@ azkaban.FlowTabView= Backbone.View.extend({
   	$("#executebtn").hide();
   	$("#pausebtn").hide();
   	$("#resumebtn").hide();
+  	$("#retrybtn").hide();
   
  	this.model.bind('change:graph', this.handleFlowStatusChange, this);
 	this.model.bind('change:update', this.handleFlowStatusChange, this);
diff --git a/src/web/js/azkaban.permission.view.js b/src/web/js/azkaban.permission.view.js
index 429b8ed..e04bc55 100644
--- a/src/web/js/azkaban.permission.view.js
+++ b/src/web/js/azkaban.permission.view.js
@@ -2,18 +2,20 @@ $.namespace('azkaban');
 
 var permissionTableView;
 var groupPermissionTableView;
+var proxyTableView;
 azkaban.PermissionTableView= Backbone.View.extend({
   events : {
 	"click button": "handleChangePermission"
   },
   initialize : function(settings) {
   	this.group = settings.group;
+  	this.proxy = settings.proxy;
   },
   render: function() {
   },
   handleChangePermission: function(evt) {
   	  var currentTarget = evt.currentTarget;
-  	  changePermissionView.display(currentTarget.id, false, this.group);
+  	  changePermissionView.display(currentTarget.id, false, this.group, this.proxy);
   }
 });
 
@@ -27,13 +29,22 @@ azkaban.ChangePermissionView= Backbone.View.extend({
   initialize : function(settings) {
   	$('#errorMsg').hide();
   },
-  display: function(userid, newPerm, group) {
+  display: function(userid, newPerm, group, proxy) {
   	// 6 is the length of the prefix "group-"
   	this.userid = group ? userid.substring(6, userid.length) : userid;
+  	if(group == true) {
+  		this.userid = userid.substring(6, userid.length)
+  	} else if (proxy == true) {
+  		this.userid = userid.substring(6, userid.length)
+  	} else {
+  		this.userid = userid
+  	}
+  	
   	this.permission = {};
 	$('#user-box').val(this.userid);
 	this.newPerm = newPerm;
 	this.group = group;
+	this.proxy = proxy;
 	
 	var prefix = userid;
 	var adminInput = $("#" + prefix + "-admin-checkbox");
@@ -41,12 +52,16 @@ azkaban.ChangePermissionView= Backbone.View.extend({
 	var writeInput = $("#" + prefix + "-write-checkbox");
 	var executeInput = $("#" + prefix + "-execute-checkbox");
 	var scheduleInput = $("#" + prefix + "-schedule-checkbox");
+	var proxyInput = $("#" + prefix + "-proxy-checkbox");
 	
 	if (newPerm) {
 		if (group) {
 			$('#change-title').text("Add New Group Permissions");
 		}
-		else {
+		else if(proxy){
+			$('#change-title').text("Add New Proxy User Permissions");
+		}
+		else{
 			$('#change-title').text("Add New User Permissions");
 		}
 		$('#user-box').attr("disabled", null);
@@ -57,22 +72,39 @@ azkaban.ChangePermissionView= Backbone.View.extend({
 		this.permission.write = false;
 		this.permission.execute = false;
 		this.permission.schedule = false;
+		this.doProxy = false;
+		
 	}
 	else {
 		if (group) {
 			$('#change-title').text("Change Group Permissions");
 		}
+		else if(proxy){
+			$('#change-title').text("Change Proxy User Permissions");
+			this.doProxy = $(proxyInput).attr("checked");
+		}
 		else {
 			$('#change-title').text("Change User Permissions");
 		}
 		
 		$('#user-box').attr("disabled", "disabled");
 		
+		
+		
 		this.permission.admin = $(adminInput).attr("checked");
 		this.permission.read = $(readInput).attr("checked");
 		this.permission.write = $(writeInput).attr("checked");
 		this.permission.execute = $(executeInput).attr("checked");
 		this.permission.schedule = $(scheduleInput).attr("checked");
+		this.doProxy = $(proxyInput).attr("checked");
+	}
+	
+	if(proxy) {
+		document.getElementById("otherCheckBoxes").hidden=true;
+		document.getElementById("proxyCheckBox").hidden=false;
+	} else {
+		document.getElementById("otherCheckBoxes").hidden=false;
+		document.getElementById("proxyCheckBox").hidden=true;
 	}
 	
 	this.changeCheckbox();
@@ -91,17 +123,27 @@ azkaban.ChangePermissionView= Backbone.View.extend({
             $("#errorMsg").hide();
           }
         });
+  	 
+
+  	 
   },
   render: function() {
   },
   handleCheckboxClick : function(evt) {
   	console.log("click");
   	var targetName = evt.currentTarget.name;
-  	this.permission[targetName] = evt.currentTarget.checked;
+  	if(targetName == "proxy") {
+  		this.doProxy = evt.currentTarget.checked;
+  	}
+  	else {
+  		this.permission[targetName] = evt.currentTarget.checked;
+  	}
   	this.changeCheckbox(evt);
   },
   changeCheckbox : function(evt) {
     var perm = this.permission;
+    var proxy = this.proxy;
+    var doProxy = this.doProxy;
 
   	if (perm.admin) {
   		$("#admin-change").attr("checked", true);
@@ -116,9 +158,13 @@ azkaban.ChangePermissionView= Backbone.View.extend({
   		
   		$("#schedule-change").attr("checked", true);
   		$("#schedule-change").attr("disabled", "disabled");
+  		
+  		$("#proxy-change").attr("checked", false);
+		$("#proxy-change").attr("disabled", "disabled");
   	}
   	else {
   		$("#admin-change").attr("checked", false);
+  		
   		$("#read-change").attr("checked", perm.read);
   		$("#read-change").attr("disabled", null);
   		  		
@@ -130,12 +176,16 @@ azkaban.ChangePermissionView= Backbone.View.extend({
   		
   		$("#schedule-change").attr("checked", perm.schedule);
 		$("#schedule-change").attr("disabled", null);
+		
+		$("#proxy-change").attr("checked", doProxy);
+		$("#proxy-change").attr("disabled", null);
+		
   	}
   	
   	$("#change-btn").removeClass("btn-disabled");
   	$("#change-btn").attr("disabled", null);
   	
-  	if (perm.admin || perm.read || perm.write || perm.execute || perm.schedule) {
+  	if (perm.admin || perm.read || perm.write || perm.execute || perm.schedule || doProxy) {
   		$("#change-btn").text("Commit");
   	}
   	else {
@@ -152,11 +202,14 @@ azkaban.ChangePermissionView= Backbone.View.extend({
   	var requestURL = contextURL + "/manager";
   	var name = $('#user-box').val();
 	var command = this.newPerm ? "addPermission" : "changePermission";
+	if(this.proxy) {
+		command = "addProxyUser";
+	}
 	var group = this.group;
 	
   	$.get(
 	      requestURL,
-	      {"project": projectName, "name": name, "ajax":command, "permissions": this.permission, "group": group},
+	      {"project": projectName, "name": name, "ajax":command, "permissions": this.permission, "doProxy": this.doProxy, "group": group},
 	      function(data) {
 	      	  console.log("Output");
 	      	  if (data.error) {
@@ -174,15 +227,21 @@ azkaban.ChangePermissionView= Backbone.View.extend({
 });
 
 $(function() {
-	permissionTableView = new azkaban.PermissionTableView({el:$('#permissions-table'), group: false});
-	groupPermissionTableView = new azkaban.PermissionTableView({el:$('#group-permissions-table'), group: true});
+	permissionTableView = new azkaban.PermissionTableView({el:$('#permissions-table'), group: false, proxy: false});
+	groupPermissionTableView = new azkaban.PermissionTableView({el:$('#group-permissions-table'), group: true, proxy: false});
+	proxyTableView = new azkaban.PermissionTableView({el:$('#proxy-user-table'), group: false, proxy: true});
 	changePermissionView = new azkaban.ChangePermissionView({el:$('#change-permission')});
 	
 	$('#addUser').bind('click', function() {
-		changePermissionView.display("", true, false);
+		changePermissionView.display("", true, false, false);
 	});
 	
 	$('#addGroup').bind('click', function() {
-		changePermissionView.display("", true, true);
+		changePermissionView.display("", true, true, false);
 	});
+	
+	$('#addProxyUser').bind('click', function() {
+		changePermissionView.display("", true, false, true);
+	});
+	
 });
diff --git a/unit/java/azkaban/test/execapp/event/LocalFlowWatcherTest.java b/unit/java/azkaban/test/execapp/event/LocalFlowWatcherTest.java
index df5f5a6..452be05 100644
--- a/unit/java/azkaban/test/execapp/event/LocalFlowWatcherTest.java
+++ b/unit/java/azkaban/test/execapp/event/LocalFlowWatcherTest.java
@@ -212,7 +212,8 @@ public class LocalFlowWatcherTest {
 		//MockProjectLoader projectLoader = new MockProjectLoader(new File(exFlow.getExecutionPath()));
 		
 		loader.uploadExecutableFlow(exFlow);
-		FlowRunner runner = new FlowRunner(exFlow, watcher,  loader, fakeProjectLoader, jobtypeManager);
+		FlowRunner runner = new FlowRunner(exFlow, loader, fakeProjectLoader, jobtypeManager);
+		runner.setFlowWatcher(watcher);
 		runner.addListener(eventCollector);
 		
 		return runner;
diff --git a/unit/java/azkaban/test/execapp/event/RemoteFlowWatcherTest.java b/unit/java/azkaban/test/execapp/event/RemoteFlowWatcherTest.java
index 78aac81..1fddfc7 100644
--- a/unit/java/azkaban/test/execapp/event/RemoteFlowWatcherTest.java
+++ b/unit/java/azkaban/test/execapp/event/RemoteFlowWatcherTest.java
@@ -212,7 +212,8 @@ public class RemoteFlowWatcherTest {
 		//MockProjectLoader projectLoader = new MockProjectLoader(new File(exFlow.getExecutionPath()));
 		
 		loader.uploadExecutableFlow(exFlow);
-		FlowRunner runner = new FlowRunner(exFlow, watcher,  loader, fakeProjectLoader, jobtypeManager);
+		FlowRunner runner = new FlowRunner(exFlow,  loader, fakeProjectLoader, jobtypeManager);
+		runner.setFlowWatcher(watcher);
 		runner.addListener(eventCollector);
 		
 		return runner;
diff --git a/unit/java/azkaban/test/execapp/FlowRunnerTest.java b/unit/java/azkaban/test/execapp/FlowRunnerTest.java
index c63fb11..54dc49c 100644
--- a/unit/java/azkaban/test/execapp/FlowRunnerTest.java
+++ b/unit/java/azkaban/test/execapp/FlowRunnerTest.java
@@ -361,7 +361,8 @@ public class FlowRunnerTest {
 		//MockProjectLoader projectLoader = new MockProjectLoader(new File(flow.getExecutionPath()));
 		
 		loader.uploadExecutableFlow(flow);
-		FlowRunner runner = new FlowRunner(flow, null, loader, fakeProjectLoader, jobtypeManager);
+		FlowRunner runner = new FlowRunner(flow, loader, fakeProjectLoader, jobtypeManager);
+
 		runner.addListener(eventCollector);
 		
 		return runner;
@@ -373,7 +374,9 @@ public class FlowRunnerTest {
 		//MockProjectLoader projectLoader = new MockProjectLoader(new File(exFlow.getExecutionPath()));
 		
 		loader.uploadExecutableFlow(exFlow);
-		FlowRunner runner = new FlowRunner(exFlow, null,  loader, fakeProjectLoader, jobtypeManager);
+	
+		FlowRunner runner = new FlowRunner(exFlow, loader, fakeProjectLoader, jobtypeManager);
+
 		runner.addListener(eventCollector);
 		
 		return runner;
diff --git a/unit/java/azkaban/test/execapp/JobRunnerTest.java b/unit/java/azkaban/test/execapp/JobRunnerTest.java
index e4d305c..1bafbbb 100644
--- a/unit/java/azkaban/test/execapp/JobRunnerTest.java
+++ b/unit/java/azkaban/test/execapp/JobRunnerTest.java
@@ -2,6 +2,7 @@ package azkaban.test.execapp;
 
 import java.io.File;
 import java.io.IOException;
+import java.util.HashSet;
 
 import junit.framework.Assert;
 
@@ -261,8 +262,10 @@ public class JobRunnerTest {
 		node.setExecutableFlow(flow);
 		
 		Props props = createProps(time, fail);
-		
-		JobRunner runner = new JobRunner(node, props, workingDir, loader, jobtypeManager, logger);
+		HashSet<String> proxyUsers = new HashSet<String>();
+		proxyUsers.add(flow.getSubmitUser());
+		JobRunner runner = new JobRunner(node, props, workingDir, loader, jobtypeManager);
+		runner.setLogSettings(logger, "5MB", 4);
 
 		runner.addListener(listener);
 		return runner;
diff --git a/unit/java/azkaban/test/execapp/MockProjectLoader.java b/unit/java/azkaban/test/execapp/MockProjectLoader.java
index 0739bd3..b10ea29 100644
--- a/unit/java/azkaban/test/execapp/MockProjectLoader.java
+++ b/unit/java/azkaban/test/execapp/MockProjectLoader.java
@@ -17,6 +17,7 @@ import azkaban.project.ProjectManagerException;
 import azkaban.user.Permission;
 import azkaban.user.User;
 import azkaban.utils.Props;
+import azkaban.utils.Triple;
 
 public class MockProjectLoader implements ProjectLoader {
 	public File dir;
@@ -209,4 +210,18 @@ public class MockProjectLoader implements ProjectLoader {
 		// TODO Auto-generated method stub
 		return null;
 	}
+
+	@Override
+	public List<Triple<String, Boolean, Permission>> getProjectPermissions(
+			int projectId) throws ProjectManagerException {
+		// TODO Auto-generated method stub
+		return null;
+	}
+
+	@Override
+	public void updateProjectSettings(Project project)
+			throws ProjectManagerException {
+		// TODO Auto-generated method stub
+		
+	}
 }
\ No newline at end of file