azkaban-developers
Changes
lib/mail-1.4.5.jar 0(+0 -0)
src/java/azkaban/executor/JobRunner.java 16(+8 -8)
src/java/azkaban/project/FileProjectManager.java 108(+103 -5)
src/java/azkaban/project/Project.java 24(+24 -0)
src/java/azkaban/scheduler/ScheduleManager.java 115(+40 -75)
src/java/azkaban/utils/Mailman.java 2(+2 -0)
src/java/azkaban/utils/Utils.java 35(+34 -1)
src/java/azkaban/utils/WebUtils.java 6(+0 -6)
src/web/css/azkaban.css 12(+6 -6)
src/web/js/azkaban.ajax.utils.js 16(+16 -0)
src/web/js/azkaban.exflow.view.js 17(+16 -1)
src/web/js/azkaban.joblog.view.js 22(+21 -1)
src/web/js/azkaban.project.view.js 35(+17 -18)
src/web/js/azkaban.projectlog.view.js 62(+62 -0)
Details
lib/mail-1.4.5.jar 0(+0 -0)
diff --git a/lib/mail-1.4.5.jar b/lib/mail-1.4.5.jar
new file mode 100644
index 0000000..300f436
Binary files /dev/null and b/lib/mail-1.4.5.jar differ
diff --git a/src/java/azkaban/executor/ExecutorManager.java b/src/java/azkaban/executor/ExecutorManager.java
index 993f3c5..bdc9ede 100644
--- a/src/java/azkaban/executor/ExecutorManager.java
+++ b/src/java/azkaban/executor/ExecutorManager.java
@@ -7,6 +7,7 @@ import java.io.FileNotFoundException;
import java.io.FileOutputStream;
import java.io.FileReader;
import java.io.IOException;
+import java.io.Writer;
import java.net.URI;
import java.net.URISyntaxException;
import java.util.ArrayList;
@@ -455,7 +456,7 @@ public class ExecutorManager {
flow.setSubmitted(true);
}
- private long readLog(File logFile, StringBuffer buffer, long startChar, long maxSize) {
+ private long readLog(File logFile, Writer writer, long startChar, long maxSize) {
if (!logFile.exists()) {
logger.error("Execution log for " + logFile + " doesn't exist.");
return -1;
@@ -485,7 +486,7 @@ public class ExecutorManager {
}
totalCharRead += charRead;
charPosition += charRead;
- buffer.append(charBuffer, 0, charRead);
+ writer.write(charBuffer, 0, charRead);
} while (charRead == charBuffer.length && totalCharRead < maxSize);
} catch (IOException e) {
@@ -504,7 +505,7 @@ public class ExecutorManager {
return charPosition;
}
- public long getExecutionJobLog(ExecutableFlow flow, String jobid, StringBuffer buffer, long startChar, long maxSize) throws ExecutorManagerException {
+ public long getExecutionJobLog(ExecutableFlow flow, String jobid, Writer writer, long startChar, long maxSize) throws ExecutorManagerException {
String path = flow.getExecutionPath();
File execPath = new File(path);
if (!execPath.exists()) {
@@ -520,11 +521,11 @@ public class ExecutorManager {
return -1;
}
- long charPosition = readLog(logFile, buffer, startChar, maxSize);
+ long charPosition = readLog(logFile, writer, startChar, maxSize);
return charPosition;
}
- public long getExecutableFlowLog(ExecutableFlow flow, StringBuffer buffer, long startChar, long maxSize) throws ExecutorManagerException {
+ public long getExecutableFlowLog(ExecutableFlow flow, Writer writer, long startChar, long maxSize) throws ExecutorManagerException {
String path = flow.getExecutionPath();
File execPath = new File(path);
if (!execPath.exists()) {
@@ -540,7 +541,7 @@ public class ExecutorManager {
return -1;
}
- long charPosition = readLog(flowLogFile, buffer, startChar, maxSize);
+ long charPosition = readLog(flowLogFile, writer, startChar, maxSize);
return charPosition;
}
src/java/azkaban/executor/JobRunner.java 16(+8 -8)
diff --git a/src/java/azkaban/executor/JobRunner.java b/src/java/azkaban/executor/JobRunner.java
index ac6bcc9..127d6b7 100644
--- a/src/java/azkaban/executor/JobRunner.java
+++ b/src/java/azkaban/executor/JobRunner.java
@@ -113,14 +113,14 @@ public class JobRunner extends EventHandler implements Runnable {
props.put(AbstractProcessJob.WORKING_DIR, workingDir.getAbsolutePath());
JobWrappingFactory factory = JobWrappingFactory.getJobWrappingFactory();
job = factory.buildJobExecutor(props, logger);
-
- try {
- job.run();
- } catch (Exception e) {
- succeeded = false;
- //logger.error("job run failed!");
- e.printStackTrace();
- }
+//
+// try {
+// job.run();
+// } catch (Exception e) {
+// succeeded = false;
+// //logger.error("job run failed!");
+// e.printStackTrace();
+// }
node.setEndTime(System.currentTimeMillis());
if (succeeded) {
src/java/azkaban/project/FileProjectManager.java 108(+103 -5)
diff --git a/src/java/azkaban/project/FileProjectManager.java b/src/java/azkaban/project/FileProjectManager.java
index 8eed304..f5c2af3 100644
--- a/src/java/azkaban/project/FileProjectManager.java
+++ b/src/java/azkaban/project/FileProjectManager.java
@@ -1,12 +1,18 @@
package azkaban.project;
+import java.io.BufferedReader;
import java.io.File;
import java.io.FileFilter;
+import java.io.FileInputStream;
import java.io.FileWriter;
import java.io.IOException;
+import java.io.InputStreamReader;
+import java.io.Writer;
import java.security.AccessControlException;
import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Comparator;
import java.util.HashMap;
import java.util.LinkedHashMap;
import java.util.List;
@@ -20,10 +26,17 @@ import net.sf.ehcache.config.CacheConfiguration;
import net.sf.ehcache.store.MemoryStoreEvictionPolicy;
import org.apache.commons.io.FileUtils;
+import org.apache.log4j.Appender;
+import org.apache.log4j.FileAppender;
+import org.apache.log4j.Layout;
import org.apache.log4j.Logger;
+import org.apache.log4j.PatternLayout;
+import org.apache.log4j.RollingFileAppender;
import org.joda.time.format.DateTimeFormat;
import org.joda.time.format.DateTimeFormatter;
+import com.sun.xml.internal.bind.v2.runtime.unmarshaller.XsiNilLoader.Array;
+
import azkaban.flow.Flow;
import azkaban.flow.FlowProps;
import azkaban.flow.Node;
@@ -40,6 +53,9 @@ import azkaban.utils.Props;
* install path where projects will be loaded installed to.
*/
public class FileProjectManager implements ProjectManager {
+ // Layout for project logging
+ private static final Layout DEFAULT_LAYOUT = new PatternLayout("%d{dd-MM-yyyy HH:mm:ss z} %c{1} %p - %m\n");
+
public static final String DIRECTORY_PARAM = "file.project.loader.path";
private static final String DELETED_PROJECT_PREFIX = ".DELETED.";
private static final DateTimeFormatter FILE_DATE_FORMAT = DateTimeFormat.forPattern("yyyy-MM-dd-HH:mm.ss.SSS");
@@ -48,6 +64,8 @@ public class FileProjectManager implements ProjectManager {
private static final String FLOW_EXTENSION = ".flow";
private static final Logger logger = Logger.getLogger(FileProjectManager.class);
private static final int IDLE_SECONDS = 120;
+ private static final long PROJECT_LOG_SIZE = 1024*512; // 512kb Log size rollover
+ private static final int LOG_BACKUP = 1000; // I think 512mb is good enough per Project.
private ConcurrentHashMap<String, Project> projects = new ConcurrentHashMap<String, Project>();
private CacheManager manager = CacheManager.create();
@@ -124,7 +142,8 @@ public class FileProjectManager implements ProjectManager {
Project project = Project.projectFromObject(obj);
logger.info("Loading project " + project.getName());
projects.put(project.getName(), project);
-
+ attachLoggerToProject(project);
+
String source = project.getSource();
if (source == null) {
logger.info(project.getName() + ": No flows uploaded");
@@ -216,9 +235,6 @@ public class FileProjectManager implements ProjectManager {
if (project == null) {
throw new ProjectManagerException("Project not found.");
}
- if (!project.hasPermission(uploader, Type.WRITE)) {
- throw new AccessControlException("Permission denied. Do not have write access.");
- }
List<String> errors = new ArrayList<String>();
DirectoryFlowLoader loader = new DirectoryFlowLoader(logger);
@@ -335,6 +351,9 @@ public class FileProjectManager implements ProjectManager {
throw new ProjectManagerException("Project directory " + projectName + " cannot be created in " + projectDirectory, e);
}
projects.put(projectName, project);
+ attachLoggerToProject(project);
+
+ project.info("Project has been created by '" + creator.getUserId() + "'");
return project;
}
@@ -456,7 +475,7 @@ public class FileProjectManager implements ProjectManager {
throw new ProjectManagerException("Deleting of project failed.");
}
}
-
+
projects.remove(projectName);
return project;
}
@@ -475,6 +494,21 @@ public class FileProjectManager implements ProjectManager {
return pathname.isFile() && !pathname.isHidden() && name.length() > suffix.length() && name.endsWith(suffix);
}
}
+
+ private static class PrefixFilter implements FileFilter {
+ private String prefix;
+
+ public PrefixFilter(String prefix) {
+ this.prefix = prefix;
+ }
+
+ @Override
+ public boolean accept(File pathname) {
+ String name = pathname.getName();
+
+ return pathname.isFile() && !pathname.isHidden() && name.length() > prefix.length() && name.startsWith(prefix);
+ }
+ }
@Override
public void commitProject(String projectName) throws ProjectManagerException {
@@ -539,4 +573,68 @@ public class FileProjectManager implements ProjectManager {
}
}
+ @Override
+ public void getProjectLogs(String projectId, long tailBytes, long skipBytes, Writer writer) throws IOException {
+ File projectDir = new File(projectDirectory, projectId);
+
+ if (!projectDir.exists()) {
+ throw new IOException("Project directory " + projectDir + " doesn't exist.");
+ }
+
+ File logFile = new File(projectDir, projectLogFileName(projectId));
+ if (!logFile.exists()) {
+ throw new IOException("Project audit log for " + projectDir + " doesn't exist.");
+ }
+
+ long lookbackBytes = skipBytes + tailBytes;
+ long fileLength = logFile.length();
+
+ long skip = Math.max(0, fileLength - lookbackBytes);
+ FileInputStream f = new FileInputStream(logFile);
+ if (skip > 0) {
+ f.skip(skip);
+ }
+
+ long bytesRead = 0;
+ BufferedReader reader = new BufferedReader(new InputStreamReader(f));
+ try {
+ String line = reader.readLine();
+ for (; line != null; line = reader.readLine()) {
+ writer.write(line);
+ writer.write("\n");
+ bytesRead += line.length();
+
+ // A very loose tail bytes count since it's by lines and encoding, but this is okay.
+ if (bytesRead > tailBytes) {
+ break;
+ }
+ }
+ } finally {
+ reader.close();
+ }
+ }
+
+ private void attachLoggerToProject(Project project) {
+ Logger logger = Logger.getLogger(".projectlogger." + project.getName());
+
+ File projectPath = new File(projectDirectory, project.getName());
+
+ String logName = projectLogFileName(project.getName());
+ File logFile = new File(projectPath, logName);
+
+ FileAppender appender = null;
+ try {
+ appender = new FileAppender(DEFAULT_LAYOUT, logFile.getPath());
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+
+ logger.addAppender(appender);
+
+ project.attachLogger(logger);
+ }
+
+ private String projectLogFileName(String projectName) {
+ return "_project." + projectName + ".log";
+ }
}
\ No newline at end of file
src/java/azkaban/project/Project.java 24(+24 -0)
diff --git a/src/java/azkaban/project/Project.java b/src/java/azkaban/project/Project.java
index 5c6fd25..cdd5edc 100644
--- a/src/java/azkaban/project/Project.java
+++ b/src/java/azkaban/project/Project.java
@@ -1,11 +1,14 @@
package azkaban.project;
+import java.io.File;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
+import org.apache.log4j.Logger;
+
import azkaban.flow.Flow;
import azkaban.user.Permission;
import azkaban.user.Permission.Type;
@@ -21,6 +24,7 @@ public class Project {
private String source;
private LinkedHashMap<String, Permission> userToPermission = new LinkedHashMap<String, Permission>();
private Map<String, Flow> flows = null;
+ private Logger logger = null;
public Project(String name) {
this.name = name;
@@ -259,4 +263,24 @@ public class Project {
public void setSource(String source) {
this.source = source;
}
+
+ public void attachLogger(Logger logger) {
+ this.logger = logger;
+ }
+
+ public void info(String message) {
+ if (logger != null) {
+ logger.info(message);
+ }
+ }
+
+ public void error(String message) {
+ if (logger != null) {
+ logger.error(message);
+ }
+ }
+
+ public Logger getLogger() {
+ return logger;
+ }
}
diff --git a/src/java/azkaban/project/ProjectManager.java b/src/java/azkaban/project/ProjectManager.java
index 93051e1..de8d2e6 100644
--- a/src/java/azkaban/project/ProjectManager.java
+++ b/src/java/azkaban/project/ProjectManager.java
@@ -1,6 +1,8 @@
package azkaban.project;
import java.io.File;
+import java.io.IOException;
+import java.io.Writer;
import java.util.HashMap;
import java.util.List;
@@ -30,6 +32,8 @@ public interface ProjectManager {
public Props getProperties(Project project, String source) throws ProjectManagerException;
public HashMap<String, Props> getAllFlowProperties(Project project, String flowId) throws ProjectManagerException;
-
+
public void copyProjectSourceFilesToDirectory(Project project, File directory) throws ProjectManagerException;
+
+ public void getProjectLogs(String projectId, long tailBytes, long skipBytes, Writer writer) throws IOException;
}
\ No newline at end of file
diff --git a/src/java/azkaban/scheduler/ScheduledFlow.java b/src/java/azkaban/scheduler/ScheduledFlow.java
index 1e92af2..20a22b8 100644
--- a/src/java/azkaban/scheduler/ScheduledFlow.java
+++ b/src/java/azkaban/scheduler/ScheduledFlow.java
@@ -41,11 +41,14 @@ public class ScheduledFlow {
private final DateTime submitTime;
private final DateTime firstSchedTime;
+ public static final String DATE_TIME_STRING = "YYYY-MM-dd HH:mm:ss";
+
+
// private SchedStatus schedStatus;
+
public enum SchedStatus {
- LASTSUCCESS("lastsuccess"), LASTFAILED("lastfailed"), LASTPAUSED(
- "lastpaused");
+ LASTSUCCESS("lastsuccess"), LASTFAILED("lastfailed"), LASTPAUSED("lastpaused");
private final String status;
@@ -110,7 +113,8 @@ public class ScheduledFlow {
String user,
String userSubmit,
DateTime submitTime,
- DateTime firstSchedTime) {
+ DateTime firstSchedTime)
+ {
this(scheduleId, projectId, flowId, user, userSubmit, submitTime, firstSchedTime, new DateTime(), null);
}
@@ -171,8 +175,7 @@ public class ScheduledFlow {
* @param period
* @return
*/
- private DateTime getNextRuntime(DateTime scheduledDate,
- ReadablePeriod period) {
+ private DateTime getNextRuntime(DateTime scheduledDate, ReadablePeriod period) {
DateTime now = new DateTime();
DateTime date = new DateTime(scheduledDate);
int count = 0;
@@ -250,6 +253,10 @@ public class ScheduledFlow {
+ ", scheduleId='" + scheduleId + '\'' + '}';
}
+ public String toNiceString() {
+ return scheduleId + "," + submitTime + "," + period;
+ }
+
public String getUser() {
return user;
}
src/java/azkaban/scheduler/ScheduleManager.java 115(+40 -75)
diff --git a/src/java/azkaban/scheduler/ScheduleManager.java b/src/java/azkaban/scheduler/ScheduleManager.java
index 06d8ad4..f5eeb75 100644
--- a/src/java/azkaban/scheduler/ScheduleManager.java
+++ b/src/java/azkaban/scheduler/ScheduleManager.java
@@ -20,15 +20,13 @@ import org.joda.time.format.PeriodFormat;
import azkaban.executor.ExecutableFlow;
import azkaban.executor.ExecutorManager;
import azkaban.executor.ExecutorManagerException;
-import azkaban.executor.ExecutableFlow.Status;
+
import azkaban.flow.Flow;
import azkaban.jobExecutor.utils.JobExecutionException;
import azkaban.project.Project;
import azkaban.project.ProjectManager;
import azkaban.project.ProjectManagerException;
-import azkaban.scheduler.ScheduledFlow.SchedStatus;
-import azkaban.user.Permission.Type;
-import azkaban.user.User;
+
import azkaban.utils.Props;
/**
@@ -40,8 +38,7 @@ import azkaban.utils.Props;
public class ScheduleManager {
private static Logger logger = Logger.getLogger(ScheduleManager.class);
- private final DateTimeFormatter _dateFormat = DateTimeFormat
- .forPattern("MM-dd-yyyy HH:mm:ss:SSS");
+ private final DateTimeFormatter _dateFormat = DateTimeFormat.forPattern("MM-dd-yyyy HH:mm:ss:SSS");
private ScheduleLoader loader;
private Map<String, ScheduledFlow> scheduleIDMap = new LinkedHashMap<String, ScheduledFlow>();
private final ScheduleRunner runner;
@@ -103,12 +100,14 @@ public class ScheduleManager {
*
* @param id
*/
- public synchronized void removeScheduledFlow(String scheduleId) {
+ public synchronized ScheduledFlow removeScheduledFlow(String scheduleId) {
ScheduledFlow flow = scheduleIDMap.get(scheduleId);
scheduleIDMap.remove(scheduleId);
runner.removeScheduledFlow(flow);
loader.saveSchedule(getSchedule());
+
+ return flow;
}
// public synchronized void pauseScheduledFlow(String scheduleId){
@@ -133,7 +132,7 @@ public class ScheduleManager {
// }
// }
- public void schedule(
+ public ScheduledFlow schedule(
final String scheduleId,
final String projectId,
final String flowId,
@@ -146,8 +145,9 @@ public class ScheduleManager {
+ _dateFormat.print(firstSchedTime) + " with a period of "
+ PeriodFormat.getDefault().print(period));
- schedule(new ScheduledFlow(scheduleId, projectId, flowId, user,
- userSubmit, submitTime, firstSchedTime, period));
+ ScheduledFlow scheduleFlow = new ScheduledFlow(scheduleId, projectId, flowId, user, userSubmit, submitTime, firstSchedTime, period);
+ schedule(scheduleFlow);
+ return scheduleFlow;
}
/**
@@ -157,7 +157,7 @@ public class ScheduleManager {
* @param date
* @param ignoreDep
*/
- public void schedule(
+ public ScheduledFlow schedule(
String scheduleId,
String projectId,
String flowId,
@@ -166,10 +166,10 @@ public class ScheduleManager {
DateTime submitTime,
DateTime firstSchedTime)
{
- logger.info("Scheduling flow '" + scheduleId + "' for "
- + _dateFormat.print(firstSchedTime));
- schedule(new ScheduledFlow(scheduleId, projectId, flowId, user, userSubmit, submitTime,
- firstSchedTime));
+ logger.info("Scheduling flow '" + scheduleId + "' for " + _dateFormat.print(firstSchedTime));
+ ScheduledFlow scheduleFlow = new ScheduledFlow(scheduleId, projectId, flowId, user, userSubmit, submitTime, firstSchedTime);
+ schedule(scheduleFlow);
+ return scheduleFlow;
}
/**
@@ -220,8 +220,7 @@ public class ScheduleManager {
private static final int TIMEOUT_MS = 300000;
public ScheduleRunner() {
- schedule = new PriorityBlockingQueue<ScheduledFlow>(1,
- new ScheduleComparator());
+ schedule = new PriorityBlockingQueue<ScheduledFlow>(1,new ScheduleComparator());
}
public void shutdown() {
@@ -284,68 +283,43 @@ public class ScheduleManager {
if (schedFlow == null) {
// If null, wake up every minute or so to see if
- // there's something to do.
- // Most likely there will not be.
+ // there's something to do. Most likely there will not be.
try {
this.wait(TIMEOUT_MS);
} catch (InterruptedException e) {
- // interruption should occur when items are
- // added or removed from the queue.
+ // interruption should occur when items are added or removed from the queue.
}
} else {
- // We've passed the flow execution time, so we will
- // run.
+ // We've passed the flow execution time, so we will run.
if (!schedFlow.getNextExecTime().isAfterNow()) {
- // Run flow. The invocation of flows should be
- // quick.
+ // Run flow. The invocation of flows should be quick.
ScheduledFlow runningFlow = schedule.poll();
- logger.info("Scheduler attempting to run "
- + runningFlow.getScheduleId());
+ logger.info("Scheduler attempting to run " + runningFlow.getScheduleId());
// Execute the flow here
try {
- Project project = projectManager
- .getProject(runningFlow
- .getProjectId());
+ Project project = projectManager.getProject(runningFlow.getProjectId());
if (project == null) {
- logger.error("Scheduled Project "
- + runningFlow.getProjectId()
- + " does not exist!");
- throw new RuntimeException(
- "Error finding the scheduled project. "
- + runningFlow
- .getScheduleId());
+ logger.error("Scheduled Project " + runningFlow.getProjectId() + " does not exist!");
+ throw new RuntimeException("Error finding the scheduled project. "+ runningFlow.getScheduleId());
}
- Flow flow = project.getFlow(runningFlow
- .getFlowId());
+ Flow flow = project.getFlow(runningFlow.getFlowId());
if (flow == null) {
- logger.error("Flow "
- + runningFlow.getFlowId()
- + " cannot be found in project "
- + project.getName());
- throw new RuntimeException(
- "Error finding the scheduled flow. "
- + runningFlow
- .getScheduleId());
+ logger.error("Flow " + runningFlow.getFlowId() + " cannot be found in project " + project.getName());
+ throw new RuntimeException("Error finding the scheduled flow. " + runningFlow.getScheduleId());
}
HashMap<String, Props> sources;
try {
- sources = projectManager
- .getAllFlowProperties(project,
- runningFlow.getFlowId());
+ sources = projectManager.getAllFlowProperties(project,runningFlow.getFlowId());
} catch (ProjectManagerException e) {
logger.error(e.getMessage());
- throw new RuntimeException(
- "Error getting the flow resources. "
- + runningFlow
- .getScheduleId());
+ throw new RuntimeException("Error getting the flow resources. " + runningFlow.getScheduleId());
}
// Create ExecutableFlow
- ExecutableFlow exflow = executorManager
- .createExecutableFlow(flow);
+ ExecutableFlow exflow = executorManager.createExecutableFlow(flow);
exflow.setSubmitUser(runningFlow.getUser());
// TODO make disabled in scheduled flow
// Map<String, String> paramGroup =
@@ -361,8 +335,7 @@ public class ScheduleManager {
// Create directory
try {
- executorManager
- .setupExecutableFlow(exflow);
+ executorManager.setupExecutableFlow(exflow);
} catch (ExecutorManagerException e) {
try {
executorManager.cleanupAll(exflow);
@@ -374,12 +347,9 @@ public class ScheduleManager {
}
// Copy files to the source.
- File executionDir = new File(
- exflow.getExecutionPath());
+ File executionDir = new File(exflow.getExecutionPath());
try {
- projectManager
- .copyProjectSourceFilesToDirectory(
- project, executionDir);
+ projectManager.copyProjectSourceFilesToDirectory(project, executionDir);
} catch (ProjectManagerException e) {
try {
executorManager.cleanupAll(exflow);
@@ -392,19 +362,20 @@ public class ScheduleManager {
try {
executorManager.executeFlow(exflow);
+ project.info("Scheduler has invoked " + exflow.getExecutionId());
} catch (ExecutorManagerException e) {
try {
executorManager.cleanupAll(exflow);
} catch (ExecutorManagerException e1) {
e1.printStackTrace();
}
-
+
+ project.info("Scheduler invoked flow " + exflow.getExecutionId() + " has failed.");
logger.error(e.getMessage());
return;
}
} catch (JobExecutionException e) {
- logger.info("Could not run flow. "
- + e.getMessage());
+ logger.info("Could not run flow. " + e.getMessage());
}
schedule.remove(runningFlow);
@@ -417,9 +388,7 @@ public class ScheduleManager {
saveSchedule();
} else {
// wait until flow run
- long millisWait = Math.max(0, schedFlow
- .getNextExecTime().getMillis()
- - (new DateTime()).getMillis());
+ long millisWait = Math.max(0, schedFlow.getNextExecTime().getMillis() - (new DateTime()).getMillis());
try {
this.wait(Math.min(millisWait, TIMEOUT_MS));
} catch (InterruptedException e) {
@@ -429,13 +398,9 @@ public class ScheduleManager {
}
}
} catch (Exception e) {
- logger.error(
- "Unexpected exception has been thrown in scheduler",
- e);
+ logger.error("Unexpected exception has been thrown in scheduler", e);
} catch (Throwable e) {
- logger.error(
- "Unexpected throwable has been thrown in scheduler",
- e);
+ logger.error("Unexpected throwable has been thrown in scheduler", e);
}
}
}
src/java/azkaban/utils/Mailman.java 2(+2 -0)
diff --git a/src/java/azkaban/utils/Mailman.java b/src/java/azkaban/utils/Mailman.java
index 1cfce64..38e812b 100644
--- a/src/java/azkaban/utils/Mailman.java
+++ b/src/java/azkaban/utils/Mailman.java
@@ -51,6 +51,7 @@ public class Mailman {
public void sendEmail(String fromAddress, List<String> toAddress,
String subject, String body) throws MessagingException {
SimpleEmail email = new SimpleEmail();
+
try {
email.setHostName(_mailHost);
@@ -60,6 +61,7 @@ public class Mailman {
email.setFrom(fromAddress);
email.setSubject(subject);
+
email.setMsg(body);
email.setDebug(true);
email.send();
src/java/azkaban/utils/Utils.java 35(+34 -1)
diff --git a/src/java/azkaban/utils/Utils.java b/src/java/azkaban/utils/Utils.java
index a4d5db3..1c4a717 100644
--- a/src/java/azkaban/utils/Utils.java
+++ b/src/java/azkaban/utils/Utils.java
@@ -230,5 +230,38 @@ public class Utils {
}
}
-
+ public static String formatDuration(long startTime, long endTime) {
+ if (startTime == -1) {
+ return "-";
+ }
+
+ long durationMS;
+ if (endTime == -1) {
+ durationMS = System.currentTimeMillis() - startTime;
+ }
+ else {
+ durationMS = endTime - startTime;
+ }
+
+ long seconds = durationMS/1000;
+ if (seconds < 60) {
+ return seconds + " sec";
+ }
+
+ long minutes = seconds / 60;
+ seconds %= 60;
+ if (minutes < 60) {
+ return minutes + "m " + seconds + "s";
+ }
+
+ long hours = minutes / 60;
+ minutes %= 60;
+ if (hours < 24) {
+ return hours + "h " + minutes + "m " + seconds + "s";
+ }
+
+ long days = hours / 24;
+ hours %= 24;
+ return days + "d " + hours + "h " + minutes + "m";
+ }
}
\ No newline at end of file
src/java/azkaban/utils/WebUtils.java 6(+0 -6)
diff --git a/src/java/azkaban/utils/WebUtils.java b/src/java/azkaban/utils/WebUtils.java
index b84c27a..ad1b734 100644
--- a/src/java/azkaban/utils/WebUtils.java
+++ b/src/java/azkaban/utils/WebUtils.java
@@ -1,14 +1,8 @@
package azkaban.utils;
import org.joda.time.DateTime;
-import org.joda.time.Days;
import org.joda.time.DurationFieldType;
-import org.joda.time.Hours;
-import org.joda.time.Minutes;
-import org.joda.time.Months;
import org.joda.time.ReadablePeriod;
-import org.joda.time.Seconds;
-import org.joda.time.Weeks;
import org.joda.time.format.DateTimeFormat;
import azkaban.executor.ExecutableFlow.Status;
diff --git a/src/java/azkaban/webapp/servlet/ExecutorServlet.java b/src/java/azkaban/webapp/servlet/ExecutorServlet.java
index f3fb647..6ae0e66 100644
--- a/src/java/azkaban/webapp/servlet/ExecutorServlet.java
+++ b/src/java/azkaban/webapp/servlet/ExecutorServlet.java
@@ -2,6 +2,7 @@ package azkaban.webapp.servlet;
import java.io.File;
import java.io.IOException;
+import java.io.PrintWriter;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
@@ -156,10 +157,10 @@ public class ExecutorServlet extends LoginAbstractAzkabanServlet {
Project project = projectManager.getProject(projectId);
if (project == null) {
- ret.put("error", "Project " + project + " not found.");
+ ret.put("error", "Project '" + project + "' not found.");
}
else if (!project.hasPermission(user, type)) {
- ret.put("error", "User " + user.getUserId() + " doesn't have " + type.name() + " permissions on " + projectId);
+ ret.put("error", "User '" + user.getUserId() + "' doesn't have " + type.name() + " permissions on " + projectId);
}
else {
return project;
@@ -213,9 +214,11 @@ public class ExecutorServlet extends LoginAbstractAzkabanServlet {
}
else if (ajaxName.equals("fetchExecFlowLogs")) {
ajaxFetchExecFlowLogs(req, resp, ret, session.getUser(), exFlow);
+ ret = null;
}
else if (ajaxName.equals("fetchExecJobLogs")) {
ajaxFetchJobLogs(req, resp, ret, session.getUser(), exFlow);
+ ret = null;
}
}
}
@@ -227,10 +230,21 @@ public class ExecutorServlet extends LoginAbstractAzkabanServlet {
ajaxExecuteFlow(req, resp, ret, session.getUser());
}
}
- this.writeJSON(resp, ret);
+ if (ret != null) {
+ this.writeJSON(resp, ret);
+ }
}
- private void ajaxFetchExecFlowLogs(HttpServletRequest req, HttpServletResponse resp, HashMap<String, Object> ret, User user, ExecutableFlow exFlow) throws ServletException {
+ /**
+ * Gets the logs through plain text stream to reduce memory overhead.
+ *
+ * @param req
+ * @param resp
+ * @param user
+ * @param exFlow
+ * @throws ServletException
+ */
+ private void ajaxFetchExecFlowLogs(HttpServletRequest req, HttpServletResponse resp, HashMap<String, Object> ret, User user, ExecutableFlow exFlow) throws ServletException {
Project project = getProjectAjaxByPermission(ret, exFlow.getProjectId(), user, Type.READ);
if (project == null) {
return;
@@ -239,17 +253,36 @@ public class ExecutorServlet extends LoginAbstractAzkabanServlet {
int startChar = this.getIntParam(req, "current");
int maxSize = this.getIntParam(req, "max");
- StringBuffer buffer = new StringBuffer(STRING_BUFFER_SIZE);
+ resp.setContentType("text/plain");
+ resp.setCharacterEncoding("utf-8");
+ PrintWriter writer;
try {
- long character = executorManager.getExecutableFlowLog(exFlow, buffer, startChar, maxSize);
- ret.put("current", character);
- ret.put("log", buffer.toString());
+ writer = resp.getWriter();
+ } catch (IOException e) {
+ throw new ServletException(e);
+ }
+
+ try {
+ long character = executorManager.getExecutableFlowLog(exFlow, writer, startChar, maxSize);
+ writer.write("\n");
+ writer.write(Long.toString(character));
} catch (ExecutorManagerException e) {
- e.printStackTrace();
- ret.put("error", e.getMessage());
+ throw new ServletException(e);
+ }
+ finally {
+ writer.close();
}
}
+ /**
+ * Gets the logs through ajax plain text stream to reduce memory overhead.
+ *
+ * @param req
+ * @param resp
+ * @param user
+ * @param exFlow
+ * @throws ServletException
+ */
private void ajaxFetchJobLogs(HttpServletRequest req, HttpServletResponse resp, HashMap<String, Object> ret, User user, ExecutableFlow exFlow) throws ServletException {
Project project = getProjectAjaxByPermission(ret, exFlow.getProjectId(), user, Type.READ);
if (project == null) {
@@ -260,11 +293,17 @@ public class ExecutorServlet extends LoginAbstractAzkabanServlet {
int maxSize = this.getIntParam(req, "max");
String jobId = this.getParam(req, "job");
- StringBuffer buffer = new StringBuffer(STRING_BUFFER_SIZE);
+ PrintWriter writer;
+ try {
+ writer = resp.getWriter();
+ } catch (IOException e) {
+ throw new ServletException(e);
+ }
+
try {
- long character = executorManager.getExecutionJobLog(exFlow, jobId, buffer, startChar, maxSize);
- ret.put("current", character);
- ret.put("log", buffer.toString());
+ long character = executorManager.getExecutionJobLog(exFlow, jobId, writer, startChar, maxSize);
+ writer.write("\n");
+ writer.write(Long.toString(character));
} catch (ExecutorManagerException e) {
e.printStackTrace();
ret.put("error", e.getMessage());
@@ -403,7 +442,7 @@ public class ExecutorServlet extends LoginAbstractAzkabanServlet {
Flow flow = project.getFlow(flowId);
if (flow == null) {
- ret.put("error", "Flow " + flowId + " cannot be found in project " + project);
+ ret.put("error", "Flow '" + flowId + "' cannot be found in project " + project);
return;
}
@@ -445,6 +484,7 @@ public class ExecutorServlet extends LoginAbstractAzkabanServlet {
try {
executorManager.executeFlow(exflow);
+ project.info("User '" + user.getUserId() + "' executed flow '" + exflow.getExecutionId() + "'");
} catch (ExecutorManagerException e) {
try {
executorManager.cleanupAll(exflow);
diff --git a/src/java/azkaban/webapp/servlet/Page.java b/src/java/azkaban/webapp/servlet/Page.java
index 1c52bab..7d44f8f 100644
--- a/src/java/azkaban/webapp/servlet/Page.java
+++ b/src/java/azkaban/webapp/servlet/Page.java
@@ -36,7 +36,6 @@ public class Page {
private final VelocityContext context;
private final String template;
private String mimeType = DEFAULT_MIME_TYPE;
- private VelocityUtils utils = new VelocityUtils();
/**
* Creates a page and sets up the velocity engine to render
@@ -53,7 +52,6 @@ public class Page {
this.engine = Utils.nonNull(engine);
this.template = Utils.nonNull(template);
this.context = new VelocityContext();
- this.context.put("utils", utils);
this.context.put("session", request.getSession(true));
this.context.put("context", request.getContextPath());
}
diff --git a/src/java/azkaban/webapp/servlet/ProjectManagerServlet.java b/src/java/azkaban/webapp/servlet/ProjectManagerServlet.java
index 6d54af0..29bc6d0 100644
--- a/src/java/azkaban/webapp/servlet/ProjectManagerServlet.java
+++ b/src/java/azkaban/webapp/servlet/ProjectManagerServlet.java
@@ -5,6 +5,7 @@ import java.io.File;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.OutputStream;
+import java.io.PrintWriter;
import java.io.Writer;
import java.security.AccessControlException;
import java.util.ArrayList;
@@ -84,6 +85,9 @@ public class ProjectManagerServlet extends LoginAbstractAzkabanServlet {
if (hasParam(req, "ajax")) {
handleAJAXAction(req, resp, session);
}
+ else if (hasParam(req, "logs")) {
+ handleProjectLogsPage(req, resp, session);
+ }
else if (hasParam(req, "permissions")) {
handlePermissionPage(req, resp, session);
}
@@ -141,7 +145,11 @@ public class ProjectManagerServlet extends LoginAbstractAzkabanServlet {
}
String ajaxName = getParam(req, "ajax");
- if (ajaxName.equals("fetchflowjobs")) {
+ if (ajaxName.equals("fetchProjectLogs")) {
+ ajaxFetchProjectLogs(project, req, resp, ret, user);
+ ret = null;
+ }
+ else if (ajaxName.equals("fetchflowjobs")) {
if (handleAjaxPermission(project, user, Type.READ, ret)) {
ajaxFetchFlow(project, ret, req, resp);
}
@@ -158,7 +166,7 @@ public class ProjectManagerServlet extends LoginAbstractAzkabanServlet {
}
else if (ajaxName.equals("changeDescription")) {
if (handleAjaxPermission(project, user, Type.WRITE, ret)) {
- ajaxChangeDescription(project, ret, req);
+ ajaxChangeDescription(project, ret, req, user);
}
}
else if (ajaxName.equals("getPermissions")) {
@@ -168,12 +176,12 @@ public class ProjectManagerServlet extends LoginAbstractAzkabanServlet {
}
else if (ajaxName.equals("changeUserPermission")) {
if (handleAjaxPermission(project, user, Type.ADMIN, ret)) {
- ajaxChangePermissions(project, ret, req);
+ ajaxChangePermissions(project, ret, req, user);
}
}
else if (ajaxName.equals("addUserPermission")) {
if (handleAjaxPermission(project, user, Type.ADMIN, ret)) {
- ajaxAddUserPermission(project, ret, req);
+ ajaxAddUserPermission(project, ret, req, user);
}
}
else if (ajaxName.equals("fetchFlowExecutions")) {
@@ -184,7 +192,10 @@ public class ProjectManagerServlet extends LoginAbstractAzkabanServlet {
else {
ret.put("error", "Cannot execute command " + ajaxName);
}
- this.writeJSON(resp, ret);
+
+ if (ret != null) {
+ this.writeJSON(resp, ret);
+ }
}
private boolean handleAjaxPermission(Project project, User user, Type type, Map<String, Object> ret) {
@@ -196,6 +207,41 @@ public class ProjectManagerServlet extends LoginAbstractAzkabanServlet {
return false;
}
+ /**
+ * Gets the logs through plain text stream to reduce memory overhead.
+ *
+ * @param req
+ * @param resp
+ * @param user
+ * @param exFlow
+ * @throws ServletException
+ */
+ private void ajaxFetchProjectLogs(Project project, HttpServletRequest req, HttpServletResponse resp, HashMap<String, Object> ret, User user) throws ServletException {
+ if (!project.hasPermission(user, Type.READ)) {
+ return;
+ }
+
+ int tailBytes = this.getIntParam(req, "tail");
+
+ resp.setContentType("text/plain");
+ resp.setCharacterEncoding("utf-8");
+ PrintWriter writer;
+ try {
+ writer = resp.getWriter();
+ } catch (IOException e) {
+ throw new ServletException(e);
+ }
+
+ try {
+ projectManager.getProjectLogs(project.getName(), tailBytes, 0, writer);
+ } catch (IOException e) {
+ throw new ServletException(e);
+ }
+ finally {
+ writer.close();
+ }
+ }
+
private void ajaxFetchFlowExecutions(Project project, HashMap<String, Object> ret, HttpServletRequest req) throws ServletException {
String flowId = getParam(req, "flow");
int from = Integer.valueOf(getParam(req, "start"));
@@ -272,6 +318,7 @@ public class ProjectManagerServlet extends LoginAbstractAzkabanServlet {
return;
}
+ project.info("Project removing by '" + user.getUserId() + "'");
try {
projectManager.removeProject(projectName);
} catch (ProjectManagerException e) {
@@ -284,12 +331,13 @@ public class ProjectManagerServlet extends LoginAbstractAzkabanServlet {
resp.sendRedirect(req.getContextPath());
}
- private void ajaxChangeDescription(Project project, HashMap<String, Object> ret, HttpServletRequest req) throws ServletException {
+ private void ajaxChangeDescription(Project project, HashMap<String, Object> ret, HttpServletRequest req, User user) throws ServletException {
String description = getParam(req, "description");
project.setDescription(description);
try {
projectManager.commitProject(project.getName());
+ project.info("Project description changed to '" + description + "' by " + user.getUserId());
} catch (ProjectManagerException e) {
ret.put("error", e.getMessage());
}
@@ -390,7 +438,7 @@ public class ProjectManagerServlet extends LoginAbstractAzkabanServlet {
ret.put("nodes", nodeList);
}
- private void ajaxAddUserPermission(Project project, HashMap<String, Object> ret, HttpServletRequest req) throws ServletException {
+ private void ajaxAddUserPermission(Project project, HashMap<String, Object> ret, HttpServletRequest req, User user) throws ServletException {
String username = getParam(req, "username");
UserManager userManager = getApplication().getUserManager();
if (!userManager.validateUser(username)) {
@@ -420,6 +468,7 @@ public class ProjectManagerServlet extends LoginAbstractAzkabanServlet {
}
project.setUserPermission(username, perm);
+ project.info("User '" + user.getUserId() + "' has added user '" + username + "' to the project with permission " + perm.toString());
try {
projectManager.commitProject(project.getName());
} catch (ProjectManagerException e) {
@@ -428,7 +477,7 @@ public class ProjectManagerServlet extends LoginAbstractAzkabanServlet {
}
- private void ajaxChangePermissions(Project project, HashMap<String, Object> ret, HttpServletRequest req) throws ServletException {
+ private void ajaxChangePermissions(Project project, HashMap<String, Object> ret, HttpServletRequest req, User user) throws ServletException {
boolean admin = Boolean.parseBoolean(getParam(req, "permissions[admin]"));
boolean read = Boolean.parseBoolean(getParam(req, "permissions[read]"));
boolean write = Boolean.parseBoolean(getParam(req, "permissions[write]"));
@@ -451,6 +500,8 @@ public class ProjectManagerServlet extends LoginAbstractAzkabanServlet {
perm.setPermission(Type.EXECUTE, execute);
perm.setPermission(Type.SCHEDULE, schedule);
}
+ project.info("User '" + user.getUserId() + "' has changed permissions for '" + username + "' to " + perm.toString());
+
try {
projectManager.commitProject(project.getName());
} catch (ProjectManagerException e) {
@@ -472,6 +523,29 @@ public class ProjectManagerServlet extends LoginAbstractAzkabanServlet {
ret.put("permissions", permissions);
}
+ private void handleProjectLogsPage(HttpServletRequest req, HttpServletResponse resp, Session session) throws ServletException, IOException {
+ Page page = newPage(req, resp, session, "azkaban/webapp/servlet/velocity/projectlogpage.vm");
+ User user = session.getUser();
+ String projectName = getParam(req, "project");
+
+ Project project = projectManager.getProject(projectName);
+ if (project == null) {
+ page.add("errorMsg", "Project " + projectName + " doesn't exist.");
+ }
+ page.add("projectName", projectName);
+ //page.add("projectManager", projectManager);
+ int bytesSkip = 0;
+ int numBytes = 1024;
+
+ // Really sucks if we do a lot of these because it'll eat up memory fast. But it's expected
+ // that this won't be a heavily used thing. If it is, then we'll revisit it to make it more stream
+ // friendly.
+ StringBuffer buffer = new StringBuffer(numBytes);
+ page.add("log", buffer.toString());
+
+ page.render();
+ }
+
private void handlePermissionPage(HttpServletRequest req, HttpServletResponse resp, Session session) throws ServletException {
Page page = newPage(req, resp, session, "azkaban/webapp/servlet/velocity/permissionspage.vm");
String projectName = getParam(req, "project");
@@ -697,32 +771,47 @@ public class ProjectManagerServlet extends LoginAbstractAzkabanServlet {
private void handleUpload(HttpServletRequest req, HttpServletResponse resp, Map<String, Object> multipart, Session session) throws ServletException, IOException {
User user = session.getUser();
String projectName = (String) multipart.get("project");
- FileItem item = (FileItem) multipart.get("file");
- String forceStr = (String) multipart.get("force");
- boolean force = forceStr == null ? false : Boolean.parseBoolean(forceStr);
- File projectDir = null;
+ Project project = projectManager.getProject(projectName);
+
if (projectName == null || projectName.isEmpty()) {
setErrorMessageInCookie(resp, "No project name found.");
}
- else if (item == null) {
- setErrorMessageInCookie(resp, "No file found.");
+ else if (project == null) {
+ setErrorMessageInCookie(resp, "Installation Failed. Project '" + projectName + "' doesn't exist.");
+ }
+ else if (!project.hasPermission(user, Type.WRITE)) {
+ setErrorMessageInCookie(resp, "Installation Failed. User '" + user.getUserId() + "' does not have write access.");
}
else {
- try {
- projectDir = extractFile(item);
- projectManager.uploadProject(projectName, projectDir, user, force);
- setSuccessMessageInCookie(resp, "Project Uploaded");
- }
- catch (Exception e) {
- logger.info("Installation Failed.", e);
- setErrorMessageInCookie(resp, "Installation Failed.\n" + e.getMessage());
+ FileItem item = (FileItem) multipart.get("file");
+ String forceStr = (String) multipart.get("force");
+ boolean force = forceStr == null ? false : Boolean.parseBoolean(forceStr);
+ File projectDir = null;
+ if (projectName == null || projectName.isEmpty()) {
+ setErrorMessageInCookie(resp, "No project name found.");
}
-
- if (projectDir != null && projectDir.exists() ) {
- FileUtils.deleteDirectory(projectDir);
+ else if (item == null) {
+ setErrorMessageInCookie(resp, "No file found.");
+ }
+ else {
+ try {
+ projectDir = extractFile(item);
+ projectManager.uploadProject(projectName, projectDir, user, force);
+ setSuccessMessageInCookie(resp, "Project Uploaded");
+ }
+ catch (Exception e) {
+ logger.info("Installation Failed.", e);
+ project.error("Upload by '" + user.getUserId() + "' failed: " + e.getMessage());
+ setErrorMessageInCookie(resp, "Installation Failed.\n" + e.getMessage());
+ }
+
+ if (projectDir != null && projectDir.exists() ) {
+ FileUtils.deleteDirectory(projectDir);
+ }
+ project.error("New project files uploaded by '" + user.getUserId() + "'");
}
- resp.sendRedirect(req.getRequestURI() + "?project=" + projectName);
}
+ resp.sendRedirect(req.getRequestURI() + "?project=" + projectName);
}
private File extractFile(FileItem item) throws IOException, ServletException {
diff --git a/src/java/azkaban/webapp/servlet/ScheduleServlet.java b/src/java/azkaban/webapp/servlet/ScheduleServlet.java
index a05ca60..bafb86b 100644
--- a/src/java/azkaban/webapp/servlet/ScheduleServlet.java
+++ b/src/java/azkaban/webapp/servlet/ScheduleServlet.java
@@ -109,11 +109,11 @@ public class ScheduleServlet extends LoginAbstractAzkabanServlet {
}
scheduleManager.removeScheduledFlow(scheduleId);
-
+ project.info("User '" + user.getUserId() + " has removed schedule " + schedFlow.toNiceString());
+
ret.put("status", "success");
ret.put("message", scheduleId + " removed.");
return;
-
}
private void ajaxScheduleFlow(HttpServletRequest req, Map<String, Object> ret, User user) throws ServletException {
@@ -178,11 +178,11 @@ public class ScheduleServlet extends LoginAbstractAzkabanServlet {
DateTime submitTime = new DateTime();
DateTime firstSchedTime = day.withHourOfDay(hour).withMinuteOfHour(minutes).withSecondOfMinute(0);
- scheduleManager.schedule(scheduleId, projectId, flowId, userExec, userSubmit, submitTime, firstSchedTime, thePeriod);
+ ScheduledFlow schedFlow = scheduleManager.schedule(scheduleId, projectId, flowId, userExec, userSubmit, submitTime, firstSchedTime, thePeriod);
+ project.info("User '" + user.getUserId() + "' has scheduled " + flow.getId() + "[" + schedFlow.toNiceString() + "].");
ret.put("status", "success");
ret.put("message", scheduleId + " scheduled.");
-
}
private ReadablePeriod parsePeriod(HttpServletRequest req) throws ServletException {
diff --git a/src/java/azkaban/webapp/servlet/velocity/executingflowpage.vm b/src/java/azkaban/webapp/servlet/velocity/executingflowpage.vm
index 8fe3bf0..20d2bc8 100644
--- a/src/java/azkaban/webapp/servlet/velocity/executingflowpage.vm
+++ b/src/java/azkaban/webapp/servlet/velocity/executingflowpage.vm
@@ -101,7 +101,7 @@
</table>
</div>
<div id="flowLogView" class="logView">
- <div class="logHeader"><div class="logButtonRow"><div id="updateLogBtn" class="btn7">Load More</div></div></div>
+ <div class="logHeader"><div class="logButtonRow"><div id="updateLogBtn" class="btn7">Refresh</div></div></div>
<div class="logViewer">
<pre id="logSection" class="log"></pre>
</div>
diff --git a/src/java/azkaban/webapp/servlet/velocity/joblogpage.vm b/src/java/azkaban/webapp/servlet/velocity/joblogpage.vm
index dd9f233..a0da7c7 100644
--- a/src/java/azkaban/webapp/servlet/velocity/joblogpage.vm
+++ b/src/java/azkaban/webapp/servlet/velocity/joblogpage.vm
@@ -57,13 +57,20 @@
</ul>
</div>
- <div id="jobLogView" class="logView">
- <div class="logHeader"><div class="logButtonRow"><div id="updateLogBtn" class="btn7">Load More</div></div></div>
+ <div id="projectLogView" class="logView">
+ <div class="logHeader"><div class="logButtonRow"><div id="updateLogBtn" class="btn7">Refresh</div></div></div>
<div class="logViewer">
<pre id="logSection" class="log"></pre>
</div>
</div>
#end
+
+ <div id="messageDialog" class="modal">
+ <h3 id="messageTitle">Error</h3>
+ <div class="messageDiv">
+ <p id="messageBox"></p>
+ </div>
+ </div>
</div>
</body>
</html>
\ No newline at end of file
diff --git a/src/java/azkaban/webapp/servlet/velocity/nav.vm b/src/java/azkaban/webapp/servlet/velocity/nav.vm
index 2517d8f..0045e1d 100644
--- a/src/java/azkaban/webapp/servlet/velocity/nav.vm
+++ b/src/java/azkaban/webapp/servlet/velocity/nav.vm
@@ -8,7 +8,7 @@
<ul id="nav" class="nav">
<li id="all-jobs-tab" #if($current_page == 'all')class="selected"#end onClick="navMenuClick('$!context/')"><a href="$!context/">Projects</a></li>
- <li id="scheduled-jobs-tab" #if($current_page == 'schedule')class="scheduled"#end onClick="navMenuClick('$!context/schedule')"><a href="$!context/schedule">Scheduled</a></li>
+ <li id="scheduled-jobs-tab" #if($current_page == 'schedule')class="selected"#end onClick="navMenuClick('$!context/schedule')"><a href="$!context/schedule">Scheduled</a></li>
<li id="executing-jobs-tab" #if($current_page == 'executing')class="selected"#end onClick="navMenuClick('$!context/executor')"><a href="$!context/executor">Executing</a></li>
<li id="history-jobs-tab" #if($current_page == 'history')class="selected"#end onClick="navMenuClick('$!context/history')"><a href="$!context/history">History</a></li>
<li><a href="$!context/fs">HDFS</a></li>
diff --git a/src/java/azkaban/webapp/servlet/velocity/projectlogpage.vm b/src/java/azkaban/webapp/servlet/velocity/projectlogpage.vm
new file mode 100644
index 0000000..98dc455
--- /dev/null
+++ b/src/java/azkaban/webapp/servlet/velocity/projectlogpage.vm
@@ -0,0 +1,66 @@
+<!DOCTYPE html>
+<html>
+ <head>
+#parse( "azkaban/webapp/servlet/velocity/style.vm" )
+ <script type="text/javascript" src="${context}/js/jquery/jquery.js"></script>
+ <script type="text/javascript" src="${context}/js/namespace.js"></script>
+ <script type="text/javascript" src="${context}/js/underscore-1.2.1-min.js"></script>
+ <script type="text/javascript" src="${context}/js/backbone-0.5.3-min.js"></script>
+ <script type="text/javascript" src="${context}/js/jquery.simplemodal.js"></script>
+ <script type="text/javascript" src="${context}/js/azkaban.ajax.utils.js"></script>
+ <script type="text/javascript" src="${context}/js/azkaban.nav.js"></script>
+ <script type="text/javascript" src="${context}/js/azkaban.projectlog.view.js"></script>
+
+ <script type="text/javascript">
+ var contextURL = "${context}";
+ var currentTime = ${currentTime};
+ var timezone = "${timezone}";
+ var errorMessage = ${error_message};
+ var successMessage = ${success_message};
+
+ var projectName = "${projectName}";
+
+ </script>
+ </head>
+ <body>
+ #set($current_page="executing")
+#parse( "azkaban/webapp/servlet/velocity/nav.vm" )
+ <div class="content">
+#if($errorMsg)
+ <div class="box-error-message">$errorMsg</div>
+#else
+#if($error_message != "null")
+ <div class="box-error-message">$error_message</div>
+#elseif($success_message != "null")
+ <div class="box-success-message">$success_message</div>
+#end
+
+ <div id="all-jobs-content">
+ <div class="section-hd flow-header">
+ <h2><a href="${context}/manager?project=${projectName}">Project Audit Logs <span>$projectName</span></a></h2>
+ </div>
+ </div>
+
+ <div id="headertabs" class="headertabs">
+ <ul>
+ <li><a id="logViewLink" href="#log">Log</a></li>
+ </ul>
+ </div>
+
+ <div id="projectLogView" class="logView">
+ <div class="logHeader"><div class="logButtonRow"><div id="updateLogBtn" class="btn7">Refresh</div></div></div>
+ <div class="logViewer">
+ <pre id="logSection" class="log"></pre>
+ </div>
+ </div>
+#end
+ <div id="messageDialog" class="modal">
+ <h3 id="messageTitle">Error</h3>
+ <div class="messageDiv">
+ <p id="messageBox"></p>
+ </div>
+ </div>
+
+ </div>
+ </body>
+</html>
\ No newline at end of file
diff --git a/src/java/azkaban/webapp/servlet/velocity/projectpage.vm b/src/java/azkaban/webapp/servlet/velocity/projectpage.vm
index cd2695f..8887ea9 100644
--- a/src/java/azkaban/webapp/servlet/velocity/projectpage.vm
+++ b/src/java/azkaban/webapp/servlet/velocity/projectpage.vm
@@ -38,9 +38,12 @@
<div id="all-jobs-content">
<div class="section-hd">
<h2><a href="${context}/manager?project=${project.name}">Project <span>$project.name</span></a></h2>
-
- <a id="project-upload-btn" class="btn1 projectupload" href="#">Upload</a>
+ <a id="project-upload-btn" class="btn1 projectupload">Upload</a>
<a id="project-permission-btn" class="btn5 projectpermission" href="${context}/manager?project=${project.name}&permissions">Permissions</a>
+ #if($admin)
+ <a id="project-logs-btn" class="btn2" href="${context}/manager?project=${project.name}&logs">Project Logs</a>
+ <a id="project-delete-btn" class="btn6">Delete Project</a>
+ #end
</div><!-- end .section-hd -->
</div>
@@ -49,10 +52,6 @@
<tr><td class="first">Project Admins:</td><td>$admins</td></tr>
<tr><td class="first">Your Permissions:</td><td>$userpermission.toString()</td></tr>
</table>
- #if($admin)
- <div id="deleteProject" class="btn6">Delete Project</div>
- #end
-
</div>
<div id="project-summary">
diff --git a/src/java/azkaban/webapp/servlet/velocity/scheduledflowpage.vm b/src/java/azkaban/webapp/servlet/velocity/scheduledflowpage.vm
index 9a16b9e..b0f33d3 100644
--- a/src/java/azkaban/webapp/servlet/velocity/scheduledflowpage.vm
+++ b/src/java/azkaban/webapp/servlet/velocity/scheduledflowpage.vm
@@ -21,7 +21,7 @@
</script>
</head>
<body>
-#set($current_page="scheduledFlows")
+#set($current_page="schedule")
#parse( "azkaban/webapp/servlet/velocity/nav.vm" )
<div class="messaging"><p id="messageClose">X</p><p id="message"></p></div>
src/web/css/azkaban.css 12(+6 -6)
diff --git a/src/web/css/azkaban.css b/src/web/css/azkaban.css
index 2679b52..11c7103 100644
--- a/src/web/css/azkaban.css
+++ b/src/web/css/azkaban.css
@@ -356,8 +356,10 @@ tr:hover td {
background: -webkit-gradient(linear, center top, center bottom, color-stop(0,#AFE47B), color-stop(5%,#8BD03F),color-stop(100%,#69B219));
}
+.section-hd .btn2,
.section-hd .btn4,
.section-hd .btn5,
+.section-hd .btn6,
.section-ft .btn4,
.section-ft .btn5{
float: right;
@@ -969,10 +971,8 @@ tr:hover td {
width: 30%;
}
-#deleteProject {
- width: 100px;
- text-align: center;
- margin-top: 10px;
+#adminActions {
+
}
#job-summary {
@@ -1211,7 +1211,7 @@ tr:hover td {
left: 5px;
right: 5px;
background-color: #FFF;
- overflow:scroll;
+ overflow:auto;
}
.logLink {
@@ -1386,7 +1386,7 @@ table.parameters tr td {
}
#edit {
- width: 90px;
+ width: 100px;
text-align: center;
}
src/web/js/azkaban.ajax.utils.js 16(+16 -0)
diff --git a/src/web/js/azkaban.ajax.utils.js b/src/web/js/azkaban.ajax.utils.js
index c087086..395d696 100644
--- a/src/web/js/azkaban.ajax.utils.js
+++ b/src/web/js/azkaban.ajax.utils.js
@@ -28,3 +28,19 @@ function ajaxCall(requestURL, data, callback) {
"json"
);
}
+
+function ajaxLogsCall(requestURL, data, callback) {
+ $.get(
+ requestURL,
+ data,
+ function(data) {
+ var pos = data.lastIndexOf("\n");
+ var log = data.substring(0, pos);
+ var currentPos = data.substr(pos + 1);
+
+ var newData = {current: currentPos, log: log};
+
+ callback.call(this,newData);
+ }
+ );
+}
\ No newline at end of file
src/web/js/azkaban.exflow.view.js 17(+16 -1)
diff --git a/src/web/js/azkaban.exflow.view.js b/src/web/js/azkaban.exflow.view.js
index 570b2fc..dbbaeb1 100644
--- a/src/web/js/azkaban.exflow.view.js
+++ b/src/web/js/azkaban.exflow.view.js
@@ -852,7 +852,7 @@ azkaban.FlowLogView = Backbone.View.extend({
var current = this.model.get("current");
var requestURL = contextURL + "/executor";
var model = this.model;
- ajaxCall(
+ ajaxLogsCall(
requestURL,
{"execid": execId, "ajax":"fetchExecFlowLogs", "current": current, "max": 100000},
function(data) {
@@ -872,6 +872,7 @@ azkaban.FlowLogView = Backbone.View.extend({
current = data.current;
$("#logSection").text(log);
model.set({"current": current, "log": log});
+ $(".logViewer").scrollTop(9999);
}
}
);
@@ -941,6 +942,19 @@ var updaterFunction = function() {
}
}
+var logUpdaterFunction = function() {
+ var oldData = graphModel.get("data");
+ var keepRunning = oldData.status != "SUCCEEDED" && oldData.status != "FAILED" && oldData.status != "KILLED";
+ if (keepRunning) {
+ // update every 30 seconds for the logs until finished
+ flowLogView.handleUpdate();
+ setTimeout(function() {logUpdaterFunction();}, 30000);
+ }
+ else {
+ flowLogView.handleUpdate();
+ }
+}
+
$(function() {
var selected;
@@ -987,6 +1001,7 @@ $(function() {
}
updaterFunction();
+ logUpdaterFunction();
}
);
});
src/web/js/azkaban.joblog.view.js 22(+21 -1)
diff --git a/src/web/js/azkaban.joblog.view.js b/src/web/js/azkaban.joblog.view.js
index 1150736..7aa5ad7 100644
--- a/src/web/js/azkaban.joblog.view.js
+++ b/src/web/js/azkaban.joblog.view.js
@@ -17,7 +17,7 @@ azkaban.JobLogView = Backbone.View.extend({
var requestURL = contextURL + "/executor";
var model = this.model;
- ajaxCall(
+ ajaxLogsCall(
requestURL,
{"execid": execId, "job": jobId, "ajax":"fetchExecJobLogs", "current": current, "max": 100000},
function(data) {
@@ -37,12 +37,32 @@ azkaban.JobLogView = Backbone.View.extend({
current = data.current;
$("#logSection").text(log);
model.set({"current": current, "log": log});
+ $(".logViewer").scrollTop(9999);
}
}
);
}
});
+var showDialog = function(title, message) {
+ $('#messageTitle').text(title);
+
+ $('#messageBox').text(message);
+
+ $('#messageDialog').modal({
+ closeHTML: "<a href='#' title='Close' class='modal-close'>x</a>",
+ position: ["20%",],
+ containerId: 'confirm-container',
+ containerCss: {
+ 'height': '220px',
+ 'width': '565px'
+ },
+ onShow: function (dialog) {
+ }
+ });
+}
+
+
$(function() {
var selected;
src/web/js/azkaban.project.view.js 35(+17 -18)
diff --git a/src/web/js/azkaban.project.view.js b/src/web/js/azkaban.project.view.js
index 8694413..e416a6a 100644
--- a/src/web/js/azkaban.project.view.js
+++ b/src/web/js/azkaban.project.view.js
@@ -3,7 +3,8 @@ $.namespace('azkaban');
var projectView;
azkaban.ProjectView= Backbone.View.extend({
events : {
- "click #project-upload-btn":"handleUploadProjectJob"
+ "click #project-upload-btn":"handleUploadProjectJob",
+ "click #project-delete-btn": "handleDeleteProject"
},
initialize : function(settings) {
},
@@ -23,6 +24,21 @@ azkaban.ProjectView= Backbone.View.extend({
}
});
},
+ handleDeleteProject : function(evt) {
+ $('#delete-project').modal({
+ closeHTML: "<a href='#' title='Close' class='modal-close'>x</a>",
+ position: ["20%",],
+ containerId: 'confirm-container',
+ containerCss: {
+ 'height': '220px',
+ 'width': '565px'
+ },
+ onShow: function (dialog) {
+ var modal = this;
+ $("#errorMsg").hide();
+ }
+ });
+ },
render: function() {
}
});
@@ -224,21 +240,4 @@ $(function() {
deleteProjectView = new azkaban.DeleteProjectView({el: $('#delete-project')});
// Setting up the project tabs
- $('#deleteProject').click(function() {
- $('#delete-project').modal({
- closeHTML: "<a href='#' title='Close' class='modal-close'>x</a>",
- position: ["20%",],
- containerId: 'confirm-container',
- containerCss: {
- 'height': '220px',
- 'width': '565px'
- },
- onShow: function (dialog) {
- var modal = this;
- $("#errorMsg").hide();
- }
- });
- }
- );
-
});
src/web/js/azkaban.projectlog.view.js 62(+62 -0)
diff --git a/src/web/js/azkaban.projectlog.view.js b/src/web/js/azkaban.projectlog.view.js
new file mode 100644
index 0000000..5dd8314
--- /dev/null
+++ b/src/web/js/azkaban.projectlog.view.js
@@ -0,0 +1,62 @@
+$.namespace('azkaban');
+
+var logModel;
+azkaban.LogModel = Backbone.Model.extend({});
+
+var projectLogView;
+azkaban.ProjectLogView = Backbone.View.extend({
+ events: {
+ "click #updateLogBtn" : "handleUpdate"
+ },
+ initialize: function(settings) {
+ this.model.set({"current": 0});
+ this.handleUpdate();
+ },
+ handleUpdate: function(evt) {
+ var current = this.model.get("current");
+ var requestURL = contextURL + "/manager";
+ var model = this.model;
+
+ $.get(
+ requestURL,
+ {"project": projectName, "ajax":"fetchProjectLogs", "tail": 100000},
+ function(data) {
+ console.log("fetchLogs");
+ if (data.error) {
+ showDialog("Error", data.error);
+ }
+ else {
+ $("#logSection").text(data);
+ model.set({"log": data});
+ $(".logViewer").scrollTop(9999);
+ }
+ }
+ );
+ }
+});
+
+var showDialog = function(title, message) {
+ $('#messageTitle').text(title);
+
+ $('#messageBox').text(message);
+
+ $('#messageDialog').modal({
+ closeHTML: "<a href='#' title='Close' class='modal-close'>x</a>",
+ position: ["20%",],
+ containerId: 'confirm-container',
+ containerCss: {
+ 'height': '220px',
+ 'width': '565px'
+ },
+ onShow: function (dialog) {
+ }
+ });
+}
+
+
+$(function() {
+ var selected;
+
+ logModel = new azkaban.LogModel();
+ projectLogView = new azkaban.ProjectLogView({el:$('#projectLogView'), model: logModel});
+});