Details
diff --git a/azkaban-common/src/main/java/azkaban/executor/ExecutorLoader.java b/azkaban-common/src/main/java/azkaban/executor/ExecutorLoader.java
index 2bf7baf..eb8c24c 100644
--- a/azkaban-common/src/main/java/azkaban/executor/ExecutorLoader.java
+++ b/azkaban-common/src/main/java/azkaban/executor/ExecutorLoader.java
@@ -19,6 +19,7 @@ package azkaban.executor;
import java.io.File;
import java.util.List;
import java.util.Map;
+import java.time.Duration;
import azkaban.executor.ExecutorLogEvent.EventType;
import azkaban.utils.FileIOUtils.LogData;
@@ -32,6 +33,9 @@ public interface ExecutorLoader {
ExecutableFlow fetchExecutableFlow(int execId)
throws ExecutorManagerException;
+ List<ExecutableFlow> fetchRecentlyFinishedFlows(Duration maxAge)
+ throws ExecutorManagerException;
+
Map<Integer, Pair<ExecutionReference, ExecutableFlow>> fetchActiveFlows()
throws ExecutorManagerException;
diff --git a/azkaban-common/src/main/java/azkaban/executor/ExecutorManager.java b/azkaban-common/src/main/java/azkaban/executor/ExecutorManager.java
index 4549ef0..b1e315b 100644
--- a/azkaban-common/src/main/java/azkaban/executor/ExecutorManager.java
+++ b/azkaban-common/src/main/java/azkaban/executor/ExecutorManager.java
@@ -25,6 +25,7 @@ import java.io.File;
import java.io.IOException;
import java.lang.Thread.State;
import java.net.URI;
+import java.time.Duration;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
@@ -96,8 +97,7 @@ public class ExecutorManager extends EventHandler implements
private ConcurrentHashMap<Integer, Pair<ExecutionReference, ExecutableFlow>> runningFlows =
new ConcurrentHashMap<Integer, Pair<ExecutionReference, ExecutableFlow>>();
- private ConcurrentHashMap<Integer, ExecutableFlow> recentlyFinished =
- new ConcurrentHashMap<Integer, ExecutableFlow>();
+
QueuedExecutions queuedFlows;
@@ -109,6 +109,7 @@ public class ExecutorManager extends EventHandler implements
// 12 weeks
private static final long DEFAULT_EXECUTION_LOGS_RETENTION_MS = 3 * 4 * 7
* 24 * 60 * 60 * 1000L;
+ private static final Duration RECENTLY_FINISHED_LIFETIME = Duration.ofMinutes(10);
private long lastCleanerThreadCheckTime = -1;
private long lastThreadCheckTime = -1;
@@ -626,7 +627,15 @@ public class ExecutorManager extends EventHandler implements
@Override
public List<ExecutableFlow> getRecentlyFinishedFlows() {
- return new ArrayList<ExecutableFlow>(recentlyFinished.values());
+ List<ExecutableFlow> flows = new ArrayList<>();
+ try {
+ flows = executorLoader.fetchRecentlyFinishedFlows(
+ RECENTLY_FINISHED_LIFETIME);
+ } catch(ExecutorManagerException e) {
+ //Todo jamiesjc: fix error handling.
+ logger.error("Failed to fetch recently finished flows.", e);
+ }
+ return flows;
}
@Override
@@ -1212,8 +1221,6 @@ public class ExecutorManager extends EventHandler implements
this.setName("ExecutorManagerUpdaterThread");
}
- // 10 mins recently finished threshold.
- private long recentlyFinishedLifetimeMs = 600000;
private int waitTimeIdleMs = 2000;
private int waitTimeMs = 500;
@@ -1327,10 +1334,6 @@ public class ExecutorManager extends EventHandler implements
}
}
- updaterStage = "Evicting old recently finished flows.";
-
- evictOldRecentlyFinished(recentlyFinishedLifetimeMs);
- // Add new finished
for (ExecutableFlow flow : finishedFlows) {
if (flow.getScheduleId() >= 0
&& flow.getStatus() == Status.SUCCEEDED) {
@@ -1338,7 +1341,6 @@ public class ExecutorManager extends EventHandler implements
cacheDir);
}
fireEventListeners(Event.create(flow, Type.FLOW_FINISHED, new EventData(flow)));
- recentlyFinished.put(flow.getExecutionId(), flow);
}
updaterStage =
@@ -1404,7 +1406,6 @@ public class ExecutorManager extends EventHandler implements
updaterStage = "finalizing flow " + execId + " cleaning from memory";
runningFlows.remove(execId);
fireEventListeners(Event.create(dsFlow, Type.FLOW_FINISHED, new EventData(dsFlow)));
- recentlyFinished.put(execId, dsFlow);
} catch (ExecutorManagerException e) {
alertUser = false; // failed due to azkaban internal error, not to alert user
@@ -1506,20 +1507,6 @@ public class ExecutorManager extends EventHandler implements
exFlow.setStatus(Status.FAILED);
}
- private void evictOldRecentlyFinished(long ageMs) {
- ArrayList<Integer> recentlyFinishedKeys =
- new ArrayList<Integer>(recentlyFinished.keySet());
- long oldAgeThreshold = System.currentTimeMillis() - ageMs;
- for (Integer key : recentlyFinishedKeys) {
- ExecutableFlow flow = recentlyFinished.get(key);
-
- if (flow.getEndTime() < oldAgeThreshold) {
- // Evict
- recentlyFinished.remove(key);
- }
- }
- }
-
private ExecutableFlow updateExecution(Map<String, Object> updateData)
throws ExecutorManagerException {
diff --git a/azkaban-common/src/main/java/azkaban/executor/JdbcExecutorLoader.java b/azkaban-common/src/main/java/azkaban/executor/JdbcExecutorLoader.java
index 0b79bb7..e7fff84 100644
--- a/azkaban-common/src/main/java/azkaban/executor/JdbcExecutorLoader.java
+++ b/azkaban-common/src/main/java/azkaban/executor/JdbcExecutorLoader.java
@@ -32,6 +32,7 @@ import java.util.Date;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import java.time.Duration;
import org.apache.commons.dbutils.DbUtils;
import org.apache.commons.dbutils.QueryRunner;
@@ -202,6 +203,27 @@ public class JdbcExecutorLoader extends AbstractJdbcLoader implements
}
}
+ /**
+ * maxAge indicates how long finished flows are shown in Recently Finished flow page.
+ */
+ @Override
+ public List<ExecutableFlow> fetchRecentlyFinishedFlows(Duration maxAge)
+ throws ExecutorManagerException {
+ QueryRunner runner = createQueryRunner();
+ FetchRecentlyFinishedFlows flowHandler = new FetchRecentlyFinishedFlows();
+
+ try {
+ List<ExecutableFlow> flows =
+ runner.query(FetchRecentlyFinishedFlows.FETCH_RECENTLY_FINISHED_FLOW,
+ flowHandler, System.currentTimeMillis() - maxAge.toMillis(),
+ Status.SUCCEEDED.getNumVal(), Status.KILLED.getNumVal(),
+ Status.FAILED.getNumVal());
+ return flows;
+ } catch (SQLException e) {
+ throw new ExecutorManagerException("Error fetching recently finished flows", e);
+ }
+ }
+
@Override
public Map<Integer, Pair<ExecutionReference, ExecutableFlow>> fetchActiveFlows()
throws ExecutorManagerException {
@@ -1364,6 +1386,52 @@ public class JdbcExecutorLoader extends AbstractJdbcLoader implements
}
}
+ private static class FetchRecentlyFinishedFlows implements
+ ResultSetHandler<List<ExecutableFlow>> {
+ // Execution_flows table is already indexed by end_time
+ private static String FETCH_RECENTLY_FINISHED_FLOW =
+ "SELECT exec_id, enc_type, flow_data FROM execution_flows "
+ + "WHERE end_time > ? AND status IN (?, ?, ?)";
+
+ @Override
+ public List<ExecutableFlow> handle(
+ ResultSet rs) throws SQLException {
+ if (!rs.next()) {
+ return Collections.emptyList();
+ }
+
+ List<ExecutableFlow> execFlows = new ArrayList<>();
+ do {
+ int id = rs.getInt(1);
+ int encodingType = rs.getInt(2);
+ byte[] data = rs.getBytes(3);
+
+ if (data != null) {
+ EncodingType encType = EncodingType.fromInteger(encodingType);
+ Object flowObj;
+ try {
+ if (encType == EncodingType.GZIP) {
+ String jsonString = GZIPUtils.unGzipString(data, "UTF-8");
+ flowObj = JSONUtils.parseJSONFromString(jsonString);
+ } else {
+ String jsonString = new String(data, "UTF-8");
+ flowObj = JSONUtils.parseJSONFromString(jsonString);
+ }
+
+ ExecutableFlow exFlow =
+ ExecutableFlow.createExecutableFlowFromObject(flowObj);
+
+ execFlows.add(exFlow);
+ } catch (IOException e) {
+ throw new SQLException("Error retrieving flow data " + id, e);
+ }
+ }
+ } while (rs.next());
+
+ return execFlows;
+ }
+ }
+
private static class FetchActiveExecutableFlows implements
ResultSetHandler<Map<Integer, Pair<ExecutionReference, ExecutableFlow>>> {
// Select running and executor assigned flows
diff --git a/azkaban-common/src/test/java/azkaban/executor/JdbcExecutorLoaderTest.java b/azkaban-common/src/test/java/azkaban/executor/JdbcExecutorLoaderTest.java
index 3a41b56..ea0d43d 100644
--- a/azkaban-common/src/test/java/azkaban/executor/JdbcExecutorLoaderTest.java
+++ b/azkaban-common/src/test/java/azkaban/executor/JdbcExecutorLoaderTest.java
@@ -21,6 +21,7 @@ import java.io.IOException;
import java.sql.Connection;
import java.sql.ResultSet;
import java.sql.SQLException;
+import java.time.Duration;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.LinkedList;
@@ -33,6 +34,7 @@ import org.apache.commons.dbutils.DbUtils;
import org.apache.commons.dbutils.QueryRunner;
import org.apache.commons.dbutils.ResultSetHandler;
import org.joda.time.DateTime;
+import org.joda.time.DateTimeUtils;
import org.junit.After;
import org.junit.Assert;
import org.junit.BeforeClass;
@@ -59,6 +61,8 @@ public class JdbcExecutorLoaderTest {
private static final String user = "azkaban";
private static final String password = "azkaban";
private static final int numConnections = 10;
+ private static final Duration RECENTLY_FINISHED_LIFETIME = Duration.ofMinutes(1);
+ private static final Duration FLOW_FINISHED_TIME = Duration.ofMinutes(2);
@BeforeClass
public static void setupDB() {
@@ -873,6 +877,51 @@ public class JdbcExecutorLoaderTest {
Assert.assertEquals(flow1.getVersion(), execFlow1.getVersion());
}
+ @Test
+ public void testFetchRecentlyFinishedFlows() throws Exception {
+ if (!isTestSetup()) {
+ return;
+ }
+
+ ExecutorLoader loader = createLoader();
+ ExecutableFlow flow1 = TestUtils.createExecutableFlow("exectest1", "exec1");
+ loader.uploadExecutableFlow(flow1);
+ flow1.setStatus(Status.SUCCEEDED);
+ flow1.setEndTime(DateTimeUtils.currentTimeMillis());
+ loader.updateExecutableFlow(flow1);
+ //Flow just finished. Fetch recently finished flows immediately. Should get it.
+ List<ExecutableFlow> flows = loader.fetchRecentlyFinishedFlows(
+ RECENTLY_FINISHED_LIFETIME);
+ Assert.assertEquals(1, flows.size());
+ Assert.assertEquals(flow1.getExecutionId(), flows.get(0).getExecutionId());
+ Assert.assertEquals(flow1.getProjectName(), flows.get(0).getProjectName());
+ Assert.assertEquals(flow1.getFlowId(), flows.get(0).getFlowId());
+ Assert.assertEquals(flow1.getVersion(), flows.get(0).getVersion());
+ }
+
+ @Test
+ public void testFetchEmptyRecentlyFinishedFlows() throws Exception {
+ if (!isTestSetup()) {
+ return;
+ }
+
+ ExecutorLoader loader = createLoader();
+ ExecutableFlow flow1 = TestUtils.createExecutableFlow("exectest1", "exec1");
+ loader.uploadExecutableFlow(flow1);
+ flow1.setStatus(Status.SUCCEEDED);
+ flow1.setEndTime(DateTimeUtils.currentTimeMillis());
+ loader.updateExecutableFlow(flow1);
+ //Todo jamiesjc: use java8.java.time api instead of jodatime
+ //Mock flow finished time to be 2 min ago.
+ DateTimeUtils.setCurrentMillisOffset(-FLOW_FINISHED_TIME.toMillis());
+ flow1.setEndTime(DateTimeUtils.currentTimeMillis());
+ loader.updateExecutableFlow(flow1);
+ //Fetch recently finished flows within 1 min. Should be empty.
+ List<ExecutableFlow> flows = loader
+ .fetchRecentlyFinishedFlows(RECENTLY_FINISHED_LIFETIME);
+ Assert.assertTrue(flows.isEmpty());
+ }
+
@Ignore @Test
public void testSmallUploadLog() throws ExecutorManagerException {
File logDir = new File(UNIT_BASE_DIR + "logtest");
diff --git a/azkaban-common/src/test/java/azkaban/executor/MockExecutorLoader.java b/azkaban-common/src/test/java/azkaban/executor/MockExecutorLoader.java
index 1d320c5..adadb2d 100644
--- a/azkaban-common/src/test/java/azkaban/executor/MockExecutorLoader.java
+++ b/azkaban-common/src/test/java/azkaban/executor/MockExecutorLoader.java
@@ -26,6 +26,7 @@ import java.util.ArrayList;
import java.util.Date;
import java.util.List;
import java.util.Map;
+import java.time.Duration;
import java.util.concurrent.ConcurrentHashMap;
import org.apache.commons.io.FileUtils;
import org.apache.log4j.Logger;
@@ -410,4 +411,10 @@ public class MockExecutorLoader implements ExecutorLoader {
public void unassignExecutor(final int executionId) throws ExecutorManagerException {
this.executionExecutorMapping.remove(executionId);
}
+
+ @Override
+ public List<ExecutableFlow> fetchRecentlyFinishedFlows(Duration maxAge)
+ throws ExecutorManagerException {
+ return new ArrayList<>();
+ }
}