azkaban-aplcache

clean up old trigger executions from db table (#1746) This

4/30/2018 7:03:41 PM

Details

diff --git a/azkaban-db/src/main/sql/create.execution_dependencies.sql b/azkaban-db/src/main/sql/create.execution_dependencies.sql
index adc966b..7d8a097 100644
--- a/azkaban-db/src/main/sql/create.execution_dependencies.sql
+++ b/azkaban-db/src/main/sql/create.execution_dependencies.sql
@@ -12,4 +12,7 @@ CREATE TABLE execution_dependencies(
   flow_version INT not null,
   flow_exec_id INT not null,
   primary key(trigger_instance_id, dep_name)
-);
\ No newline at end of file
+);
+
+CREATE INDEX ex_end_time
+  ON execution_dependencies (endtime);
diff --git a/azkaban-web-server/src/main/java/azkaban/flowtrigger/database/FlowTriggerInstanceLoader.java b/azkaban-web-server/src/main/java/azkaban/flowtrigger/database/FlowTriggerInstanceLoader.java
index 81294d4..fc83b26 100644
--- a/azkaban-web-server/src/main/java/azkaban/flowtrigger/database/FlowTriggerInstanceLoader.java
+++ b/azkaban-web-server/src/main/java/azkaban/flowtrigger/database/FlowTriggerInstanceLoader.java
@@ -63,4 +63,11 @@ public interface FlowTriggerInstanceLoader {
 
   Collection<TriggerInstance> getTriggerInstances(int projectId, String flowId, int from, int
       length);
+
+  /**
+   * Delete cancelled or succeeded trigger instances whose endtime is older than the timestamp
+   *
+   * @return number of deleted rows(dependency instances) ;
+   */
+  int deleteTriggerExecutionsFinishingOlderThan(long timestamp);
 }
diff --git a/azkaban-web-server/src/main/java/azkaban/flowtrigger/database/JdbcFlowTriggerInstanceLoaderImpl.java b/azkaban-web-server/src/main/java/azkaban/flowtrigger/database/JdbcFlowTriggerInstanceLoaderImpl.java
index 7a3d1cc..8144cc1 100644
--- a/azkaban-web-server/src/main/java/azkaban/flowtrigger/database/JdbcFlowTriggerInstanceLoaderImpl.java
+++ b/azkaban-web-server/src/main/java/azkaban/flowtrigger/database/JdbcFlowTriggerInstanceLoaderImpl.java
@@ -38,6 +38,7 @@ import java.util.Collection;
 import java.util.Collections;
 import java.util.Comparator;
 import java.util.HashMap;
+import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
@@ -128,6 +129,16 @@ public class JdbcFlowTriggerInstanceLoaderImpl implements FlowTriggerInstanceLoa
           + "LIMIT ? OFFSET ?) AS tmp);", StringUtils.join(DEPENDENCY_EXECUTIONS_COLUMNS, ","),
       DEPENDENCY_EXECUTION_TABLE, DEPENDENCY_EXECUTION_TABLE);
 
+  private static final String SELECT_EXECUTION_OLDER_THAN =
+      String.format(
+          "SELECT %s FROM %s WHERE trigger_instance_id IN (SELECT "
+              + "DISTINCT(trigger_instance_id) FROM %s WHERE endtime <= ? AND endtime != 0);",
+          StringUtils.join(DEPENDENCY_EXECUTIONS_COLUMNS, ","), DEPENDENCY_EXECUTION_TABLE,
+          DEPENDENCY_EXECUTION_TABLE);
+
+  private static final String DELETE_EXECUTIONS =
+      String.format("DELETE FROM %s WHERE trigger_instance_id IN (?);", DEPENDENCY_EXECUTION_TABLE);
+
   private static final String UPDATE_DEPENDENCY_FLOW_EXEC_ID = String.format("UPDATE %s SET "
       + "flow_exec_id "
       + "= ? WHERE trigger_instance_id = ? AND dep_name = ? ;", DEPENDENCY_EXECUTION_TABLE);
@@ -294,7 +305,6 @@ public class JdbcFlowTriggerInstanceLoaderImpl implements FlowTriggerInstanceLoa
     try {
       //todo chengren311:
       // 1. add index for the execution_dependencies table to accelerate selection.
-      // 2. implement purging mechanism to keep reasonable amount of historical executions in db.
       return this.dbOperator.query(SELECT_ALL_RUNNING_EXECUTIONS, new TriggerInstanceHandler
           (SORT_MODE.SORT_ON_START_TIME_ASC));
     } catch (final SQLException ex) {
@@ -378,6 +388,33 @@ public class JdbcFlowTriggerInstanceLoaderImpl implements FlowTriggerInstanceLoa
     return Collections.emptyList();
   }
 
