azkaban-uncached

moved log cleanup thread to executor, added upload time to execution_logs

3/8/2013 9:44:44 PM

Details

diff --git a/src/java/azkaban/executor/ExecutorLoader.java b/src/java/azkaban/executor/ExecutorLoader.java
index 13eea40..c0108e2 100644
--- a/src/java/azkaban/executor/ExecutorLoader.java
+++ b/src/java/azkaban/executor/ExecutorLoader.java
@@ -70,5 +70,7 @@ public interface ExecutorLoader {
 	public Props fetchExecutionJobOutputProps(int execId, String jobId) throws ExecutorManagerException;
 	
 	public Pair<Props, Props> fetchExecutionJobProps(int execId, String jobId) throws ExecutorManagerException;
+
+	public int removeExecutionLogsByTime(long millis) throws ExecutorManagerException;
 	
 }
diff --git a/src/java/azkaban/executor/ExecutorManager.java b/src/java/azkaban/executor/ExecutorManager.java
index 6cd9d48..7314b21 100644
--- a/src/java/azkaban/executor/ExecutorManager.java
+++ b/src/java/azkaban/executor/ExecutorManager.java
@@ -34,6 +34,7 @@ import org.apache.http.client.utils.URIBuilder;
 import org.apache.http.impl.client.BasicResponseHandler;
 import org.apache.http.impl.client.DefaultHttpClient;
 import org.apache.log4j.Logger;
+import org.joda.time.DateTime;
 
 import azkaban.executor.ExecutableFlow.Status;
 import azkaban.project.Project;
