azkaban-aplcache
Changes
azkaban-web-server/src/main/java/azkaban/flowtrigger/database/FlowTriggerInstanceLoader.java 7(+7 -0)
azkaban-web-server/src/main/java/azkaban/flowtrigger/database/JdbcFlowTriggerInstanceLoaderImpl.java 39(+38 -1)
Details
diff --git a/azkaban-db/src/main/sql/create.execution_dependencies.sql b/azkaban-db/src/main/sql/create.execution_dependencies.sql
index adc966b..7d8a097 100644
--- a/azkaban-db/src/main/sql/create.execution_dependencies.sql
+++ b/azkaban-db/src/main/sql/create.execution_dependencies.sql
@@ -12,4 +12,7 @@ CREATE TABLE execution_dependencies(
flow_version INT not null,
flow_exec_id INT not null,
primary key(trigger_instance_id, dep_name)
-);
\ No newline at end of file
+);
+
+CREATE INDEX ex_end_time
+ ON execution_dependencies (endtime);
diff --git a/azkaban-web-server/src/main/java/azkaban/flowtrigger/database/FlowTriggerInstanceLoader.java b/azkaban-web-server/src/main/java/azkaban/flowtrigger/database/FlowTriggerInstanceLoader.java
index 81294d4..fc83b26 100644
--- a/azkaban-web-server/src/main/java/azkaban/flowtrigger/database/FlowTriggerInstanceLoader.java
+++ b/azkaban-web-server/src/main/java/azkaban/flowtrigger/database/FlowTriggerInstanceLoader.java
@@ -63,4 +63,11 @@ public interface FlowTriggerInstanceLoader {
Collection<TriggerInstance> getTriggerInstances(int projectId, String flowId, int from, int
length);
+
+ /**
+ * Delete cancelled or succeeded trigger instances whose endtime is older than the timestamp
+ *
+ * @return number of deleted rows(dependency instances) ;
+ */
+ int deleteTriggerExecutionsFinishingOlderThan(long timestamp);
}
diff --git a/azkaban-web-server/src/main/java/azkaban/flowtrigger/database/JdbcFlowTriggerInstanceLoaderImpl.java b/azkaban-web-server/src/main/java/azkaban/flowtrigger/database/JdbcFlowTriggerInstanceLoaderImpl.java
index 7a3d1cc..8144cc1 100644
--- a/azkaban-web-server/src/main/java/azkaban/flowtrigger/database/JdbcFlowTriggerInstanceLoaderImpl.java
+++ b/azkaban-web-server/src/main/java/azkaban/flowtrigger/database/JdbcFlowTriggerInstanceLoaderImpl.java
@@ -38,6 +38,7 @@ import java.util.Collection;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
+import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
@@ -128,6 +129,16 @@ public class JdbcFlowTriggerInstanceLoaderImpl implements FlowTriggerInstanceLoa
+ "LIMIT ? OFFSET ?) AS tmp);", StringUtils.join(DEPENDENCY_EXECUTIONS_COLUMNS, ","),
DEPENDENCY_EXECUTION_TABLE, DEPENDENCY_EXECUTION_TABLE);
+ private static final String SELECT_EXECUTION_OLDER_THAN =
+ String.format(
+ "SELECT %s FROM %s WHERE trigger_instance_id IN (SELECT "
+ + "DISTINCT(trigger_instance_id) FROM %s WHERE endtime <= ? AND endtime != 0);",
+ StringUtils.join(DEPENDENCY_EXECUTIONS_COLUMNS, ","), DEPENDENCY_EXECUTION_TABLE,
+ DEPENDENCY_EXECUTION_TABLE);
+
+ private static final String DELETE_EXECUTIONS =
+ String.format("DELETE FROM %s WHERE trigger_instance_id IN (?);", DEPENDENCY_EXECUTION_TABLE);
+
private static final String UPDATE_DEPENDENCY_FLOW_EXEC_ID = String.format("UPDATE %s SET "
+ "flow_exec_id "
+ "= ? WHERE trigger_instance_id = ? AND dep_name = ? ;", DEPENDENCY_EXECUTION_TABLE);
@@ -294,7 +305,6 @@ public class JdbcFlowTriggerInstanceLoaderImpl implements FlowTriggerInstanceLoa
try {
//todo chengren311:
// 1. add index for the execution_dependencies table to accelerate selection.
- // 2. implement purging mechanism to keep reasonable amount of historical executions in db.
return this.dbOperator.query(SELECT_ALL_RUNNING_EXECUTIONS, new TriggerInstanceHandler
(SORT_MODE.SORT_ON_START_TIME_ASC));
} catch (final SQLException ex) {
@@ -378,6 +388,33 @@ public class JdbcFlowTriggerInstanceLoaderImpl implements FlowTriggerInstanceLoa
return Collections.emptyList();
}
+ @Override
+ public int deleteTriggerExecutionsFinishingOlderThan(final long timestamp) {
+ try {
+ final Collection<TriggerInstance> res = this.dbOperator
+ .query(SELECT_EXECUTION_OLDER_THAN,
+ new TriggerInstanceHandler(SORT_MODE.SORT_ON_START_TIME_DESC), timestamp);
+ final Set<String> toBeDeleted = new HashSet<>();
+ for (final TriggerInstance inst : res) {
+ if ((inst.getStatus() == Status.CANCELLED || (inst.getStatus() == Status.SUCCEEDED && inst
+ .getFlowExecId() != -1)) && inst.getEndTime() <= timestamp) {
+ toBeDeleted.add(inst.getId());
+ }
+ }
+ int numDeleted = 0;
+ if (!toBeDeleted.isEmpty()) {
+ final String ids = toBeDeleted.stream().map(s -> "'" + s + "'")
+ .collect(Collectors.joining(", "));
+ numDeleted = this.dbOperator.update(DELETE_EXECUTIONS.replace("?", ids));
+ }
+ logger.info("{} dependency instance record(s) deleted", numDeleted);
+ return numDeleted;
+ } catch (final SQLException ex) {
+ handleSQLException(ex);
+ return 0;
+ }
+ }
+
/**
* Retrieve a trigger instance given an instance id. Flow trigger properties will also be
* populated into the returned trigger instance.
diff --git a/azkaban-web-server/src/main/java/azkaban/flowtrigger/FlowTriggerExecutionCleaner.java b/azkaban-web-server/src/main/java/azkaban/flowtrigger/FlowTriggerExecutionCleaner.java
new file mode 100644
index 0000000..b39af1f
--- /dev/null
+++ b/azkaban-web-server/src/main/java/azkaban/flowtrigger/FlowTriggerExecutionCleaner.java
@@ -0,0 +1,58 @@
+/*
+ * Copyright 2018 LinkedIn Corp.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package azkaban.flowtrigger;
+
+import azkaban.flowtrigger.database.FlowTriggerInstanceLoader;
+import java.time.Duration;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import javax.inject.Inject;
+
+/**
+ * This is to purge old flow trigger execution records from the db table.
+ * Otherwise the table will keep growing indefinitely as triggers are executed, leading to
+ * excessive query time on the table.
+ * The cleanup policy is removing trigger instances finishing older than 20 days back.
+ */
+@SuppressWarnings("FutureReturnValueIgnored")
+public class FlowTriggerExecutionCleaner {
+
+ private static final Duration CLEAN_INTERVAL = Duration.ofMinutes(10);
+ private static final Duration RETENTION_PERIOD = Duration.ofDays(10);
+ private final ScheduledExecutorService scheduler;
+ private final FlowTriggerInstanceLoader flowTriggerInstanceLoader;
+
+ @Inject
+ public FlowTriggerExecutionCleaner(final FlowTriggerInstanceLoader loader) {
+ this.flowTriggerInstanceLoader = loader;
+ this.scheduler = Executors.newSingleThreadScheduledExecutor();
+ }
+
+ public void start() {
+ this.scheduler.scheduleAtFixedRate(() -> {
+ FlowTriggerExecutionCleaner.this.flowTriggerInstanceLoader
+ .deleteTriggerExecutionsFinishingOlderThan(System
+ .currentTimeMillis() - RETENTION_PERIOD.toMillis());
+ }, 0, CLEAN_INTERVAL.getSeconds(), TimeUnit.SECONDS);
+ }
+
+ public void shutdown() {
+ this.scheduler.shutdown();
+ this.scheduler.shutdownNow();
+ }
+}
diff --git a/azkaban-web-server/src/main/java/azkaban/flowtrigger/FlowTriggerService.java b/azkaban-web-server/src/main/java/azkaban/flowtrigger/FlowTriggerService.java
index 3ff5f3f..2b43657 100644
--- a/azkaban-web-server/src/main/java/azkaban/flowtrigger/FlowTriggerService.java
+++ b/azkaban-web-server/src/main/java/azkaban/flowtrigger/FlowTriggerService.java
@@ -83,11 +83,13 @@ public class FlowTriggerService {
private final TriggerInstanceProcessor triggerProcessor;
private final FlowTriggerInstanceLoader flowTriggerInstanceLoader;
private final DependencyInstanceProcessor dependencyProcessor;
+ private final FlowTriggerExecutionCleaner cleaner;
@Inject
public FlowTriggerService(final FlowTriggerDependencyPluginManager pluginManager,
final TriggerInstanceProcessor triggerProcessor, final DependencyInstanceProcessor
- dependencyProcessor, final FlowTriggerInstanceLoader flowTriggerInstanceLoader) {
+ dependencyProcessor, final FlowTriggerInstanceLoader flowTriggerInstanceLoader,
+ final FlowTriggerExecutionCleaner cleaner) {
// Give the thread a name to make debugging easier.
final ThreadFactory namedThreadFactory = new ThreadFactoryBuilder()
.setNameFormat("FlowTrigger-service").build();
@@ -100,11 +102,13 @@ public class FlowTriggerService {
this.triggerProcessor = triggerProcessor;
this.dependencyProcessor = dependencyProcessor;
this.flowTriggerInstanceLoader = flowTriggerInstanceLoader;
+ this.cleaner = cleaner;
}
public void start() throws FlowTriggerDependencyPluginException {
this.triggerPluginManager.loadAllPlugins();
this.recoverIncompleteTriggerInstances();
+ this.cleaner.start();
}
private DependencyInstanceContext createDepContext(final FlowTriggerDependency dep, final long
@@ -541,8 +545,10 @@ public class FlowTriggerService {
this.singleThreadExecutorService.shutdownNow(); // Cancel currently executing tasks
this.multiThreadsExecutorService.shutdown();
this.multiThreadsExecutorService.shutdownNow();
+
this.triggerProcessor.shutdown();
this.triggerPluginManager.shutdown();
+ this.cleaner.shutdown();
}
public Collection<TriggerInstance> getTriggerInstances(final int projectId, final String flowId,
diff --git a/azkaban-web-server/src/main/java/azkaban/flowtrigger/plugin/FlowTriggerDependencyPluginManager.java b/azkaban-web-server/src/main/java/azkaban/flowtrigger/plugin/FlowTriggerDependencyPluginManager.java
index f51e1e9..af30d9f 100644
--- a/azkaban-web-server/src/main/java/azkaban/flowtrigger/plugin/FlowTriggerDependencyPluginManager.java
+++ b/azkaban-web-server/src/main/java/azkaban/flowtrigger/plugin/FlowTriggerDependencyPluginManager.java
@@ -259,6 +259,4 @@ public class FlowTriggerDependencyPluginManager {
}
}
}
-
-
}
diff --git a/azkaban-web-server/src/test/java/azkaban/flowtrigger/FlowTriggerInstanceLoaderTest.java b/azkaban-web-server/src/test/java/azkaban/flowtrigger/FlowTriggerInstanceLoaderTest.java
index 5b00576..03fbf00 100644
--- a/azkaban-web-server/src/test/java/azkaban/flowtrigger/FlowTriggerInstanceLoaderTest.java
+++ b/azkaban-web-server/src/test/java/azkaban/flowtrigger/FlowTriggerInstanceLoaderTest.java
@@ -303,7 +303,7 @@ public class FlowTriggerInstanceLoaderTest {
finalizeTriggerInstanceWithCancelling(all.get(i));
}
//sleep for a while to ensure endtime is different for each trigger instance
- Thread.sleep(1000);
+ Thread.sleep(100);
}
this.shuffleAndUpload(all);
@@ -393,6 +393,47 @@ public class FlowTriggerInstanceLoaderTest {
}
@Test
+ public void testDeleteOldTriggerInstances() throws InterruptedException {
+ final List<TriggerInstance> all = new ArrayList<>();
+ final long ts1 = System.currentTimeMillis();
+ long ts2 = -1;
+ long ts3 = -1;
+ for (int i = 0; i < 30; i++) {
+ all.add(this.createTriggerInstance(this.flowTrigger, this
+ .flow_id, this.flow_version, this.submitUser, this.project, System.currentTimeMillis()
+ + i * 10000));
+
+ if (i < 5) {
+ finalizeTriggerInstanceWithSuccess(all.get(i), -1);
+ } else if (i <= 15) {
+ finalizeTriggerInstanceWithCancelled(all.get(i));
+ } else if (i <= 25) {
+ finalizeTriggerInstanceWithCancelling(all.get(i));
+ } else if (i <= 27) {
+ finalizeTriggerInstanceWithSuccess(all.get(i), 1000);
+ }
+ //sleep for a while to ensure end time is different for each trigger instance
+ if (i == 3) {
+ ts2 = System.currentTimeMillis();
+ } else if (i == 12) {
+ ts3 = System.currentTimeMillis();
+ }
+ Thread.sleep(100);
+ }
+ this.shuffleAndUpload(all);
+
+ assertThat(this.triggerInstLoader.deleteTriggerExecutionsFinishingOlderThan(ts1))
+ .isEqualTo(0);
+
+ assertThat(this.triggerInstLoader.deleteTriggerExecutionsFinishingOlderThan(ts2))
+ .isEqualTo(0);
+
+ assertThat(this.triggerInstLoader.deleteTriggerExecutionsFinishingOlderThan(ts3))
+ .isEqualTo(16);
+
+ }
+
+ @Test
public void testGetRecentlyFinished() throws InterruptedException {
final List<TriggerInstance> all = new ArrayList<>();
@@ -408,7 +449,7 @@ public class FlowTriggerInstanceLoaderTest {
finalizeTriggerInstanceWithCancelling(all.get(i));
}
//sleep for a while to ensure endtime is different for each trigger instance
- Thread.sleep(1000);
+ Thread.sleep(100);
}
this.shuffleAndUpload(all);
diff --git a/azkaban-web-server/src/test/java/azkaban/flowtrigger/FlowTriggerServiceTest.java b/azkaban-web-server/src/test/java/azkaban/flowtrigger/FlowTriggerServiceTest.java
index 0f172ed..2b7d588 100644
--- a/azkaban-web-server/src/test/java/azkaban/flowtrigger/FlowTriggerServiceTest.java
+++ b/azkaban-web-server/src/test/java/azkaban/flowtrigger/FlowTriggerServiceTest.java
@@ -77,8 +77,11 @@ public class FlowTriggerServiceTest {
final DependencyInstanceProcessor depInstProcessor = new DependencyInstanceProcessor
(flowTriggerInstanceLoader);
- flowTriggerService = new FlowTriggerService(pluginManager,
- triggerInstProcessor, depInstProcessor, flowTriggerInstanceLoader);
+ final FlowTriggerExecutionCleaner executionCleaner = new FlowTriggerExecutionCleaner(
+ flowTriggerInstanceLoader);
+
+ flowTriggerService = new FlowTriggerService(pluginManager, triggerInstProcessor,
+ depInstProcessor, flowTriggerInstanceLoader, executionCleaner);
flowTriggerService.start();
}
diff --git a/azkaban-web-server/src/test/java/azkaban/flowtrigger/MockFlowTriggerInstanceLoader.java b/azkaban-web-server/src/test/java/azkaban/flowtrigger/MockFlowTriggerInstanceLoader.java
index 2c99af7..3a2aca7 100644
--- a/azkaban-web-server/src/test/java/azkaban/flowtrigger/MockFlowTriggerInstanceLoader.java
+++ b/azkaban-web-server/src/test/java/azkaban/flowtrigger/MockFlowTriggerInstanceLoader.java
@@ -20,6 +20,7 @@ import azkaban.flowtrigger.database.FlowTriggerInstanceLoader;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
+import java.util.Iterator;
import java.util.List;
public class MockFlowTriggerInstanceLoader implements FlowTriggerInstanceLoader {
@@ -121,4 +122,19 @@ public class MockFlowTriggerInstanceLoader implements FlowTriggerInstanceLoader
final int from, final int length) {
throw new UnsupportedOperationException("Not Yet Implemented");
}
+
+ @Override
+ public int deleteTriggerExecutionsFinishingOlderThan(final long timestamp) {
+ int deleted = 0;
+ for (final Iterator<TriggerInstance> iterator = this.triggerInstances.iterator();
+ iterator.hasNext(); ) {
+ final TriggerInstance inst = iterator.next();
+ if ((inst.getEndTime() <= timestamp) && ((inst.getStatus() == Status.CANCELLED) || ((inst
+ .getStatus() == Status.SUCCEEDED) && (inst.getFlowExecId() != -1)))) {
+ iterator.remove();
+ deleted++;
+ }
+ }
+ return deleted;
+ }
}