azkaban-uncached

Merge branch 'master' of github.com:azkaban/azkaban2 into

3/12/2013 11:43:39 PM

Details

diff --git a/src/java/azkaban/executor/ExecutorLoader.java b/src/java/azkaban/executor/ExecutorLoader.java
index 13eea40..c0108e2 100644
--- a/src/java/azkaban/executor/ExecutorLoader.java
+++ b/src/java/azkaban/executor/ExecutorLoader.java
@@ -70,5 +70,7 @@ public interface ExecutorLoader {
 	public Props fetchExecutionJobOutputProps(int execId, String jobId) throws ExecutorManagerException;
 	
 	public Pair<Props, Props> fetchExecutionJobProps(int execId, String jobId) throws ExecutorManagerException;
+
+	public int removeExecutionLogsByTime(long millis) throws ExecutorManagerException;
 	
 }
diff --git a/src/java/azkaban/executor/ExecutorManager.java b/src/java/azkaban/executor/ExecutorManager.java
index b18bc6c..bff70c2 100644
--- a/src/java/azkaban/executor/ExecutorManager.java
+++ b/src/java/azkaban/executor/ExecutorManager.java
@@ -35,6 +35,7 @@ import org.apache.http.client.utils.URIBuilder;
 import org.apache.http.impl.client.BasicResponseHandler;
 import org.apache.http.impl.client.DefaultHttpClient;
 import org.apache.log4j.Logger;
+import org.joda.time.DateTime;
 
 import azkaban.project.Project;
 import azkaban.utils.FileIOUtils.LogData;