+  @Override
+  public int deleteTriggerExecutionsFinishingOlderThan(final long timestamp) {
+    try {
+      final Collection<TriggerInstance> res = this.dbOperator
+          .query(SELECT_EXECUTION_OLDER_THAN,
+              new TriggerInstanceHandler(SORT_MODE.SORT_ON_START_TIME_DESC), timestamp);
+      final Set<String> toBeDeleted = new HashSet<>();
+      for (final TriggerInstance inst : res) {
+        if ((inst.getStatus() == Status.CANCELLED || (inst.getStatus() == Status.SUCCEEDED && inst
+            .getFlowExecId() != -1)) && inst.getEndTime() <= timestamp) {
+          toBeDeleted.add(inst.getId());
+        }
+      }
+      int numDeleted = 0;
+      if (!toBeDeleted.isEmpty()) {
+        final String ids = toBeDeleted.stream().map(s -> "'" + s + "'")
+            .collect(Collectors.joining(", "));
+        numDeleted = this.dbOperator.update(DELETE_EXECUTIONS.replace("?", ids));
+      }
+      logger.info("{} dependency instance record(s) deleted", numDeleted);
+      return numDeleted;
+    } catch (final SQLException ex) {
+      handleSQLException(ex);
+      return 0;
+    }
+  }
+
   /**
    * Retrieve a trigger instance given an instance id. Flow trigger properties will also be
    * populated into the returned trigger instance.
diff --git a/azkaban-web-server/src/main/java/azkaban/flowtrigger/FlowTriggerExecutionCleaner.java b/azkaban-web-server/src/main/java/azkaban/flowtrigger/FlowTriggerExecutionCleaner.java
new file mode 100644
index 0000000..b39af1f
--- /dev/null
+++ b/azkaban-web-server/src/main/java/azkaban/flowtrigger/FlowTriggerExecutionCleaner.java
@@ -0,0 +1,58 @@
+/*
+ * Copyright 2018 LinkedIn Corp.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package azkaban.flowtrigger;
+
+import azkaban.flowtrigger.database.FlowTriggerInstanceLoader;
+import java.time.Duration;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import javax.inject.Inject;
+
+/**
+ * This is to purge old flow trigger execution records from the db table.
+ * Otherwise the table will keep growing indefinitely as triggers are executed, leading to
+ * excessive query time on the table.
+ * The cleanup policy is removing trigger instances finishing older than 20 days back.
+ */
+@SuppressWarnings("FutureReturnValueIgnored")
+public class FlowTriggerExecutionCleaner {
+
+  private static final Duration CLEAN_INTERVAL = Duration.ofMinutes(10);
+  private static final Duration RETENTION_PERIOD = Duration.ofDays(10);
+  private final ScheduledExecutorService scheduler;
+  private final FlowTriggerInstanceLoader flowTriggerInstanceLoader;
+
+  @Inject
+  public FlowTriggerExecutionCleaner(final FlowTriggerInstanceLoader loader) {
+    this.flowTriggerInstanceLoader = loader;
+    this.scheduler = Executors.newSingleThreadScheduledExecutor();
+  }
+
+  public void start() {
+    this.scheduler.scheduleAtFixedRate(() -> {
+      FlowTriggerExecutionCleaner.this.flowTriggerInstanceLoader
+          .deleteTriggerExecutionsFinishingOlderThan(System
+              .currentTimeMillis() - RETENTION_PERIOD.toMillis());
+    }, 0, CLEAN_INTERVAL.getSeconds(), TimeUnit.SECONDS);
+  }
+
+  public void shutdown() {
+    this.scheduler.shutdown();
+    this.scheduler.shutdownNow();
+  }
+}
diff --git a/azkaban-web-server/src/main/java/azkaban/flowtrigger/FlowTriggerService.java b/azkaban-web-server/src/main/java/azkaban/flowtrigger/FlowTriggerService.java
index 3ff5f3f..2b43657 100644
--- a/azkaban-web-server/src/main/java/azkaban/flowtrigger/FlowTriggerService.java
+++ b/azkaban-web-server/src/main/java/azkaban/flowtrigger/FlowTriggerService.java
@@ -83,11 +83,13 @@ public class FlowTriggerService {
   private final TriggerInstanceProcessor triggerProcessor;
   private final FlowTriggerInstanceLoader flowTriggerInstanceLoader;
   private final DependencyInstanceProcessor dependencyProcessor;
+  private final FlowTriggerExecutionCleaner cleaner;
 
   @Inject
   public FlowTriggerService(final FlowTriggerDependencyPluginManager pluginManager,
       final TriggerInstanceProcessor triggerProcessor, final DependencyInstanceProcessor
-      dependencyProcessor, final FlowTriggerInstanceLoader flowTriggerInstanceLoader) {
+      dependencyProcessor, final FlowTriggerInstanceLoader flowTriggerInstanceLoader,
+      final FlowTriggerExecutionCleaner cleaner) {
     // Give the thread a name to make debugging easier.
     final ThreadFactory namedThreadFactory = new ThreadFactoryBuilder()
         .setNameFormat("FlowTrigger-service").build();
@@ -100,11 +102,13 @@ public class FlowTriggerService {
     this.triggerProcessor = triggerProcessor;
     this.dependencyProcessor = dependencyProcessor;
     this.flowTriggerInstanceLoader = flowTriggerInstanceLoader;
+    this.cleaner = cleaner;
   }
 
   public void start() throws FlowTriggerDependencyPluginException {
     this.triggerPluginManager.loadAllPlugins();
     this.recoverIncompleteTriggerInstances();
+    this.cleaner.start();
   }
 
   private DependencyInstanceContext createDepContext(final FlowTriggerDependency dep, final long
@@ -541,8 +545,10 @@ public class FlowTriggerService {
     this.singleThreadExecutorService.shutdownNow(); // Cancel currently executing tasks
     this.multiThreadsExecutorService.shutdown();
     this.multiThreadsExecutorService.shutdownNow();
+
     this.triggerProcessor.shutdown();
     this.triggerPluginManager.shutdown();
+    this.cleaner.shutdown();
   }
 
   public Collection<TriggerInstance> getTriggerInstances(final int projectId, final String flowId,
diff --git a/azkaban-web-server/src/main/java/azkaban/flowtrigger/plugin/FlowTriggerDependencyPluginManager.java b/azkaban-web-server/src/main/java/azkaban/flowtrigger/plugin/FlowTriggerDependencyPluginManager.java
index f51e1e9..af30d9f 100644
--- a/azkaban-web-server/src/main/java/azkaban/flowtrigger/plugin/FlowTriggerDependencyPluginManager.java
+++ b/azkaban-web-server/src/main/java/azkaban/flowtrigger/plugin/FlowTriggerDependencyPluginManager.java
@@ -259,6 +259,4 @@ public class FlowTriggerDependencyPluginManager {
       }
     }
   }
-
-
 }
diff --git a/azkaban-web-server/src/test/java/azkaban/flowtrigger/FlowTriggerInstanceLoaderTest.java b/azkaban-web-server/src/test/java/azkaban/flowtrigger/FlowTriggerInstanceLoaderTest.java
index 5b00576..03fbf00 100644
--- a/azkaban-web-server/src/test/java/azkaban/flowtrigger/FlowTriggerInstanceLoaderTest.java
+++ b/azkaban-web-server/src/test/java/azkaban/flowtrigger/FlowTriggerInstanceLoaderTest.java
@@ -303,7 +303,7 @@ public class FlowTriggerInstanceLoaderTest {
         finalizeTriggerInstanceWithCancelling(all.get(i));
       }
       //sleep for a while to ensure endtime is different for each trigger instance
-      Thread.sleep(1000);
+      Thread.sleep(100);
     }
 
     this.shuffleAndUpload(all);
