/*
* Copyright 2014 Groupon, Inc
* Copyright 2014 The Billing Project, LLC
*
* The Billing Project licenses this file to you under the Apache License, version 2.0
* (the "License"); you may not use this file except in compliance with the
* License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package org.killbill.billing.payment.core;
import java.util.Collections;
import java.util.List;
import java.util.UUID;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import javax.inject.Inject;
import javax.inject.Named;
import org.joda.time.DateTime;
import org.killbill.billing.ObjectType;
import org.killbill.billing.account.api.Account;
import org.killbill.billing.account.api.AccountApiException;
import org.killbill.billing.account.api.AccountInternalApi;
import org.killbill.billing.callcontext.DefaultCallContext;
import org.killbill.billing.callcontext.InternalCallContext;
import org.killbill.billing.callcontext.InternalTenantContext;
import org.killbill.billing.payment.api.PaymentApiException;
import org.killbill.billing.payment.api.TransactionStatus;
import org.killbill.billing.payment.core.sm.PluginControlledPaymentAutomatonRunner;
import org.killbill.billing.payment.core.sm.RetryStateMachineHelper;
import org.killbill.billing.payment.core.sm.RetryablePaymentStateContext;
import org.killbill.billing.payment.dao.PaymentAttemptModelDao;
import org.killbill.billing.payment.dao.PaymentDao;
import org.killbill.billing.payment.dao.PaymentTransactionModelDao;
import org.killbill.billing.payment.dao.PluginPropertySerializer;
import org.killbill.billing.payment.dao.PluginPropertySerializer.PluginPropertySerializerException;
import org.killbill.billing.payment.glue.PaymentModule;
import org.killbill.billing.util.callcontext.CallContext;
import org.killbill.billing.util.callcontext.CallOrigin;
import org.killbill.billing.util.callcontext.InternalCallContextFactory;
import org.killbill.billing.util.callcontext.UserType;
import org.killbill.billing.util.config.PaymentConfig;
import org.killbill.billing.util.dao.NonEntityDao;
import org.killbill.clock.Clock;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.google.common.base.Predicate;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Iterables;
/**
* Takes care of incomplete payment/transactions.
*/
public class Janitor {
private final static Logger log = LoggerFactory.getLogger(Janitor.class);
private final static int TERMINATION_TIMEOUT_SEC = 5;
private final ScheduledExecutorService janitorExecutor;
private final AccountInternalApi accountInternalApi;
private final PaymentDao paymentDao;
private final Clock clock;
private final PaymentConfig paymentConfig;
private final InternalCallContextFactory internalCallContextFactory;
private final NonEntityDao nonEntityDao;
private final PluginControlledPaymentAutomatonRunner pluginControlledPaymentAutomatonRunner;
private final RetryStateMachineHelper retrySMHelper;
private volatile boolean isStopped;
private CountDownLatch shutdownLatch;
@Inject
public Janitor(final AccountInternalApi accountInternalApi,
final PaymentDao paymentDao,
final PaymentConfig paymentConfig,
final Clock clock,
final NonEntityDao nonEntityDao,
final InternalCallContextFactory internalCallContextFactory,
final PluginControlledPaymentAutomatonRunner pluginControlledPaymentAutomatonRunner,
@Named(PaymentModule.JANITOR_EXECUTOR_NAMED) final ScheduledExecutorService janitorExecutor,
final RetryStateMachineHelper retrySMHelper) {
this.accountInternalApi = accountInternalApi;
this.paymentDao = paymentDao;
this.clock = clock;
this.paymentConfig = paymentConfig;
this.janitorExecutor = janitorExecutor;
this.nonEntityDao = nonEntityDao;
this.internalCallContextFactory = internalCallContextFactory;
this.pluginControlledPaymentAutomatonRunner = pluginControlledPaymentAutomatonRunner;
this.retrySMHelper = retrySMHelper;
}
public void start() {
isStopped = false;
// Start task for removing old pending payments.
final TimeUnit pendingRateUnit = paymentConfig.getJanitorRunningRate().getUnit();
final long pendingPeriod = paymentConfig.getJanitorRunningRate().getPeriod();
janitorExecutor.scheduleAtFixedRate(new PendingTransactionTask(), pendingPeriod, pendingPeriod, pendingRateUnit);
// Start task for completing incomplete payment attempts
final TimeUnit attemptCompletionRateUnit = paymentConfig.getJanitorRunningRate().getUnit();
final long attemptCompletionPeriod = paymentConfig.getJanitorRunningRate().getPeriod();
janitorExecutor.scheduleAtFixedRate(new AttemptCompletionTask(), attemptCompletionPeriod, attemptCompletionPeriod, attemptCompletionRateUnit);
}
public void stop() {
isStopped = true;
try {
/* Previously submitted tasks will be executed with shutdown(); when task executes as a result of shutdown being called
* or because it was already in its execution loop, it will check for the volatile boolean isStopped flag and
* return immediately.
* Then, awaitTermination with a timeout is required to ensure tasks completed.
*/
janitorExecutor.shutdown();
boolean success = janitorExecutor.awaitTermination(TERMINATION_TIMEOUT_SEC, TimeUnit.SECONDS);
if (!success) {
log.warn("Janitor failed to complete termination within " + TERMINATION_TIMEOUT_SEC + "sec");
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
log.warn("Janitor stop sequence got interrupted");
}
}
protected abstract class CompletionTaskBase<T> implements Runnable {
private final String taskName;
protected final InternalCallContext fakeCallContext;
protected CompletionTaskBase() {
this.taskName = this.getClass().getName();
this.fakeCallContext = internalCallContextFactory.createInternalCallContext((Long) null, (Long) null, taskName, CallOrigin.INTERNAL, UserType.SYSTEM, UUID.randomUUID());
}
@Override
public void run() {
if (isStopped) {
log.info("Janitor Task " + taskName + " was requested to stop");
return;
}
final List<T> items = getItemsForIteration();
for (T item : items) {
if (isStopped) {
log.info("Janitor Task " + taskName + " was requested to stop");
return;
}
doIteration(item);
}
}
public abstract List<T> getItemsForIteration();
public abstract void doIteration(final T item);
protected DateTime getCreatedDateBefore() {
final long delayBeforeNowMs = paymentConfig.getJanitorPendingCleanupTime().getMillis();
return clock.getUTCNow().minusMillis((int) delayBeforeNowMs);
}
}
/**
* Task to find old PENDING transactions and move them into
*/
private final class PendingTransactionTask extends CompletionTaskBase<Integer> {
private final List<Integer> itemsForIterations;
private PendingTransactionTask() {
super();
this.itemsForIterations = ImmutableList.of(new Integer(1));
}
@Override
public List<Integer> getItemsForIteration() {
return itemsForIterations;
}
@Override
public void doIteration(final Integer item) {
int result = paymentDao.failOldPendingTransactions(TransactionStatus.PLUGIN_FAILURE, getCreatedDateBefore(), fakeCallContext);
if (result > 0) {
log.info("Janitor PendingTransactionTask moved " + result + " PENDING payments -> PLUGIN_FAILURE");
}
}
}
/**
* Task to complete 'partially' incomplete attempts
* <p/>
* If the state of the transaction associated with the attempt completed, but the attempt state machine did not,
* we rerun the retry state machine to complete the call and transition the attempt into a terminal state.
*/
private final class AttemptCompletionTask extends CompletionTaskBase<PaymentAttemptModelDao> {
private AttemptCompletionTask() {
super();
}
@Override
public List<PaymentAttemptModelDao> getItemsForIteration() {
final List<PaymentAttemptModelDao> incompleteAttempts = paymentDao.getPaymentAttemptsByState(retrySMHelper.getInitialState().getName(), getCreatedDateBefore(), fakeCallContext);
log.info("Janitor AttemptCompletionTask start run : found " + incompleteAttempts.size() + " incomplete attempts");
return incompleteAttempts;
}
@Override
public void doIteration(final PaymentAttemptModelDao attempt) {
// STEPH seems a bit insane??
final InternalTenantContext tenantContext = internalCallContextFactory.createInternalTenantContext(attempt.getAccountId(), attempt.getId(), ObjectType.PAYMENT_ATTEMPT);
final UUID tenantId = nonEntityDao.retrieveIdFromObject(tenantContext.getTenantRecordId(), ObjectType.TENANT);
final CallContext callContext = new DefaultCallContext(tenantId, "AttemptCompletionJanitorTask", CallOrigin.INTERNAL, UserType.SYSTEM, UUID.randomUUID(), clock);
final InternalCallContext internalCallContext = internalCallContextFactory.createInternalCallContext(attempt.getAccountId(), callContext);
final List<PaymentTransactionModelDao> transactions = paymentDao.getPaymentTransactionsByExternalKey(attempt.getTransactionExternalKey(), tenantContext);
final PaymentTransactionModelDao transaction = Iterables.tryFind(transactions, new Predicate<PaymentTransactionModelDao>() {
@Override
public boolean apply(final PaymentTransactionModelDao input) {
return input.getAttemptId().equals(attempt.getId()) &&
input.getTransactionStatus() == TransactionStatus.SUCCESS;
}
}).orNull();
if (transaction == null) {
log.info("Janitor AttemptCompletionTask moving attempt " + attempt.getId() + " -> ABORTED");
paymentDao.updatePaymentAttempt(attempt.getId(), attempt.getTransactionId(), "ABORTED", internalCallContext);
return;
}
try {
log.info("Janitor AttemptCompletionTask completing attempt " + attempt.getId() + " -> SUCCESS");
final Account account = accountInternalApi.getAccountById(attempt.getAccountId(), tenantContext);
final boolean isApiPayment = true; // unclear
final RetryablePaymentStateContext paymentStateContext = new RetryablePaymentStateContext(attempt.getPluginName(),
isApiPayment,
transaction.getPaymentId(),
attempt.getPaymentExternalKey(),
transaction.getTransactionExternalKey(),
transaction.getTransactionType(),
account,
attempt.getPaymentMethodId(),
transaction.getAmount(),
transaction.getCurrency(),
PluginPropertySerializer.deserialize(attempt.getPluginProperties()),
internalCallContext,
callContext);
paymentStateContext.setAttemptId(attempt.getId()); // Normally set by leavingState Callback
paymentStateContext.setPaymentTransactionModelDao(transaction); // Normally set by raw state machine
//
// Will rerun the state machine with special callbacks to only make the onCompletion call
// to the PaymentControlPluginApi plugin and transition the state.
//
pluginControlledPaymentAutomatonRunner.completeRun(paymentStateContext);
} catch (AccountApiException e) {
log.warn("Janitor AttemptCompletionTask failed to complete payment attempt " + attempt.getId(), e);
} catch (PluginPropertySerializerException e) {
log.warn("Janitor AttemptCompletionTask failed to complete payment attempt " + attempt.getId(), e);
} catch (PaymentApiException e) {
log.warn("Janitor AttemptCompletionTask failed to complete payment attempt " + attempt.getId(), e);
}
}
}
}