@@ -52,12 +53,16 @@ public class ExecutorManager {
 	private String executorHost;
 	private int executorPort;
 	
+	private CleanerThread cleanerThread;
+	
 	private ConcurrentHashMap<Integer, Pair<ExecutionReference, ExecutableFlow>> runningFlows = new ConcurrentHashMap<Integer, Pair<ExecutionReference, ExecutableFlow>>();
 	private ConcurrentHashMap<Integer, ExecutableFlow> recentlyFinished = new ConcurrentHashMap<Integer, ExecutableFlow>();
 	
 	private ExecutorMailer mailer;
 	private ExecutingManagerUpdaterThread executingManager;
 	
+	private long lastCleanerThreadCheckTime = -1;
+	
 	private long lastThreadCheckTime = -1;
 	
 	public ExecutorManager(Props props, ExecutorLoader loader) throws ExecutorManagerException {
@@ -69,6 +74,9 @@ public class ExecutorManager {
 		mailer = new ExecutorMailer(props);
 		executingManager = new ExecutingManagerUpdaterThread();
 		executingManager.start();
+		
+		cleanerThread = new CleanerThread();
+		cleanerThread.start();
 	}
 	
 	public String getExecutorHost() {
@@ -310,6 +318,17 @@ public class ExecutorManager {
 			}
 		}
 	}
+	
+	
+	public void cleanOldExecutionLogs(long millis) {
+		try {
+			int count = executorLoader.removeExecutionLogsByTime(millis);
+			logger.info("Cleaned up " + count + " log entries.");
+		}
+		catch (ExecutorManagerException e) {
+			e.printStackTrace();
+		}
+	}
 
 	private Map<String, Object> callExecutorServer(ExecutionReference ref, String action) throws ExecutorManagerException {
 		try {
@@ -764,4 +783,56 @@ public class ExecutorManager {
 		outputList.addAll(flows);
 		return executorLoader.fetchNumExecutableFlows(projectId, flowId);
 	}
+	
+	/* 
+	 * cleaner thread to clean up execution_logs, etc in DB. Runs every day.
+	 * 
+	 */
+	private class CleanerThread extends Thread {
+		// log file retention is 1 month.
+		private static final long EXECUTION_LOGS_RETENTION_MS = 3*4*7*24*60*60*1000;
+		// check every day
+		private static final long CLEANER_THREAD_WAIT_INTERVAL_MS = 24*60*60*1000;
+		
+		private boolean shutdown = false;
+		private long lastLogCleanTime = -1;
+		
+		public CleanerThread() {
+			this.setName("AzkabanWebServer-Cleaner-Thread");
+		}
+		
+		@SuppressWarnings("unused")
+		public void shutdown() {
+			shutdown = true;
+			this.interrupt();
+		}
+		
+		public void run() {
+			while (!shutdown) {
+				synchronized (this) {
+					try {
+						lastCleanerThreadCheckTime = System.currentTimeMillis();
+						
+						// Cleanup old stuff.
+						long currentTime = System.currentTimeMillis();
+						if (currentTime - CLEANER_THREAD_WAIT_INTERVAL_MS > lastLogCleanTime) {
+							cleanExecutionLogs();
+							lastLogCleanTime = currentTime;
+						}
+		
+						
+						wait(CLEANER_THREAD_WAIT_INTERVAL_MS);
+					} catch (InterruptedException e) {
+						logger.info("Interrupted. Probably to shut down.");
+					}
+				}
+			}
+		}
+
+		private void cleanExecutionLogs() {
+			logger.info("Cleaning old logs from execution_logs");
+			logger.info("Cleaning old log files before " + DateTime.now() + " or in milliseconds: " + DateTime.now().getMillis());
+			cleanOldExecutionLogs(DateTime.now().getMillis() - EXECUTION_LOGS_RETENTION_MS);
+		}
+	}
 }
diff --git a/src/java/azkaban/executor/JdbcExecutorLoader.java b/src/java/azkaban/executor/JdbcExecutorLoader.java
index c70d9f5..d0a1763 100644
--- a/src/java/azkaban/executor/JdbcExecutorLoader.java
+++ b/src/java/azkaban/executor/JdbcExecutorLoader.java
@@ -22,6 +22,7 @@ import org.apache.commons.dbutils.QueryRunner;
 import org.apache.commons.dbutils.ResultSetHandler;
 import org.apache.commons.io.IOUtils;
 import org.apache.log4j.Logger;
+import org.joda.time.DateTime;
 
 import azkaban.executor.ExecutableFlow.Status;
 import azkaban.utils.DataSourceUtils;
@@ -621,7 +622,7 @@ public class JdbcExecutorLoader implements ExecutorLoader {
 	}
 	
 	private void uploadLogPart(Connection connection, int execId, String name, int attempt, int startByte, int endByte, EncodingType encType, byte[] buffer, int length) throws SQLException, IOException {
-		final String INSERT_EXECUTION_LOGS = "INSERT INTO execution_logs (exec_id, name, attempt, enc_type, start_byte, end_byte, log) VALUES (?,?,?,?,?,?,?)";
+		final String INSERT_EXECUTION_LOGS = "INSERT INTO execution_logs (exec_id, name, attempt, enc_type, start_byte, end_byte, log, upload_time) VALUES (?,?,?,?,?,?,?,?)";
 		QueryRunner runner = new QueryRunner();
 		
 		byte[] buf = buffer;
@@ -632,7 +633,7 @@ public class JdbcExecutorLoader implements ExecutorLoader {
 			buf = Arrays.copyOf(buffer, length);
 		}
 		
-		runner.update(connection, INSERT_EXECUTION_LOGS, execId, name, attempt, encType.getNumVal(), startByte, startByte + length, buf);
+		runner.update(connection, INSERT_EXECUTION_LOGS, execId, name, attempt, encType.getNumVal(), startByte, startByte + length, buf, DateTime.now().getMillis());
 	}
 	
 	private Connection getConnection() throws ExecutorManagerException {
@@ -913,4 +914,19 @@ public class JdbcExecutorLoader implements ExecutorLoader {
 			return rs.getInt(1);
 		}
 	}
+
+	@Override
+	public int removeExecutionLogsByTime(long millis) throws ExecutorManagerException {
+		final String DELETE_BY_TIME = "DELETE FROM execution_logs WHERE upload_time < ?";
+		
+		QueryRunner runner = new QueryRunner(dataSource);
+		int updateNum = 0;
+		try {
+			updateNum = runner.update(DELETE_BY_TIME, millis);
+		} catch (SQLException e) {
+			throw new ExecutorManagerException("Error deleting old execution_logs before " + millis);
+		}
+		
+		return updateNum;
+	}
 }
\ No newline at end of file
diff --git a/src/java/azkaban/scheduler/ScheduleManager.java b/src/java/azkaban/scheduler/ScheduleManager.java
index 18d4790..cae4fa5 100644
--- a/src/java/azkaban/scheduler/ScheduleManager.java
+++ b/src/java/azkaban/scheduler/ScheduleManager.java
@@ -34,7 +34,6 @@ import org.joda.time.format.DateTimeFormatter;
 
 import azkaban.executor.ExecutableFlow;
 import azkaban.executor.ExecutorManager;
-import azkaban.executor.ExecutorManagerException;
 import azkaban.executor.ExecutableFlow.Status;
 
 import azkaban.flow.Flow;
diff --git a/src/java/azkaban/webapp/AzkabanWebServer.java b/src/java/azkaban/webapp/AzkabanWebServer.java
index bbdfbf2..c86204f 100644
--- a/src/java/azkaban/webapp/AzkabanWebServer.java
+++ b/src/java/azkaban/webapp/AzkabanWebServer.java
@@ -35,7 +35,6 @@ import javax.management.MBeanServer;
 import javax.management.ObjectName;
 
 import org.apache.commons.lang.StringUtils;
-import org.apache.log4j.FileAppender;
 import org.apache.log4j.Logger;
 import org.apache.velocity.app.VelocityEngine;
 import org.apache.velocity.runtime.log.Log4JLogChute;
@@ -49,6 +48,7 @@ import org.mortbay.jetty.servlet.DefaultServlet;
 import org.mortbay.jetty.servlet.ServletHolder;
 import org.mortbay.thread.QueuedThreadPool;
 
+
 import azkaban.executor.ExecutorManager;
 import azkaban.executor.JdbcExecutorLoader;
 import azkaban.jmx.JmxExecutorManager;
@@ -143,6 +143,9 @@ public class AzkabanWebServer implements AzkabanServer {
 	private MBeanServer mbeanServer;
 	private ArrayList<ObjectName> registeredMBeans = new ArrayList<ObjectName>();
 
+	
+	
+	
 	/**
 	 * Constructor usually called by tomcat AzkabanServletContext to create the
 	 * initial server
@@ -178,6 +181,9 @@ public class AzkabanWebServer implements AzkabanServer {
 		}
 		
 		configureMBeanServer();
+		
+		
+		
 	}
 	
 	private void setupLoggers() {
@@ -723,4 +729,5 @@ public class AzkabanWebServer implements AzkabanServer {
 		}
 
 	}
+	
 }
diff --git a/src/java/azkaban/webapp/servlet/ScheduleServlet.java b/src/java/azkaban/webapp/servlet/ScheduleServlet.java
index 489299a..4365986 100644
--- a/src/java/azkaban/webapp/servlet/ScheduleServlet.java
+++ b/src/java/azkaban/webapp/servlet/ScheduleServlet.java
@@ -27,22 +27,16 @@ import javax.servlet.ServletConfig;
 import javax.servlet.ServletException;
 import javax.servlet.http.HttpServletRequest;
 import javax.servlet.http.HttpServletResponse;
-import javax.swing.text.StyledEditorKit.BoldAction;
 
 import org.apache.log4j.Logger;
 import org.joda.time.DateTime;
 import org.joda.time.DateTimeZone;
-import org.joda.time.Hours;
 import org.joda.time.LocalDateTime;
 import org.joda.time.Minutes;
 import org.joda.time.ReadablePeriod;
 import org.joda.time.format.DateTimeFormat;
 
-import azkaban.executor.ExecutableFlow;
-import azkaban.executor.ExecutableNode;
-import azkaban.executor.ExecutorManagerException;
 import azkaban.executor.ExecutableFlow.FailureAction;
-import azkaban.executor.ExecutableFlow.Status;
 import azkaban.flow.Flow;
 import azkaban.flow.Node;
 import azkaban.project.Project;
@@ -62,7 +56,6 @@ import azkaban.scheduler.Schedule.SlaOptions;
 import azkaban.scheduler.ScheduleManager;
 import azkaban.sla.SLA;
 import azkaban.sla.SLA.SlaRule;
-import azkaban.sla.SLAManager;
 import azkaban.sla.SLA.SlaAction;
 import azkaban.sla.SLA.SlaSetting;
 
@@ -71,7 +64,6 @@ public class ScheduleServlet extends LoginAbstractAzkabanServlet {
 	private static final Logger logger = Logger.getLogger(ScheduleServlet.class);
 	private ProjectManager projectManager;
 	private ScheduleManager scheduleManager;
-	private SLAManager slaManager;
 	private UserManager userManager;
 
 	@Override
@@ -81,7 +73,6 @@ public class ScheduleServlet extends LoginAbstractAzkabanServlet {
 		projectManager = server.getProjectManager();
 		scheduleManager = server.getScheduleManager();
 		userManager = server.getUserManager();
-		slaManager = server.getSLAManager();
 	}
 	
 	@Override
diff --git a/src/sql/create_execution_logs.sql b/src/sql/create_execution_logs.sql
index 69e0cad..3585011 100644
--- a/src/sql/create_execution_logs.sql
+++ b/src/sql/create_execution_logs.sql
@@ -6,6 +6,7 @@ CREATE TABLE execution_logs (
 	start_byte INT,
 	end_byte INT,
 	log LONGBLOB,
+	upload_time BIGINT,
 	PRIMARY KEY (exec_id, name, attempt, start_byte),
 	INDEX log_index (exec_id, name),
 	INDEX byte_log_index(exec_id, name, start_byte, end_byte)
diff --git a/src/sql/update_2.0_to_2.01.sql b/src/sql/update_2.0_to_2.01.sql
index 493df5a..736f281 100644
--- a/src/sql/update_2.0_to_2.01.sql
+++ b/src/sql/update_2.0_to_2.01.sql
@@ -7,6 +7,7 @@ ALTER TABLE execution_jobs ADD PRIMARY KEY(exec_id, job_id, attempt);
 ALTER TABLE execution_jobs ADD INDEX exec_job (exec_id, job_id);
 
 ALTER TABLE execution_logs ADD COLUMN attempt INT DEFAULT 0;
+ALTER TABLE execution_logs ADD COLUMN upload_time BIGINT DEFAULT 1364801450000;
 ALTER TABLE execution_logs DROP PRIMARY KEY;
 ALTER TABLE execution_logs ADD PRIMARY KEY(exec_id, name, attempt, start_byte);
 
diff --git a/unit/java/azkaban/test/execapp/MockExecutorLoader.java b/unit/java/azkaban/test/execapp/MockExecutorLoader.java
index 124c5d9..6510ae3 100644
--- a/unit/java/azkaban/test/execapp/MockExecutorLoader.java
+++ b/unit/java/azkaban/test/execapp/MockExecutorLoader.java
@@ -186,5 +186,12 @@ public class MockExecutorLoader implements ExecutorLoader {
 		return null;
 	}
 
+	@Override
+	public int removeExecutionLogsByTime(long millis)
+			throws ExecutorManagerException {
+		// TODO Auto-generated method stub
+		return 0;
+	}
+
 
 }
\ No newline at end of file
diff --git a/unit/java/azkaban/test/executor/JdbcExecutorLoaderTest.java b/unit/java/azkaban/test/executor/JdbcExecutorLoaderTest.java
index ba673dd..832eb22 100644
--- a/unit/java/azkaban/test/executor/JdbcExecutorLoaderTest.java
+++ b/unit/java/azkaban/test/executor/JdbcExecutorLoaderTest.java
@@ -16,6 +16,7 @@ import junit.framework.Assert;
 import org.apache.commons.dbutils.DbUtils;
 import org.apache.commons.dbutils.QueryRunner;
 import org.apache.commons.dbutils.ResultSetHandler;
+import org.joda.time.DateTime;
 import org.junit.BeforeClass;
 import org.junit.Test;
 
@@ -39,9 +40,9 @@ import azkaban.utils.Props;
 public class JdbcExecutorLoaderTest {
 	private static boolean testDBExists;
 	//@TODO remove this and turn into local host.
-	private static final String host = "rpark-ld.linkedin.biz";
+	private static final String host = "cyu-ld.linkedin.biz";
 	private static final int port = 3306;
-	private static final String database = "test";
+	private static final String database = "azkaban2";
 	private static final String user = "azkaban";
 	private static final String password = "azkaban";
 	private static final int numConnections = 10;
@@ -375,6 +376,32 @@ public class JdbcExecutorLoaderTest {
 		Assert.assertEquals("Logs length is " + logsResult6.getLength(), logsResult6.getLength(), 185493);
 	}
 	
+	@Test
+	public void testRemoveExecutionLogsByTime() throws ExecutorManagerException, IOException, InterruptedException {
+		
+		ExecutorLoader loader = createLoader();
+		
+		File logDir = new File("unit/executions/logtest");		
+		
+		// Multiple of 255 for Henry the Eigth
+		File[] largelog = {new File(logDir, "largeLog1.log"), new File(logDir, "largeLog2.log"), new File(logDir, "largeLog3.log")};
+		
+		DateTime time1 = DateTime.now();
+		loader.uploadLogFile(1, "oldlog", 0, largelog);
+		// sleep for 5 seconds
+		Thread.currentThread().sleep(5000);
+		loader.uploadLogFile(2, "newlog", 0, largelog);
+		
+		DateTime time2 = time1.plusMillis(2500);
+				
+		int count = loader.removeExecutionLogsByTime(time2.getMillis());
+		System.out.print("Removed " + count + " records");
+		LogData logs = loader.fetchLogs(1, "oldlog", 0, 0, 22222);
+		Assert.assertTrue(logs == null);
+		logs = loader.fetchLogs(2, "newlog", 0, 0, 22222);
+		Assert.assertFalse(logs == null);
+	}
+	
 	private ExecutableFlow createExecutableFlow(int executionId, String flowName) throws IOException {
 		File jsonFlowFile = new File(flowDir, flowName + ".flow");
 		@SuppressWarnings("unchecked")
@@ -432,4 +459,5 @@ public class JdbcExecutorLoaderTest {
 			return val;
 		}
 	}
+	
 }
\ No newline at end of file