@@ -393,6 +393,47 @@ public class FlowTriggerInstanceLoaderTest {
   }
 
   @Test
+  public void testDeleteOldTriggerInstances() throws InterruptedException {
+    final List<TriggerInstance> all = new ArrayList<>();
+    final long ts1 = System.currentTimeMillis();
+    long ts2 = -1;
+    long ts3 = -1;
+    for (int i = 0; i < 30; i++) {
+      all.add(this.createTriggerInstance(this.flowTrigger, this
+          .flow_id, this.flow_version, this.submitUser, this.project, System.currentTimeMillis()
+          + i * 10000));
+
+      if (i < 5) {
+        finalizeTriggerInstanceWithSuccess(all.get(i), -1);
+      } else if (i <= 15) {
+        finalizeTriggerInstanceWithCancelled(all.get(i));
+      } else if (i <= 25) {
+        finalizeTriggerInstanceWithCancelling(all.get(i));
+      } else if (i <= 27) {
+        finalizeTriggerInstanceWithSuccess(all.get(i), 1000);
+      }
+      //sleep for a while to ensure end time is different for each trigger instance
+      if (i == 3) {
+        ts2 = System.currentTimeMillis();
+      } else if (i == 12) {
+        ts3 = System.currentTimeMillis();
+      }
+      Thread.sleep(100);
+    }
+    this.shuffleAndUpload(all);
+
+    assertThat(this.triggerInstLoader.deleteTriggerExecutionsFinishingOlderThan(ts1))
+        .isEqualTo(0);
+
+    assertThat(this.triggerInstLoader.deleteTriggerExecutionsFinishingOlderThan(ts2))
+        .isEqualTo(0);
+
+    assertThat(this.triggerInstLoader.deleteTriggerExecutionsFinishingOlderThan(ts3))
+        .isEqualTo(16);
+
+  }
+
+  @Test
   public void testGetRecentlyFinished() throws InterruptedException {
 
     final List<TriggerInstance> all = new ArrayList<>();
@@ -408,7 +449,7 @@ public class FlowTriggerInstanceLoaderTest {
         finalizeTriggerInstanceWithCancelling(all.get(i));
       }
       //sleep for a while to ensure endtime is different for each trigger instance
-      Thread.sleep(1000);
+      Thread.sleep(100);
     }
 
     this.shuffleAndUpload(all);
