killbill-memoizeit

Details

diff --git a/payment/src/main/java/org/killbill/billing/payment/core/Janitor.java b/payment/src/main/java/org/killbill/billing/payment/core/Janitor.java
index aa9a97c..f8b3748 100644
--- a/payment/src/main/java/org/killbill/billing/payment/core/Janitor.java
+++ b/payment/src/main/java/org/killbill/billing/payment/core/Janitor.java
@@ -17,6 +17,7 @@
 
 package org.killbill.billing.payment.core;
 
+import java.util.Collections;
 import java.util.List;
 import java.util.UUID;
 import java.util.concurrent.CountDownLatch;
@@ -56,6 +57,7 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import com.google.common.base.Predicate;
+import com.google.common.collect.ImmutableList;
 import com.google.common.collect.Iterables;
 
 /**
@@ -65,6 +67,8 @@ public class Janitor {
 
     private final static Logger log = LoggerFactory.getLogger(Janitor.class);
 
+    private final static int TERMINATION_TIMEOUT_SEC = 5;
+
     private final ScheduledExecutorService janitorExecutor;
     private final AccountInternalApi accountInternalApi;
     private final PaymentDao paymentDao;
@@ -110,15 +114,20 @@ public class Janitor {
         final TimeUnit attemptCompletionRateUnit = paymentConfig.getJanitorRunningRate().getUnit();
         final long attemptCompletionPeriod = paymentConfig.getJanitorRunningRate().getPeriod();
         janitorExecutor.scheduleAtFixedRate(new AttemptCompletionTask(), attemptCompletionPeriod, attemptCompletionPeriod, attemptCompletionRateUnit);
-        this.shutdownLatch = new CountDownLatch(2);
     }
 
     public void stop() {
         isStopped = true;
         try {
-            boolean res = shutdownLatch.await(5, TimeUnit.SECONDS);
-            if (!res) {
-                log.warn("Janitor stop sequence timed out : did not complete in 5 sec");
+            /* Previously submitted tasks will be executed with shutdown(); when task executes as a result of shutdown being called
+             * or because it was already in its execution loop, it will check for the volatile boolean isStopped flag and
+             * return immediately.
+             * Then, awaitTermination with a timeout is required to ensure tasks completed.
+             */
+            janitorExecutor.shutdown();
+            boolean success = janitorExecutor.awaitTermination(TERMINATION_TIMEOUT_SEC, TimeUnit.SECONDS);
+            if (!success) {
+                log.warn("Janitor failed to complete termination within " + TERMINATION_TIMEOUT_SEC + "sec");
             }
         } catch (InterruptedException e) {
             Thread.currentThread().interrupt();
@@ -126,79 +135,90 @@ public class Janitor {
         }
     }
 
