azkaban-uncached
Changes
src/java/azkaban/sla/SLAManager.java 2(+1 -1)
src/sql/create_execution_logs.sql 1(+1 -0)
src/sql/update_2.0_to_2.01.sql 1(+1 -0)
src/web/js/azkaban.scheduled.view.js 6(+3 -3)
Details
diff --git a/src/java/azkaban/executor/ExecutorLoader.java b/src/java/azkaban/executor/ExecutorLoader.java
index 13eea40..c0108e2 100644
--- a/src/java/azkaban/executor/ExecutorLoader.java
+++ b/src/java/azkaban/executor/ExecutorLoader.java
@@ -70,5 +70,7 @@ public interface ExecutorLoader {
public Props fetchExecutionJobOutputProps(int execId, String jobId) throws ExecutorManagerException;
public Pair<Props, Props> fetchExecutionJobProps(int execId, String jobId) throws ExecutorManagerException;
+
+ public int removeExecutionLogsByTime(long millis) throws ExecutorManagerException;
}
diff --git a/src/java/azkaban/executor/ExecutorManager.java b/src/java/azkaban/executor/ExecutorManager.java
index b18bc6c..bff70c2 100644
--- a/src/java/azkaban/executor/ExecutorManager.java
+++ b/src/java/azkaban/executor/ExecutorManager.java
@@ -35,6 +35,7 @@ import org.apache.http.client.utils.URIBuilder;
import org.apache.http.impl.client.BasicResponseHandler;
import org.apache.http.impl.client.DefaultHttpClient;
import org.apache.log4j.Logger;
+import org.joda.time.DateTime;
import azkaban.project.Project;
import azkaban.utils.FileIOUtils.LogData;
@@ -52,12 +53,17 @@ public class ExecutorManager {
private String executorHost;
private int executorPort;
+ private CleanerThread cleanerThread;
+
private ConcurrentHashMap<Integer, Pair<ExecutionReference, ExecutableFlow>> runningFlows = new ConcurrentHashMap<Integer, Pair<ExecutionReference, ExecutableFlow>>();
private ConcurrentHashMap<Integer, ExecutableFlow> recentlyFinished = new ConcurrentHashMap<Integer, ExecutableFlow>();
private ExecutorMailer mailer;
private ExecutingManagerUpdaterThread executingManager;
+ private static final long DEFAULT_EXECUTION_LOGS_RETENTION_MS = 3*4*7*24*60*60*1000l;
+ private long lastCleanerThreadCheckTime = -1;
+
private long lastThreadCheckTime = -1;
public ExecutorManager(Props props, ExecutorLoader loader) throws ExecutorManagerException {
@@ -69,6 +75,10 @@ public class ExecutorManager {
mailer = new ExecutorMailer(props);
executingManager = new ExecutingManagerUpdaterThread();
executingManager.start();
+
+ long executionLogsRetentionMs = props.getLong("azkaban.execution.logs.retention.ms", DEFAULT_EXECUTION_LOGS_RETENTION_MS);
+ cleanerThread = new CleanerThread(executionLogsRetentionMs);
+ cleanerThread.start();
}
public String getExecutorHost() {
@@ -108,6 +118,7 @@ public class ExecutorManager {
public boolean isFlowRunning(int projectId, String flowId) {
for (Pair<ExecutionReference, ExecutableFlow> ref : runningFlows.values()) {
+
if (ref.getSecond().getProjectId() == projectId && ref.getSecond().getFlowId().equals(flowId)) {
return true;
}
@@ -345,6 +356,17 @@ public class ExecutorManager {
return message;
}
}
+
+
+ public void cleanOldExecutionLogs(long millis) {
+ try {
+ int count = executorLoader.removeExecutionLogsByTime(millis);
+ logger.info("Cleaned up " + count + " log entries.");
+ }
+ catch (ExecutorManagerException e) {
+ e.printStackTrace();
+ }
+ }
private Map<String, Object> callExecutorServer(ExecutionReference ref, String action) throws ExecutorManagerException {
try {
@@ -809,4 +831,60 @@ public class ExecutorManager {
outputList.addAll(flows);
return executorLoader.fetchNumExecutableFlows(projectId, flowId);
}
+
+ /*
+ * cleaner thread to clean up execution_logs, etc in DB. Runs every day.
+ *
+ */
+ private class CleanerThread extends Thread {
+ // log file retention is 1 month.
+
+ // check every day
+ private static final long CLEANER_THREAD_WAIT_INTERVAL_MS = 24*60*60*1000;
+
+ private final long executionLogsRetentionMs;
+
+ private boolean shutdown = false;
+ private long lastLogCleanTime = -1;
+
+ public CleanerThread(long executionLogsRetentionMs) {
+ this.executionLogsRetentionMs = executionLogsRetentionMs;
+ this.setName("AzkabanWebServer-Cleaner-Thread");
+ }
+
+ @SuppressWarnings("unused")
+ public void shutdown() {
+ shutdown = true;
+ this.interrupt();
+ }
+
+ public void run() {
+ while (!shutdown) {
+ synchronized (this) {
+ try {
+ lastCleanerThreadCheckTime = System.currentTimeMillis();
+
+ // Cleanup old stuff.
+ long currentTime = System.currentTimeMillis();
+ if (currentTime - CLEANER_THREAD_WAIT_INTERVAL_MS > lastLogCleanTime) {
+ cleanExecutionLogs();
+ lastLogCleanTime = currentTime;
+ }
+
+
+ wait(CLEANER_THREAD_WAIT_INTERVAL_MS);
+ } catch (InterruptedException e) {
+ logger.info("Interrupted. Probably to shut down.");
+ }
+ }
+ }
+ }
+
+ private void cleanExecutionLogs() {
+ logger.info("Cleaning old logs from execution_logs");
+ long cutoff = DateTime.now().getMillis() - executionLogsRetentionMs;
+ logger.info("Cleaning old log files before " + new DateTime(cutoff).toString());
+ cleanOldExecutionLogs(DateTime.now().getMillis() - executionLogsRetentionMs);
+ }
+ }
}
diff --git a/src/java/azkaban/executor/JdbcExecutorLoader.java b/src/java/azkaban/executor/JdbcExecutorLoader.java
index 481a8fe..6464f6b 100644
--- a/src/java/azkaban/executor/JdbcExecutorLoader.java
+++ b/src/java/azkaban/executor/JdbcExecutorLoader.java
@@ -22,6 +22,7 @@ import org.apache.commons.dbutils.QueryRunner;
import org.apache.commons.dbutils.ResultSetHandler;
import org.apache.commons.io.IOUtils;
import org.apache.log4j.Logger;
+import org.joda.time.DateTime;
import azkaban.utils.DataSourceUtils;
import azkaban.utils.FileIOUtils;
@@ -620,7 +621,7 @@ public class JdbcExecutorLoader implements ExecutorLoader {
}
private void uploadLogPart(Connection connection, int execId, String name, int attempt, int startByte, int endByte, EncodingType encType, byte[] buffer, int length) throws SQLException, IOException {
- final String INSERT_EXECUTION_LOGS = "INSERT INTO execution_logs (exec_id, name, attempt, enc_type, start_byte, end_byte, log) VALUES (?,?,?,?,?,?,?)";
+ final String INSERT_EXECUTION_LOGS = "INSERT INTO execution_logs (exec_id, name, attempt, enc_type, start_byte, end_byte, log, upload_time) VALUES (?,?,?,?,?,?,?,?)";
QueryRunner runner = new QueryRunner();
byte[] buf = buffer;
@@ -631,7 +632,7 @@ public class JdbcExecutorLoader implements ExecutorLoader {
buf = Arrays.copyOf(buffer, length);
}
- runner.update(connection, INSERT_EXECUTION_LOGS, execId, name, attempt, encType.getNumVal(), startByte, startByte + length, buf);
+ runner.update(connection, INSERT_EXECUTION_LOGS, execId, name, attempt, encType.getNumVal(), startByte, startByte + length, buf, DateTime.now().getMillis());
}
private Connection getConnection() throws ExecutorManagerException {
@@ -913,4 +914,19 @@ public class JdbcExecutorLoader implements ExecutorLoader {
return rs.getInt(1);
}
}
+
+ @Override
+ public int removeExecutionLogsByTime(long millis) throws ExecutorManagerException {
+ final String DELETE_BY_TIME = "DELETE FROM execution_logs WHERE upload_time < ?";
+
+ QueryRunner runner = new QueryRunner(dataSource);
+ int updateNum = 0;
+ try {
+ updateNum = runner.update(DELETE_BY_TIME, millis);
+ } catch (SQLException e) {
+ throw new ExecutorManagerException("Error deleting old execution_logs before " + millis);
+ }
+
+ return updateNum;
+ }
}
\ No newline at end of file
diff --git a/src/java/azkaban/scheduler/JdbcScheduleLoader.java b/src/java/azkaban/scheduler/JdbcScheduleLoader.java
index 96130dd..5227a90 100644
--- a/src/java/azkaban/scheduler/JdbcScheduleLoader.java
+++ b/src/java/azkaban/scheduler/JdbcScheduleLoader.java
@@ -142,8 +142,12 @@ public class JdbcScheduleLoader implements ScheduleLoader {
DbUtils.closeQuietly(connection);
throw new ScheduleManagerException("Loading schedules from db failed. ", e);
+ } finally {
+ DbUtils.closeQuietly(connection);
}
+ logger.info("Now trying to update the schedules");
+
// filter the schedules
for(Schedule sched : schedules) {
if(!sched.updateTime()) {
@@ -152,17 +156,18 @@ public class JdbcScheduleLoader implements ScheduleLoader {
removeSchedule(sched);
}
else {
+ logger.info("Recurring schedule, need to update next exec time");
try {
updateNextExecTime(sched);
} catch (Exception e) {
- DbUtils.closeQuietly(connection);
+ e.printStackTrace();
throw new ScheduleManagerException("Update next execution time failed.", e);
- }
+ }
logger.info("Schedule " + sched.getScheduleName() + " loaded and updated.");
}
}
- DbUtils.closeQuietly(connection);
+
logger.info("Loaded " + schedules.size() + " schedules.");
@@ -238,13 +243,18 @@ public class JdbcScheduleLoader implements ScheduleLoader {
@Override
public void updateNextExecTime(Schedule s) throws ScheduleManagerException
{
+ logger.info("Update schedule " + s.getScheduleName() + " into db. ");
Connection connection = getConnection();
QueryRunner runner = new QueryRunner();
try {
+
runner.update(connection, UPDATE_NEXT_EXEC_TIME, s.getNextExecTime(), s.getProjectId(), s.getFlowName());
} catch (SQLException e) {
- logger.error(UPDATE_NEXT_EXEC_TIME + " failed.");
+ e.printStackTrace();
+ logger.error(UPDATE_NEXT_EXEC_TIME + " failed.", e);
throw new ScheduleManagerException("Update schedule " + s.getScheduleName() + " into db failed. ", e);
+ } finally {
+ DbUtils.closeQuietly(connection);
}
}
diff --git a/src/java/azkaban/scheduler/ScheduleManager.java b/src/java/azkaban/scheduler/ScheduleManager.java
index e298356..d1bbd6d 100644
--- a/src/java/azkaban/scheduler/ScheduleManager.java
+++ b/src/java/azkaban/scheduler/ScheduleManager.java
@@ -91,6 +91,7 @@ public class ScheduleManager {
scheduleList = loader.loadSchedules();
} catch (ScheduleManagerException e) {
// TODO Auto-generated catch block
+ logger.error("Failed to load schedules" + e.getCause() + e.getMessage());
e.printStackTrace();
}
@@ -338,6 +339,7 @@ public class ScheduleManager {
// If null, wake up every minute or so to see if
// there's something to do. Most likely there will not be.
try {
+ logger.info("Nothing scheduled to run. Checking again soon.");
nextWakupTime = System.currentTimeMillis() + TIMEOUT_MS;
this.wait(TIMEOUT_MS);
} catch (InterruptedException e) {
@@ -349,11 +351,12 @@ public class ScheduleManager {
// Run flow. The invocation of flows should be quick.
Schedule runningSched = schedules.poll();
- logger.info("Scheduler attempting to run " + runningSched.getScheduleName() );
+ logger.info("Scheduler attempting to run " + runningSched.toString() );
// check if it is already running
if(!executorManager.isFlowRunning(runningSched.getProjectId(), runningSched.getFlowName()))
{
+ logger.info("Scheduler ready to run " + runningSched.toString());
// Execute the flow here
try {
Project project = projectManager.getProject(runningSched.getProjectId());
@@ -381,13 +384,13 @@ public class ScheduleManager {
executorManager.submitExecutableFlow(exflow);
logger.info("Scheduler has invoked " + exflow.getExecutionId());
} catch (Exception e) {
- logger.error("Scheduler invoked flow " + exflow.getExecutionId() + " has failed.");
- logger.error(e.getMessage());
- return;
+ e.printStackTrace();
+ throw new ScheduleManagerException("Scheduler invoked flow " + exflow.getExecutionId() + " has failed.", e);
}
SlaOptions slaOptions = runningSched.getSlaOptions();
if(slaOptions != null) {
+ logger.info("Submitting SLA checkings for " + runningSched.getFlowName());
// submit flow slas
List<SlaSetting> jobsettings = new ArrayList<SlaSetting>();
for(SlaSetting set : slaOptions.getSettings()) {
@@ -412,7 +415,6 @@ public class ScheduleManager {
removeRunnerSchedule(runningSched);
-
// Immediately reschedule if it's possible. Let
// the execution manager
// handle any duplicate runs.
src/java/azkaban/sla/SLAManager.java 2(+1 -1)
diff --git a/src/java/azkaban/sla/SLAManager.java b/src/java/azkaban/sla/SLAManager.java
index 911f9e3..3c2a4fe 100644
--- a/src/java/azkaban/sla/SLAManager.java
+++ b/src/java/azkaban/sla/SLAManager.java
@@ -345,7 +345,7 @@ public class SLAManager {
}
private void takeSLASuccessActions(SLA s, ExecutableFlow exflow) {
- sendSlaSuccessEmail(s, exflow);
+ //sendSlaSuccessEmail(s, exflow);
}
diff --git a/src/java/azkaban/webapp/AzkabanWebServer.java b/src/java/azkaban/webapp/AzkabanWebServer.java
index 3e4a654..3ebd2d7 100644
--- a/src/java/azkaban/webapp/AzkabanWebServer.java
+++ b/src/java/azkaban/webapp/AzkabanWebServer.java
@@ -48,6 +48,7 @@ import org.mortbay.jetty.servlet.DefaultServlet;
import org.mortbay.jetty.servlet.ServletHolder;
import org.mortbay.thread.QueuedThreadPool;
+
import azkaban.executor.ExecutorManager;
import azkaban.executor.JdbcExecutorLoader;
import azkaban.jmx.JmxExecutorManager;
@@ -142,6 +143,9 @@ public class AzkabanWebServer implements AzkabanServer {
private MBeanServer mbeanServer;
private ArrayList<ObjectName> registeredMBeans = new ArrayList<ObjectName>();
+
+
+
/**
* Constructor usually called by tomcat AzkabanServletContext to create the
* initial server
@@ -177,6 +181,9 @@ public class AzkabanWebServer implements AzkabanServer {
}
configureMBeanServer();
+
+
+
}
private void setViewerPlugins(List<ViewerPlugin> viewerPlugins) {
@@ -719,4 +726,5 @@ public class AzkabanWebServer implements AzkabanServer {
}
}
+
}
diff --git a/src/java/azkaban/webapp/servlet/ScheduleServlet.java b/src/java/azkaban/webapp/servlet/ScheduleServlet.java
index c382bbe..b236c56 100644
--- a/src/java/azkaban/webapp/servlet/ScheduleServlet.java
+++ b/src/java/azkaban/webapp/servlet/ScheduleServlet.java
@@ -49,6 +49,7 @@ import azkaban.webapp.AzkabanWebServer;
import azkaban.webapp.session.Session;
import azkaban.scheduler.Schedule;
import azkaban.scheduler.ScheduleManager;
+import azkaban.scheduler.ScheduleManagerException;
import azkaban.sla.SLA;
import azkaban.sla.SLA.SlaRule;
import azkaban.sla.SLA.SlaAction;
@@ -116,11 +117,9 @@ public class ScheduleServlet extends LoginAbstractAzkabanServlet {
SlaOptions slaOptions= new SlaOptions();
String slaEmails = getParam(req, "slaEmails");
- System.out.println(slaEmails);
String[] emailSplit = slaEmails.split("\\s*,\\s*|\\s*;\\s*|\\s+");
Map<String, String> settings = getParamGroup(req, "settings");
- System.out.println(settings);
List<SlaSetting> slaSettings = new ArrayList<SlaSetting>();
for(String set : settings.keySet()) {
SlaSetting s;
@@ -136,6 +135,10 @@ public class ScheduleServlet extends LoginAbstractAzkabanServlet {
}
if(slaSettings.size() > 0) {
+ if(slaEmails.equals("")) {
+ ret.put("error", "Please put correct email settings for your SLA actions");
+ return;
+ }
slaOptions.setSlaEmails(Arrays.asList(emailSplit));
slaOptions.setSettings(slaSettings);
}
@@ -150,12 +153,12 @@ public class ScheduleServlet extends LoginAbstractAzkabanServlet {
}
} catch (ServletException e) {
- ret.put("error", e);
+ ret.put("error", e.getMessage());
}
}
- private SlaSetting parseSlaSetting(String set) {
+ private SlaSetting parseSlaSetting(String set) throws ScheduleManagerException {
// "" + Duration + EmailAction + KillAction
String[] parts = set.split(",", -1);
String id = parts[0];
@@ -163,17 +166,23 @@ public class ScheduleServlet extends LoginAbstractAzkabanServlet {
String duration = parts[2];
String emailAction = parts[3];
String killAction = parts[4];
- if(emailAction.equals("on") || killAction.equals("on")) {
+ if(emailAction.equals("true") || killAction.equals("true")) {
SlaSetting r = new SlaSetting();
r.setId(id);
r.setRule(SlaRule.valueOf(rule));
- ReadablePeriod dur = parseDuration(duration);
+ ReadablePeriod dur;
+ try {
+ dur = parseDuration(duration);
+ }
+ catch (Exception e) {
+ throw new ScheduleManagerException("Unable to parse duration for a SLA that needs to take actions!", e);
+ }
r.setDuration(dur);
List<SlaAction> actions = new ArrayList<SLA.SlaAction>();
- if(emailAction.equals("on")) {
+ if(emailAction.equals("true")) {
actions.add(SlaAction.EMAIL);
}
- if(killAction.equals("on")) {
+ if(killAction.equals("true")) {
actions.add(SlaAction.KILL);
}
r.setActions(actions);
diff --git a/src/java/azkaban/webapp/servlet/velocity/executionspage.vm b/src/java/azkaban/webapp/servlet/velocity/executionspage.vm
index 40a4ddd..f31cb29 100644
--- a/src/java/azkaban/webapp/servlet/velocity/executionspage.vm
+++ b/src/java/azkaban/webapp/servlet/velocity/executionspage.vm
@@ -55,6 +55,7 @@
<th>Flow</th>
<th>Project</th>
<th class="user">User</th>
+ <th class="user">Proxy User</th>
<th class="date">Start Time</th>
<th class="date">End Time</th>
<th class="elapse">Elapsed</th>
@@ -74,6 +75,7 @@
<a href="${context}/manager?project=$vmutils.getProjectName(${flow.projectId})">$vmutils.getProjectName(${flow.projectId})</a>
</td>
<td>${flow.submitUser}</td>
+ <td>${flow.proxyUsers}</td>
<td>$utils.formatDate(${flow.startTime})</td>
<td>$utils.formatDate(${flow.endTime})</td>
<td>$utils.formatDuration(${flow.startTime}, ${flow.endTime})</td>
@@ -96,6 +98,7 @@
<th>Flow</th>
<th>Project</th>
<th class="user">User</th>
+ <th class="user">Proxy User</th>
<th class="date">Start Time</th>
<th class="date">End Time</th>
<th class="elapse">Elapsed</th>
@@ -115,6 +118,7 @@
<a href="${context}/manager?project=$vmutils.getProjectName(${flow.projectId})">$vmutils.getProjectName(${flow.projectId})</a>
</td>
<td>${flow.submitUser}</td>
+ <td>${flow.proxyUsers}</td>
<td>$utils.formatDate(${flow.startTime})</td>
<td>$utils.formatDate(${flow.endTime})</td>
<td>$utils.formatDuration(${flow.startTime}, ${flow.endTime})</td>
diff --git a/src/java/azkaban/webapp/servlet/velocity/scheduledflowpage.vm b/src/java/azkaban/webapp/servlet/velocity/scheduledflowpage.vm
index 9bf9b6b..189ea93 100644
--- a/src/java/azkaban/webapp/servlet/velocity/scheduledflowpage.vm
+++ b/src/java/azkaban/webapp/servlet/velocity/scheduledflowpage.vm
@@ -146,23 +146,7 @@
</tbody>
</table>
</div>
- <!--h4 style="visibility: hidden">Job SLA Rules</h4>
- <div class="tableDiv" style="visibility: hidden">
- <table id="jobRulesTbl">
- <thead>
- <tr>
- <th>Flow/Job Id</th>
- <th>Finish In</th>
- <th>Duration</th>
- <th>Email Action</th>
- <th>Kill Action</th>
- </tr>
- </thead>
- <tbody>
- <tr id="addRow"><td id="addRow-col" colspan="4"><span class="addIcon"></span><a href="#">Add Row</a></td></tr>
- </tbody>
- </table>
- </div-->
+
</div>
</div>
</div>
src/sql/create_execution_logs.sql 1(+1 -0)
diff --git a/src/sql/create_execution_logs.sql b/src/sql/create_execution_logs.sql
index a1eab7a..0a9cb0b 100644
--- a/src/sql/create_execution_logs.sql
+++ b/src/sql/create_execution_logs.sql
@@ -6,6 +6,7 @@ CREATE TABLE execution_logs (
start_byte INT,
end_byte INT,
log LONGBLOB,
+ upload_time BIGINT,
PRIMARY KEY (exec_id, name, attempt, start_byte),
INDEX log_attempt (exec_id, name, attempt),
INDEX log_index (exec_id, name)
src/sql/update_2.0_to_2.01.sql 1(+1 -0)
diff --git a/src/sql/update_2.0_to_2.01.sql b/src/sql/update_2.0_to_2.01.sql
index 309742e..aaa2c6d 100644
--- a/src/sql/update_2.0_to_2.01.sql
+++ b/src/sql/update_2.0_to_2.01.sql
@@ -7,6 +7,7 @@ ALTER TABLE execution_jobs ADD PRIMARY KEY(exec_id, job_id, attempt);
ALTER TABLE execution_jobs ADD INDEX exec_job (exec_id, job_id);
ALTER TABLE execution_logs ADD COLUMN attempt INT DEFAULT 0;
+ALTER TABLE execution_logs ADD COLUMN upload_time BIGINT DEFAULT 1364801450000;
ALTER TABLE execution_logs DROP PRIMARY KEY;
ALTER TABLE execution_logs ADD PRIMARY KEY(exec_id, name, attempt, start_byte);
ALTER TABLE execution_logs ADD INDEX log_attempt (exec_id, name, attempt)
src/web/js/azkaban.scheduled.view.js 6(+3 -3)
diff --git a/src/web/js/azkaban.scheduled.view.js b/src/web/js/azkaban.scheduled.view.js
index b66374b..2c1ee76 100644
--- a/src/web/js/azkaban.scheduled.view.js
+++ b/src/web/js/azkaban.scheduled.view.js
@@ -219,8 +219,8 @@ azkaban.ChangeSlaView = Backbone.View.extend({
var id = rFlowRule.cells[0].firstChild.value;
var rule = rFlowRule.cells[1].firstChild.value;
var duration = rFlowRule.cells[2].firstChild.value;
- var email = rFlowRule.cells[3].firstChild.value;
- var kill = rFlowRule.cells[4].firstChild.value;
+ var email = rFlowRule.cells[3].firstChild.checked;
+ var kill = rFlowRule.cells[4].firstChild.checked;
settings[row] = id + "," + rule + "," + duration + "," + email + "," + kill;
}
@@ -257,7 +257,7 @@ azkaban.ChangeSlaView = Backbone.View.extend({
var ruleBoxOptions = this.ruleBoxOptions;
var tFlowRules = document.getElementById("flowRulesTbl").tBodies[0];
- var rFlowRule = tFlowRules.insertRow(0);
+ var rFlowRule = tFlowRules.insertRow(tFlowRules.rows.length-1);
var cId = rFlowRule.insertCell(-1);
var idSelect = document.createElement("select");
diff --git a/unit/java/azkaban/test/execapp/MockExecutorLoader.java b/unit/java/azkaban/test/execapp/MockExecutorLoader.java
index 124c5d9..6510ae3 100644
--- a/unit/java/azkaban/test/execapp/MockExecutorLoader.java
+++ b/unit/java/azkaban/test/execapp/MockExecutorLoader.java
@@ -186,5 +186,12 @@ public class MockExecutorLoader implements ExecutorLoader {
return null;
}
+ @Override
+ public int removeExecutionLogsByTime(long millis)
+ throws ExecutorManagerException {
+ // TODO Auto-generated method stub
+ return 0;
+ }
+
}
\ No newline at end of file
diff --git a/unit/java/azkaban/test/executor/JdbcExecutorLoaderTest.java b/unit/java/azkaban/test/executor/JdbcExecutorLoaderTest.java
index 1fcd9ee..ad28bb3 100644
--- a/unit/java/azkaban/test/executor/JdbcExecutorLoaderTest.java
+++ b/unit/java/azkaban/test/executor/JdbcExecutorLoaderTest.java
@@ -16,6 +16,7 @@ import junit.framework.Assert;
import org.apache.commons.dbutils.DbUtils;
import org.apache.commons.dbutils.QueryRunner;
import org.apache.commons.dbutils.ResultSetHandler;
+import org.joda.time.DateTime;
import org.junit.BeforeClass;
import org.junit.Test;
@@ -39,9 +40,9 @@ import azkaban.utils.Props;
public class JdbcExecutorLoaderTest {
private static boolean testDBExists;
//@TODO remove this and turn into local host.
- private static final String host = "rpark-ld.linkedin.biz";
+ private static final String host = "cyu-ld.linkedin.biz";
private static final int port = 3306;
- private static final String database = "test";
+ private static final String database = "azkaban2";
private static final String user = "azkaban";
private static final String password = "azkaban";
private static final int numConnections = 10;
@@ -375,6 +376,32 @@ public class JdbcExecutorLoaderTest {
Assert.assertEquals("Logs length is " + logsResult6.getLength(), logsResult6.getLength(), 185493);
}
+ @Test
+ public void testRemoveExecutionLogsByTime() throws ExecutorManagerException, IOException, InterruptedException {
+
+ ExecutorLoader loader = createLoader();
+
+ File logDir = new File("unit/executions/logtest");
+
+ // Multiple of 255 for Henry the Eigth
+ File[] largelog = {new File(logDir, "largeLog1.log"), new File(logDir, "largeLog2.log"), new File(logDir, "largeLog3.log")};
+
+ DateTime time1 = DateTime.now();
+ loader.uploadLogFile(1, "oldlog", 0, largelog);
+ // sleep for 5 seconds
+ Thread.currentThread().sleep(5000);
+ loader.uploadLogFile(2, "newlog", 0, largelog);
+
+ DateTime time2 = time1.plusMillis(2500);
+
+ int count = loader.removeExecutionLogsByTime(time2.getMillis());
+ System.out.print("Removed " + count + " records");
+ LogData logs = loader.fetchLogs(1, "oldlog", 0, 0, 22222);
+ Assert.assertTrue(logs == null);
+ logs = loader.fetchLogs(2, "newlog", 0, 0, 22222);
+ Assert.assertFalse(logs == null);
+ }
+
private ExecutableFlow createExecutableFlow(int executionId, String flowName) throws IOException {
File jsonFlowFile = new File(flowDir, flowName + ".flow");
@SuppressWarnings("unchecked")
@@ -432,4 +459,5 @@ public class JdbcExecutorLoaderTest {
return val;
}
}
+
}
\ No newline at end of file