diff --git a/azkaban-web-server/src/test/java/azkaban/flowtrigger/FlowTriggerServiceTest.java b/azkaban-web-server/src/test/java/azkaban/flowtrigger/FlowTriggerServiceTest.java
index 0f172ed..2b7d588 100644
--- a/azkaban-web-server/src/test/java/azkaban/flowtrigger/FlowTriggerServiceTest.java
+++ b/azkaban-web-server/src/test/java/azkaban/flowtrigger/FlowTriggerServiceTest.java
@@ -77,8 +77,11 @@ public class FlowTriggerServiceTest {
     final DependencyInstanceProcessor depInstProcessor = new DependencyInstanceProcessor
         (flowTriggerInstanceLoader);
 
-    flowTriggerService = new FlowTriggerService(pluginManager,
-        triggerInstProcessor, depInstProcessor, flowTriggerInstanceLoader);
+    final FlowTriggerExecutionCleaner executionCleaner = new FlowTriggerExecutionCleaner(
+        flowTriggerInstanceLoader);
+
+    flowTriggerService = new FlowTriggerService(pluginManager, triggerInstProcessor,
+        depInstProcessor, flowTriggerInstanceLoader, executionCleaner);
     flowTriggerService.start();
   }
 
diff --git a/azkaban-web-server/src/test/java/azkaban/flowtrigger/MockFlowTriggerInstanceLoader.java b/azkaban-web-server/src/test/java/azkaban/flowtrigger/MockFlowTriggerInstanceLoader.java
index 2c99af7..3a2aca7 100644
--- a/azkaban-web-server/src/test/java/azkaban/flowtrigger/MockFlowTriggerInstanceLoader.java
+++ b/azkaban-web-server/src/test/java/azkaban/flowtrigger/MockFlowTriggerInstanceLoader.java
@@ -20,6 +20,7 @@ import azkaban.flowtrigger.database.FlowTriggerInstanceLoader;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
+import java.util.Iterator;
 import java.util.List;
 
 public class MockFlowTriggerInstanceLoader implements FlowTriggerInstanceLoader {
@@ -121,4 +122,19 @@ public class MockFlowTriggerInstanceLoader implements FlowTriggerInstanceLoader 
       final int from, final int length) {
     throw new UnsupportedOperationException("Not Yet Implemented");
   }
+
+  @Override
+  public int deleteTriggerExecutionsFinishingOlderThan(final long timestamp) {
+    int deleted = 0;
+    for (final Iterator<TriggerInstance> iterator = this.triggerInstances.iterator();
+        iterator.hasNext(); ) {
+      final TriggerInstance inst = iterator.next();
+      if ((inst.getEndTime() <= timestamp) && ((inst.getStatus() == Status.CANCELLED) || ((inst
+          .getStatus() == Status.SUCCEEDED) && (inst.getFlowExecId() != -1)))) {
+        iterator.remove();
+        deleted++;
+      }
+    }
+    return deleted;
+  }
 }