-    /**
-     * Task to find old PENDING transactions and move them into
-     */
-    private final class PendingTransactionTask implements Runnable {
+    protected abstract class CompletionTaskBase<T> implements Runnable {
 
-        private final InternalCallContext fakeCallContext;
+        private final String taskName;
+        protected final InternalCallContext fakeCallContext;
 
-        private PendingTransactionTask() {
-            this.fakeCallContext = internalCallContextFactory.createInternalCallContext((Long) null, (Long) null, "PendingJanitorTask", CallOrigin.INTERNAL, UserType.SYSTEM, UUID.randomUUID());
+        protected CompletionTaskBase() {
+            this.taskName = this.getClass().getName();
+            this.fakeCallContext = internalCallContextFactory.createInternalCallContext((Long) null, (Long) null, taskName, CallOrigin.INTERNAL, UserType.SYSTEM, UUID.randomUUID());
         }
 
         @Override
         public void run() {
 
             if (isStopped) {
-                shutdownLatch.countDown();
+                log.info("Janitor Task " + taskName + " was requested to stop");
                 return;
             }
-            log.info("PendingTransactionTask start run ");
-
-            int result = paymentDao.failOldPendingTransactions(TransactionStatus.PLUGIN_FAILURE, getCreatedDateBefore(), fakeCallContext);
-            if (result > 0) {
-                log.info("PendingTransactionTask moved " + result + " PENDING payments ->  PLUGIN_FAILURE");
+            final List<T> items = getItemsForIteration();
+            for (T item : items) {
+                if (isStopped) {
+                    log.info("Janitor Task " + taskName + " was requested to stop");
+                    return;
+                }
+                doIteration(item);
             }
         }
 
-        private DateTime getCreatedDateBefore() {
+        public abstract List<T> getItemsForIteration();
+
+        public abstract void doIteration(final T item);
+
+        protected DateTime getCreatedDateBefore() {
             final long delayBeforeNowMs = paymentConfig.getJanitorPendingCleanupTime().getMillis();
             return clock.getUTCNow().minusMillis((int) delayBeforeNowMs);
         }
     }
 
     /**
-     * Task to complete 'partially' incomplete attempts
-     * <p/>
-     * If the state of the transaction associated with the attempt completed, but the attempt state machine did not,
-     * we rerun the retry state machine to complete the call and transition the attempt into a terminal state.
+     * Task to find old PENDING transactions and move them into
      */
-    private final class AttemptCompletionTask implements Runnable {
+    private final class PendingTransactionTask extends CompletionTaskBase<Integer> {
 
-        private final InternalCallContext fakeCallContext;
+        private final List<Integer> itemsForIterations;
 
-        private AttemptCompletionTask() {
-            this.fakeCallContext = internalCallContextFactory.createInternalCallContext((Long) null, (Long) null, "PendingJanitorTask", CallOrigin.INTERNAL, UserType.SYSTEM, UUID.randomUUID());
+        private PendingTransactionTask() {
+            super();
+            this.itemsForIterations = ImmutableList.of(new Integer(1));
         }
 
         @Override
-        public void run() {
+        public List<Integer> getItemsForIteration() {
+            return itemsForIterations;
+        }
 
-            if (isStopped) {
-                shutdownLatch.countDown();
-                return;
+        @Override
+        public void doIteration(final Integer item) {
+            int result = paymentDao.failOldPendingTransactions(TransactionStatus.PLUGIN_FAILURE, getCreatedDateBefore(), fakeCallContext);
+            if (result > 0) {
+                log.info("Janitor PendingTransactionTask moved " + result + " PENDING payments ->  PLUGIN_FAILURE");
             }
+        }
+    }
 
-            final List<PaymentAttemptModelDao> incompleteAttempts = paymentDao.getPaymentAttemptsByState(retrySMHelper.getInitialState().getName(), getCreatedDateBefore(), fakeCallContext);
-            log.info("AttemptCompletionTask start run : found " + incompleteAttempts.size() + " incomplete attempts");
+    /**
+     * Task to complete 'partially' incomplete attempts
+     * <p/>
+     * If the state of the transaction associated with the attempt completed, but the attempt state machine did not,
+     * we rerun the retry state machine to complete the call and transition the attempt into a terminal state.
+     */
+    private final class AttemptCompletionTask extends CompletionTaskBase<PaymentAttemptModelDao> {
 
-            for (PaymentAttemptModelDao cur : incompleteAttempts) {
-                if (isStopped) {
-                    shutdownLatch.countDown();
-                    return;
-                }
-                complete(cur);
-            }
+        private AttemptCompletionTask() {
+            super();
         }
 
-        private DateTime getCreatedDateBefore() {
-            final long delayBeforeNowMs = paymentConfig.getJanitorAttemptCompletionTime().getMillis();
-            return clock.getUTCNow().minusMillis((int) delayBeforeNowMs);
+        @Override
+        public List<PaymentAttemptModelDao> getItemsForIteration() {
+            final List<PaymentAttemptModelDao> incompleteAttempts = paymentDao.getPaymentAttemptsByState(retrySMHelper.getInitialState().getName(), getCreatedDateBefore(), fakeCallContext);
+            log.info("Janitor AttemptCompletionTask start run : found " + incompleteAttempts.size() + " incomplete attempts");
+            return incompleteAttempts;
         }
 
-        private void complete(final PaymentAttemptModelDao attempt) {
-
+        @Override
+        public void doIteration(final PaymentAttemptModelDao attempt) {
             // STEPH seems a bit insane??
             final InternalTenantContext tenantContext = internalCallContextFactory.createInternalTenantContext(attempt.getAccountId(), attempt.getId(), ObjectType.PAYMENT_ATTEMPT);
             final UUID tenantId = nonEntityDao.retrieveIdFromObject(tenantContext.getTenantRecordId(), ObjectType.TENANT);
@@ -215,30 +235,29 @@ public class Janitor {
             }).orNull();
 
             if (transaction == null) {
-                log.info("AttemptCompletionTask moving attempt " + attempt.getId() + " -> ABORTED");
+                log.info("Janitor AttemptCompletionTask moving attempt " + attempt.getId() + " -> ABORTED");
                 paymentDao.updatePaymentAttempt(attempt.getId(), attempt.getTransactionId(), "ABORTED", internalCallContext);
                 return;
             }
 
             try {
-
-                log.info("AttemptCompletionTask completing attempt " + attempt.getId() + " -> SUCCESS");
+                log.info("Janitor AttemptCompletionTask completing attempt " + attempt.getId() + " -> SUCCESS");
 
                 final Account account = accountInternalApi.getAccountById(attempt.getAccountId(), tenantContext);
                 final boolean isApiPayment = true; // unclear
                 final RetryablePaymentStateContext paymentStateContext = new RetryablePaymentStateContext(attempt.getPluginName(),
-                                                                                                                      isApiPayment,
-                                                                                                                      transaction.getPaymentId(),
-                                                                                                                      attempt.getPaymentExternalKey(),
-                                                                                                                      transaction.getTransactionExternalKey(),
-                                                                                                                      transaction.getTransactionType(),
-                                                                                                                      account,
-                                                                                                                      attempt.getPaymentMethodId(),
-                                                                                                                      transaction.getAmount(),
-                                                                                                                      transaction.getCurrency(),
-                                                                                                                      PluginPropertySerializer.deserialize(attempt.getPluginProperties()),
-                                                                                                                      internalCallContext,
-                                                                                                                      callContext);
+                                                                                                          isApiPayment,
+                                                                                                          transaction.getPaymentId(),
+                                                                                                          attempt.getPaymentExternalKey(),
+                                                                                                          transaction.getTransactionExternalKey(),
+                                                                                                          transaction.getTransactionType(),
+                                                                                                          account,
+                                                                                                          attempt.getPaymentMethodId(),
+                                                                                                          transaction.getAmount(),
+                                                                                                          transaction.getCurrency(),
+                                                                                                          PluginPropertySerializer.deserialize(attempt.getPluginProperties()),
+                                                                                                          internalCallContext,
+                                                                                                          callContext);
 
                 paymentStateContext.setAttemptId(attempt.getId()); // Normally set by leavingState Callback
                 paymentStateContext.setPaymentTransactionModelDao(transaction); // Normally set by raw state machine
@@ -247,13 +266,12 @@ public class Janitor {
                 // to the PaymentControlPluginApi plugin and transition the state.
                 //
                 pluginControlledPaymentAutomatonRunner.completeRun(paymentStateContext);
-
             } catch (AccountApiException e) {
-                log.warn("Failed to complete payment attempt " + attempt.getId(), e);
+                log.warn("Janitor AttemptCompletionTask failed to complete payment attempt " + attempt.getId(), e);
             } catch (PluginPropertySerializerException e) {
-                log.warn("Failed to complete payment attempt " + attempt.getId(), e);
+                log.warn("Janitor AttemptCompletionTask failed to complete payment attempt " + attempt.getId(), e);
             } catch (PaymentApiException e) {
-                log.warn("Failed to complete payment attempt " + attempt.getId(), e);
+                log.warn("Janitor AttemptCompletionTask failed to complete payment attempt " + attempt.getId(), e);
             }
         }
     }
diff --git a/payment/src/test/java/org/killbill/billing/payment/TestJanitor.java b/payment/src/test/java/org/killbill/billing/payment/TestJanitor.java
index 39b5d50..422ccfd 100644
--- a/payment/src/test/java/org/killbill/billing/payment/TestJanitor.java
+++ b/payment/src/test/java/org/killbill/billing/payment/TestJanitor.java
@@ -39,15 +39,20 @@ import org.killbill.billing.payment.api.TransactionType;
 import org.killbill.billing.payment.control.InvoicePaymentControlPluginApi;
 import org.killbill.billing.payment.core.Janitor;
 import org.killbill.billing.payment.dao.PaymentAttemptModelDao;
+import org.killbill.billing.payment.glue.TestPaymentModuleWithEmbeddedDB;
 import org.killbill.billing.payment.provider.MockPaymentProviderPlugin;
 import org.killbill.billing.platform.api.KillbillConfigSource;
 import org.killbill.bus.api.PersistentBus.EventBusException;
+import org.testng.annotations.AfterClass;
 import org.testng.annotations.AfterMethod;
+import org.testng.annotations.BeforeClass;
 import org.testng.annotations.BeforeMethod;
 import org.testng.annotations.Test;
 
 import com.google.common.collect.ImmutableMap;
+import com.google.inject.Guice;
 import com.google.inject.Inject;
+import com.google.inject.Injector;
 
 import static org.testng.Assert.assertEquals;
 
@@ -80,17 +85,27 @@ public class TestJanitor extends PaymentTestSuiteWithEmbeddedDB {
                               );
     }
 
+    @BeforeClass(groups = "slow")
+    protected void beforeClass() throws Exception {
+        super.beforeClass();
+        janitor.start();
+    }
+
+    @AfterClass(groups = "slow")
+    protected void afterClass() throws Exception {
+        janitor.stop();
+    }
+
+
     @BeforeMethod(groups = "slow")
     public void beforeMethod() throws Exception {
         super.beforeMethod();
-        janitor.start();
         account = testHelper.createTestAccount("bobo@gmail.com", true);
     }
 
     @AfterMethod(groups = "slow")
     public void afterMethod() throws Exception {
         super.afterMethod();
-        janitor.stop();
     }
 
     @Test(groups = "slow")