diff --git a/payment/src/main/java/org/killbill/billing/payment/core/Janitor.java b/payment/src/main/java/org/killbill/billing/payment/core/Janitor.java
index aa9a97c..f8b3748 100644
--- a/payment/src/main/java/org/killbill/billing/payment/core/Janitor.java
+++ b/payment/src/main/java/org/killbill/billing/payment/core/Janitor.java
@@ -17,6 +17,7 @@
package org.killbill.billing.payment.core;
+import java.util.Collections;
import java.util.List;
import java.util.UUID;
import java.util.concurrent.CountDownLatch;
@@ -56,6 +57,7 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.google.common.base.Predicate;
+import com.google.common.collect.ImmutableList;
import com.google.common.collect.Iterables;
/**
@@ -65,6 +67,8 @@ public class Janitor {
private final static Logger log = LoggerFactory.getLogger(Janitor.class);
+ private final static int TERMINATION_TIMEOUT_SEC = 5;
+
private final ScheduledExecutorService janitorExecutor;
private final AccountInternalApi accountInternalApi;
private final PaymentDao paymentDao;
@@ -110,15 +114,20 @@ public class Janitor {
final TimeUnit attemptCompletionRateUnit = paymentConfig.getJanitorRunningRate().getUnit();
final long attemptCompletionPeriod = paymentConfig.getJanitorRunningRate().getPeriod();
janitorExecutor.scheduleAtFixedRate(new AttemptCompletionTask(), attemptCompletionPeriod, attemptCompletionPeriod, attemptCompletionRateUnit);
- this.shutdownLatch = new CountDownLatch(2);
}
public void stop() {
isStopped = true;
try {
- boolean res = shutdownLatch.await(5, TimeUnit.SECONDS);
- if (!res) {
- log.warn("Janitor stop sequence timed out : did not complete in 5 sec");
+ /* Previously submitted tasks will be executed with shutdown(); when task executes as a result of shutdown being called
+ * or because it was already in its execution loop, it will check for the volatile boolean isStopped flag and
+ * return immediately.
+ * Then, awaitTermination with a timeout is required to ensure tasks completed.
+ */
+ janitorExecutor.shutdown();
+ boolean success = janitorExecutor.awaitTermination(TERMINATION_TIMEOUT_SEC, TimeUnit.SECONDS);
+ if (!success) {
+ log.warn("Janitor failed to complete termination within " + TERMINATION_TIMEOUT_SEC + "sec");
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
@@ -126,79 +135,90 @@ public class Janitor {
}
}
- /**
- * Task to find old PENDING transactions and move them into
- */
- private final class PendingTransactionTask implements Runnable {
+ protected abstract class CompletionTaskBase<T> implements Runnable {
- private final InternalCallContext fakeCallContext;
+ private final String taskName;
+ protected final InternalCallContext fakeCallContext;
- private PendingTransactionTask() {
- this.fakeCallContext = internalCallContextFactory.createInternalCallContext((Long) null, (Long) null, "PendingJanitorTask", CallOrigin.INTERNAL, UserType.SYSTEM, UUID.randomUUID());
+ protected CompletionTaskBase() {
+ this.taskName = this.getClass().getName();
+ this.fakeCallContext = internalCallContextFactory.createInternalCallContext((Long) null, (Long) null, taskName, CallOrigin.INTERNAL, UserType.SYSTEM, UUID.randomUUID());
}
@Override
public void run() {
if (isStopped) {
- shutdownLatch.countDown();
+ log.info("Janitor Task " + taskName + " was requested to stop");
return;
}
- log.info("PendingTransactionTask start run ");
-
- int result = paymentDao.failOldPendingTransactions(TransactionStatus.PLUGIN_FAILURE, getCreatedDateBefore(), fakeCallContext);
- if (result > 0) {
- log.info("PendingTransactionTask moved " + result + " PENDING payments -> PLUGIN_FAILURE");
+ final List<T> items = getItemsForIteration();
+ for (T item : items) {
+ if (isStopped) {
+ log.info("Janitor Task " + taskName + " was requested to stop");
+ return;
+ }
+ doIteration(item);
}
}
- private DateTime getCreatedDateBefore() {
+ public abstract List<T> getItemsForIteration();
+
+ public abstract void doIteration(final T item);
+
+ protected DateTime getCreatedDateBefore() {
final long delayBeforeNowMs = paymentConfig.getJanitorPendingCleanupTime().getMillis();
return clock.getUTCNow().minusMillis((int) delayBeforeNowMs);
}
}
/**
- * Task to complete 'partially' incomplete attempts
- * <p/>
- * If the state of the transaction associated with the attempt completed, but the attempt state machine did not,
- * we rerun the retry state machine to complete the call and transition the attempt into a terminal state.
+ * Task to find old PENDING transactions and move them into
*/
- private final class AttemptCompletionTask implements Runnable {
+ private final class PendingTransactionTask extends CompletionTaskBase<Integer> {
- private final InternalCallContext fakeCallContext;
+ private final List<Integer> itemsForIterations;
- private AttemptCompletionTask() {
- this.fakeCallContext = internalCallContextFactory.createInternalCallContext((Long) null, (Long) null, "PendingJanitorTask", CallOrigin.INTERNAL, UserType.SYSTEM, UUID.randomUUID());
+ private PendingTransactionTask() {
+ super();
+ this.itemsForIterations = ImmutableList.of(new Integer(1));
}
@Override
- public void run() {
+ public List<Integer> getItemsForIteration() {
+ return itemsForIterations;
+ }
- if (isStopped) {
- shutdownLatch.countDown();
- return;
+ @Override
+ public void doIteration(final Integer item) {
+ int result = paymentDao.failOldPendingTransactions(TransactionStatus.PLUGIN_FAILURE, getCreatedDateBefore(), fakeCallContext);
+ if (result > 0) {
+ log.info("Janitor PendingTransactionTask moved " + result + " PENDING payments -> PLUGIN_FAILURE");
}
+ }
+ }
- final List<PaymentAttemptModelDao> incompleteAttempts = paymentDao.getPaymentAttemptsByState(retrySMHelper.getInitialState().getName(), getCreatedDateBefore(), fakeCallContext);
- log.info("AttemptCompletionTask start run : found " + incompleteAttempts.size() + " incomplete attempts");
+ /**
+ * Task to complete 'partially' incomplete attempts
+ * <p/>
+ * If the state of the transaction associated with the attempt completed, but the attempt state machine did not,
+ * we rerun the retry state machine to complete the call and transition the attempt into a terminal state.
+ */
+ private final class AttemptCompletionTask extends CompletionTaskBase<PaymentAttemptModelDao> {
- for (PaymentAttemptModelDao cur : incompleteAttempts) {
- if (isStopped) {
- shutdownLatch.countDown();
- return;
- }
- complete(cur);
- }
+ private AttemptCompletionTask() {
+ super();
}
- private DateTime getCreatedDateBefore() {
- final long delayBeforeNowMs = paymentConfig.getJanitorAttemptCompletionTime().getMillis();
- return clock.getUTCNow().minusMillis((int) delayBeforeNowMs);
+ @Override
+ public List<PaymentAttemptModelDao> getItemsForIteration() {
+ final List<PaymentAttemptModelDao> incompleteAttempts = paymentDao.getPaymentAttemptsByState(retrySMHelper.getInitialState().getName(), getCreatedDateBefore(), fakeCallContext);
+ log.info("Janitor AttemptCompletionTask start run : found " + incompleteAttempts.size() + " incomplete attempts");
+ return incompleteAttempts;
}
- private void complete(final PaymentAttemptModelDao attempt) {
-
+ @Override
+ public void doIteration(final PaymentAttemptModelDao attempt) {
// STEPH seems a bit insane??
final InternalTenantContext tenantContext = internalCallContextFactory.createInternalTenantContext(attempt.getAccountId(), attempt.getId(), ObjectType.PAYMENT_ATTEMPT);
final UUID tenantId = nonEntityDao.retrieveIdFromObject(tenantContext.getTenantRecordId(), ObjectType.TENANT);
@@ -215,30 +235,29 @@ public class Janitor {
}).orNull();
if (transaction == null) {
- log.info("AttemptCompletionTask moving attempt " + attempt.getId() + " -> ABORTED");
+ log.info("Janitor AttemptCompletionTask moving attempt " + attempt.getId() + " -> ABORTED");
paymentDao.updatePaymentAttempt(attempt.getId(), attempt.getTransactionId(), "ABORTED", internalCallContext);
return;
}
try {
-
- log.info("AttemptCompletionTask completing attempt " + attempt.getId() + " -> SUCCESS");
+ log.info("Janitor AttemptCompletionTask completing attempt " + attempt.getId() + " -> SUCCESS");
final Account account = accountInternalApi.getAccountById(attempt.getAccountId(), tenantContext);
final boolean isApiPayment = true; // unclear
final RetryablePaymentStateContext paymentStateContext = new RetryablePaymentStateContext(attempt.getPluginName(),
- isApiPayment,
- transaction.getPaymentId(),
- attempt.getPaymentExternalKey(),
- transaction.getTransactionExternalKey(),
- transaction.getTransactionType(),
- account,
- attempt.getPaymentMethodId(),
- transaction.getAmount(),
- transaction.getCurrency(),
- PluginPropertySerializer.deserialize(attempt.getPluginProperties()),
- internalCallContext,
- callContext);
+ isApiPayment,
+ transaction.getPaymentId(),
+ attempt.getPaymentExternalKey(),
+ transaction.getTransactionExternalKey(),
+ transaction.getTransactionType(),
+ account,
+ attempt.getPaymentMethodId(),
+ transaction.getAmount(),
+ transaction.getCurrency(),
+ PluginPropertySerializer.deserialize(attempt.getPluginProperties()),
+ internalCallContext,
+ callContext);
paymentStateContext.setAttemptId(attempt.getId()); // Normally set by leavingState Callback
paymentStateContext.setPaymentTransactionModelDao(transaction); // Normally set by raw state machine
@@ -247,13 +266,12 @@ public class Janitor {
// to the PaymentControlPluginApi plugin and transition the state.
//
pluginControlledPaymentAutomatonRunner.completeRun(paymentStateContext);
-
} catch (AccountApiException e) {
- log.warn("Failed to complete payment attempt " + attempt.getId(), e);
+ log.warn("Janitor AttemptCompletionTask failed to complete payment attempt " + attempt.getId(), e);
} catch (PluginPropertySerializerException e) {
- log.warn("Failed to complete payment attempt " + attempt.getId(), e);
+ log.warn("Janitor AttemptCompletionTask failed to complete payment attempt " + attempt.getId(), e);
} catch (PaymentApiException e) {
- log.warn("Failed to complete payment attempt " + attempt.getId(), e);
+ log.warn("Janitor AttemptCompletionTask failed to complete payment attempt " + attempt.getId(), e);
}
}
}