azkaban-uncached
Details
diff --git a/src/java/azkaban/executor/ExecutorLoader.java b/src/java/azkaban/executor/ExecutorLoader.java
index 13eea40..c0108e2 100644
--- a/src/java/azkaban/executor/ExecutorLoader.java
+++ b/src/java/azkaban/executor/ExecutorLoader.java
@@ -70,5 +70,7 @@ public interface ExecutorLoader {
public Props fetchExecutionJobOutputProps(int execId, String jobId) throws ExecutorManagerException;
public Pair<Props, Props> fetchExecutionJobProps(int execId, String jobId) throws ExecutorManagerException;
+
+ public int removeExecutionLogsByTime(long millis) throws ExecutorManagerException;
}
diff --git a/src/java/azkaban/executor/ExecutorManager.java b/src/java/azkaban/executor/ExecutorManager.java
index 6cd9d48..7314b21 100644
--- a/src/java/azkaban/executor/ExecutorManager.java
+++ b/src/java/azkaban/executor/ExecutorManager.java
@@ -34,6 +34,7 @@ import org.apache.http.client.utils.URIBuilder;
import org.apache.http.impl.client.BasicResponseHandler;
import org.apache.http.impl.client.DefaultHttpClient;
import org.apache.log4j.Logger;
+import org.joda.time.DateTime;
import azkaban.executor.ExecutableFlow.Status;
import azkaban.project.Project;
@@ -52,12 +53,16 @@ public class ExecutorManager {
private String executorHost;
private int executorPort;
+ private CleanerThread cleanerThread;
+
private ConcurrentHashMap<Integer, Pair<ExecutionReference, ExecutableFlow>> runningFlows = new ConcurrentHashMap<Integer, Pair<ExecutionReference, ExecutableFlow>>();
private ConcurrentHashMap<Integer, ExecutableFlow> recentlyFinished = new ConcurrentHashMap<Integer, ExecutableFlow>();
private ExecutorMailer mailer;
private ExecutingManagerUpdaterThread executingManager;
+ private long lastCleanerThreadCheckTime = -1;
+
private long lastThreadCheckTime = -1;
public ExecutorManager(Props props, ExecutorLoader loader) throws ExecutorManagerException {
@@ -69,6 +74,9 @@ public class ExecutorManager {
mailer = new ExecutorMailer(props);
executingManager = new ExecutingManagerUpdaterThread();
executingManager.start();
+
+ cleanerThread = new CleanerThread();
+ cleanerThread.start();
}
public String getExecutorHost() {
@@ -310,6 +318,17 @@ public class ExecutorManager {
}
}
}
+
+
+ public void cleanOldExecutionLogs(long millis) {
+ try {
+ int count = executorLoader.removeExecutionLogsByTime(millis);
+ logger.info("Cleaned up " + count + " log entries.");
+ }
+ catch (ExecutorManagerException e) {
+ e.printStackTrace();
+ }
+ }
private Map<String, Object> callExecutorServer(ExecutionReference ref, String action) throws ExecutorManagerException {
try {
@@ -764,4 +783,56 @@ public class ExecutorManager {
outputList.addAll(flows);
return executorLoader.fetchNumExecutableFlows(projectId, flowId);
}
+
+ /*
+ * cleaner thread to clean up execution_logs, etc in DB. Runs every day.
+ *
+ */
+ private class CleanerThread extends Thread {
+ // log file retention is 1 month.
+ private static final long EXECUTION_LOGS_RETENTION_MS = 3*4*7*24*60*60*1000;
+ // check every day
+ private static final long CLEANER_THREAD_WAIT_INTERVAL_MS = 24*60*60*1000;
+
+ private boolean shutdown = false;
+ private long lastLogCleanTime = -1;
+
+ public CleanerThread() {
+ this.setName("AzkabanWebServer-Cleaner-Thread");
+ }
+
+ @SuppressWarnings("unused")
+ public void shutdown() {
+ shutdown = true;
+ this.interrupt();
+ }
+
+ public void run() {
+ while (!shutdown) {
+ synchronized (this) {
+ try {
+ lastCleanerThreadCheckTime = System.currentTimeMillis();
+
+ // Cleanup old stuff.
+ long currentTime = System.currentTimeMillis();
+ if (currentTime - CLEANER_THREAD_WAIT_INTERVAL_MS > lastLogCleanTime) {
+ cleanExecutionLogs();
+ lastLogCleanTime = currentTime;
+ }
+
+
+ wait(CLEANER_THREAD_WAIT_INTERVAL_MS);
+ } catch (InterruptedException e) {
+ logger.info("Interrupted. Probably to shut down.");
+ }
+ }
+ }
+ }
+
+ private void cleanExecutionLogs() {
+ logger.info("Cleaning old logs from execution_logs");
+ logger.info("Cleaning old log files before " + DateTime.now() + " or in milliseconds: " + DateTime.now().getMillis());
+ cleanOldExecutionLogs(DateTime.now().getMillis() - EXECUTION_LOGS_RETENTION_MS);
+ }
+ }
}
diff --git a/src/java/azkaban/executor/JdbcExecutorLoader.java b/src/java/azkaban/executor/JdbcExecutorLoader.java
index c70d9f5..d0a1763 100644
--- a/src/java/azkaban/executor/JdbcExecutorLoader.java
+++ b/src/java/azkaban/executor/JdbcExecutorLoader.java
@@ -22,6 +22,7 @@ import org.apache.commons.dbutils.QueryRunner;
import org.apache.commons.dbutils.ResultSetHandler;
import org.apache.commons.io.IOUtils;
import org.apache.log4j.Logger;
+import org.joda.time.DateTime;
import azkaban.executor.ExecutableFlow.Status;
import azkaban.utils.DataSourceUtils;
@@ -621,7 +622,7 @@ public class JdbcExecutorLoader implements ExecutorLoader {
}
private void uploadLogPart(Connection connection, int execId, String name, int attempt, int startByte, int endByte, EncodingType encType, byte[] buffer, int length) throws SQLException, IOException {
- final String INSERT_EXECUTION_LOGS = "INSERT INTO execution_logs (exec_id, name, attempt, enc_type, start_byte, end_byte, log) VALUES (?,?,?,?,?,?,?)";
+ final String INSERT_EXECUTION_LOGS = "INSERT INTO execution_logs (exec_id, name, attempt, enc_type, start_byte, end_byte, log, upload_time) VALUES (?,?,?,?,?,?,?,?)";
QueryRunner runner = new QueryRunner();
byte[] buf = buffer;
@@ -632,7 +633,7 @@ public class JdbcExecutorLoader implements ExecutorLoader {
buf = Arrays.copyOf(buffer, length);
}
- runner.update(connection, INSERT_EXECUTION_LOGS, execId, name, attempt, encType.getNumVal(), startByte, startByte + length, buf);
+ runner.update(connection, INSERT_EXECUTION_LOGS, execId, name, attempt, encType.getNumVal(), startByte, startByte + length, buf, DateTime.now().getMillis());
}
private Connection getConnection() throws ExecutorManagerException {
@@ -913,4 +914,19 @@ public class JdbcExecutorLoader implements ExecutorLoader {
return rs.getInt(1);
}
}
+
+ @Override
+ public int removeExecutionLogsByTime(long millis) throws ExecutorManagerException {
+ final String DELETE_BY_TIME = "DELETE FROM execution_logs WHERE upload_time < ?";
+
+ QueryRunner runner = new QueryRunner(dataSource);
+ int updateNum = 0;
+ try {
+ updateNum = runner.update(DELETE_BY_TIME, millis);
+ } catch (SQLException e) {
+ throw new ExecutorManagerException("Error deleting old execution_logs before " + millis);
+ }
+
+ return updateNum;
+ }
}
\ No newline at end of file
diff --git a/src/java/azkaban/scheduler/ScheduleManager.java b/src/java/azkaban/scheduler/ScheduleManager.java
index 18d4790..cae4fa5 100644
--- a/src/java/azkaban/scheduler/ScheduleManager.java
+++ b/src/java/azkaban/scheduler/ScheduleManager.java
@@ -34,7 +34,6 @@ import org.joda.time.format.DateTimeFormatter;
import azkaban.executor.ExecutableFlow;
import azkaban.executor.ExecutorManager;
-import azkaban.executor.ExecutorManagerException;
import azkaban.executor.ExecutableFlow.Status;
import azkaban.flow.Flow;
diff --git a/src/java/azkaban/webapp/AzkabanWebServer.java b/src/java/azkaban/webapp/AzkabanWebServer.java
index bbdfbf2..c86204f 100644
--- a/src/java/azkaban/webapp/AzkabanWebServer.java
+++ b/src/java/azkaban/webapp/AzkabanWebServer.java
@@ -35,7 +35,6 @@ import javax.management.MBeanServer;
import javax.management.ObjectName;
import org.apache.commons.lang.StringUtils;
-import org.apache.log4j.FileAppender;
import org.apache.log4j.Logger;
import org.apache.velocity.app.VelocityEngine;
import org.apache.velocity.runtime.log.Log4JLogChute;
@@ -49,6 +48,7 @@ import org.mortbay.jetty.servlet.DefaultServlet;
import org.mortbay.jetty.servlet.ServletHolder;
import org.mortbay.thread.QueuedThreadPool;
+
import azkaban.executor.ExecutorManager;
import azkaban.executor.JdbcExecutorLoader;
import azkaban.jmx.JmxExecutorManager;
@@ -143,6 +143,9 @@ public class AzkabanWebServer implements AzkabanServer {
private MBeanServer mbeanServer;
private ArrayList<ObjectName> registeredMBeans = new ArrayList<ObjectName>();
+
+
+
/**
* Constructor usually called by tomcat AzkabanServletContext to create the
* initial server
@@ -178,6 +181,9 @@ public class AzkabanWebServer implements AzkabanServer {
}
configureMBeanServer();
+
+
+
}
private void setupLoggers() {
@@ -723,4 +729,5 @@ public class AzkabanWebServer implements AzkabanServer {
}
}
+
}
diff --git a/src/java/azkaban/webapp/servlet/ScheduleServlet.java b/src/java/azkaban/webapp/servlet/ScheduleServlet.java
index 489299a..4365986 100644
--- a/src/java/azkaban/webapp/servlet/ScheduleServlet.java
+++ b/src/java/azkaban/webapp/servlet/ScheduleServlet.java
@@ -27,22 +27,16 @@ import javax.servlet.ServletConfig;
import javax.servlet.ServletException;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
-import javax.swing.text.StyledEditorKit.BoldAction;
import org.apache.log4j.Logger;
import org.joda.time.DateTime;
import org.joda.time.DateTimeZone;
-import org.joda.time.Hours;
import org.joda.time.LocalDateTime;
import org.joda.time.Minutes;
import org.joda.time.ReadablePeriod;
import org.joda.time.format.DateTimeFormat;
-import azkaban.executor.ExecutableFlow;
-import azkaban.executor.ExecutableNode;
-import azkaban.executor.ExecutorManagerException;
import azkaban.executor.ExecutableFlow.FailureAction;
-import azkaban.executor.ExecutableFlow.Status;
import azkaban.flow.Flow;
import azkaban.flow.Node;
import azkaban.project.Project;
@@ -62,7 +56,6 @@ import azkaban.scheduler.Schedule.SlaOptions;
import azkaban.scheduler.ScheduleManager;
import azkaban.sla.SLA;
import azkaban.sla.SLA.SlaRule;
-import azkaban.sla.SLAManager;
import azkaban.sla.SLA.SlaAction;
import azkaban.sla.SLA.SlaSetting;
@@ -71,7 +64,6 @@ public class ScheduleServlet extends LoginAbstractAzkabanServlet {
private static final Logger logger = Logger.getLogger(ScheduleServlet.class);
private ProjectManager projectManager;
private ScheduleManager scheduleManager;
- private SLAManager slaManager;
private UserManager userManager;
@Override
@@ -81,7 +73,6 @@ public class ScheduleServlet extends LoginAbstractAzkabanServlet {
projectManager = server.getProjectManager();
scheduleManager = server.getScheduleManager();
userManager = server.getUserManager();
- slaManager = server.getSLAManager();
}
@Override
src/sql/create_execution_logs.sql 1(+1 -0)
diff --git a/src/sql/create_execution_logs.sql b/src/sql/create_execution_logs.sql
index 69e0cad..3585011 100644
--- a/src/sql/create_execution_logs.sql
+++ b/src/sql/create_execution_logs.sql
@@ -6,6 +6,7 @@ CREATE TABLE execution_logs (
start_byte INT,
end_byte INT,
log LONGBLOB,
+ upload_time BIGINT,
PRIMARY KEY (exec_id, name, attempt, start_byte),
INDEX log_index (exec_id, name),
INDEX byte_log_index(exec_id, name, start_byte, end_byte)
src/sql/update_2.0_to_2.01.sql 1(+1 -0)
diff --git a/src/sql/update_2.0_to_2.01.sql b/src/sql/update_2.0_to_2.01.sql
index 493df5a..736f281 100644
--- a/src/sql/update_2.0_to_2.01.sql
+++ b/src/sql/update_2.0_to_2.01.sql
@@ -7,6 +7,7 @@ ALTER TABLE execution_jobs ADD PRIMARY KEY(exec_id, job_id, attempt);
ALTER TABLE execution_jobs ADD INDEX exec_job (exec_id, job_id);
ALTER TABLE execution_logs ADD COLUMN attempt INT DEFAULT 0;
+ALTER TABLE execution_logs ADD COLUMN upload_time BIGINT DEFAULT 1364801450000;
ALTER TABLE execution_logs DROP PRIMARY KEY;
ALTER TABLE execution_logs ADD PRIMARY KEY(exec_id, name, attempt, start_byte);
diff --git a/unit/java/azkaban/test/execapp/MockExecutorLoader.java b/unit/java/azkaban/test/execapp/MockExecutorLoader.java
index 124c5d9..6510ae3 100644
--- a/unit/java/azkaban/test/execapp/MockExecutorLoader.java
+++ b/unit/java/azkaban/test/execapp/MockExecutorLoader.java
@@ -186,5 +186,12 @@ public class MockExecutorLoader implements ExecutorLoader {
return null;
}
+ @Override
+ public int removeExecutionLogsByTime(long millis)
+ throws ExecutorManagerException {
+ // TODO Auto-generated method stub
+ return 0;
+ }
+
}
\ No newline at end of file
diff --git a/unit/java/azkaban/test/executor/JdbcExecutorLoaderTest.java b/unit/java/azkaban/test/executor/JdbcExecutorLoaderTest.java
index ba673dd..832eb22 100644
--- a/unit/java/azkaban/test/executor/JdbcExecutorLoaderTest.java
+++ b/unit/java/azkaban/test/executor/JdbcExecutorLoaderTest.java
@@ -16,6 +16,7 @@ import junit.framework.Assert;
import org.apache.commons.dbutils.DbUtils;
import org.apache.commons.dbutils.QueryRunner;
import org.apache.commons.dbutils.ResultSetHandler;
+import org.joda.time.DateTime;
import org.junit.BeforeClass;
import org.junit.Test;
@@ -39,9 +40,9 @@ import azkaban.utils.Props;
public class JdbcExecutorLoaderTest {
private static boolean testDBExists;
//@TODO remove this and turn into local host.
- private static final String host = "rpark-ld.linkedin.biz";
+ private static final String host = "cyu-ld.linkedin.biz";
private static final int port = 3306;
- private static final String database = "test";
+ private static final String database = "azkaban2";
private static final String user = "azkaban";
private static final String password = "azkaban";
private static final int numConnections = 10;
@@ -375,6 +376,32 @@ public class JdbcExecutorLoaderTest {
Assert.assertEquals("Logs length is " + logsResult6.getLength(), logsResult6.getLength(), 185493);
}
+ @Test
+ public void testRemoveExecutionLogsByTime() throws ExecutorManagerException, IOException, InterruptedException {
+
+ ExecutorLoader loader = createLoader();
+
+ File logDir = new File("unit/executions/logtest");
+
+ // Multiple of 255 for Henry the Eigth
+ File[] largelog = {new File(logDir, "largeLog1.log"), new File(logDir, "largeLog2.log"), new File(logDir, "largeLog3.log")};
+
+ DateTime time1 = DateTime.now();
+ loader.uploadLogFile(1, "oldlog", 0, largelog);
+ // sleep for 5 seconds
+ Thread.currentThread().sleep(5000);
+ loader.uploadLogFile(2, "newlog", 0, largelog);
+
+ DateTime time2 = time1.plusMillis(2500);
+
+ int count = loader.removeExecutionLogsByTime(time2.getMillis());
+ System.out.print("Removed " + count + " records");
+ LogData logs = loader.fetchLogs(1, "oldlog", 0, 0, 22222);
+ Assert.assertTrue(logs == null);
+ logs = loader.fetchLogs(2, "newlog", 0, 0, 22222);
+ Assert.assertFalse(logs == null);
+ }
+
private ExecutableFlow createExecutableFlow(int executionId, String flowName) throws IOException {
File jsonFlowFile = new File(flowDir, flowName + ".flow");
@SuppressWarnings("unchecked")
@@ -432,4 +459,5 @@ public class JdbcExecutorLoaderTest {
return val;
}
}
+
}
\ No newline at end of file