azkaban-uncached
Changes
README.md 4(+2 -2)
src/java/azkaban/execapp/FlowRunner.java 40(+34 -6)
src/java/azkaban/execapp/JobRunner.java 73(+62 -11)
src/java/azkaban/jobtype/JobTypeManager.java 48(+35 -13)
src/java/azkaban/project/Project.java 44(+43 -1)
src/java/azkaban/project/ProjectManager.java 22(+21 -1)
src/java/azkaban/sla/SLAManager.java 102(+2 -100)
src/java/azkaban/user/XmlUserManager.java 35(+33 -2)
src/java/azkaban/utils/Props.java 13(+13 -0)
src/sql/create_execution_logs.sql 3(+1 -2)
src/sql/create_project_table.sql 2(+2 -0)
src/sql/update_2.0_to_2.01.sql 3(+3 -0)
src/web/js/azkaban.exflow.view.js 1(+1 -0)
src/web/js/azkaban.permission.view.js 79(+69 -10)
Details
README.md 4(+2 -2)
diff --git a/README.md b/README.md
index 897a0b5..672010c 100644
--- a/README.md
+++ b/README.md
@@ -12,8 +12,8 @@ Here are a few features:
* Project workspaces
* Logging and auditing of workflow and jobs
-To go to the package download links: [Download-Packages](azkaban2/wiki/Download-Packages)
+To go to the package download links: [Download-Packages](https://github.com/azkaban/azkaban2/wiki/Download-Packages)
-To go to the azkaban-plugins github repo: [Azkaban-plugins](azkaban-plugins)
+To go to the azkaban-plugins github repo: [Azkaban-plugins](https://github.com/azkaban/azkaban-plugins)
There is a google groups: [Azkaban Group](https://groups.google.com/forum/?fromgroups#!forum/azkaban-dev)
diff --git a/src/java/azkaban/execapp/ExecutorServlet.java b/src/java/azkaban/execapp/ExecutorServlet.java
index 4fd55c2..1475a9a 100644
--- a/src/java/azkaban/execapp/ExecutorServlet.java
+++ b/src/java/azkaban/execapp/ExecutorServlet.java
@@ -58,13 +58,16 @@ public class ExecutorServlet extends HttpServlet implements ConnectorParams {
@Override
public void doGet(HttpServletRequest req, HttpServletResponse resp) throws ServletException, IOException {
HashMap<String,Object> respMap= new HashMap<String,Object>();
+ logger.info("ExecutorServer called by " + req.getRemoteAddr());
try {
if (!hasParam(req, ACTION_PARAM)) {
+ logger.error("Parameter action not set");
respMap.put("error", "Parameter action not set");
}
else {
String action = getParam(req, ACTION_PARAM);
if (action.equals(UPDATE_ACTION)) {
+ logger.info("Updated called");
handleAjaxUpdateRequest(req, respMap);
}
else if (action.equals(PING_ACTION)) {
@@ -74,7 +77,8 @@ public class ExecutorServlet extends HttpServlet implements ConnectorParams {
int execid = Integer.parseInt(getParam(req, EXECID_PARAM));
String user = getParam(req, USER_PARAM, null);
- if (action.equals(LOG_ACTION)) {
+ logger.info("User " + user + " has called action " + action + " on " + execid);
+ if (action.equals(LOG_ACTION)) {
handleFetchLogEvent(execid, req, resp, respMap);
}
else if (action.equals(EXECUTE_ACTION)) {
@@ -100,11 +104,13 @@ public class ExecutorServlet extends HttpServlet implements ConnectorParams {
handleModifyExecutionRequest(respMap, execid, user, req);
}
else {
+ logger.error("action: '" + action + "' not supported.");
respMap.put("error", "action: '" + action + "' not supported.");
}
}
}
} catch (Exception e) {
+ logger.error(e);
respMap.put(RESPONSE_ERROR, e.getMessage());
}
writeJSON(resp, respMap);
@@ -139,6 +145,7 @@ public class ExecutorServlet extends HttpServlet implements ConnectorParams {
}
} catch (ExecutorManagerException e) {
+ logger.error(e);
respMap.put("error", e.getMessage());
}
}
@@ -157,6 +164,7 @@ public class ExecutorServlet extends HttpServlet implements ConnectorParams {
result = flowRunnerManager.readFlowLogs(execId, startByte, length);
respMap.putAll(result.toObject());
} catch (Exception e) {
+ logger.error(e);
respMap.put(RESPONSE_ERROR, e.getMessage());
}
}
@@ -167,6 +175,7 @@ public class ExecutorServlet extends HttpServlet implements ConnectorParams {
LogData result = flowRunnerManager.readJobLogs(execId, jobId, attempt, startByte, length);
respMap.putAll(result.toObject());
} catch (Exception e) {
+ logger.error(e);
respMap.put("error", e.getMessage());
}
}
@@ -203,6 +212,8 @@ public class ExecutorServlet extends HttpServlet implements ConnectorParams {
try {
flowRunnerManager.submitFlow(execId);
} catch (ExecutorManagerException e) {
+ e.printStackTrace();
+ logger.error(e);
respMap.put(RESPONSE_ERROR, e.getMessage());
}
}
@@ -228,7 +239,7 @@ public class ExecutorServlet extends HttpServlet implements ConnectorParams {
flowRunnerManager.pauseFlow(execid, user);
respMap.put(STATUS_PARAM, RESPONSE_SUCCESS);
} catch (ExecutorManagerException e) {
- e.printStackTrace();
+ logger.error(e);
respMap.put(RESPONSE_ERROR, e.getMessage());
}
}
@@ -258,7 +269,7 @@ public class ExecutorServlet extends HttpServlet implements ConnectorParams {
flowRunnerManager.cancelFlow(execid, user);
respMap.put(STATUS_PARAM, RESPONSE_SUCCESS);
} catch (ExecutorManagerException e) {
- e.printStackTrace();
+ logger.error(e);
respMap.put(RESPONSE_ERROR, e.getMessage());
}
}
src/java/azkaban/execapp/FlowRunner.java 40(+34 -6)
diff --git a/src/java/azkaban/execapp/FlowRunner.java b/src/java/azkaban/execapp/FlowRunner.java
index c61d724..2f58a1a 100644
--- a/src/java/azkaban/execapp/FlowRunner.java
+++ b/src/java/azkaban/execapp/FlowRunner.java
@@ -5,6 +5,7 @@ import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
+import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.concurrent.BlockingQueue;
@@ -88,8 +89,14 @@ public class FlowRunner extends EventHandler implements Runnable {
// Watches external flows for execution.
private FlowWatcher watcher = null;
+
+ private HashSet<String> proxyUsers = null;
+ private boolean validateUserProxy;
+
+ private String jobLogFileSize = "5MB";
+ private int jobLogNumFiles = 4;
- public FlowRunner(ExecutableFlow flow, FlowWatcher watcher, ExecutorLoader executorLoader, ProjectLoader projectLoader, JobTypeManager jobtypeManager) throws ExecutorManagerException {
+ public FlowRunner(ExecutableFlow flow, ExecutorLoader executorLoader, ProjectLoader projectLoader, JobTypeManager jobtypeManager) throws ExecutorManagerException {
this.execId = flow.getExecutionId();
this.flow = flow;
this.executorLoader = executorLoader;
@@ -97,17 +104,35 @@ public class FlowRunner extends EventHandler implements Runnable {
this.executorService = Executors.newFixedThreadPool(numThreads);
this.execDir = new File(flow.getExecutionPath());
this.jobtypeManager = jobtypeManager;
-
+
this.pipelineLevel = flow.getPipelineLevel();
this.pipelineExecId = flow.getPipelineExecutionId();
- this.watcher = watcher;
+
+ this.proxyUsers = flow.getProxyUsers();
}
+ public FlowRunner setFlowWatcher(FlowWatcher watcher) {
+ this.watcher = watcher;
+ return this;
+ }
+
public FlowRunner setGlobalProps(Props globalProps) {
this.globalProps = globalProps;
return this;
}
-
+
+ public FlowRunner setJobLogSettings(String jobLogFileSize, int jobLogNumFiles) {
+ this.jobLogFileSize = jobLogFileSize;
+ this.jobLogNumFiles = jobLogNumFiles;
+
+ return this;
+ }
+
+ public FlowRunner setValidateProxyUser(boolean validateUserProxy) {
+ this.validateUserProxy = validateUserProxy;
+ return this;
+ }
+
public File getExecutionDir() {
return execDir;
}
@@ -360,12 +385,15 @@ public class FlowRunner extends EventHandler implements Runnable {
prop.setSource(path.getPath());
prop.setParent(parentProps);
- // should have one prop with system secrets, the other user level props
- JobRunner jobRunner = new JobRunner(node, prop, path.getParentFile(), executorLoader, jobtypeManager, logger);
+ JobRunner jobRunner = new JobRunner(node, prop, path.getParentFile(), executorLoader, jobtypeManager);
if (watcher != null) {
jobRunner.setPipeline(watcher, pipelineLevel);
}
+ if (validateUserProxy) {
+ jobRunner.setValidatedProxyUsers(proxyUsers);
+ }
+ jobRunner.setLogSettings(logger, jobLogFileSize, jobLogNumFiles);
jobRunner.addListener(listener);
return jobRunner;
diff --git a/src/java/azkaban/execapp/FlowRunnerManager.java b/src/java/azkaban/execapp/FlowRunnerManager.java
index 11c6bc1..9e9d8c7 100644
--- a/src/java/azkaban/execapp/FlowRunnerManager.java
+++ b/src/java/azkaban/execapp/FlowRunnerManager.java
@@ -18,6 +18,7 @@ package azkaban.execapp;
import java.io.File;
import java.io.FileFilter;
+import java.io.FilenameFilter;
import java.io.IOException;
import java.lang.Thread.State;
import java.util.ArrayList;
@@ -79,16 +80,27 @@ public class FlowRunnerManager implements EventListener {
private Props globalProps;
+ private final Props azkabanProps;
+
private long lastSubmitterThreadCheckTime = -1;
private long lastCleanerThreadCheckTime = -1;
private long executionDirRetention = 1*24*60*60*1000;
- private Object executionDirDeletionSync = new Object();
+ // We want to limit the log sizes to about 20 megs
+ private String jobLogChunkSize = "5MB";
+ private int jobLogNumFiles = 4;
+
+ // If true, jobs will validate proxy user against a list of valid proxy users.
+ private boolean validateProxyUser = false;
+ private Object executionDirDeletionSync = new Object();
+
public FlowRunnerManager(Props props, ExecutorLoader executorLoader, ProjectLoader projectLoader, ClassLoader parentClassLoader) throws IOException {
executionDirectory = new File(props.getString("azkaban.execution.dir", "executions"));
projectDirectory = new File(props.getString("azkaban.project.dir", "projects"));
+ azkabanProps = props;
+
//JobWrappingFactory.init(props, getClass().getClassLoader());
executionDirRetention = props.getLong("execution.dir.retention", executionDirRetention);
logger.info("Execution dir retention set to " + executionDirRetention + " ms");
@@ -100,6 +112,8 @@ public class FlowRunnerManager implements EventListener {
projectDirectory.mkdirs();
}
+ installedProjects = loadExistingProjects();
+
//azkaban.temp.dir
numThreads = props.getInt("executor.flow.threads", DEFAULT_NUM_EXECUTING_FLOWS);
executorService = Executors.newFixedThreadPool(numThreads);
@@ -107,6 +121,11 @@ public class FlowRunnerManager implements EventListener {
this.executorLoader = executorLoader;
this.projectLoader = projectLoader;
+ this.jobLogChunkSize = azkabanProps.getString("job.log.chunk.size", "5MB");
+ this.jobLogNumFiles = azkabanProps.getInt("job.log.backup.index", 4);
+
+ this.validateProxyUser = azkabanProps.getBoolean("proxy.user.lock.down", false);
+
submitterThread = new SubmitterThread(flowQueue);
submitterThread.start();
@@ -117,6 +136,32 @@ public class FlowRunnerManager implements EventListener {
}
+ private Map<Pair<Integer, Integer>, ProjectVersion> loadExistingProjects() {
+ Map<Pair<Integer, Integer>, ProjectVersion> allProjects = new HashMap<Pair<Integer,Integer>, ProjectVersion>();
+ for(File project : projectDirectory.listFiles(new FilenameFilter() {
+
+ String pattern = "[0-9]+\\.[0-9]+";
+ @Override
+ public boolean accept(File dir, String name) {
+ return name.matches(pattern);
+ }
+ })) {
+ if(project.isDirectory()) {
+ try {
+ String fileName = new File(project.getAbsolutePath()).getName();
+ int projectId = Integer.parseInt(fileName.split("\\.")[0]);
+ int versionNum = Integer.parseInt(fileName.split("\\.")[1]);
+ ProjectVersion version = new ProjectVersion(projectId, versionNum);
+ allProjects.put(new Pair<Integer, Integer>(projectId, versionNum), version);
+ }
+ catch (Exception e) {
+ e.printStackTrace();
+ }
+ }
+ }
+ return allProjects;
+ }
+
public Props getGlobalProps() {
return globalProps;
}
@@ -334,8 +379,11 @@ public class FlowRunnerManager implements EventListener {
watcher = new RemoteFlowWatcher(pipelineExecId, executorLoader);
}
}
-
- FlowRunner runner = new FlowRunner(flow, watcher, executorLoader, projectLoader, jobtypeManager);
+
+ FlowRunner runner = new FlowRunner(flow, executorLoader, projectLoader, jobtypeManager);
+ runner.setFlowWatcher(watcher);
+ runner.setJobLogSettings(jobLogChunkSize, jobLogNumFiles);
+ runner.setValidateProxyUser(validateProxyUser);
runner.setGlobalProps(globalProps);
runner.addListener(this);
src/java/azkaban/execapp/JobRunner.java 73(+62 -11)
diff --git a/src/java/azkaban/execapp/JobRunner.java b/src/java/azkaban/execapp/JobRunner.java
index f6c482b..a14aff5 100644
--- a/src/java/azkaban/execapp/JobRunner.java
+++ b/src/java/azkaban/execapp/JobRunner.java
@@ -16,17 +16,21 @@ package azkaban.execapp;
*/
import java.io.File;
+import java.io.FilenameFilter;
import java.io.IOException;
import java.util.ArrayList;
+
import java.util.HashSet;
-import java.util.List;
import java.util.Set;
+import java.util.Arrays;
+import java.util.Collections;
+
import org.apache.log4j.Appender;
-import org.apache.log4j.FileAppender;
import org.apache.log4j.Layout;
import org.apache.log4j.Logger;
import org.apache.log4j.PatternLayout;
+import org.apache.log4j.RollingFileAppender;
import azkaban.execapp.event.BlockingStatus;
import azkaban.execapp.event.Event;
@@ -41,12 +45,13 @@ import azkaban.flow.CommonJobProperties;
import azkaban.jobExecutor.AbstractProcessJob;
import azkaban.jobExecutor.Job;
import azkaban.jobtype.JobTypeManager;
+import azkaban.jobtype.JobTypeManagerException;
import azkaban.utils.Props;
public class JobRunner extends EventHandler implements Runnable {
private static final Layout DEFAULT_LAYOUT = new PatternLayout("%d{dd-MM-yyyy HH:mm:ss z} %c{1} %p - %m\n");
-
+
private ExecutorLoader loader;
private Props props;
private Props outputProps;
@@ -67,28 +72,44 @@ public class JobRunner extends EventHandler implements Runnable {
private Object syncObject = new Object();
private final JobTypeManager jobtypeManager;
+
+ // Used by the job to watch and block against another flow
private Integer pipelineLevel = null;
private FlowWatcher watcher = null;
private Set<String> pipelineJobs = new HashSet<String>();
+
+ private Set<String> proxyUsers = null;
+
+ private String jobLogChunkSize;
+ private int jobLogBackupIndex;
- public JobRunner(ExecutableNode node, Props props, File workingDir, ExecutorLoader loader, JobTypeManager jobtypeManager, Logger flowLogger) {
+ public JobRunner(ExecutableNode node, Props props, File workingDir, ExecutorLoader loader, JobTypeManager jobtypeManager) {
this.props = props;
this.node = node;
this.workingDir = workingDir;
this.executionId = node.getExecutionId();
this.loader = loader;
this.jobtypeManager = jobtypeManager;
+ }
+
+ public void setValidatedProxyUsers(Set<String> proxyUsers) {
+ this.proxyUsers = proxyUsers;
+ }
+
+ public void setLogSettings(Logger flowLogger, String logFileChuckSize, int numLogBackup ) {
this.flowLogger = flowLogger;
+ this.jobLogChunkSize = logFileChuckSize;
+ this.jobLogBackupIndex = numLogBackup;
}
public void setPipeline(FlowWatcher watcher, int pipelineLevel) {
this.watcher = watcher;
this.pipelineLevel = pipelineLevel;
- if (pipelineLevel == 1) {
+ if (this.pipelineLevel == 1) {
pipelineJobs.add(node.getJobId());
}
- else if (pipelineLevel == 2) {
+ else if (this.pipelineLevel == 2) {
pipelineJobs.add(node.getJobId());
pipelineJobs.addAll(node.getOutNodes());
}
@@ -115,8 +136,9 @@ public class JobRunner extends EventHandler implements Runnable {
jobAppender = null;
try {
- FileAppender fileAppender = new FileAppender(loggerLayout, absolutePath, true);
-
+ RollingFileAppender fileAppender = new RollingFileAppender(loggerLayout, absolutePath, true);
+ fileAppender.setMaxBackupIndex(jobLogBackupIndex);
+ fileAppender.setMaxFileSize(jobLogChunkSize);
jobAppender = fileAppender;
logger.addAppender(jobAppender);
} catch (IOException e) {
@@ -199,6 +221,10 @@ public class JobRunner extends EventHandler implements Runnable {
fireEvent(Event.create(this, Type.JOB_STATUS_CHANGED), false);
runJob();
}
+ else {
+ node.setStatus(Status.FAILED);
+ logError("Job run failed!");
+ }
node.setEndTime(System.currentTimeMillis());
@@ -209,7 +235,18 @@ public class JobRunner extends EventHandler implements Runnable {
if (logFile != null) {
try {
- loader.uploadLogFile(executionId, node.getJobId(), node.getAttempt(), logFile);
+ File[] files = logFile.getParentFile().listFiles(new FilenameFilter() {
+
+ @Override
+ public boolean accept(File dir, String name) {
+ return name.startsWith(logFile.getName());
+ }
+ }
+ );
+ Arrays.sort(files, Collections.reverseOrder());
+
+
+ loader.uploadLogFile(executionId, node.getJobId(), node.getAttempt(), files);
} catch (ExecutorManagerException e) {
flowLogger.error("Error writing out logs for job " + node.getJobId(), e);
}
@@ -258,9 +295,23 @@ public class JobRunner extends EventHandler implements Runnable {
if (!props.containsKey(AbstractProcessJob.WORKING_DIR)) {
props.put(AbstractProcessJob.WORKING_DIR, workingDir.getAbsolutePath());
}
-
+
+ if(props.containsKey("user.to.proxy")) {
+ String jobProxyUser = props.getString("user.to.proxy");
+ if(proxyUsers != null && !proxyUsers.contains(jobProxyUser)) {
+ logger.error("User " + jobProxyUser + " has no permission to execute this job " + node.getJobId() + "!");
+ return false;
+ }
+ }
+
//job = JobWrappingFactory.getJobWrappingFactory().buildJobExecutor(node.getJobId(), props, logger);
- job = jobtypeManager.buildJobExecutor(node.getJobId(), props, logger);
+ try {
+ job = jobtypeManager.buildJobExecutor(node.getJobId(), props, logger);
+ }
+ catch (JobTypeManagerException e) {
+ logger.error("Failed to build job type, skipping this job");
+ return false;
+ }
}
return true;
diff --git a/src/java/azkaban/executor/ExecutableFlow.java b/src/java/azkaban/executor/ExecutableFlow.java
index d2d44dc..5d25081 100644
--- a/src/java/azkaban/executor/ExecutableFlow.java
+++ b/src/java/azkaban/executor/ExecutableFlow.java
@@ -60,6 +60,8 @@ public class ExecutableFlow {
private Integer pipelineLevel = null;
private Integer pipelineExecId = null;
private Map<String, String> flowParameters = new HashMap<String, String>();
+
+ private HashSet<String> proxyUsers = new HashSet<String>();
public enum FailureAction {
FINISH_CURRENTLY_RUNNING,
@@ -117,6 +119,14 @@ public class ExecutableFlow {
return flowParameters;
}
+ public void setProxyUsers(HashSet<String> proxyUsers) {
+ this.proxyUsers = proxyUsers;
+ }
+
+ public HashSet<String> getProxyUsers() {
+ return this.proxyUsers;
+ }
+
private void setFlow(Flow flow) {
for (Node node: flow.getNodes()) {
String id = node.getId();
@@ -310,6 +320,10 @@ public class ExecutableFlow {
nodes.add(node.toObject());
}
flowObj.put("nodes", nodes);
+
+ ArrayList<String> proxyUserList = new ArrayList<String>(proxyUsers);
+
+ flowObj.put("proxyUsers", proxyUserList);
return flowObj;
}
@@ -447,6 +461,11 @@ public class ExecutableFlow {
// Failure emails
exFlow.setFailureEmails((List<String>)flowObj.get("failureEmails"));
+ if(flowObj.containsKey("proxyUsers")) {
+ ArrayList<String> proxyUserList = (ArrayList<String>) flowObj.get("proxyUsers");
+ exFlow.setProxyUsers(new HashSet<String>(proxyUserList));
+ }
+
return exFlow;
}
diff --git a/src/java/azkaban/executor/JdbcExecutorLoader.java b/src/java/azkaban/executor/JdbcExecutorLoader.java
index 2fb2dbe..481a8fe 100644
--- a/src/java/azkaban/executor/JdbcExecutorLoader.java
+++ b/src/java/azkaban/executor/JdbcExecutorLoader.java
@@ -684,6 +684,7 @@ public class JdbcExecutorLoader implements ExecutorLoader {
do {
//int execId = rs.getInt(1);
//String name = rs.getString(2);
+ @SuppressWarnings("unused")
int attempt = rs.getInt(3);
EncodingType encType = EncodingType.fromInteger(rs.getInt(4));
int startByte = rs.getInt(5);
src/java/azkaban/jobtype/JobTypeManager.java 48(+35 -13)
diff --git a/src/java/azkaban/jobtype/JobTypeManager.java b/src/java/azkaban/jobtype/JobTypeManager.java
index d43b0da..65c747e 100644
--- a/src/java/azkaban/jobtype/JobTypeManager.java
+++ b/src/java/azkaban/jobtype/JobTypeManager.java
@@ -73,6 +73,7 @@ public class JobTypeManager
}
catch (Exception e) {
logger.info("Plugin jobtypes failed to load. " + e.getCause());
+ throw new JobTypeManagerException(e);
}
}
@@ -120,6 +121,7 @@ public class JobTypeManager
loadJob(dir, globalConf, globalSysConf);
}
catch (Exception e) {
+ logger.error("Failed to load jobtype " + dir.getName() + e.getMessage());
throw new JobTypeManagerException(e);
}
}
@@ -195,7 +197,7 @@ public class JobTypeManager
try {
if(confFile != null) {
conf = new Props(commonConf, confFile);
- conf = PropsUtils.resolveProps(conf);
+// conf = PropsUtils.resolveProps(conf);
}
if(sysConfFile != null) {
sysConf = new Props(commonSysConf, sysConfFile);
@@ -203,7 +205,7 @@ public class JobTypeManager
}
}
catch (Exception e) {
- throw new JobTypeManagerException("Failed to get jobtype properties", e);
+ throw new JobTypeManagerException("Failed to get jobtype properties" + e.getMessage());
}
sysConf.put("plugin.dir", dir.getAbsolutePath());
@@ -221,7 +223,7 @@ public class JobTypeManager
try {
if(f.getName().endsWith(".jar")) {
resources.add(f.toURI().toURL());
- logger.info("adding to classpath " + f);
+ logger.info("adding to classpath " + f.toURI().toURL());
}
} catch (MalformedURLException e) {
// TODO Auto-generated catch block
@@ -240,6 +242,23 @@ public class JobTypeManager
catch (ClassNotFoundException e) {
throw new JobTypeManagerException(e);
}
+
+ logger.info("Doing simple testing...");
+ try {
+ Props fakeSysProps = new Props(sysConf);
+ fakeSysProps.put("type", jobtypeName);
+ Props fakeJobProps = new Props(conf);
+ Job job = (Job)Utils.callConstructor(clazz, "dummy", fakeSysProps, fakeJobProps, logger);
+ }
+ catch (Exception e) {
+ logger.info("Jobtype " + jobtypeName + " failed test!", e);
+ throw new JobExecutionException(e);
+ }
+ catch (Throwable t) {
+ logger.info("Jobtype " + jobtypeName + " failed test!", t);
+ throw new JobExecutionException(t);
+ }
+
logger.info("Loaded jobtype " + jobtypeName + " " + jobtypeClass);
if(conf != null) jobtypeJobProps.put(jobtypeName, conf);
@@ -249,7 +268,7 @@ public class JobTypeManager
public Job buildJobExecutor(String jobId, Props jobProps, Logger logger) throws JobTypeManagerException
{
- Job job;
+ Job job = null;
try {
String jobType = jobProps.getString("type");
if (jobType == null || jobType.length() == 0) {
@@ -274,14 +293,12 @@ public class JobTypeManager
Props p = jobtypeJobProps.get(jobType);
for(String k : p.getKeySet())
{
- if(!jobProps.containsKey(k)) {
- jobProps.put(k, p.get(k));
+ if(!jobConf.containsKey(k)) {
+ jobConf.put(k, p.get(k));
}
}
}
- else {
- jobConf = jobProps;
- }
+ jobConf = PropsUtils.resolveProps(jobConf);
if (sysConf != null) {
sysConf = PropsUtils.resolveProps(sysConf);
@@ -289,19 +306,24 @@ public class JobTypeManager
else {
sysConf = new Props();
}
-
- jobConf = PropsUtils.resolveProps(jobConf);
+
// logger.info("sysConf is " + sysConf);
// logger.info("jobConf is " + jobConf);
-
+//
job = (Job)Utils.callConstructor(executorClass, jobId, sysConf, jobConf, logger);
logger.info("job built.");
}
catch (Exception e) {
- job = new InitErrorJob(jobId, e);
+ //job = new InitErrorJob(jobId, e);
+ logger.error("Failed to build job executor for job " + jobId + e.getMessage());
+ throw new JobTypeManagerException("Failed to build job executor for job " + jobId, e);
//throw new JobTypeManagerException(e);
}
+ catch (Throwable t) {
+ logger.error("Failed to build job executor for job " + jobId + t.getMessage(), t);
+ throw new JobTypeManagerException("Failed to build job executor for job " + jobId, t);
+ }
return job;
}
diff --git a/src/java/azkaban/project/JdbcProjectLoader.java b/src/java/azkaban/project/JdbcProjectLoader.java
index 01976b7..c692c27 100644
--- a/src/java/azkaban/project/JdbcProjectLoader.java
+++ b/src/java/azkaban/project/JdbcProjectLoader.java
@@ -236,11 +236,11 @@ public class JdbcProjectLoader implements ProjectLoader {
throw new ProjectManagerException("Checking for existing project failed. " + name, e);
}
- final String INSERT_PROJECT = "INSERT INTO projects ( name, active, modified_time, create_time, version, last_modified_by, description) values (?,?,?,?,?,?,?)";
+ final String INSERT_PROJECT = "INSERT INTO projects ( name, active, modified_time, create_time, version, last_modified_by, description, enc_type, settings_blob) values (?,?,?,?,?,?,?,?,?)";
// Insert project
try {
long time = System.currentTimeMillis();
- int i = runner.update(connection, INSERT_PROJECT, name, true, time, time, null, creator.getUserId(), description);
+ int i = runner.update(connection, INSERT_PROJECT, name, true, time, time, null, creator.getUserId(), description, defaultEncodingType.numVal, null);
if (i == 0) {
throw new ProjectManagerException("No projects have been inserted.");
}
@@ -504,6 +504,49 @@ public class JdbcProjectLoader implements ProjectLoader {
}
}
+
+
+ @Override
+ public void updateProjectSettings(Project project) throws ProjectManagerException {
+ Connection connection = getConnection();
+ try {
+ updateProjectSettings(connection, project, defaultEncodingType);
+ connection.commit();
+ }
+ catch (SQLException e) {
+ throw new ProjectManagerException("Error updating project settings", e);
+ }
+ finally {
+ DbUtils.closeQuietly(connection);
+ }
+ }
+
+ private void updateProjectSettings(Connection connection, Project project, EncodingType encType) throws ProjectManagerException {
+ QueryRunner runner = new QueryRunner();
+ final String UPDATE_PROJECT_SETTINGS = "UPDATE projects SET enc_type=?, settings_blob=? WHERE id=?";
+
+ String json = JSONUtils.toJSON(project.toObject());
+ byte[] data = null;
+ try {
+ byte[] stringData = json.getBytes("UTF-8");
+ data = stringData;
+
+ if (encType == EncodingType.GZIP) {
+ data = GZIPUtils.gzipBytes(stringData);
+ }
+ logger.debug("NumChars: " + json.length() + " UTF-8:" + stringData.length + " Gzip:"+ data.length);
+ } catch(IOException e) {
+ throw new ProjectManagerException("Failed to encode. ", e);
+ }
+
+ try {
+ runner.update(connection, UPDATE_PROJECT_SETTINGS, encType.numVal, data, project.getId());
+ connection.commit();
+ } catch (SQLException e) {
+ throw new ProjectManagerException("Error updating project " + project.getName() + " version " + project.getVersion(), e);
+ }
+ }
+
@Override
public void removePermission(Project project, String name, boolean isGroup) throws ProjectManagerException {
QueryRunner runner = new QueryRunner(dataSource);
@@ -525,6 +568,21 @@ public class JdbcProjectLoader implements ProjectLoader {
}
@Override
+ public List<Triple<String, Boolean, Permission>> getProjectPermissions(int projectId) throws ProjectManagerException {
+ ProjectPermissionsResultHandler permHander = new ProjectPermissionsResultHandler();
+ QueryRunner runner = new QueryRunner(dataSource);
+ List<Triple<String, Boolean,Permission>> permissions = null;
+ try {
+ permissions = runner.query(ProjectPermissionsResultHandler.SELECT_PROJECT_PERMISSION, permHander, projectId);
+ } catch (SQLException e) {
+ throw new ProjectManagerException("Query for permissions for " + projectId + " failed.", e);
+ }
+
+ return permissions;
+ }
+
+
+ @Override
public void removeProject(Project project, String user) throws ProjectManagerException {
QueryRunner runner = new QueryRunner(dataSource);
@@ -927,13 +985,13 @@ public class JdbcProjectLoader implements ProjectLoader {
private static class ProjectResultHandler implements ResultSetHandler<List<Project>> {
private static String SELECT_PROJECT_BY_ID =
- "SELECT id, name, active, modified_time, create_time, version, last_modified_by, description FROM projects WHERE id=?";
+ "SELECT id, name, active, modified_time, create_time, version, last_modified_by, description, enc_type, settings_blob FROM projects WHERE id=?";
private static String SELECT_ALL_ACTIVE_PROJECTS =
- "SELECT id, name, active, modified_time, create_time, version, last_modified_by, description FROM projects WHERE active=true";
+ "SELECT id, name, active, modified_time, create_time, version, last_modified_by, description, enc_type, settings_blob FROM projects WHERE active=true";
private static String SELECT_ACTIVE_PROJECT_BY_NAME =
- "SELECT id, name, active, modified_time, create_time, version, last_modified_by, description FROM projects WHERE name=? AND active=true";
+ "SELECT id, name, active, modified_time, create_time, version, last_modified_by, description, enc_type, settings_blob FROM projects WHERE name=? AND active=true";
@Override
public List<Project> handle(ResultSet rs) throws SQLException {
@@ -951,8 +1009,35 @@ public class JdbcProjectLoader implements ProjectLoader {
int version = rs.getInt(6);
String lastModifiedBy = rs.getString(7);
String description = rs.getString(8);
+ int encodingType = rs.getInt(9);
+ byte[] data = rs.getBytes(10);
+
+ Project project;
+ if (data != null) {
+ EncodingType encType = EncodingType.fromInteger(encodingType);
+ Object blobObj;
+ try {
+ // Convoluted way to inflate strings. Should find common package or helper function.
+ if (encType == EncodingType.GZIP) {
+ // Decompress the sucker.
+ String jsonString = GZIPUtils.unGzipString(data, "UTF-8");
+ blobObj = JSONUtils.parseJSONFromString(jsonString);
+ }
+ else {
+ String jsonString = new String(data, "UTF-8");
+ blobObj = JSONUtils.parseJSONFromString(jsonString);
+ }
+ project = Project.projectFromObject(blobObj);
+ } catch (IOException e) {
+ throw new SQLException("Failed to get project.", e);
+ }
+ }
+ else {
+ project = new Project(id, name);
+ }
+
+ // update the fields as they may have changed
- Project project = new Project(id, name);
project.setActive(active);
project.setLastModifiedTimestamp(modifiedTime);
project.setCreateTimestamp(createTime);
src/java/azkaban/project/Project.java 44(+43 -1)
diff --git a/src/java/azkaban/project/Project.java b/src/java/azkaban/project/Project.java
index 5a611fc..7344820 100644
--- a/src/java/azkaban/project/Project.java
+++ b/src/java/azkaban/project/Project.java
@@ -18,6 +18,7 @@ package azkaban.project;
import java.util.ArrayList;
import java.util.HashMap;
+import java.util.HashSet;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
@@ -41,6 +42,15 @@ public class Project {
private LinkedHashMap<String, Permission> userPermissionMap = new LinkedHashMap<String, Permission>();
private LinkedHashMap<String, Permission> groupPermissionMap = new LinkedHashMap<String, Permission>();
private Map<String, Flow> flows = null;
+ private HashSet<String> proxyUsers = new HashSet<String>();
+
+ public HashSet<String> getProxyUsers() {
+ return proxyUsers;
+ }
+
+ public void setProxyUsers(HashSet<String> proxyUsers) {
+ this.proxyUsers = proxyUsers;
+ }
public Project(int id, String name) {
this.id = id;
@@ -235,8 +245,13 @@ public class Project {
userMap.put("permissions", entry.getValue().toStringArray());
users.add(userMap);
}
-
+
projectObject.put("users", users);
+
+
+ ArrayList<String> proxyUserList = new ArrayList<String>(proxyUsers);
+ projectObject.put("proxyUsers", proxyUserList);
+
return projectObject;
}
@@ -277,6 +292,10 @@ public class Project {
project.setUserPermission(userid, perm);
}
+
+ List<String> proxyUserList = (List<String>) projectObject.get("proxyUsers");
+ HashSet<String> proxyUsers = new HashSet<String>(proxyUserList);
+ project.setProxyUsers(proxyUsers);
return project;
}
@@ -390,4 +409,27 @@ public class Project {
public void setVersion(int version) {
this.version = version;
}
+
+ public List<String> getProxyUserList() {
+ return new ArrayList<String>(proxyUsers);
+ }
+
+// public Object getSettingsObject() {
+// HashMap<String, Object> projectObject = new HashMap<String, Object>();
+// ArrayList<String> proxyUserList = new ArrayList<String>(proxyUsers);
+// projectObject.put("proxyUsers", proxyUserList);
+//
+// return projectObject;
+// }
+//
+// @SuppressWarnings("unchecked")
+// public static HashSet<String> proxyUserFromSettingsObj(Object object) {
+// Map<String, Object> settingsObj = (Map<String, Object>) object;
+//
+// List<String> proxyUserList = (List<String>) settingsObj.get("proxyUsers");
+// HashSet<String> proxyUsers = new HashSet<String>(proxyUserList);
+//
+// return proxyUsers;
+// }
+
}
diff --git a/src/java/azkaban/project/ProjectLoader.java b/src/java/azkaban/project/ProjectLoader.java
index 4711dc0..7e99298 100644
--- a/src/java/azkaban/project/ProjectLoader.java
+++ b/src/java/azkaban/project/ProjectLoader.java
@@ -10,6 +10,7 @@ import azkaban.project.ProjectLogEvent.EventType;
import azkaban.user.Permission;
import azkaban.user.User;
import azkaban.utils.Props;
+import azkaban.utils.Triple;
public interface ProjectLoader {
@@ -218,5 +219,9 @@ public interface ProjectLoader {
public void updateProjectProperty(Project project, Props props) throws ProjectManagerException;
Props fetchProjectProperty(int projectId, int projectVer, String propsName) throws ProjectManagerException;
+
+ List<Triple<String, Boolean, Permission>> getProjectPermissions(int projectId) throws ProjectManagerException;
+
+ void updateProjectSettings(Project project) throws ProjectManagerException;
}
\ No newline at end of file
src/java/azkaban/project/ProjectManager.java 22(+21 -1)
diff --git a/src/java/azkaban/project/ProjectManager.java b/src/java/azkaban/project/ProjectManager.java
index e526474..a5be28b 100644
--- a/src/java/azkaban/project/ProjectManager.java
+++ b/src/java/azkaban/project/ProjectManager.java
@@ -32,6 +32,7 @@ public class ProjectManager {
private final Props props;
private final File tempDir;
private final int projectVersionRetention;
+ private final boolean creatorDefaultPermissions;
public ProjectManager(ProjectLoader loader, Props props) {
this.projectLoader = loader;
@@ -40,6 +41,8 @@ public class ProjectManager {
this.projectVersionRetention = (props.getInt("project.version.retention", 3));
logger.info("Project version retention is set to " + projectVersionRetention);
+ this.creatorDefaultPermissions = props.getBoolean("azkaban.creator.default.permissions", true);
+
if (!tempDir.exists()) {
tempDir.mkdirs();
}
@@ -170,8 +173,19 @@ public class ProjectManager {
projectsByName.put(newProject.getName(), newProject);
projectsById.put(newProject.getId(), newProject);
+ if(creatorDefaultPermissions) {
// Add permission to project
- projectLoader.updatePermission(newProject, creator.getUserId(), new Permission(Permission.Type.ADMIN), false);
+ projectLoader.updatePermission(newProject, creator.getUserId(), new Permission(Permission.Type.ADMIN), false);
+
+ // Add proxy user
+ newProject.getProxyUsers().add(creator.getUserId());
+ try {
+ updateProjectSetting(newProject);
+ } catch (ProjectManagerException e) {
+ e.printStackTrace();
+ throw e;
+ }
+ }
projectLoader.postEvent(newProject, EventType.CREATED, creator.getUserId(), null);
@@ -217,6 +231,10 @@ public class ProjectManager {
return;
}
+ public void updateProjectSetting(Project project) throws ProjectManagerException {
+ projectLoader.updateProjectSettings(project);
+ }
+
public void updateProjectPermission(Project project, String name, Permission perm, boolean group, User modifier) throws ProjectManagerException {
logger.info("User " + modifier.getUserId() + " updating permissions for project " + project.getName() + " for " + name + " " + perm.toString());
projectLoader.updatePermission(project, name, perm, group);
@@ -329,4 +347,6 @@ public class ProjectManager {
public void postProjectEvent(Project project, EventType type, String user,String message) {
projectLoader.postEvent(project, type, user, message);
}
+
+
}
diff --git a/src/java/azkaban/scheduler/ScheduleManager.java b/src/java/azkaban/scheduler/ScheduleManager.java
index 540fea6..7fad4e2 100644
--- a/src/java/azkaban/scheduler/ScheduleManager.java
+++ b/src/java/azkaban/scheduler/ScheduleManager.java
@@ -379,6 +379,7 @@ public class ScheduleManager {
// Create ExecutableFlow
ExecutableFlow exflow = new ExecutableFlow(flow);
exflow.setSubmitUser(runningSched.getSubmitUser());
+ exflow.setProxyUsers(project.getProxyUsers());
FlowOptions flowOptions = runningSched.getFlowOptions();
src/java/azkaban/sla/SLAManager.java 102(+2 -100)
diff --git a/src/java/azkaban/sla/SLAManager.java b/src/java/azkaban/sla/SLAManager.java
index 124e8f4..911f9e3 100644
--- a/src/java/azkaban/sla/SLAManager.java
+++ b/src/java/azkaban/sla/SLAManager.java
@@ -3,34 +3,23 @@ package azkaban.sla;
import java.lang.Thread.State;
import java.util.ArrayList;
import java.util.Comparator;
-import java.util.LinkedHashMap;
import java.util.List;
-import java.util.Map;
import java.util.concurrent.PriorityBlockingQueue;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.log4j.Logger;
-import org.apache.velocity.runtime.parser.node.GetExecutor;
import org.joda.time.DateTime;
-import org.joda.time.DateTimeZone;
-import org.joda.time.ReadablePeriod;
-import org.joda.time.format.DateTimeFormat;
-import org.joda.time.format.DateTimeFormatter;
import azkaban.executor.ExecutableFlow;
import azkaban.executor.ExecutableNode;
-import azkaban.executor.ExecutorMailer;
import azkaban.executor.ExecutorManager;
import azkaban.executor.ExecutorManagerException;
+
import azkaban.executor.Status;
-import azkaban.flow.Flow;
-import azkaban.project.Project;
-import azkaban.project.ProjectManager;
+
import azkaban.sla.SLA.SlaAction;
import azkaban.sla.SLA.SlaRule;
import azkaban.sla.SLA.SlaSetting;
-import azkaban.user.User;
-import azkaban.utils.Pair;
import azkaban.utils.Props;
/*
@@ -61,7 +50,6 @@ public class SLAManager {
private SLALoader loader;
private final SLARunner runner;
- private final SLAPreRunner prerunner;
private final ExecutorManager executorManager;
private SlaMailer mailer;
@@ -82,7 +70,6 @@ public class SLAManager {
this.loader = loader;
this.mailer = new SlaMailer(props);
this.runner = new SLARunner();
- this.prerunner = new SLAPreRunner();
List<SLA> SLAList = null;
try {
@@ -105,7 +92,6 @@ public class SLAManager {
*/
public void shutdown() {
this.runner.shutdown();
- this.prerunner.shutdown();
}
/**
@@ -476,90 +462,6 @@ public class SLAManager {
}
mailer.sendSlaEmail(s, message);
}
-
-
- public class SLAPreRunner extends Thread {
- private final List<SLA> preSlas;
- private AtomicBoolean stillAlive = new AtomicBoolean(true);
-
- // Five minute minimum intervals
- private static final int TIMEOUT_MS = 300000;
-
- public SLAPreRunner() {
- preSlas = new ArrayList<SLA>();
- }
-
- public void shutdown() {
- logger.error("Shutting down pre-sla checker thread");
- stillAlive.set(false);
- this.interrupt();
- }
-
- /**
- * Return a list of flow with SLAs
- *
- * @return
- */
- protected synchronized List<SLA> getPreSlas() {
- return new ArrayList<SLA>(preSlas);
- }
-
- /**
- * Adds SLA into runner and then interrupts so it will update
- * its wait time.
- *
- * @param flow
- */
- public synchronized void addCheckerPreSla(SLA s) {
- logger.info("Adding " + s + " to pre-sla checker.");
- preSlas.add(s);
- this.interrupt();
- }
-
- /**
- * Remove runner SLA. Does not interrupt.
- *
- * @param flow
- * @throws SLAManagerException
- */
- public synchronized void removeCheckerPreSla(SLA s) {
- logger.info("Removing " + s + " from the pre-sla checker.");
- preSlas.remove(s);
- }
-
- public void run() {
- while (stillAlive.get()) {
- synchronized (this) {
- try {
- // TODO clear up the exception handling
-
- if (preSlas.size() == 0) {
- try {
- this.wait(TIMEOUT_MS);
- } catch (InterruptedException e) {
- // interruption should occur when items are added or removed from the queue.
- }
- } else {
- for(SLA s : preSlas) {
- ExecutableFlow exflow = executorManager.getExecutableFlow(s.getExecId());
- String id = s.getJobName();
- if(!s.equals("")) {
- ExecutableNode exnode = exflow.getExecutableNode(id);
- if(exnode.getStartTime() != -1) {
-
- }
- }
- }
- }
- } catch (Exception e) {
- logger.error("Unexpected exception has been thrown in scheduler", e);
- } catch (Throwable e) {
- logger.error("Unexpected throwable has been thrown in scheduler", e);
- }
- }
- }
- }
- }
public int getNumActiveSLA() {
return runner.getRunnerSLAs().size();
diff --git a/src/java/azkaban/user/UserManager.java b/src/java/azkaban/user/UserManager.java
index 114ccbe..a5329c4 100644
--- a/src/java/azkaban/user/UserManager.java
+++ b/src/java/azkaban/user/UserManager.java
@@ -58,4 +58,6 @@ public interface UserManager {
* @return
*/
public Role getRole(String roleName);
+
+ public boolean validateProxyUser(String proxyUser, User realUser);
}
src/java/azkaban/user/XmlUserManager.java 35(+33 -2)
diff --git a/src/java/azkaban/user/XmlUserManager.java b/src/java/azkaban/user/XmlUserManager.java
index 09539b5..7c26daf 100644
--- a/src/java/azkaban/user/XmlUserManager.java
+++ b/src/java/azkaban/user/XmlUserManager.java
@@ -19,6 +19,7 @@ package azkaban.user;
import java.io.File;
import java.io.IOException;
import java.util.HashMap;
+import java.util.HashSet;
import javax.xml.parsers.DocumentBuilder;
import javax.xml.parsers.DocumentBuilderFactory;
@@ -53,6 +54,7 @@ public class XmlUserManager implements UserManager {
public static final String USERNAME_ATTR = "username";
public static final String PASSWORD_ATTR = "password";
public static final String ROLES_ATTR = "roles";
+ public static final String PROXY_ATTR = "proxy";
public static final String GROUPS_ATTR = "groups";
private String xmlPath;
@@ -60,6 +62,7 @@ public class XmlUserManager implements UserManager {
private HashMap<String, User> users;
private HashMap<String, String> userPassword;
private HashMap<String, Role> roles;
+ private HashMap<String, HashSet<String>> proxyUserMap;
/**
* The constructor.
@@ -81,6 +84,7 @@ public class XmlUserManager implements UserManager {
HashMap<String, User> users = new HashMap<String, User>();
HashMap<String, String> userPassword = new HashMap<String, String>();
HashMap<String, Role> roles = new HashMap<String, Role>();
+ HashMap<String, HashSet<String>> proxyUserMap = new HashMap<String, HashSet<String>>();
// Creating the document builder to parse xml.
DocumentBuilderFactory docBuilderFactory = DocumentBuilderFactory
@@ -113,7 +117,7 @@ public class XmlUserManager implements UserManager {
Node node = azkabanUsersList.item(i);
if (node.getNodeType() == Node.ELEMENT_NODE) {
if (node.getNodeName().equals(USER_TAG)) {
- parseUserTag(node, users, userPassword);
+ parseUserTag(node, users, userPassword, proxyUserMap);
}
else if (node.getNodeName().equals(ROLE_TAG)) {
parseRoleTag(node, roles);
@@ -126,10 +130,11 @@ public class XmlUserManager implements UserManager {
this.users = users;
this.userPassword = userPassword;
this.roles = roles;
+ this.proxyUserMap = proxyUserMap;
}
}
- private void parseUserTag(Node node, HashMap<String, User> users, HashMap<String, String> userPassword) {
+ private void parseUserTag(Node node, HashMap<String, User> users, HashMap<String, String> userPassword, HashMap<String, HashSet<String>> proxyUserMap) {
NamedNodeMap userAttrMap = node.getAttributes();
Node userNameAttr = userAttrMap.getNamedItem(USERNAME_ATTR);
if (userNameAttr == null) {
@@ -157,6 +162,22 @@ public class XmlUserManager implements UserManager {
user.addRole(role);
}
}
+
+ Node proxy = userAttrMap.getNamedItem(PROXY_ATTR);
+ if (proxy != null) {
+ String value = proxy.getNodeValue();
+ String[] groupSplit = value.split("\\s*,\\s*");
+ for (String group : groupSplit) {
+ if(proxyUserMap.containsKey(username)) {
+ proxyUserMap.get(username).add(group);
+ }
+ else {
+ HashSet<String> proxySet = new HashSet<String>();
+ proxySet.add(group);
+ proxyUserMap.put(username, proxySet);
+ }
+ }
+ }
Node groups = userAttrMap.getNamedItem(GROUPS_ATTR);
if (groups != null) {
@@ -248,4 +269,14 @@ public class XmlUserManager implements UserManager {
// Return true. Validation should be added when groups are added to the xml.
return true;
}
+
+ @Override
+ public boolean validateProxyUser(String proxyUser, User realUser) {
+ if(proxyUserMap.containsKey(realUser.getUserId()) && proxyUserMap.get(realUser.getUserId()).contains(proxyUser)) {
+ return true;
+ }
+ else {
+ return false;
+ }
+ }
}
diff --git a/src/java/azkaban/utils/EmailMessage.java b/src/java/azkaban/utils/EmailMessage.java
index b510d6c..06ca9bc 100644
--- a/src/java/azkaban/utils/EmailMessage.java
+++ b/src/java/azkaban/utils/EmailMessage.java
@@ -15,7 +15,6 @@ import javax.mail.BodyPart;
import javax.mail.Message;
import javax.mail.MessagingException;
import javax.mail.Session;
-import javax.mail.Transport;
import javax.mail.internet.InternetAddress;
import javax.mail.internet.MimeBodyPart;
import javax.mail.internet.MimeMessage;
src/java/azkaban/utils/Props.java 13(+13 -0)
diff --git a/src/java/azkaban/utils/Props.java b/src/java/azkaban/utils/Props.java
index 4b16b9c..12da4a7 100644
--- a/src/java/azkaban/utils/Props.java
+++ b/src/java/azkaban/utils/Props.java
@@ -418,6 +418,19 @@ public class Props {
}
}
+ public Class<?> getClass(String key, boolean initialize, ClassLoader cl) {
+ try {
+ if (containsKey(key)) {
+ return Class.forName(get(key), initialize, cl);
+ } else {
+ throw new UndefinedPropertyException(
+ "Missing required property '" + key + "'");
+ }
+ } catch (ClassNotFoundException e) {
+ throw new IllegalArgumentException(e);
+ }
+ }
+
/**
* Gets the class from the Props. If it doesn't exist, it will return the
* defaultClass
diff --git a/src/java/azkaban/webapp/servlet/ExecutorServlet.java b/src/java/azkaban/webapp/servlet/ExecutorServlet.java
index 9c68784..7438c0c 100644
--- a/src/java/azkaban/webapp/servlet/ExecutorServlet.java
+++ b/src/java/azkaban/webapp/servlet/ExecutorServlet.java
@@ -264,6 +264,9 @@ public class ExecutorServlet extends LoginAbstractAzkabanServlet {
else if (ajaxName.equals("retryFailedJobs")) {
ajaxRestartFailed(req, resp, ret, session.getUser(), exFlow);
}
+// else if (ajaxName.equals("fetchLatestJobStatus")) {
+// ajaxFetchLatestJobStatus(req, resp, ret, session.getUser(), exFlow);
+// }
else if (ajaxName.equals("flowInfo")) {
//String projectName = getParam(req, "project");
//Project project = projectManager.getProject(projectName);
@@ -295,6 +298,50 @@ public class ExecutorServlet extends LoginAbstractAzkabanServlet {
}
}
+// private void ajaxFetchLatestJobStatus(HttpServletRequest req,HttpServletResponse resp, HashMap<String, Object> ret, User user, ExecutableFlow exFlow) {
+// Project project = getProjectAjaxByPermission(ret, exFlow.getProjectId(), user, Type.READ);
+// if (project == null) {
+// ret.put("error", "Project doesn't exist or incorrect access permission.");
+// return;
+// }
+//
+// String projectName;
+// String flowName;
+// String jobName;
+// try {
+// projectName = getParam(req, "projectName");
+// flowName = getParam(req, "flowName");
+// jobName = getParam(req, "jobName");
+// } catch (Exception e) {
+// ret.put("error", e.getMessage());
+// return;
+// }
+//
+// try {
+// ExecutableNode node = exFlow.getExecutableNode(jobId);
+// if (node == null) {
+// ret.put("error", "Job " + jobId + " doesn't exist in " + exFlow.getExecutionId());
+// return;
+// }
+//
+// int attempt = this.getIntParam(req, "attempt", node.getAttempt());
+// LogData data = executorManager.getExecutionJobLog(exFlow, jobId, offset, length, attempt);
+// if (data == null) {
+// ret.put("length", 0);
+// ret.put("offset", offset);
+// ret.put("data", "");
+// }
+// else {
+// ret.put("length", data.getLength());
+// ret.put("offset", data.getOffset());
+// ret.put("data", data.getData());
+// }
+// } catch (ExecutorManagerException e) {
+// throw new ServletException(e);
+// }
+//
+// }
+
private void ajaxRestartFailed(HttpServletRequest req, HttpServletResponse resp, HashMap<String, Object> ret, User user, ExecutableFlow exFlow) throws ServletException {
Project project = getProjectAjaxByPermission(ret, exFlow.getProjectId(), user, Type.EXECUTE);
if (project == null) {
@@ -660,6 +707,7 @@ public class ExecutorServlet extends LoginAbstractAzkabanServlet {
ExecutableFlow exflow = new ExecutableFlow(flow);
exflow.setSubmitUser(user.getUserId());
+ exflow.setProxyUsers(project.getProxyUsers());
if (hasParam(req, "failureAction")) {
String option = getParam(req, "failureAction");
diff --git a/src/java/azkaban/webapp/servlet/ProjectManagerServlet.java b/src/java/azkaban/webapp/servlet/ProjectManagerServlet.java
index 0e06e59..423cbcb 100644
--- a/src/java/azkaban/webapp/servlet/ProjectManagerServlet.java
+++ b/src/java/azkaban/webapp/servlet/ProjectManagerServlet.java
@@ -28,6 +28,7 @@ import java.util.Collection;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
+import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
@@ -212,6 +213,11 @@ public class ProjectManagerServlet extends LoginAbstractAzkabanServlet {
ajaxAddPermission(project, ret, req, user);
}
}
+ else if (ajaxName.equals("addProxyUser")) {
+ if (handleAjaxPermission(project, user, Type.ADMIN, ret)) {
+ ajaxAddProxyUser(project, ret, req, user);
+ }
+ }
else if (ajaxName.equals("fetchFlowExecutions")) {
if (handleAjaxPermission(project, user, Type.READ, ret)) {
ajaxFetchFlowExecutions(project, ret, req);
@@ -546,6 +552,55 @@ public class ProjectManagerServlet extends LoginAbstractAzkabanServlet {
ret.put("nodes", nodeList);
}
+ private void ajaxAddProxyUser(Project project, HashMap<String, Object> ret, HttpServletRequest req, User user) throws ServletException {
+ String name = getParam(req, "name");
+
+ logger.info("Adding proxy user " + name + " by " + user.getUserId());
+
+
+
+ boolean doProxy = Boolean.parseBoolean(getParam(req, "doProxy"));
+
+
+ HashSet<String> proxyUsers = project.getProxyUsers();
+ //add
+ if(doProxy) {
+ if(proxyUsers.contains(name)) {
+ return;
+ }
+ else {
+ if(userManager.validateProxyUser(name, user)) {
+ proxyUsers.add(name);
+ }
+ else {
+ ret.put("error", "User " + user.getUserId() + " has no permission to add " + name + " as proxy user for project " + project.getName());
+ return;
+ }
+ }
+ }
+ else {
+ if(!proxyUsers.contains(name)) {
+ return;
+ }
+ else {
+ if(userManager.validateProxyUser(name, user)) {
+ proxyUsers.remove(name);
+ }
+ else {
+ ret.put("error", "User " + user.getUserId() + " has no permission to remove " + name + " as proxy user for project " + project.getName());
+ return;
+ }
+ }
+ }
+ try {
+ projectManager.updateProjectSetting(project);
+ } catch (ProjectManagerException e) {
+ // TODO Auto-generated catch block
+ ret.put("error", e.getMessage());
+ }
+
+ }
+
private void ajaxAddPermission(Project project, HashMap<String, Object> ret, HttpServletRequest req, User user) throws ServletException {
String name = getParam(req, "name");
boolean group = Boolean.parseBoolean(getParam(req, "group"));
@@ -792,6 +847,7 @@ public class ProjectManagerServlet extends LoginAbstractAzkabanServlet {
page.add("permissions", project.getUserPermissions());
page.add("groupPermissions", project.getGroupPermissions());
+ page.add("proxyUsers", project.getProxyUserList());
if(hasPermission(project, user, Type.ADMIN)) {
page.add("isAdmin", true);
diff --git a/src/java/azkaban/webapp/servlet/velocity/permissionspage.vm b/src/java/azkaban/webapp/servlet/velocity/permissionspage.vm
index 159bd05..9b13af6 100644
--- a/src/java/azkaban/webapp/servlet/velocity/permissionspage.vm
+++ b/src/java/azkaban/webapp/servlet/velocity/permissionspage.vm
@@ -71,6 +71,7 @@
#if($isAdmin)
<button id="addUser" class="btn1">Add User</button>
<button id="addGroup" class="btn1">Add Group</button>
+ <button id="addProxyUser" class="btn2">Add Proxy User</button>
#end
</div>
@@ -93,7 +94,7 @@
<th class="tb-read">Read</th>
<th class="tb-write">Write</th>
<th class="tb-execute">Execute</th>
- <th class="tb-schedule">Schedule</th>
+ <th class="tb-schedule">Schedule</th>
#if($isAdmin)
<th class="tb-action"></th>
#end
@@ -172,6 +173,45 @@
#end
</tbody>
</table>
+
+ <br></br>
+ <table id="proxy-user-table" class="all-jobs permission-table">
+ <thead>
+ <tr>
+ <th class="tb-username">Proxy User</th>
+ <th class="tb-perm">Admin</th>
+ <th class="tb-read">Read</th>
+ <th class="tb-write">Write</th>
+ <th class="tb-execute">Execute</th>
+ <th class="tb-schedule">Schedule</th>
+ <th class="tb-proxy">Proxy</th>
+ #if($isAdmin)
+ <th class="tb-action"></th>
+ #end
+ </tr>
+ </thead>
+ <tbody>
+#if($proxyUsers)
+#foreach($proxyUser in $proxyUsers)
+ <tr id="${proxyUser}-row" >
+ <td class="tb-name">#if($proxyUser == $username) ${proxyUser} <span class="sublabel">(you)</span> #else $proxyUser #end</td>
+ <td><input id="proxy-${proxyUser}-admin-checkbox" type="checkbox" name="admin" disabled="disabled" ></input></td>
+ <td><input id="proxy-${proxyUser}-read-checkbox" type="checkbox" name="read" disabled="disabled" ></input></td>
+ <td><input id="proxy-${proxyUser}-write-checkbox" type="checkbox" name="write" disabled="disabled" ></input></td>
+ <td><input id="proxy-${proxyUser}-execute-checkbox" type="checkbox" name="execute" disabled="disabled" ></input></td>
+ <td><input id="proxy-${proxyUser}-schedule-checkbox" type="checkbox" name="schedule" disabled="disabled" ></input></td>
+ <td><input id="proxy-${proxyUser}-proxy-checkbox" type="checkbox" name="proxy" disabled="disabled" checked="true"></input></td>
+
+ #if($isAdmin)
+ <td><button id="proxy-${proxyUser}" class="change-btn btn2">Change</button></td>
+ #end
+ </tr>
+#end
+#else
+ <tr><td class="last">No Proxy User Found.</td></tr>
+#end
+ </tbody>
+ </table>
#end
</div>
@@ -184,6 +224,7 @@
<dl>
<dt>User</dt>
<dd><input id="user-box" name="userid" type="text" /></dd>
+ <div id="otherCheckBoxes">
<dt class="nextline">Admin</dt>
<dd><input id="admin-change" name="admin" type="checkbox" /></dd>
<dt>Read</dt>
@@ -194,6 +235,11 @@
<dd><input id="execute-change" name="execute" type="checkbox" /></dd>
<dt>Schedule</dt>
<dd><input id="schedule-change" name="schedule" type="checkbox" /></dd>
+ </div>
+ <div id="proxyCheckBox" hidden=true>
+ <dt>Proxy</dt>
+ <dd><input id="proxy-change" name="proxy" type="checkbox" /></dd>
+ </div>
</dl>
</fieldset>
</div>
src/sql/create_execution_logs.sql 3(+1 -2)
diff --git a/src/sql/create_execution_logs.sql b/src/sql/create_execution_logs.sql
index 6e76312..a1eab7a 100644
--- a/src/sql/create_execution_logs.sql
+++ b/src/sql/create_execution_logs.sql
@@ -8,6 +8,5 @@ CREATE TABLE execution_logs (
log LONGBLOB,
PRIMARY KEY (exec_id, name, attempt, start_byte),
INDEX log_attempt (exec_id, name, attempt),
- INDEX log_index (exec_id, name),
- INDEX byte_log_index(exec_id, name, start_byte, end_byte)
+ INDEX log_index (exec_id, name)
) ENGINE=InnoDB;
src/sql/create_project_table.sql 2(+2 -0)
diff --git a/src/sql/create_project_table.sql b/src/sql/create_project_table.sql
index e52fa76..34115a7 100644
--- a/src/sql/create_project_table.sql
+++ b/src/sql/create_project_table.sql
@@ -7,6 +7,8 @@ CREATE TABLE projects (
version INT,
last_modified_by VARCHAR(64) NOT NULL,
description VARCHAR(255),
+ enc_type TINYINT,
+ settings_blob LONGBLOB,
UNIQUE INDEX project_id (id),
INDEX project_name (name)
) ENGINE=InnoDB;
src/sql/update_2.0_to_2.01.sql 3(+3 -0)
diff --git a/src/sql/update_2.0_to_2.01.sql b/src/sql/update_2.0_to_2.01.sql
index 9ab565e..309742e 100644
--- a/src/sql/update_2.0_to_2.01.sql
+++ b/src/sql/update_2.0_to_2.01.sql
@@ -14,6 +14,9 @@ ALTER TABLE execution_logs ADD INDEX log_attempt (exec_id, name, attempt)
ALTER TABLE schedules ADD COLUMN enc_type TINYINT;
ALTER TABLE schedules ADD COLUMN schedule_options LONGBLOB;
+ALTER TABLE project_events MODIFY COLUMN message VARCHAR(512);
+ALTER TABLE projects ADD COLUMN enc_type TINYINT;
+ALTER TABLE projects ADD COLUMN settings_blob LONGBLOB;
src/web/js/azkaban.exflow.view.js 1(+1 -0)
diff --git a/src/web/js/azkaban.exflow.view.js b/src/web/js/azkaban.exflow.view.js
index 4fb7326..e35c340 100644
--- a/src/web/js/azkaban.exflow.view.js
+++ b/src/web/js/azkaban.exflow.view.js
@@ -83,6 +83,7 @@ azkaban.FlowTabView= Backbone.View.extend({
$("#executebtn").hide();
$("#pausebtn").hide();
$("#resumebtn").hide();
+ $("#retrybtn").hide();
this.model.bind('change:graph', this.handleFlowStatusChange, this);
this.model.bind('change:update', this.handleFlowStatusChange, this);
src/web/js/azkaban.permission.view.js 79(+69 -10)
diff --git a/src/web/js/azkaban.permission.view.js b/src/web/js/azkaban.permission.view.js
index 429b8ed..e04bc55 100644
--- a/src/web/js/azkaban.permission.view.js
+++ b/src/web/js/azkaban.permission.view.js
@@ -2,18 +2,20 @@ $.namespace('azkaban');
var permissionTableView;
var groupPermissionTableView;
+var proxyTableView;
azkaban.PermissionTableView= Backbone.View.extend({
events : {
"click button": "handleChangePermission"
},
initialize : function(settings) {
this.group = settings.group;
+ this.proxy = settings.proxy;
},
render: function() {
},
handleChangePermission: function(evt) {
var currentTarget = evt.currentTarget;
- changePermissionView.display(currentTarget.id, false, this.group);
+ changePermissionView.display(currentTarget.id, false, this.group, this.proxy);
}
});
@@ -27,13 +29,22 @@ azkaban.ChangePermissionView= Backbone.View.extend({
initialize : function(settings) {
$('#errorMsg').hide();
},
- display: function(userid, newPerm, group) {
+ display: function(userid, newPerm, group, proxy) {
// 6 is the length of the prefix "group-"
this.userid = group ? userid.substring(6, userid.length) : userid;
+ if(group == true) {
+ this.userid = userid.substring(6, userid.length)
+ } else if (proxy == true) {
+ this.userid = userid.substring(6, userid.length)
+ } else {
+ this.userid = userid
+ }
+
this.permission = {};
$('#user-box').val(this.userid);
this.newPerm = newPerm;
this.group = group;
+ this.proxy = proxy;
var prefix = userid;
var adminInput = $("#" + prefix + "-admin-checkbox");
@@ -41,12 +52,16 @@ azkaban.ChangePermissionView= Backbone.View.extend({
var writeInput = $("#" + prefix + "-write-checkbox");
var executeInput = $("#" + prefix + "-execute-checkbox");
var scheduleInput = $("#" + prefix + "-schedule-checkbox");
+ var proxyInput = $("#" + prefix + "-proxy-checkbox");
if (newPerm) {
if (group) {
$('#change-title').text("Add New Group Permissions");
}
- else {
+ else if(proxy){
+ $('#change-title').text("Add New Proxy User Permissions");
+ }
+ else{
$('#change-title').text("Add New User Permissions");
}
$('#user-box').attr("disabled", null);
@@ -57,22 +72,39 @@ azkaban.ChangePermissionView= Backbone.View.extend({
this.permission.write = false;
this.permission.execute = false;
this.permission.schedule = false;
+ this.doProxy = false;
+
}
else {
if (group) {
$('#change-title').text("Change Group Permissions");
}
+ else if(proxy){
+ $('#change-title').text("Change Proxy User Permissions");
+ this.doProxy = $(proxyInput).attr("checked");
+ }
else {
$('#change-title').text("Change User Permissions");
}
$('#user-box').attr("disabled", "disabled");
+
+
this.permission.admin = $(adminInput).attr("checked");
this.permission.read = $(readInput).attr("checked");
this.permission.write = $(writeInput).attr("checked");
this.permission.execute = $(executeInput).attr("checked");
this.permission.schedule = $(scheduleInput).attr("checked");
+ this.doProxy = $(proxyInput).attr("checked");
+ }
+
+ if(proxy) {
+ document.getElementById("otherCheckBoxes").hidden=true;
+ document.getElementById("proxyCheckBox").hidden=false;
+ } else {
+ document.getElementById("otherCheckBoxes").hidden=false;
+ document.getElementById("proxyCheckBox").hidden=true;
}
this.changeCheckbox();
@@ -91,17 +123,27 @@ azkaban.ChangePermissionView= Backbone.View.extend({
$("#errorMsg").hide();
}
});
+
+
+
},
render: function() {
},
handleCheckboxClick : function(evt) {
console.log("click");
var targetName = evt.currentTarget.name;
- this.permission[targetName] = evt.currentTarget.checked;
+ if(targetName == "proxy") {
+ this.doProxy = evt.currentTarget.checked;
+ }
+ else {
+ this.permission[targetName] = evt.currentTarget.checked;
+ }
this.changeCheckbox(evt);
},
changeCheckbox : function(evt) {
var perm = this.permission;
+ var proxy = this.proxy;
+ var doProxy = this.doProxy;
if (perm.admin) {
$("#admin-change").attr("checked", true);
@@ -116,9 +158,13 @@ azkaban.ChangePermissionView= Backbone.View.extend({
$("#schedule-change").attr("checked", true);
$("#schedule-change").attr("disabled", "disabled");
+
+ $("#proxy-change").attr("checked", false);
+ $("#proxy-change").attr("disabled", "disabled");
}
else {
$("#admin-change").attr("checked", false);
+
$("#read-change").attr("checked", perm.read);
$("#read-change").attr("disabled", null);
@@ -130,12 +176,16 @@ azkaban.ChangePermissionView= Backbone.View.extend({
$("#schedule-change").attr("checked", perm.schedule);
$("#schedule-change").attr("disabled", null);
+
+ $("#proxy-change").attr("checked", doProxy);
+ $("#proxy-change").attr("disabled", null);
+
}
$("#change-btn").removeClass("btn-disabled");
$("#change-btn").attr("disabled", null);
- if (perm.admin || perm.read || perm.write || perm.execute || perm.schedule) {
+ if (perm.admin || perm.read || perm.write || perm.execute || perm.schedule || doProxy) {
$("#change-btn").text("Commit");
}
else {
@@ -152,11 +202,14 @@ azkaban.ChangePermissionView= Backbone.View.extend({
var requestURL = contextURL + "/manager";
var name = $('#user-box').val();
var command = this.newPerm ? "addPermission" : "changePermission";
+ if(this.proxy) {
+ command = "addProxyUser";
+ }
var group = this.group;
$.get(
requestURL,
- {"project": projectName, "name": name, "ajax":command, "permissions": this.permission, "group": group},
+ {"project": projectName, "name": name, "ajax":command, "permissions": this.permission, "doProxy": this.doProxy, "group": group},
function(data) {
console.log("Output");
if (data.error) {
@@ -174,15 +227,21 @@ azkaban.ChangePermissionView= Backbone.View.extend({
});
$(function() {
- permissionTableView = new azkaban.PermissionTableView({el:$('#permissions-table'), group: false});
- groupPermissionTableView = new azkaban.PermissionTableView({el:$('#group-permissions-table'), group: true});
+ permissionTableView = new azkaban.PermissionTableView({el:$('#permissions-table'), group: false, proxy: false});
+ groupPermissionTableView = new azkaban.PermissionTableView({el:$('#group-permissions-table'), group: true, proxy: false});
+ proxyTableView = new azkaban.PermissionTableView({el:$('#proxy-user-table'), group: false, proxy: true});
changePermissionView = new azkaban.ChangePermissionView({el:$('#change-permission')});
$('#addUser').bind('click', function() {
- changePermissionView.display("", true, false);
+ changePermissionView.display("", true, false, false);
});
$('#addGroup').bind('click', function() {
- changePermissionView.display("", true, true);
+ changePermissionView.display("", true, true, false);
});
+
+ $('#addProxyUser').bind('click', function() {
+ changePermissionView.display("", true, false, true);
+ });
+
});
diff --git a/unit/java/azkaban/test/execapp/event/LocalFlowWatcherTest.java b/unit/java/azkaban/test/execapp/event/LocalFlowWatcherTest.java
index df5f5a6..452be05 100644
--- a/unit/java/azkaban/test/execapp/event/LocalFlowWatcherTest.java
+++ b/unit/java/azkaban/test/execapp/event/LocalFlowWatcherTest.java
@@ -212,7 +212,8 @@ public class LocalFlowWatcherTest {
//MockProjectLoader projectLoader = new MockProjectLoader(new File(exFlow.getExecutionPath()));
loader.uploadExecutableFlow(exFlow);
- FlowRunner runner = new FlowRunner(exFlow, watcher, loader, fakeProjectLoader, jobtypeManager);
+ FlowRunner runner = new FlowRunner(exFlow, loader, fakeProjectLoader, jobtypeManager);
+ runner.setFlowWatcher(watcher);
runner.addListener(eventCollector);
return runner;
diff --git a/unit/java/azkaban/test/execapp/event/RemoteFlowWatcherTest.java b/unit/java/azkaban/test/execapp/event/RemoteFlowWatcherTest.java
index 78aac81..1fddfc7 100644
--- a/unit/java/azkaban/test/execapp/event/RemoteFlowWatcherTest.java
+++ b/unit/java/azkaban/test/execapp/event/RemoteFlowWatcherTest.java
@@ -212,7 +212,8 @@ public class RemoteFlowWatcherTest {
//MockProjectLoader projectLoader = new MockProjectLoader(new File(exFlow.getExecutionPath()));
loader.uploadExecutableFlow(exFlow);
- FlowRunner runner = new FlowRunner(exFlow, watcher, loader, fakeProjectLoader, jobtypeManager);
+ FlowRunner runner = new FlowRunner(exFlow, loader, fakeProjectLoader, jobtypeManager);
+ runner.setFlowWatcher(watcher);
runner.addListener(eventCollector);
return runner;
diff --git a/unit/java/azkaban/test/execapp/FlowRunnerTest.java b/unit/java/azkaban/test/execapp/FlowRunnerTest.java
index c63fb11..54dc49c 100644
--- a/unit/java/azkaban/test/execapp/FlowRunnerTest.java
+++ b/unit/java/azkaban/test/execapp/FlowRunnerTest.java
@@ -361,7 +361,8 @@ public class FlowRunnerTest {
//MockProjectLoader projectLoader = new MockProjectLoader(new File(flow.getExecutionPath()));
loader.uploadExecutableFlow(flow);
- FlowRunner runner = new FlowRunner(flow, null, loader, fakeProjectLoader, jobtypeManager);
+ FlowRunner runner = new FlowRunner(flow, loader, fakeProjectLoader, jobtypeManager);
+
runner.addListener(eventCollector);
return runner;
@@ -373,7 +374,9 @@ public class FlowRunnerTest {
//MockProjectLoader projectLoader = new MockProjectLoader(new File(exFlow.getExecutionPath()));
loader.uploadExecutableFlow(exFlow);
- FlowRunner runner = new FlowRunner(exFlow, null, loader, fakeProjectLoader, jobtypeManager);
+
+ FlowRunner runner = new FlowRunner(exFlow, loader, fakeProjectLoader, jobtypeManager);
+
runner.addListener(eventCollector);
return runner;
diff --git a/unit/java/azkaban/test/execapp/JobRunnerTest.java b/unit/java/azkaban/test/execapp/JobRunnerTest.java
index e4d305c..1bafbbb 100644
--- a/unit/java/azkaban/test/execapp/JobRunnerTest.java
+++ b/unit/java/azkaban/test/execapp/JobRunnerTest.java
@@ -2,6 +2,7 @@ package azkaban.test.execapp;
import java.io.File;
import java.io.IOException;
+import java.util.HashSet;
import junit.framework.Assert;
@@ -261,8 +262,10 @@ public class JobRunnerTest {
node.setExecutableFlow(flow);
Props props = createProps(time, fail);
-
- JobRunner runner = new JobRunner(node, props, workingDir, loader, jobtypeManager, logger);
+ HashSet<String> proxyUsers = new HashSet<String>();
+ proxyUsers.add(flow.getSubmitUser());
+ JobRunner runner = new JobRunner(node, props, workingDir, loader, jobtypeManager);
+ runner.setLogSettings(logger, "5MB", 4);
runner.addListener(listener);
return runner;
diff --git a/unit/java/azkaban/test/execapp/MockProjectLoader.java b/unit/java/azkaban/test/execapp/MockProjectLoader.java
index 0739bd3..b10ea29 100644
--- a/unit/java/azkaban/test/execapp/MockProjectLoader.java
+++ b/unit/java/azkaban/test/execapp/MockProjectLoader.java
@@ -17,6 +17,7 @@ import azkaban.project.ProjectManagerException;
import azkaban.user.Permission;
import azkaban.user.User;
import azkaban.utils.Props;
+import azkaban.utils.Triple;
public class MockProjectLoader implements ProjectLoader {
public File dir;
@@ -209,4 +210,18 @@ public class MockProjectLoader implements ProjectLoader {
// TODO Auto-generated method stub
return null;
}
+
+ @Override
+ public List<Triple<String, Boolean, Permission>> getProjectPermissions(
+ int projectId) throws ProjectManagerException {
+ // TODO Auto-generated method stub
+ return null;
+ }
+
+ @Override
+ public void updateProjectSettings(Project project)
+ throws ProjectManagerException {
+ // TODO Auto-generated method stub
+
+ }
}
\ No newline at end of file