@@ -52,12 +53,17 @@ public class ExecutorManager {
 	private String executorHost;
 	private int executorPort;
 	
+	private CleanerThread cleanerThread;
+	
 	private ConcurrentHashMap<Integer, Pair<ExecutionReference, ExecutableFlow>> runningFlows = new ConcurrentHashMap<Integer, Pair<ExecutionReference, ExecutableFlow>>();
 	private ConcurrentHashMap<Integer, ExecutableFlow> recentlyFinished = new ConcurrentHashMap<Integer, ExecutableFlow>();
 
 	private ExecutorMailer mailer;
 	private ExecutingManagerUpdaterThread executingManager;
 	
+	private static final long DEFAULT_EXECUTION_LOGS_RETENTION_MS = 3*4*7*24*60*60*1000l;
+	private long lastCleanerThreadCheckTime = -1;
+	
 	private long lastThreadCheckTime = -1;
 	
 	public ExecutorManager(Props props, ExecutorLoader loader) throws ExecutorManagerException {
@@ -69,6 +75,10 @@ public class ExecutorManager {
 		mailer = new ExecutorMailer(props);
 		executingManager = new ExecutingManagerUpdaterThread();
 		executingManager.start();
+
+		long executionLogsRetentionMs = props.getLong("azkaban.execution.logs.retention.ms", DEFAULT_EXECUTION_LOGS_RETENTION_MS);
+		cleanerThread = new CleanerThread(executionLogsRetentionMs);
+		cleanerThread.start();
 	}
 	
 	public String getExecutorHost() {
@@ -108,6 +118,7 @@ public class ExecutorManager {
 	
 	public boolean isFlowRunning(int projectId, String flowId) {
 		for (Pair<ExecutionReference, ExecutableFlow> ref : runningFlows.values()) {
+
 			if (ref.getSecond().getProjectId() == projectId && ref.getSecond().getFlowId().equals(flowId)) {
 				return true;
 			}
@@ -345,6 +356,17 @@ public class ExecutorManager {
 			return message;
 		}
 	}
+	
+	
+	public void cleanOldExecutionLogs(long millis) {
+		try {
+			int count = executorLoader.removeExecutionLogsByTime(millis);
+			logger.info("Cleaned up " + count + " log entries.");
+		}
+		catch (ExecutorManagerException e) {
+			e.printStackTrace();
+		}
+	}
 
 	private Map<String, Object> callExecutorServer(ExecutionReference ref, String action) throws ExecutorManagerException {
 		try {
@@ -809,4 +831,60 @@ public class ExecutorManager {
 		outputList.addAll(flows);
 		return executorLoader.fetchNumExecutableFlows(projectId, flowId);
 	}
+	
+	/* 
+	 * cleaner thread to clean up execution_logs, etc in DB. Runs every day.
+	 * 
+	 */
+	private class CleanerThread extends Thread {
+		// log file retention is 1 month.
+		
+		// check every day
+		private static final long CLEANER_THREAD_WAIT_INTERVAL_MS = 24*60*60*1000;
+		
+		private final long executionLogsRetentionMs;
+		
+		private boolean shutdown = false;
+		private long lastLogCleanTime = -1;
+		
+		public CleanerThread(long executionLogsRetentionMs) {
+			this.executionLogsRetentionMs = executionLogsRetentionMs;
+			this.setName("AzkabanWebServer-Cleaner-Thread");
+		}
+		
+		@SuppressWarnings("unused")
+		public void shutdown() {
+			shutdown = true;
+			this.interrupt();
+		}
+		
+		public void run() {
+			while (!shutdown) {
+				synchronized (this) {
+					try {
+						lastCleanerThreadCheckTime = System.currentTimeMillis();
+						
+						// Cleanup old stuff.
+						long currentTime = System.currentTimeMillis();
+						if (currentTime - CLEANER_THREAD_WAIT_INTERVAL_MS > lastLogCleanTime) {
+							cleanExecutionLogs();
+							lastLogCleanTime = currentTime;
+						}
+		
+						
+						wait(CLEANER_THREAD_WAIT_INTERVAL_MS);
+					} catch (InterruptedException e) {
+						logger.info("Interrupted. Probably to shut down.");
+					}
+				}
+			}
+		}
+
+		private void cleanExecutionLogs() {
+			logger.info("Cleaning old logs from execution_logs");
+			long cutoff = DateTime.now().getMillis() - executionLogsRetentionMs;
+			logger.info("Cleaning old log files before " + new DateTime(cutoff).toString());
+			cleanOldExecutionLogs(DateTime.now().getMillis() - executionLogsRetentionMs);
+		}
+	}
 }
diff --git a/src/java/azkaban/executor/JdbcExecutorLoader.java b/src/java/azkaban/executor/JdbcExecutorLoader.java
index 481a8fe..6464f6b 100644
--- a/src/java/azkaban/executor/JdbcExecutorLoader.java
+++ b/src/java/azkaban/executor/JdbcExecutorLoader.java
@@ -22,6 +22,7 @@ import org.apache.commons.dbutils.QueryRunner;
 import org.apache.commons.dbutils.ResultSetHandler;
 import org.apache.commons.io.IOUtils;
 import org.apache.log4j.Logger;
+import org.joda.time.DateTime;
 
 import azkaban.utils.DataSourceUtils;
 import azkaban.utils.FileIOUtils;
@@ -620,7 +621,7 @@ public class JdbcExecutorLoader implements ExecutorLoader {
 	}
 	
 	private void uploadLogPart(Connection connection, int execId, String name, int attempt, int startByte, int endByte, EncodingType encType, byte[] buffer, int length) throws SQLException, IOException {
-		final String INSERT_EXECUTION_LOGS = "INSERT INTO execution_logs (exec_id, name, attempt, enc_type, start_byte, end_byte, log) VALUES (?,?,?,?,?,?,?)";
+		final String INSERT_EXECUTION_LOGS = "INSERT INTO execution_logs (exec_id, name, attempt, enc_type, start_byte, end_byte, log, upload_time) VALUES (?,?,?,?,?,?,?,?)";
 		QueryRunner runner = new QueryRunner();
 		
 		byte[] buf = buffer;
@@ -631,7 +632,7 @@ public class JdbcExecutorLoader implements ExecutorLoader {
 			buf = Arrays.copyOf(buffer, length);
 		}
 		
-		runner.update(connection, INSERT_EXECUTION_LOGS, execId, name, attempt, encType.getNumVal(), startByte, startByte + length, buf);
+		runner.update(connection, INSERT_EXECUTION_LOGS, execId, name, attempt, encType.getNumVal(), startByte, startByte + length, buf, DateTime.now().getMillis());
 	}
 	
 	private Connection getConnection() throws ExecutorManagerException {
@@ -913,4 +914,19 @@ public class JdbcExecutorLoader implements ExecutorLoader {
 			return rs.getInt(1);
 		}
 	}
+
+	@Override
+	public int removeExecutionLogsByTime(long millis) throws ExecutorManagerException {
+		final String DELETE_BY_TIME = "DELETE FROM execution_logs WHERE upload_time < ?";
+		
+		QueryRunner runner = new QueryRunner(dataSource);
+		int updateNum = 0;
+		try {
+			updateNum = runner.update(DELETE_BY_TIME, millis);
+		} catch (SQLException e) {
+			throw new ExecutorManagerException("Error deleting old execution_logs before " + millis);
+		}
+		
+		return updateNum;
+	}
 }
\ No newline at end of file
diff --git a/src/java/azkaban/scheduler/JdbcScheduleLoader.java b/src/java/azkaban/scheduler/JdbcScheduleLoader.java
index 96130dd..5227a90 100644
--- a/src/java/azkaban/scheduler/JdbcScheduleLoader.java
+++ b/src/java/azkaban/scheduler/JdbcScheduleLoader.java
@@ -142,8 +142,12 @@ public class JdbcScheduleLoader implements ScheduleLoader {
 
 			DbUtils.closeQuietly(connection);
 			throw new ScheduleManagerException("Loading schedules from db failed. ", e);
+		} finally {
+			DbUtils.closeQuietly(connection);
 		}
 		
+		logger.info("Now trying to update the schedules");
+		
 		// filter the schedules
 		for(Schedule sched : schedules) {
 			if(!sched.updateTime()) {
@@ -152,17 +156,18 @@ public class JdbcScheduleLoader implements ScheduleLoader {
 				removeSchedule(sched);
 			}
 			else {
+				logger.info("Recurring schedule, need to update next exec time");
 				try {
 					updateNextExecTime(sched);
 				} catch (Exception e) {
-					DbUtils.closeQuietly(connection);
+					e.printStackTrace();
 					throw new ScheduleManagerException("Update next execution time failed.", e);
-				}
+				} 
 				logger.info("Schedule " + sched.getScheduleName() + " loaded and updated.");
 			}
 		}
 		
-		DbUtils.closeQuietly(connection);
+		
 				
 		logger.info("Loaded " + schedules.size() + " schedules.");
 		
@@ -238,13 +243,18 @@ public class JdbcScheduleLoader implements ScheduleLoader {
 	@Override
 	public void updateNextExecTime(Schedule s) throws ScheduleManagerException 
 	{
+		logger.info("Update schedule " + s.getScheduleName() + " into db. ");
 		Connection connection = getConnection();
 		QueryRunner runner = new QueryRunner();
 		try {
+			
 			runner.update(connection, UPDATE_NEXT_EXEC_TIME, s.getNextExecTime(), s.getProjectId(), s.getFlowName()); 
 		} catch (SQLException e) {
-			logger.error(UPDATE_NEXT_EXEC_TIME + " failed.");
+			e.printStackTrace();
+			logger.error(UPDATE_NEXT_EXEC_TIME + " failed.", e);
 			throw new ScheduleManagerException("Update schedule " + s.getScheduleName() + " into db failed. ", e);
+		} finally {
+			DbUtils.closeQuietly(connection);
 		}
 	}
 	
diff --git a/src/java/azkaban/scheduler/ScheduleManager.java b/src/java/azkaban/scheduler/ScheduleManager.java
index e298356..d1bbd6d 100644
--- a/src/java/azkaban/scheduler/ScheduleManager.java
+++ b/src/java/azkaban/scheduler/ScheduleManager.java
@@ -91,6 +91,7 @@ public class ScheduleManager {
 			scheduleList = loader.loadSchedules();
 		} catch (ScheduleManagerException e) {
 			// TODO Auto-generated catch block
+			logger.error("Failed to load schedules" + e.getCause() + e.getMessage());
 			e.printStackTrace();
 		}
 
@@ -338,6 +339,7 @@ public class ScheduleManager {
 							// If null, wake up every minute or so to see if
 							// there's something to do. Most likely there will not be.
 							try {
+								logger.info("Nothing scheduled to run. Checking again soon.");
 								nextWakupTime = System.currentTimeMillis() + TIMEOUT_MS;
 								this.wait(TIMEOUT_MS);
 							} catch (InterruptedException e) {
@@ -349,11 +351,12 @@ public class ScheduleManager {
 								// Run flow. The invocation of flows should be quick.
 								Schedule runningSched = schedules.poll();
 								
-								logger.info("Scheduler attempting to run " + runningSched.getScheduleName() );
+								logger.info("Scheduler attempting to run " + runningSched.toString() );
 								
 								// check if it is already running
 								if(!executorManager.isFlowRunning(runningSched.getProjectId(), runningSched.getFlowName()))
 								{
+									logger.info("Scheduler ready to run " + runningSched.toString());
 									// Execute the flow here
 									try {
 										Project project = projectManager.getProject(runningSched.getProjectId());
@@ -381,13 +384,13 @@ public class ScheduleManager {
 											executorManager.submitExecutableFlow(exflow);
 											logger.info("Scheduler has invoked " + exflow.getExecutionId());
 										} catch (Exception e) {	
-											logger.error("Scheduler invoked flow " + exflow.getExecutionId() + " has failed.");
-											logger.error(e.getMessage());
-											return;
+											e.printStackTrace();
+											throw new ScheduleManagerException("Scheduler invoked flow " + exflow.getExecutionId() + " has failed.", e);
 										}
 										
 										SlaOptions slaOptions = runningSched.getSlaOptions();
 										if(slaOptions != null) {
+											logger.info("Submitting SLA checkings for " + runningSched.getFlowName());
 											// submit flow slas
 											List<SlaSetting> jobsettings = new ArrayList<SlaSetting>();
 											for(SlaSetting set : slaOptions.getSettings()) {
@@ -412,7 +415,6 @@ public class ScheduleManager {
 								
 								removeRunnerSchedule(runningSched);
 
-
 								// Immediately reschedule if it's possible. Let
 								// the execution manager
 								// handle any duplicate runs.
diff --git a/src/java/azkaban/sla/SLAManager.java b/src/java/azkaban/sla/SLAManager.java
index 911f9e3..3c2a4fe 100644
--- a/src/java/azkaban/sla/SLAManager.java
+++ b/src/java/azkaban/sla/SLAManager.java
@@ -345,7 +345,7 @@ public class SLAManager {
 	}
 	
 	private void takeSLASuccessActions(SLA s, ExecutableFlow exflow) {
-		sendSlaSuccessEmail(s, exflow);
+		//sendSlaSuccessEmail(s, exflow);
 		
 	}
 	
diff --git a/src/java/azkaban/webapp/AzkabanWebServer.java b/src/java/azkaban/webapp/AzkabanWebServer.java
index 3e4a654..3ebd2d7 100644
--- a/src/java/azkaban/webapp/AzkabanWebServer.java
+++ b/src/java/azkaban/webapp/AzkabanWebServer.java
@@ -48,6 +48,7 @@ import org.mortbay.jetty.servlet.DefaultServlet;
 import org.mortbay.jetty.servlet.ServletHolder;
 import org.mortbay.thread.QueuedThreadPool;
 
+
 import azkaban.executor.ExecutorManager;
 import azkaban.executor.JdbcExecutorLoader;
 import azkaban.jmx.JmxExecutorManager;
@@ -142,6 +143,9 @@ public class AzkabanWebServer implements AzkabanServer {
 	private MBeanServer mbeanServer;
 	private ArrayList<ObjectName> registeredMBeans = new ArrayList<ObjectName>();
 
+	
+	
+	
 	/**
 	 * Constructor usually called by tomcat AzkabanServletContext to create the
 	 * initial server
@@ -177,6 +181,9 @@ public class AzkabanWebServer implements AzkabanServer {
 		}
 		
 		configureMBeanServer();
+		
+		
+		
 	}
 
 	private void setViewerPlugins(List<ViewerPlugin> viewerPlugins) {
@@ -719,4 +726,5 @@ public class AzkabanWebServer implements AzkabanServer {
 		}
 
 	}
+	
 }
diff --git a/src/java/azkaban/webapp/servlet/ScheduleServlet.java b/src/java/azkaban/webapp/servlet/ScheduleServlet.java
index c382bbe..b236c56 100644
--- a/src/java/azkaban/webapp/servlet/ScheduleServlet.java
+++ b/src/java/azkaban/webapp/servlet/ScheduleServlet.java
@@ -49,6 +49,7 @@ import azkaban.webapp.AzkabanWebServer;
 import azkaban.webapp.session.Session;
 import azkaban.scheduler.Schedule;
 import azkaban.scheduler.ScheduleManager;
+import azkaban.scheduler.ScheduleManagerException;
 import azkaban.sla.SLA;
 import azkaban.sla.SLA.SlaRule;
 import azkaban.sla.SLA.SlaAction;
@@ -116,11 +117,9 @@ public class ScheduleServlet extends LoginAbstractAzkabanServlet {
 			SlaOptions slaOptions= new SlaOptions();
 			
 			String slaEmails = getParam(req, "slaEmails");
-			System.out.println(slaEmails); 
 			String[] emailSplit = slaEmails.split("\\s*,\\s*|\\s*;\\s*|\\s+");
 			
 			Map<String, String> settings = getParamGroup(req, "settings");
-			System.out.println(settings);
 			List<SlaSetting> slaSettings = new ArrayList<SlaSetting>();
 			for(String set : settings.keySet()) {
 				SlaSetting s;
@@ -136,6 +135,10 @@ public class ScheduleServlet extends LoginAbstractAzkabanServlet {
 			}
 			
 			if(slaSettings.size() > 0) {
+				if(slaEmails.equals("")) {
+					ret.put("error", "Please put correct email settings for your SLA actions");
+					return;
+				}
 				slaOptions.setSlaEmails(Arrays.asList(emailSplit));
 				slaOptions.setSettings(slaSettings);
 			}
@@ -150,12 +153,12 @@ public class ScheduleServlet extends LoginAbstractAzkabanServlet {
 			}
 			
 		} catch (ServletException e) {
-			ret.put("error", e);
+			ret.put("error", e.getMessage());
 		}
 		
 	}
 	
-	private SlaSetting parseSlaSetting(String set) {
+	private SlaSetting parseSlaSetting(String set) throws ScheduleManagerException {
 		// "" + Duration + EmailAction + KillAction
 		String[] parts = set.split(",", -1);
 		String id = parts[0];
@@ -163,17 +166,23 @@ public class ScheduleServlet extends LoginAbstractAzkabanServlet {
 		String duration = parts[2];
 		String emailAction = parts[3];
 		String killAction = parts[4];
-		if(emailAction.equals("on") || killAction.equals("on")) {
+		if(emailAction.equals("true") || killAction.equals("true")) {
 			SlaSetting r = new SlaSetting();			
 			r.setId(id);
 			r.setRule(SlaRule.valueOf(rule));
-			ReadablePeriod dur = parseDuration(duration);
+			ReadablePeriod dur;
+			try {
+				dur = parseDuration(duration);
+			}
+			catch (Exception e) {
+				throw new ScheduleManagerException("Unable to parse duration for a SLA that needs to take actions!", e);
+			}
 			r.setDuration(dur);
 			List<SlaAction> actions = new ArrayList<SLA.SlaAction>();
-			if(emailAction.equals("on")) {
+			if(emailAction.equals("true")) {
 				actions.add(SlaAction.EMAIL);
 			}
-			if(killAction.equals("on")) {
+			if(killAction.equals("true")) {
 				actions.add(SlaAction.KILL);
 			}
 			r.setActions(actions);
diff --git a/src/java/azkaban/webapp/servlet/velocity/executionspage.vm b/src/java/azkaban/webapp/servlet/velocity/executionspage.vm
index 40a4ddd..f31cb29 100644
--- a/src/java/azkaban/webapp/servlet/velocity/executionspage.vm
+++ b/src/java/azkaban/webapp/servlet/velocity/executionspage.vm
@@ -55,6 +55,7 @@
 							<th>Flow</th>
 							<th>Project</th>
 							<th class="user">User</th>
+							<th class="user">Proxy User</th>
 							<th class="date">Start Time</th>
 							<th class="date">End Time</th>
 							<th class="elapse">Elapsed</th>
@@ -74,6 +75,7 @@
 								<a href="${context}/manager?project=$vmutils.getProjectName(${flow.projectId})">$vmutils.getProjectName(${flow.projectId})</a>
 							</td>
 							<td>${flow.submitUser}</td>
+							<td>${flow.proxyUsers}</td>
 							<td>$utils.formatDate(${flow.startTime})</td>
 							<td>$utils.formatDate(${flow.endTime})</td>
 							<td>$utils.formatDuration(${flow.startTime}, ${flow.endTime})</td>
@@ -96,6 +98,7 @@
 							<th>Flow</th>
 							<th>Project</th>
 							<th class="user">User</th>
+							<th class="user">Proxy User</th>
 							<th class="date">Start Time</th>
 							<th class="date">End Time</th>
 							<th class="elapse">Elapsed</th>
@@ -115,6 +118,7 @@
 								<a href="${context}/manager?project=$vmutils.getProjectName(${flow.projectId})">$vmutils.getProjectName(${flow.projectId})</a>
 							</td>
 							<td>${flow.submitUser}</td>
+							<td>${flow.proxyUsers}</td>
 							<td>$utils.formatDate(${flow.startTime})</td>
 							<td>$utils.formatDate(${flow.endTime})</td>
 							<td>$utils.formatDuration(${flow.startTime}, ${flow.endTime})</td>
diff --git a/src/java/azkaban/webapp/servlet/velocity/scheduledflowpage.vm b/src/java/azkaban/webapp/servlet/velocity/scheduledflowpage.vm
index 9bf9b6b..189ea93 100644
--- a/src/java/azkaban/webapp/servlet/velocity/scheduledflowpage.vm
+++ b/src/java/azkaban/webapp/servlet/velocity/scheduledflowpage.vm
@@ -146,23 +146,7 @@
 										</tbody>
 									</table>
 								</div>
-								<!--h4 style="visibility: hidden">Job SLA Rules</h4>
-								<div class="tableDiv" style="visibility: hidden">
-									<table id="jobRulesTbl">
-										<thead>
-											<tr>
-												<th>Flow/Job Id</th>
-												<th>Finish In</th>
-												<th>Duration</th>
-												<th>Email Action</th>
-												<th>Kill Action</th>
-											</tr>
-										</thead>
-										<tbody>
-											<tr id="addRow"><td id="addRow-col" colspan="4"><span class="addIcon"></span><a href="#">Add Row</a></td></tr>
-										</tbody>
-									</table>
-								</div-->
+							
 							</div>
 						</div>
 					</div>
diff --git a/src/sql/create_execution_logs.sql b/src/sql/create_execution_logs.sql
index a1eab7a..0a9cb0b 100644
--- a/src/sql/create_execution_logs.sql
+++ b/src/sql/create_execution_logs.sql
@@ -6,6 +6,7 @@ CREATE TABLE execution_logs (
 	start_byte INT,
 	end_byte INT,
 	log LONGBLOB,
+	upload_time BIGINT,
 	PRIMARY KEY (exec_id, name, attempt, start_byte),
 	INDEX log_attempt (exec_id, name, attempt),
 	INDEX log_index (exec_id, name)
diff --git a/src/sql/update_2.0_to_2.01.sql b/src/sql/update_2.0_to_2.01.sql
index 309742e..aaa2c6d 100644
--- a/src/sql/update_2.0_to_2.01.sql
+++ b/src/sql/update_2.0_to_2.01.sql
@@ -7,6 +7,7 @@ ALTER TABLE execution_jobs ADD PRIMARY KEY(exec_id, job_id, attempt);
 ALTER TABLE execution_jobs ADD INDEX exec_job (exec_id, job_id);
 
 ALTER TABLE execution_logs ADD COLUMN attempt INT DEFAULT 0;
+ALTER TABLE execution_logs ADD COLUMN upload_time BIGINT DEFAULT 1364801450000;
 ALTER TABLE execution_logs DROP PRIMARY KEY;
 ALTER TABLE execution_logs ADD PRIMARY KEY(exec_id, name, attempt, start_byte);
 ALTER TABLE execution_logs ADD INDEX log_attempt (exec_id, name, attempt)
diff --git a/src/web/js/azkaban.scheduled.view.js b/src/web/js/azkaban.scheduled.view.js
index b66374b..2c1ee76 100644
--- a/src/web/js/azkaban.scheduled.view.js
+++ b/src/web/js/azkaban.scheduled.view.js
@@ -219,8 +219,8 @@ azkaban.ChangeSlaView = Backbone.View.extend({
 			var id = rFlowRule.cells[0].firstChild.value;
 			var rule = rFlowRule.cells[1].firstChild.value;
 			var duration = rFlowRule.cells[2].firstChild.value;
-			var email = rFlowRule.cells[3].firstChild.value;
-			var kill = rFlowRule.cells[4].firstChild.value;
+			var email = rFlowRule.cells[3].firstChild.checked;
+			var kill = rFlowRule.cells[4].firstChild.checked;
 			settings[row] = id + "," + rule + "," + duration + "," + email + "," + kill; 
 		}
 
@@ -257,7 +257,7 @@ azkaban.ChangeSlaView = Backbone.View.extend({
 		var ruleBoxOptions = this.ruleBoxOptions;
 
 		var tFlowRules = document.getElementById("flowRulesTbl").tBodies[0];
-		var rFlowRule = tFlowRules.insertRow(0);
+		var rFlowRule = tFlowRules.insertRow(tFlowRules.rows.length-1);
 		
 		var cId = rFlowRule.insertCell(-1);
 		var idSelect = document.createElement("select");
diff --git a/unit/java/azkaban/test/execapp/MockExecutorLoader.java b/unit/java/azkaban/test/execapp/MockExecutorLoader.java
index 124c5d9..6510ae3 100644
--- a/unit/java/azkaban/test/execapp/MockExecutorLoader.java
+++ b/unit/java/azkaban/test/execapp/MockExecutorLoader.java
@@ -186,5 +186,12 @@ public class MockExecutorLoader implements ExecutorLoader {
 		return null;
 	}
 
+	@Override
+	public int removeExecutionLogsByTime(long millis)
+			throws ExecutorManagerException {
+		// TODO Auto-generated method stub
+		return 0;
+	}
+
 
 }
\ No newline at end of file
diff --git a/unit/java/azkaban/test/executor/JdbcExecutorLoaderTest.java b/unit/java/azkaban/test/executor/JdbcExecutorLoaderTest.java
index 1fcd9ee..ad28bb3 100644
--- a/unit/java/azkaban/test/executor/JdbcExecutorLoaderTest.java
+++ b/unit/java/azkaban/test/executor/JdbcExecutorLoaderTest.java
@@ -16,6 +16,7 @@ import junit.framework.Assert;
 import org.apache.commons.dbutils.DbUtils;
 import org.apache.commons.dbutils.QueryRunner;
 import org.apache.commons.dbutils.ResultSetHandler;
+import org.joda.time.DateTime;
 import org.junit.BeforeClass;
 import org.junit.Test;
 
@@ -39,9 +40,9 @@ import azkaban.utils.Props;
 public class JdbcExecutorLoaderTest {
 	private static boolean testDBExists;
 	//@TODO remove this and turn into local host.
-	private static final String host = "rpark-ld.linkedin.biz";
+	private static final String host = "cyu-ld.linkedin.biz";
 	private static final int port = 3306;
-	private static final String database = "test";
+	private static final String database = "azkaban2";
 	private static final String user = "azkaban";
 	private static final String password = "azkaban";
 	private static final int numConnections = 10;
@@ -375,6 +376,32 @@ public class JdbcExecutorLoaderTest {
 		Assert.assertEquals("Logs length is " + logsResult6.getLength(), logsResult6.getLength(), 185493);
 	}
 	
+	@Test
+	public void testRemoveExecutionLogsByTime() throws ExecutorManagerException, IOException, InterruptedException {
+		
+		ExecutorLoader loader = createLoader();
+		
+		File logDir = new File("unit/executions/logtest");		
+		
+		// Multiple of 255 for Henry the Eigth
+		File[] largelog = {new File(logDir, "largeLog1.log"), new File(logDir, "largeLog2.log"), new File(logDir, "largeLog3.log")};
+		
+		DateTime time1 = DateTime.now();
+		loader.uploadLogFile(1, "oldlog", 0, largelog);
+		// sleep for 5 seconds
+		Thread.currentThread().sleep(5000);
+		loader.uploadLogFile(2, "newlog", 0, largelog);
+		
+		DateTime time2 = time1.plusMillis(2500);
+				
+		int count = loader.removeExecutionLogsByTime(time2.getMillis());
+		System.out.print("Removed " + count + " records");
+		LogData logs = loader.fetchLogs(1, "oldlog", 0, 0, 22222);
+		Assert.assertTrue(logs == null);
+		logs = loader.fetchLogs(2, "newlog", 0, 0, 22222);
+		Assert.assertFalse(logs == null);
+	}
+	
 	private ExecutableFlow createExecutableFlow(int executionId, String flowName) throws IOException {
 		File jsonFlowFile = new File(flowDir, flowName + ".flow");
 		@SuppressWarnings("unchecked")
@@ -432,4 +459,5 @@ public class JdbcExecutorLoaderTest {
 			return val;
 		}
 	}
+	
 }
\ No newline at end of file