azkaban-uncached
Changes
src/java/azkaban/execapp/FlowRunner.java 27(+26 -1)
src/java/azkaban/execapp/JobRunner.java 32(+29 -3)
src/java/azkaban/jobtype/JobTypeManager.java 48(+35 -13)
src/java/azkaban/sla/SLAManager.java 100(+0 -100)
src/java/azkaban/utils/Props.java 15(+15 -0)
Details
src/java/azkaban/execapp/FlowRunner.java 27(+26 -1)
diff --git a/src/java/azkaban/execapp/FlowRunner.java b/src/java/azkaban/execapp/FlowRunner.java
index c912ad9..31bae29 100644
--- a/src/java/azkaban/execapp/FlowRunner.java
+++ b/src/java/azkaban/execapp/FlowRunner.java
@@ -35,9 +35,11 @@ import azkaban.flow.FlowProps;
import azkaban.jobtype.JobTypeManager;
import azkaban.project.ProjectLoader;
import azkaban.project.ProjectManagerException;
+import azkaban.user.Permission;
import azkaban.utils.Pair;
import azkaban.utils.Props;
import azkaban.utils.PropsUtils;
+import azkaban.utils.Triple;
public class FlowRunner extends EventHandler implements Runnable {
private static final Layout DEFAULT_LAYOUT = new PatternLayout("%d{dd-MM-yyyy HH:mm:ss z} %c{1} %p - %m\n");
@@ -80,6 +82,8 @@ public class FlowRunner extends EventHandler implements Runnable {
private boolean flowFinished = false;
private boolean flowCancelled = false;
+ private List<String> proxyUsers = null;
+
public FlowRunner(ExecutableFlow flow, ExecutorLoader executorLoader, ProjectLoader projectLoader, JobTypeManager jobtypeManager) throws ExecutorManagerException {
this.execId = flow.getExecutionId();
this.flow = flow;
@@ -88,6 +92,27 @@ public class FlowRunner extends EventHandler implements Runnable {
this.executorService = Executors.newFixedThreadPool(numThreads);
this.execDir = new File(flow.getExecutionPath());
this.jobtypeManager = jobtypeManager;
+
+ this.proxyUsers = getProxyUsers();
+ }
+
+ private List<String> getProxyUsers() {
+ List<String> allUsers = new ArrayList<String>();
+ allUsers.add(flow.getSubmitUser());
+ List<Triple<String, Boolean, Permission>> permissions;
+ try {
+ permissions = projectLoader.getProjectPermissions(flow.getProjectId());
+ for(Triple<String, Boolean, Permission> triple : permissions) {
+ if(triple.getSecond() == false && triple.getThird().isPermissionSet(Permission.Type.EXECUTE)) {
+ allUsers.add(triple.getFirst());
+ }
+ }
+ } catch (ProjectManagerException e) {
+ // This gets funny when no user specified and submitted by the scheduler
+ logger.error("Failed to get project permission from project. Using default permission.", e);
+ }
+
+ return allUsers;
}
public FlowRunner setGlobalProps(Props globalProps) {
@@ -345,7 +370,7 @@ public class FlowRunner extends EventHandler implements Runnable {
prop.setParent(parentProps);
// should have one prop with system secrets, the other user level props
- JobRunner jobRunner = new JobRunner(node, prop, path.getParentFile(), executorLoader, jobtypeManager, logger);
+ JobRunner jobRunner = new JobRunner(node, prop, path.getParentFile(), proxyUsers, executorLoader, jobtypeManager, logger);
jobRunner.addListener(listener);
return jobRunner;
src/java/azkaban/execapp/JobRunner.java 32(+29 -3)
diff --git a/src/java/azkaban/execapp/JobRunner.java b/src/java/azkaban/execapp/JobRunner.java
index e34132e..641787d 100644
--- a/src/java/azkaban/execapp/JobRunner.java
+++ b/src/java/azkaban/execapp/JobRunner.java
@@ -17,6 +17,7 @@ package azkaban.execapp;
import java.io.File;
import java.io.IOException;
+import java.util.List;
import org.apache.log4j.Appender;
import org.apache.log4j.FileAppender;
@@ -35,6 +36,7 @@ import azkaban.flow.CommonJobProperties;
import azkaban.jobExecutor.AbstractProcessJob;
import azkaban.jobExecutor.Job;
import azkaban.jobtype.JobTypeManager;
+import azkaban.jobtype.JobTypeManagerException;
import azkaban.utils.Props;
@@ -61,8 +63,9 @@ public class JobRunner extends EventHandler implements Runnable {
private Object syncObject = new Object();
private final JobTypeManager jobtypeManager;
+ private List<String> proxyUsers = null;
- public JobRunner(ExecutableNode node, Props props, File workingDir, ExecutorLoader loader, JobTypeManager jobtypeManager, Logger flowLogger) {
+ public JobRunner(ExecutableNode node, Props props, File workingDir, List<String> proxyUsers, ExecutorLoader loader, JobTypeManager jobtypeManager, Logger flowLogger) {
this.props = props;
this.node = node;
this.workingDir = workingDir;
@@ -70,6 +73,7 @@ public class JobRunner extends EventHandler implements Runnable {
this.loader = loader;
this.jobtypeManager = jobtypeManager;
this.flowLogger = flowLogger;
+ this.proxyUsers = proxyUsers;
}
public ExecutableNode getNode() {
@@ -152,6 +156,10 @@ public class JobRunner extends EventHandler implements Runnable {
fireEvent(Event.create(this, Type.JOB_STATUS_CHANGED), false);
runJob();
}
+ else {
+ node.setStatus(Status.FAILED);
+ logError("Job run failed!");
+ }
node.setEndTime(System.currentTimeMillis());
@@ -211,9 +219,27 @@ public class JobRunner extends EventHandler implements Runnable {
if (!props.containsKey(AbstractProcessJob.WORKING_DIR)) {
props.put(AbstractProcessJob.WORKING_DIR, workingDir.getAbsolutePath());
}
-
+
+ String jobProxyUser = props.getString("user.to.proxy", null);
+ if(jobProxyUser == null) {
+ jobProxyUser = proxyUsers.get(0);
+ }
+ else {
+ if(! proxyUsers.contains(jobProxyUser)) {
+ logger.error("User " + jobProxyUser + " has no permission to execute this job " + node.getJobId() + "!");
+ return false;
+ }
+ }
+ props.put("user.to.proxy", jobProxyUser);
+
//job = JobWrappingFactory.getJobWrappingFactory().buildJobExecutor(node.getJobId(), props, logger);
- job = jobtypeManager.buildJobExecutor(node.getJobId(), props, logger);
+ try {
+ job = jobtypeManager.buildJobExecutor(node.getJobId(), props, logger);
+ }
+ catch (JobTypeManagerException e) {
+ logger.error("Failed to build job type, skipping this job");
+ return false;
+ }
}
return true;
src/java/azkaban/jobtype/JobTypeManager.java 48(+35 -13)
diff --git a/src/java/azkaban/jobtype/JobTypeManager.java b/src/java/azkaban/jobtype/JobTypeManager.java
index d43b0da..65c747e 100644
--- a/src/java/azkaban/jobtype/JobTypeManager.java
+++ b/src/java/azkaban/jobtype/JobTypeManager.java
@@ -73,6 +73,7 @@ public class JobTypeManager
}
catch (Exception e) {
logger.info("Plugin jobtypes failed to load. " + e.getCause());
+ throw new JobTypeManagerException(e);
}
}
@@ -120,6 +121,7 @@ public class JobTypeManager
loadJob(dir, globalConf, globalSysConf);
}
catch (Exception e) {
+ logger.error("Failed to load jobtype " + dir.getName() + e.getMessage());
throw new JobTypeManagerException(e);
}
}
@@ -195,7 +197,7 @@ public class JobTypeManager
try {
if(confFile != null) {
conf = new Props(commonConf, confFile);
- conf = PropsUtils.resolveProps(conf);
+// conf = PropsUtils.resolveProps(conf);
}
if(sysConfFile != null) {
sysConf = new Props(commonSysConf, sysConfFile);
@@ -203,7 +205,7 @@ public class JobTypeManager
}
}
catch (Exception e) {
- throw new JobTypeManagerException("Failed to get jobtype properties", e);
+ throw new JobTypeManagerException("Failed to get jobtype properties" + e.getMessage());
}
sysConf.put("plugin.dir", dir.getAbsolutePath());
@@ -221,7 +223,7 @@ public class JobTypeManager
try {
if(f.getName().endsWith(".jar")) {
resources.add(f.toURI().toURL());
- logger.info("adding to classpath " + f);
+ logger.info("adding to classpath " + f.toURI().toURL());
}
} catch (MalformedURLException e) {
// TODO Auto-generated catch block
@@ -240,6 +242,23 @@ public class JobTypeManager
catch (ClassNotFoundException e) {
throw new JobTypeManagerException(e);
}
+
+ logger.info("Doing simple testing...");
+ try {
+ Props fakeSysProps = new Props(sysConf);
+ fakeSysProps.put("type", jobtypeName);
+ Props fakeJobProps = new Props(conf);
+ Job job = (Job)Utils.callConstructor(clazz, "dummy", fakeSysProps, fakeJobProps, logger);
+ }
+ catch (Exception e) {
+ logger.info("Jobtype " + jobtypeName + " failed test!", e);
+ throw new JobExecutionException(e);
+ }
+ catch (Throwable t) {
+ logger.info("Jobtype " + jobtypeName + " failed test!", t);
+ throw new JobExecutionException(t);
+ }
+
logger.info("Loaded jobtype " + jobtypeName + " " + jobtypeClass);
if(conf != null) jobtypeJobProps.put(jobtypeName, conf);
@@ -249,7 +268,7 @@ public class JobTypeManager
public Job buildJobExecutor(String jobId, Props jobProps, Logger logger) throws JobTypeManagerException
{
- Job job;
+ Job job = null;
try {
String jobType = jobProps.getString("type");
if (jobType == null || jobType.length() == 0) {
@@ -274,14 +293,12 @@ public class JobTypeManager
Props p = jobtypeJobProps.get(jobType);
for(String k : p.getKeySet())
{
- if(!jobProps.containsKey(k)) {
- jobProps.put(k, p.get(k));
+ if(!jobConf.containsKey(k)) {
+ jobConf.put(k, p.get(k));
}
}
}
- else {
- jobConf = jobProps;
- }
+ jobConf = PropsUtils.resolveProps(jobConf);
if (sysConf != null) {
sysConf = PropsUtils.resolveProps(sysConf);
@@ -289,19 +306,24 @@ public class JobTypeManager
else {
sysConf = new Props();
}
-
- jobConf = PropsUtils.resolveProps(jobConf);
+
// logger.info("sysConf is " + sysConf);
// logger.info("jobConf is " + jobConf);
-
+//
job = (Job)Utils.callConstructor(executorClass, jobId, sysConf, jobConf, logger);
logger.info("job built.");
}
catch (Exception e) {
- job = new InitErrorJob(jobId, e);
+ //job = new InitErrorJob(jobId, e);
+ logger.error("Failed to build job executor for job " + jobId + e.getMessage());
+ throw new JobTypeManagerException("Failed to build job executor for job " + jobId, e);
//throw new JobTypeManagerException(e);
}
+ catch (Throwable t) {
+ logger.error("Failed to build job executor for job " + jobId + t.getMessage(), t);
+ throw new JobTypeManagerException("Failed to build job executor for job " + jobId, t);
+ }
return job;
}
diff --git a/src/java/azkaban/project/JdbcProjectLoader.java b/src/java/azkaban/project/JdbcProjectLoader.java
index 01976b7..6f0c3f5 100644
--- a/src/java/azkaban/project/JdbcProjectLoader.java
+++ b/src/java/azkaban/project/JdbcProjectLoader.java
@@ -525,6 +525,21 @@ public class JdbcProjectLoader implements ProjectLoader {
}
@Override
+ public List<Triple<String, Boolean, Permission>> getProjectPermissions(int projectId) throws ProjectManagerException {
+ ProjectPermissionsResultHandler permHander = new ProjectPermissionsResultHandler();
+ QueryRunner runner = new QueryRunner(dataSource);
+ List<Triple<String, Boolean,Permission>> permissions = null;
+ try {
+ permissions = runner.query(ProjectPermissionsResultHandler.SELECT_PROJECT_PERMISSION, permHander, projectId);
+ } catch (SQLException e) {
+ throw new ProjectManagerException("Query for permissions for " + projectId + " failed.", e);
+ }
+
+ return permissions;
+ }
+
+
+ @Override
public void removeProject(Project project, String user) throws ProjectManagerException {
QueryRunner runner = new QueryRunner(dataSource);
diff --git a/src/java/azkaban/project/ProjectLoader.java b/src/java/azkaban/project/ProjectLoader.java
index 4711dc0..5d01a71 100644
--- a/src/java/azkaban/project/ProjectLoader.java
+++ b/src/java/azkaban/project/ProjectLoader.java
@@ -10,6 +10,7 @@ import azkaban.project.ProjectLogEvent.EventType;
import azkaban.user.Permission;
import azkaban.user.User;
import azkaban.utils.Props;
+import azkaban.utils.Triple;
public interface ProjectLoader {
@@ -218,5 +219,7 @@ public interface ProjectLoader {
public void updateProjectProperty(Project project, Props props) throws ProjectManagerException;
Props fetchProjectProperty(int projectId, int projectVer, String propsName) throws ProjectManagerException;
+
+ List<Triple<String, Boolean, Permission>> getProjectPermissions(int projectId) throws ProjectManagerException;
}
\ No newline at end of file
src/java/azkaban/sla/SLAManager.java 100(+0 -100)
diff --git a/src/java/azkaban/sla/SLAManager.java b/src/java/azkaban/sla/SLAManager.java
index 04fbe08..4e04528 100644
--- a/src/java/azkaban/sla/SLAManager.java
+++ b/src/java/azkaban/sla/SLAManager.java
@@ -3,34 +3,21 @@ package azkaban.sla;
import java.lang.Thread.State;
import java.util.ArrayList;
import java.util.Comparator;
-import java.util.LinkedHashMap;
import java.util.List;
-import java.util.Map;
import java.util.concurrent.PriorityBlockingQueue;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.log4j.Logger;
-import org.apache.velocity.runtime.parser.node.GetExecutor;
import org.joda.time.DateTime;
-import org.joda.time.DateTimeZone;
-import org.joda.time.ReadablePeriod;
-import org.joda.time.format.DateTimeFormat;
-import org.joda.time.format.DateTimeFormatter;
import azkaban.executor.ExecutableFlow;
import azkaban.executor.ExecutableFlow.Status;
import azkaban.executor.ExecutableNode;
-import azkaban.executor.ExecutorMailer;
import azkaban.executor.ExecutorManager;
import azkaban.executor.ExecutorManagerException;
-import azkaban.flow.Flow;
-import azkaban.project.Project;
-import azkaban.project.ProjectManager;
import azkaban.sla.SLA.SlaAction;
import azkaban.sla.SLA.SlaRule;
import azkaban.sla.SLA.SlaSetting;
-import azkaban.user.User;
-import azkaban.utils.Pair;
import azkaban.utils.Props;
/*
@@ -61,7 +48,6 @@ public class SLAManager {
private SLALoader loader;
private final SLARunner runner;
- private final SLAPreRunner prerunner;
private final ExecutorManager executorManager;
private SlaMailer mailer;
@@ -82,7 +68,6 @@ public class SLAManager {
this.loader = loader;
this.mailer = new SlaMailer(props);
this.runner = new SLARunner();
- this.prerunner = new SLAPreRunner();
List<SLA> SLAList = null;
try {
@@ -105,7 +90,6 @@ public class SLAManager {
*/
public void shutdown() {
this.runner.shutdown();
- this.prerunner.shutdown();
}
/**
@@ -475,90 +459,6 @@ public class SLAManager {
}
mailer.sendSlaEmail(s, message);
}
-
-
- public class SLAPreRunner extends Thread {
- private final List<SLA> preSlas;
- private AtomicBoolean stillAlive = new AtomicBoolean(true);
-
- // Five minute minimum intervals
- private static final int TIMEOUT_MS = 300000;
-
- public SLAPreRunner() {
- preSlas = new ArrayList<SLA>();
- }
-
- public void shutdown() {
- logger.error("Shutting down pre-sla checker thread");
- stillAlive.set(false);
- this.interrupt();
- }
-
- /**
- * Return a list of flow with SLAs
- *
- * @return
- */
- protected synchronized List<SLA> getPreSlas() {
- return new ArrayList<SLA>(preSlas);
- }
-
- /**
- * Adds SLA into runner and then interrupts so it will update
- * its wait time.
- *
- * @param flow
- */
- public synchronized void addCheckerPreSla(SLA s) {
- logger.info("Adding " + s + " to pre-sla checker.");
- preSlas.add(s);
- this.interrupt();
- }
-
- /**
- * Remove runner SLA. Does not interrupt.
- *
- * @param flow
- * @throws SLAManagerException
- */
- public synchronized void removeCheckerPreSla(SLA s) {
- logger.info("Removing " + s + " from the pre-sla checker.");
- preSlas.remove(s);
- }
-
- public void run() {
- while (stillAlive.get()) {
- synchronized (this) {
- try {
- // TODO clear up the exception handling
-
- if (preSlas.size() == 0) {
- try {
- this.wait(TIMEOUT_MS);
- } catch (InterruptedException e) {
- // interruption should occur when items are added or removed from the queue.
- }
- } else {
- for(SLA s : preSlas) {
- ExecutableFlow exflow = executorManager.getExecutableFlow(s.getExecId());
- String id = s.getJobName();
- if(!s.equals("")) {
- ExecutableNode exnode = exflow.getExecutableNode(id);
- if(exnode.getStartTime() != -1) {
-
- }
- }
- }
- }
- } catch (Exception e) {
- logger.error("Unexpected exception has been thrown in scheduler", e);
- } catch (Throwable e) {
- logger.error("Unexpected throwable has been thrown in scheduler", e);
- }
- }
- }
- }
- }
public int getNumActiveSLA() {
return runner.getRunnerSLAs().size();
src/java/azkaban/utils/Props.java 15(+15 -0)
diff --git a/src/java/azkaban/utils/Props.java b/src/java/azkaban/utils/Props.java
index 4b16b9c..84b48b3 100644
--- a/src/java/azkaban/utils/Props.java
+++ b/src/java/azkaban/utils/Props.java
@@ -35,6 +35,8 @@ import java.util.Map;
import java.util.Properties;
import java.util.Set;
+import javax.swing.text.StyledEditorKit.BoldAction;
+
import org.apache.log4j.Logger;
/**
@@ -418,6 +420,19 @@ public class Props {
}
}
+ public Class<?> getClass(String key, boolean initialize, ClassLoader cl) {
+ try {
+ if (containsKey(key)) {
+ return Class.forName(get(key), initialize, cl);
+ } else {
+ throw new UndefinedPropertyException(
+ "Missing required property '" + key + "'");
+ }
+ } catch (ClassNotFoundException e) {
+ throw new IllegalArgumentException(e);
+ }
+ }
+
/**
* Gets the class from the Props. If it doesn't exist, it will return the
* defaultClass
diff --git a/unit/java/azkaban/test/execapp/JobRunnerTest.java b/unit/java/azkaban/test/execapp/JobRunnerTest.java
index 41d44f3..056cbac 100644
--- a/unit/java/azkaban/test/execapp/JobRunnerTest.java
+++ b/unit/java/azkaban/test/execapp/JobRunnerTest.java
@@ -2,6 +2,8 @@ package azkaban.test.execapp;
import java.io.File;
import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
import junit.framework.Assert;
@@ -261,8 +263,9 @@ public class JobRunnerTest {
node.setExecutableFlow(flow);
Props props = createProps(time, fail);
-
- JobRunner runner = new JobRunner(node, props, workingDir, loader, jobtypeManager, logger);
+ List<String> proxyUsers = new ArrayList<String>();
+ proxyUsers.add(flow.getSubmitUser());
+ JobRunner runner = new JobRunner(node, props, workingDir, proxyUsers, loader, jobtypeManager, logger);
runner.addListener(listener);
return runner;
diff --git a/unit/java/azkaban/test/execapp/MockProjectLoader.java b/unit/java/azkaban/test/execapp/MockProjectLoader.java
index 0739bd3..165f0c9 100644
--- a/unit/java/azkaban/test/execapp/MockProjectLoader.java
+++ b/unit/java/azkaban/test/execapp/MockProjectLoader.java
@@ -17,6 +17,7 @@ import azkaban.project.ProjectManagerException;
import azkaban.user.Permission;
import azkaban.user.User;
import azkaban.utils.Props;
+import azkaban.utils.Triple;
public class MockProjectLoader implements ProjectLoader {
public File dir;
@@ -209,4 +210,11 @@ public class MockProjectLoader implements ProjectLoader {
// TODO Auto-generated method stub
return null;
}
+
+ @Override
+ public List<Triple<String, Boolean, Permission>> getProjectPermissions(
+ int projectId) throws ProjectManagerException {
+ // TODO Auto-generated method stub
+ return null;
+ }
}
\ No newline at end of file