azkaban-uncached

Details

diff --git a/src/java/azkaban/execapp/FlowRunner.java b/src/java/azkaban/execapp/FlowRunner.java
index c912ad9..31bae29 100644
--- a/src/java/azkaban/execapp/FlowRunner.java
+++ b/src/java/azkaban/execapp/FlowRunner.java
@@ -35,9 +35,11 @@ import azkaban.flow.FlowProps;
 import azkaban.jobtype.JobTypeManager;
 import azkaban.project.ProjectLoader;
 import azkaban.project.ProjectManagerException;
+import azkaban.user.Permission;
 import azkaban.utils.Pair;
 import azkaban.utils.Props;
 import azkaban.utils.PropsUtils;
+import azkaban.utils.Triple;
 
 public class FlowRunner extends EventHandler implements Runnable {
 	private static final Layout DEFAULT_LAYOUT = new PatternLayout("%d{dd-MM-yyyy HH:mm:ss z} %c{1} %p - %m\n");
@@ -80,6 +82,8 @@ public class FlowRunner extends EventHandler implements Runnable {
 	private boolean flowFinished = false;
 	private boolean flowCancelled = false;
 	
+	private List<String> proxyUsers = null;
+	
 	public FlowRunner(ExecutableFlow flow, ExecutorLoader executorLoader, ProjectLoader projectLoader, JobTypeManager jobtypeManager) throws ExecutorManagerException {
 		this.execId = flow.getExecutionId();
 		this.flow = flow;
@@ -88,6 +92,27 @@ public class FlowRunner extends EventHandler implements Runnable {
 		this.executorService = Executors.newFixedThreadPool(numThreads);
 		this.execDir = new File(flow.getExecutionPath());
 		this.jobtypeManager = jobtypeManager;
+		
+		this.proxyUsers = getProxyUsers();
+	}
+
+	private List<String> getProxyUsers() {
+		List<String> allUsers = new ArrayList<String>();
+		allUsers.add(flow.getSubmitUser());
+		List<Triple<String, Boolean, Permission>> permissions;
+		try {
+			permissions = projectLoader.getProjectPermissions(flow.getProjectId());
+			for(Triple<String, Boolean, Permission> triple : permissions) {
+				if(triple.getSecond() == false && triple.getThird().isPermissionSet(Permission.Type.EXECUTE)) {
+					allUsers.add(triple.getFirst());
+				}
+			}
+		} catch (ProjectManagerException e) {
+			// This gets funny when no user specified and submitted by the scheduler
+			logger.error("Failed to get project permission from project. Using default permission.", e);
+		}
+		
+		return allUsers;
 	}
 
 	public FlowRunner setGlobalProps(Props globalProps) {
@@ -345,7 +370,7 @@ public class FlowRunner extends EventHandler implements Runnable {
 		prop.setParent(parentProps);
 		
 		// should have one prop with system secrets, the other user level props
-		JobRunner jobRunner = new JobRunner(node, prop, path.getParentFile(), executorLoader, jobtypeManager, logger);
+		JobRunner jobRunner = new JobRunner(node, prop, path.getParentFile(), proxyUsers, executorLoader, jobtypeManager, logger);
 		jobRunner.addListener(listener);
 
 		return jobRunner;
diff --git a/src/java/azkaban/execapp/JobRunner.java b/src/java/azkaban/execapp/JobRunner.java
index e34132e..641787d 100644
--- a/src/java/azkaban/execapp/JobRunner.java
+++ b/src/java/azkaban/execapp/JobRunner.java
@@ -17,6 +17,7 @@ package azkaban.execapp;
 
 import java.io.File;
 import java.io.IOException;
+import java.util.List;
 
 import org.apache.log4j.Appender;
 import org.apache.log4j.FileAppender;
@@ -35,6 +36,7 @@ import azkaban.flow.CommonJobProperties;
 import azkaban.jobExecutor.AbstractProcessJob;
 import azkaban.jobExecutor.Job;
 import azkaban.jobtype.JobTypeManager;
+import azkaban.jobtype.JobTypeManagerException;
 
 import azkaban.utils.Props;
 
@@ -61,8 +63,9 @@ public class JobRunner extends EventHandler implements Runnable {
 	private Object syncObject = new Object();
 	
 	private final JobTypeManager jobtypeManager;
+	private List<String> proxyUsers = null;
 
-	public JobRunner(ExecutableNode node, Props props, File workingDir, ExecutorLoader loader, JobTypeManager jobtypeManager, Logger flowLogger) {
+	public JobRunner(ExecutableNode node, Props props, File workingDir, List<String> proxyUsers, ExecutorLoader loader, JobTypeManager jobtypeManager, Logger flowLogger) {
 		this.props = props;
 		this.node = node;
 		this.workingDir = workingDir;
@@ -70,6 +73,7 @@ public class JobRunner extends EventHandler implements Runnable {
 		this.loader = loader;
 		this.jobtypeManager = jobtypeManager;
 		this.flowLogger = flowLogger;
+		this.proxyUsers = proxyUsers;
 	}
 	
 	public ExecutableNode getNode() {
@@ -152,6 +156,10 @@ public class JobRunner extends EventHandler implements Runnable {
 				fireEvent(Event.create(this, Type.JOB_STATUS_CHANGED), false);
 				runJob();
 			}
+			else {
+				node.setStatus(Status.FAILED);
+				logError("Job run failed!");
+			}
 			
 			node.setEndTime(System.currentTimeMillis());
 
@@ -211,9 +219,27 @@ public class JobRunner extends EventHandler implements Runnable {
 			if (!props.containsKey(AbstractProcessJob.WORKING_DIR)) {
 				props.put(AbstractProcessJob.WORKING_DIR, workingDir.getAbsolutePath());
 			}
-
+			
+			String jobProxyUser = props.getString("user.to.proxy", null);
+			if(jobProxyUser == null) {
+				jobProxyUser = proxyUsers.get(0);
+			}
+			else {
+				if(! proxyUsers.contains(jobProxyUser)) {
+					logger.error("User " + jobProxyUser + " has no permission to execute this job " + node.getJobId() + "!");
+					return false;
+				}
+			}
+			props.put("user.to.proxy", jobProxyUser);
+			
 			//job = JobWrappingFactory.getJobWrappingFactory().buildJobExecutor(node.getJobId(), props, logger);
-			job = jobtypeManager.buildJobExecutor(node.getJobId(), props, logger);
+			try {
+				job = jobtypeManager.buildJobExecutor(node.getJobId(), props, logger);
+			}
+			catch (JobTypeManagerException e) {
+				logger.error("Failed to build job type, skipping this job");
+				return false;
+			}
 		}
 		
 		return true;
diff --git a/src/java/azkaban/jobtype/JobTypeManager.java b/src/java/azkaban/jobtype/JobTypeManager.java
index d43b0da..65c747e 100644
--- a/src/java/azkaban/jobtype/JobTypeManager.java
+++ b/src/java/azkaban/jobtype/JobTypeManager.java
@@ -73,6 +73,7 @@ public class JobTypeManager
 			}
 			catch (Exception e) {
 				logger.info("Plugin jobtypes failed to load. " + e.getCause());
+				throw new JobTypeManagerException(e);
 			}
 		}
 		
@@ -120,6 +121,7 @@ public class JobTypeManager
 					loadJob(dir, globalConf, globalSysConf);
 				}
 				catch (Exception e) {
+					logger.error("Failed to load jobtype " + dir.getName() + e.getMessage());
 					throw new JobTypeManagerException(e);
 				}
 			}
@@ -195,7 +197,7 @@ public class JobTypeManager
 		try {
 			if(confFile != null) {
 				conf = new Props(commonConf, confFile);
-				conf = PropsUtils.resolveProps(conf);
+//				conf = PropsUtils.resolveProps(conf);
 			}
 			if(sysConfFile != null) {
 				sysConf = new Props(commonSysConf, sysConfFile);
@@ -203,7 +205,7 @@ public class JobTypeManager
 			}
 		}
 		catch (Exception e) {
-			throw new JobTypeManagerException("Failed to get jobtype properties", e);
+			throw new JobTypeManagerException("Failed to get jobtype properties" + e.getMessage());
 		}
 		sysConf.put("plugin.dir", dir.getAbsolutePath());
 		
@@ -221,7 +223,7 @@ public class JobTypeManager
 			try {
 				if(f.getName().endsWith(".jar")) {
 					resources.add(f.toURI().toURL());
-					logger.info("adding to classpath " + f);
+					logger.info("adding to classpath " + f.toURI().toURL());
 				}
 			} catch (MalformedURLException e) {
 				// TODO Auto-generated catch block
@@ -240,6 +242,23 @@ public class JobTypeManager
 		catch (ClassNotFoundException e) {
 			throw new JobTypeManagerException(e);
 		}
+		
+		logger.info("Doing simple testing...");
+		try {
+			Props fakeSysProps = new Props(sysConf);
+			fakeSysProps.put("type", jobtypeName);
+			Props fakeJobProps = new Props(conf);
+			Job job = (Job)Utils.callConstructor(clazz, "dummy", fakeSysProps, fakeJobProps, logger);
+		}
+		catch (Exception e) {
+			logger.info("Jobtype " + jobtypeName + " failed test!", e);
+			throw new JobExecutionException(e);
+		}
+		catch (Throwable t) {
+			logger.info("Jobtype " + jobtypeName + " failed test!", t);
+			throw new JobExecutionException(t);
+		}
+		
 		logger.info("Loaded jobtype " + jobtypeName + " " + jobtypeClass);
 		
 		if(conf != null) jobtypeJobProps.put(jobtypeName, conf);
@@ -249,7 +268,7 @@ public class JobTypeManager
 	
 	public Job buildJobExecutor(String jobId, Props jobProps, Logger logger) throws JobTypeManagerException
 	{
-		Job job;
+		Job job = null;
 		try {
 			String jobType = jobProps.getString("type");
 			if (jobType == null || jobType.length() == 0) {
@@ -274,14 +293,12 @@ public class JobTypeManager
 				Props p = jobtypeJobProps.get(jobType);
 				for(String k : p.getKeySet())
 				{
-					if(!jobProps.containsKey(k)) {
-						jobProps.put(k, p.get(k));
+					if(!jobConf.containsKey(k)) {
+						jobConf.put(k, p.get(k));
 					}
 				}
 			}
-			else {
-				jobConf = jobProps;
-			}
+			jobConf = PropsUtils.resolveProps(jobConf);
 
 			if (sysConf != null) {
 				sysConf = PropsUtils.resolveProps(sysConf);
@@ -289,19 +306,24 @@ public class JobTypeManager
 			else {
 				sysConf = new Props();
 			}
-
-			jobConf = PropsUtils.resolveProps(jobConf);
+			
 			
 //			logger.info("sysConf is " + sysConf);
 //			logger.info("jobConf is " + jobConf);
-			
+//			
 			job = (Job)Utils.callConstructor(executorClass, jobId, sysConf, jobConf, logger);
 			logger.info("job built.");
 		}
 		catch (Exception e) {
-			job = new InitErrorJob(jobId, e);
+			//job = new InitErrorJob(jobId, e);
+			logger.error("Failed to build job executor for job " + jobId + e.getMessage());
+			throw new JobTypeManagerException("Failed to build job executor for job " + jobId, e);
 			//throw new JobTypeManagerException(e);
 		}
+		catch (Throwable t) {
+			logger.error("Failed to build job executor for job " + jobId + t.getMessage(), t);
+			throw new JobTypeManagerException("Failed to build job executor for job " + jobId, t);
+		}
 
 		return job;
 	}
diff --git a/src/java/azkaban/project/JdbcProjectLoader.java b/src/java/azkaban/project/JdbcProjectLoader.java
index 01976b7..6f0c3f5 100644
--- a/src/java/azkaban/project/JdbcProjectLoader.java
+++ b/src/java/azkaban/project/JdbcProjectLoader.java
@@ -525,6 +525,21 @@ public class JdbcProjectLoader implements ProjectLoader {
 	}
 	
 	@Override
+	public List<Triple<String, Boolean, Permission>> getProjectPermissions(int projectId) throws ProjectManagerException {
+		ProjectPermissionsResultHandler permHander = new ProjectPermissionsResultHandler();
+		QueryRunner runner = new QueryRunner(dataSource);
+		List<Triple<String, Boolean,Permission>> permissions = null;
+		try {
+			permissions = runner.query(ProjectPermissionsResultHandler.SELECT_PROJECT_PERMISSION, permHander, projectId);
+		} catch (SQLException e) {
+			throw new ProjectManagerException("Query for permissions for " + projectId + " failed.", e);
+		}
+		
+		return permissions;
+	}
+	
+	
+	@Override
 	public void removeProject(Project project, String user) throws ProjectManagerException {
 		QueryRunner runner = new QueryRunner(dataSource);
 		
diff --git a/src/java/azkaban/project/ProjectLoader.java b/src/java/azkaban/project/ProjectLoader.java
index 4711dc0..5d01a71 100644
--- a/src/java/azkaban/project/ProjectLoader.java
+++ b/src/java/azkaban/project/ProjectLoader.java
@@ -10,6 +10,7 @@ import azkaban.project.ProjectLogEvent.EventType;
 import azkaban.user.Permission;
 import azkaban.user.User;
 import azkaban.utils.Props;
+import azkaban.utils.Triple;
 
 public interface ProjectLoader {
 
@@ -218,5 +219,7 @@ public interface ProjectLoader {
 	public void updateProjectProperty(Project project, Props props) throws ProjectManagerException;
 
 	Props fetchProjectProperty(int projectId, int projectVer, String propsName) throws ProjectManagerException;
+
+	List<Triple<String, Boolean, Permission>> getProjectPermissions(int projectId) throws ProjectManagerException;
 	
 }
\ No newline at end of file
diff --git a/src/java/azkaban/sla/SLAManager.java b/src/java/azkaban/sla/SLAManager.java
index 04fbe08..4e04528 100644
--- a/src/java/azkaban/sla/SLAManager.java
+++ b/src/java/azkaban/sla/SLAManager.java
@@ -3,34 +3,21 @@ package azkaban.sla;
 import java.lang.Thread.State;
 import java.util.ArrayList;
 import java.util.Comparator;
-import java.util.LinkedHashMap;
 import java.util.List;
-import java.util.Map;
 import java.util.concurrent.PriorityBlockingQueue;
 import java.util.concurrent.atomic.AtomicBoolean;
 
 import org.apache.log4j.Logger;
-import org.apache.velocity.runtime.parser.node.GetExecutor;
 import org.joda.time.DateTime;
-import org.joda.time.DateTimeZone;
-import org.joda.time.ReadablePeriod;
-import org.joda.time.format.DateTimeFormat;
-import org.joda.time.format.DateTimeFormatter;
 
 import azkaban.executor.ExecutableFlow;
 import azkaban.executor.ExecutableFlow.Status;
 import azkaban.executor.ExecutableNode;
-import azkaban.executor.ExecutorMailer;
 import azkaban.executor.ExecutorManager;
 import azkaban.executor.ExecutorManagerException;
-import azkaban.flow.Flow;
-import azkaban.project.Project;
-import azkaban.project.ProjectManager;
 import azkaban.sla.SLA.SlaAction;
 import azkaban.sla.SLA.SlaRule;
 import azkaban.sla.SLA.SlaSetting;
-import azkaban.user.User;
-import azkaban.utils.Pair;
 import azkaban.utils.Props;
 
 /*
@@ -61,7 +48,6 @@ public class SLAManager {
 	private SLALoader loader;
 
 	private final SLARunner runner;
-	private final SLAPreRunner prerunner;
 	private final ExecutorManager executorManager;
 	private SlaMailer mailer;
 
@@ -82,7 +68,6 @@ public class SLAManager {
 		this.loader = loader;
 		this.mailer = new SlaMailer(props);
 		this.runner = new SLARunner();
-		this.prerunner = new SLAPreRunner();
 
 		List<SLA> SLAList = null;
 		try {
@@ -105,7 +90,6 @@ public class SLAManager {
 	 */
 	public void shutdown() {
 		this.runner.shutdown();
-		this.prerunner.shutdown();
 	}
 
 	/**
@@ -475,90 +459,6 @@ public class SLAManager {
 		}
 		mailer.sendSlaEmail(s, message);
 	}
-	
-	
-	public class SLAPreRunner extends Thread {
-		private final List<SLA> preSlas;
-		private AtomicBoolean stillAlive = new AtomicBoolean(true);
-
-		// Five minute minimum intervals
-		private static final int TIMEOUT_MS = 300000;
-
-		public SLAPreRunner() {
-			preSlas = new ArrayList<SLA>();
-		}
-
-		public void shutdown() {
-			logger.error("Shutting down pre-sla checker thread");
-			stillAlive.set(false);
-			this.interrupt();
-		}
-
-		/**
-		 * Return a list of flow with SLAs
-		 * 
-		 * @return
-		 */
-		protected synchronized List<SLA> getPreSlas() {
-			return new ArrayList<SLA>(preSlas);
-		}
-
-		/**
-		 * Adds SLA into runner and then interrupts so it will update
-		 * its wait time.
-		 * 
-		 * @param flow
-		 */
-		public synchronized void addCheckerPreSla(SLA s) {
-			logger.info("Adding " + s + " to pre-sla checker.");
-			preSlas.add(s);
-			this.interrupt();
-		}
-		
-		/**
-		 * Remove runner SLA. Does not interrupt.
-		 * 
-		 * @param flow
-		 * @throws SLAManagerException 
-		 */
-		public synchronized void removeCheckerPreSla(SLA s) {
-			logger.info("Removing " + s + " from the pre-sla checker.");
-			preSlas.remove(s);
-		}
-
-		public void run() {
-			while (stillAlive.get()) {
-				synchronized (this) {
-					try {
-						// TODO clear up the exception handling
-
-						if (preSlas.size() == 0) {
-							try {
-								this.wait(TIMEOUT_MS);
-							} catch (InterruptedException e) {
-								// interruption should occur when items are added or removed from the queue.
-							}
-						} else {
-							for(SLA s : preSlas) {
-								ExecutableFlow exflow = executorManager.getExecutableFlow(s.getExecId());
-								String id = s.getJobName();
-								if(!s.equals("")) {
-									ExecutableNode exnode = exflow.getExecutableNode(id);
-									if(exnode.getStartTime() != -1) {
-										
-									}
-								}
-							}
-						}
-					} catch (Exception e) {
-						logger.error("Unexpected exception has been thrown in scheduler", e);
-					} catch (Throwable e) {
-						logger.error("Unexpected throwable has been thrown in scheduler", e);
-					}
-				}
-			}
-		}
-	}
 
 	public int getNumActiveSLA() {
 		return runner.getRunnerSLAs().size();
diff --git a/src/java/azkaban/utils/Props.java b/src/java/azkaban/utils/Props.java
index 4b16b9c..84b48b3 100644
--- a/src/java/azkaban/utils/Props.java
+++ b/src/java/azkaban/utils/Props.java
@@ -35,6 +35,8 @@ import java.util.Map;
 import java.util.Properties;
 import java.util.Set;
 
+import javax.swing.text.StyledEditorKit.BoldAction;
+
 import org.apache.log4j.Logger;
 
 /**
@@ -418,6 +420,19 @@ public class Props {
 		}
 	}
 
+	public Class<?> getClass(String key, boolean initialize, ClassLoader cl) {
+		try {
+			if (containsKey(key)) {
+				return Class.forName(get(key), initialize, cl);
+			} else {
+				throw new UndefinedPropertyException(
+						"Missing required property '" + key + "'");
+			}
+		} catch (ClassNotFoundException e) {
+			throw new IllegalArgumentException(e);
+		}
+	}
+	
 	/**
 	 * Gets the class from the Props. If it doesn't exist, it will return the
 	 * defaultClass
diff --git a/unit/java/azkaban/test/execapp/JobRunnerTest.java b/unit/java/azkaban/test/execapp/JobRunnerTest.java
index 41d44f3..056cbac 100644
--- a/unit/java/azkaban/test/execapp/JobRunnerTest.java
+++ b/unit/java/azkaban/test/execapp/JobRunnerTest.java
@@ -2,6 +2,8 @@ package azkaban.test.execapp;
 
 import java.io.File;
 import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
 
 import junit.framework.Assert;
 
@@ -261,8 +263,9 @@ public class JobRunnerTest {
 		node.setExecutableFlow(flow);
 		
 		Props props = createProps(time, fail);
-		
-		JobRunner runner = new JobRunner(node, props, workingDir, loader, jobtypeManager, logger);
+		List<String> proxyUsers = new ArrayList<String>();
+		proxyUsers.add(flow.getSubmitUser());
+		JobRunner runner = new JobRunner(node, props, workingDir, proxyUsers, loader, jobtypeManager, logger);
 
 		runner.addListener(listener);
 		return runner;
diff --git a/unit/java/azkaban/test/execapp/MockProjectLoader.java b/unit/java/azkaban/test/execapp/MockProjectLoader.java
index 0739bd3..165f0c9 100644
--- a/unit/java/azkaban/test/execapp/MockProjectLoader.java
+++ b/unit/java/azkaban/test/execapp/MockProjectLoader.java
@@ -17,6 +17,7 @@ import azkaban.project.ProjectManagerException;
 import azkaban.user.Permission;
 import azkaban.user.User;
 import azkaban.utils.Props;
+import azkaban.utils.Triple;
 
 public class MockProjectLoader implements ProjectLoader {
 	public File dir;
@@ -209,4 +210,11 @@ public class MockProjectLoader implements ProjectLoader {
 		// TODO Auto-generated method stub
 		return null;
 	}
+
+	@Override
+	public List<Triple<String, Boolean, Permission>> getProjectPermissions(
+			int projectId) throws ProjectManagerException {
+		// TODO Auto-generated method stub
+		return null;
+	}
 }
\ No newline at end of file