killbill-aplcache
Changes
payment/src/main/java/org/killbill/billing/payment/api/DefaultPaymentPluginErrorEvent.java 24(+24 -0)
payment/src/main/java/org/killbill/billing/payment/core/janitor/IncompletePaymentAttemptTask.java 8(+8 -0)
payment/src/main/java/org/killbill/billing/payment/core/janitor/IncompletePaymentTransactionTask.java 80(+64 -16)
payment/src/main/java/org/killbill/billing/payment/core/janitor/JanitorNotificationKey.java 48(+48 -0)
payment/src/main/java/org/killbill/billing/payment/core/sm/payments/PaymentEnteringStateCallback.java 2(+2 -0)
Details
diff --git a/api/src/main/java/org/killbill/billing/events/PaymentErrorInternalEvent.java b/api/src/main/java/org/killbill/billing/events/PaymentErrorInternalEvent.java
index 3b5889f..9ef4716 100644
--- a/api/src/main/java/org/killbill/billing/events/PaymentErrorInternalEvent.java
+++ b/api/src/main/java/org/killbill/billing/events/PaymentErrorInternalEvent.java
@@ -17,9 +17,10 @@ package org.killbill.billing.events;
import java.util.UUID;
+import org.killbill.billing.payment.api.TransactionStatus;
import org.killbill.billing.payment.api.TransactionType;
-public interface PaymentErrorInternalEvent extends BusInternalEvent {
+public interface PaymentErrorInternalEvent extends PaymentInternalEvent {
public String getMessage();
@@ -27,5 +28,9 @@ public interface PaymentErrorInternalEvent extends BusInternalEvent {
public UUID getPaymentId();
+ public UUID getPaymentTransactionId();
+
+ public TransactionStatus getStatus();
+
public TransactionType getTransactionType();
}
diff --git a/api/src/main/java/org/killbill/billing/events/PaymentInfoInternalEvent.java b/api/src/main/java/org/killbill/billing/events/PaymentInfoInternalEvent.java
index f1265b9..4669d1d 100644
--- a/api/src/main/java/org/killbill/billing/events/PaymentInfoInternalEvent.java
+++ b/api/src/main/java/org/killbill/billing/events/PaymentInfoInternalEvent.java
@@ -24,11 +24,7 @@ import org.killbill.billing.catalog.api.Currency;
import org.killbill.billing.payment.api.TransactionStatus;
import org.killbill.billing.payment.api.TransactionType;
-public interface PaymentInfoInternalEvent extends BusInternalEvent {
-
- public UUID getPaymentId();
-
- public UUID getAccountId();
+public interface PaymentInfoInternalEvent extends PaymentInternalEvent {
public BigDecimal getAmount();
@@ -36,7 +32,4 @@ public interface PaymentInfoInternalEvent extends BusInternalEvent {
public DateTime getEffectiveDate();
- public TransactionStatus getStatus();
-
- public TransactionType getTransactionType();
}
diff --git a/api/src/main/java/org/killbill/billing/events/PaymentInternalEvent.java b/api/src/main/java/org/killbill/billing/events/PaymentInternalEvent.java
new file mode 100644
index 0000000..6a442de
--- /dev/null
+++ b/api/src/main/java/org/killbill/billing/events/PaymentInternalEvent.java
@@ -0,0 +1,37 @@
+/*
+ * Copyright 2014-2015 Groupon, Inc
+ * Copyright 2014-2015 The Billing Project, LLC
+ *
+ * The Billing Project licenses this file to you under the Apache License, version 2.0
+ * (the "License"); you may not use this file except in compliance with the
+ * License. You may obtain a copy of the License at:
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.killbill.billing.events;
+
+import java.util.UUID;
+
+import org.killbill.billing.payment.api.TransactionStatus;
+import org.killbill.billing.payment.api.TransactionType;
+
+public interface PaymentInternalEvent extends BusInternalEvent {
+
+ public UUID getAccountId();
+
+ public UUID getPaymentId();
+
+ public UUID getPaymentTransactionId();
+
+ public TransactionType getTransactionType();
+
+ public TransactionStatus getStatus();
+
+}
diff --git a/api/src/main/java/org/killbill/billing/events/PaymentPluginErrorInternalEvent.java b/api/src/main/java/org/killbill/billing/events/PaymentPluginErrorInternalEvent.java
index a0198cc..be1a80b 100644
--- a/api/src/main/java/org/killbill/billing/events/PaymentPluginErrorInternalEvent.java
+++ b/api/src/main/java/org/killbill/billing/events/PaymentPluginErrorInternalEvent.java
@@ -13,19 +13,9 @@
* License for the specific language governing permissions and limitations
* under the License.
*/
-package org.killbill.billing.events;
-
-import java.util.UUID;
-import org.killbill.billing.payment.api.TransactionType;
-
-public interface PaymentPluginErrorInternalEvent extends BusInternalEvent {
+package org.killbill.billing.events;
+public interface PaymentPluginErrorInternalEvent extends PaymentInternalEvent {
public String getMessage();
-
- public UUID getAccountId();
-
- public UUID getPaymentId();
-
- public TransactionType getTransactionType();
}
diff --git a/payment/src/main/java/org/killbill/billing/payment/api/DefaultPaymentErrorEvent.java b/payment/src/main/java/org/killbill/billing/payment/api/DefaultPaymentErrorEvent.java
index d81ebad..030eb96 100644
--- a/payment/src/main/java/org/killbill/billing/payment/api/DefaultPaymentErrorEvent.java
+++ b/payment/src/main/java/org/killbill/billing/payment/api/DefaultPaymentErrorEvent.java
@@ -30,11 +30,15 @@ public class DefaultPaymentErrorEvent extends BusEventBase implements PaymentErr
private final String message;
private final UUID accountId;
private final UUID paymentId;
+ private final UUID paymentTransactionId;
+ private final TransactionStatus status;
private final TransactionType transactionType;
@JsonCreator
public DefaultPaymentErrorEvent(@JsonProperty("accountId") final UUID accountId,
@JsonProperty("paymentId") final UUID paymentId,
+ @JsonProperty("paymentTransactionId") final UUID paymentTransactionId,
+ @JsonProperty("status") final TransactionStatus status,
@JsonProperty("transactionType") final TransactionType transactionType,
@JsonProperty("message") final String message,
@JsonProperty("searchKey1") final Long searchKey1,
@@ -44,6 +48,8 @@ public class DefaultPaymentErrorEvent extends BusEventBase implements PaymentErr
this.message = message;
this.accountId = accountId;
this.paymentId = paymentId;
+ this.paymentTransactionId = paymentTransactionId;
+ this.status = status;
this.transactionType = transactionType;
}
@@ -67,6 +73,16 @@ public class DefaultPaymentErrorEvent extends BusEventBase implements PaymentErr
return transactionType;
}
+ @Override
+ public UUID getPaymentTransactionId() {
+ return paymentTransactionId;
+ }
+
+ @Override
+ public TransactionStatus getStatus() {
+ return status;
+ }
+
@JsonIgnore
@Override
public BusInternalEventType getBusEventType() {
@@ -79,6 +95,8 @@ public class DefaultPaymentErrorEvent extends BusEventBase implements PaymentErr
sb.append("message='").append(message).append('\'');
sb.append(", accountId=").append(accountId);
sb.append(", paymentId=").append(paymentId);
+ sb.append(", paymentTransactionId=").append(paymentTransactionId);
+ sb.append(", status=").append(status);
sb.append(", transactionType=").append(transactionType);
sb.append('}');
return sb.toString();
@@ -107,7 +125,12 @@ public class DefaultPaymentErrorEvent extends BusEventBase implements PaymentErr
if (paymentId != null ? !paymentId.equals(that.paymentId) : that.paymentId != null) {
return false;
}
-
+ if (paymentTransactionId != null ? !paymentTransactionId.equals(that.paymentTransactionId) : that.paymentTransactionId != null) {
+ return false;
+ }
+ if (status != null ? !status.equals(that.status) : that.status != null) {
+ return false;
+ }
return true;
}
@@ -117,6 +140,8 @@ public class DefaultPaymentErrorEvent extends BusEventBase implements PaymentErr
result = 31 * result + (accountId != null ? accountId.hashCode() : 0);
result = 31 * result + (transactionType != null ? transactionType.hashCode() : 0);
result = 31 * result + (paymentId != null ? paymentId.hashCode() : 0);
+ result = 31 * result + (paymentTransactionId != null ? paymentTransactionId.hashCode() : 0);
+ result = 31 * result + (status != null ? status.hashCode() : 0);
return result;
}
}
diff --git a/payment/src/main/java/org/killbill/billing/payment/api/DefaultPaymentInfoEvent.java b/payment/src/main/java/org/killbill/billing/payment/api/DefaultPaymentInfoEvent.java
index d3d73b2..9dd0eb5 100644
--- a/payment/src/main/java/org/killbill/billing/payment/api/DefaultPaymentInfoEvent.java
+++ b/payment/src/main/java/org/killbill/billing/payment/api/DefaultPaymentInfoEvent.java
@@ -32,6 +32,7 @@ public class DefaultPaymentInfoEvent extends BusEventBase implements PaymentInfo
private final UUID accountId;
private final UUID paymentId;
+ private final UUID paymentTransactionId;
private final BigDecimal amount;
private final Currency currency;
private final TransactionStatus status;
@@ -41,6 +42,7 @@ public class DefaultPaymentInfoEvent extends BusEventBase implements PaymentInfo
@JsonCreator
public DefaultPaymentInfoEvent(@JsonProperty("accountId") final UUID accountId,
@JsonProperty("paymentId") final UUID paymentId,
+ @JsonProperty("paymentTransactionId") final UUID paymentTransactionId,
@JsonProperty("amount") final BigDecimal amount,
@JsonProperty("currency") final Currency currency,
@JsonProperty("status") final TransactionStatus status,
@@ -54,6 +56,7 @@ public class DefaultPaymentInfoEvent extends BusEventBase implements PaymentInfo
super(searchKey1, searchKey2, userToken);
this.accountId = accountId;
this.paymentId = paymentId;
+ this.paymentTransactionId = paymentTransactionId;
this.amount = amount;
this.currency = currency;
this.status = status;
@@ -63,6 +66,7 @@ public class DefaultPaymentInfoEvent extends BusEventBase implements PaymentInfo
public DefaultPaymentInfoEvent(final UUID accountId,
final UUID paymentId,
+ final UUID paymentTransactionId,
final BigDecimal amount,
final Currency currency,
final TransactionStatus status,
@@ -71,7 +75,7 @@ public class DefaultPaymentInfoEvent extends BusEventBase implements PaymentInfo
final Long searchKey1,
final Long searchKey2,
final UUID userToken) {
- this(accountId, paymentId, amount, currency, status, transactionType, null, null,
+ this(accountId, paymentId, paymentTransactionId, amount, currency, status, transactionType, null, null,
effectiveDate, searchKey1, searchKey2, userToken);
}
@@ -108,6 +112,11 @@ public class DefaultPaymentInfoEvent extends BusEventBase implements PaymentInfo
}
@Override
+ public UUID getPaymentTransactionId() {
+ return paymentTransactionId;
+ }
+
+ @Override
public TransactionType getTransactionType() {
return transactionType;
}
@@ -123,6 +132,7 @@ public class DefaultPaymentInfoEvent extends BusEventBase implements PaymentInfo
sb.append("DefaultPaymentInfoEvent");
sb.append("{accountId=").append(accountId);
sb.append(", paymentId=").append(paymentId);
+ sb.append(", paymentTransactionId=").append(paymentTransactionId);
sb.append(", amount=").append(amount);
sb.append(", currency=").append(currency);
sb.append(", status=").append(status);
@@ -144,6 +154,8 @@ public class DefaultPaymentInfoEvent extends BusEventBase implements PaymentInfo
result = prime * result
+ ((paymentId == null) ? 0 : paymentId.hashCode());
result = prime * result
+ + ((paymentTransactionId == null) ? 0 : paymentTransactionId.hashCode());
+ result = prime * result
+ ((currency == null) ? 0 : currency.hashCode());
result = prime * result + ((status == null) ? 0 : status.hashCode());
return result;
@@ -196,6 +208,13 @@ public class DefaultPaymentInfoEvent extends BusEventBase implements PaymentInfo
} else if (!paymentId.equals(other.paymentId)) {
return false;
}
+ if (paymentTransactionId == null) {
+ if (other.paymentTransactionId != null) {
+ return false;
+ }
+ } else if (!paymentTransactionId.equals(other.paymentTransactionId)) {
+ return false;
+ }
if (currency == null) {
if (other.currency != null) {
return false;
diff --git a/payment/src/main/java/org/killbill/billing/payment/api/DefaultPaymentPluginErrorEvent.java b/payment/src/main/java/org/killbill/billing/payment/api/DefaultPaymentPluginErrorEvent.java
index 5ef1ac2..10c1fc3 100644
--- a/payment/src/main/java/org/killbill/billing/payment/api/DefaultPaymentPluginErrorEvent.java
+++ b/payment/src/main/java/org/killbill/billing/payment/api/DefaultPaymentPluginErrorEvent.java
@@ -29,11 +29,15 @@ public class DefaultPaymentPluginErrorEvent extends BusEventBase implements Paym
private final String message;
private final UUID accountId;
private final UUID paymentId;
+ private final UUID paymentTransactionId;
+ private final TransactionStatus status;
private final TransactionType transactionType;
@JsonCreator
public DefaultPaymentPluginErrorEvent(@JsonProperty("accountId") final UUID accountId,
@JsonProperty("paymentId") final UUID paymentId,
+ @JsonProperty("paymentTransactionId") final UUID paymentTransactionId,
+ @JsonProperty("status") final TransactionStatus status,
@JsonProperty("transactionType") final TransactionType transactionType,
@JsonProperty("message") final String message,
@JsonProperty("searchKey1") final Long searchKey1,
@@ -43,6 +47,8 @@ public class DefaultPaymentPluginErrorEvent extends BusEventBase implements Paym
this.message = message;
this.accountId = accountId;
this.paymentId = paymentId;
+ this.paymentTransactionId = paymentTransactionId;
+ this.status = status;
this.transactionType = transactionType;
}
@@ -66,6 +72,16 @@ public class DefaultPaymentPluginErrorEvent extends BusEventBase implements Paym
return transactionType;
}
+ @Override
+ public UUID getPaymentTransactionId() {
+ return paymentTransactionId;
+ }
+
+ @Override
+ public TransactionStatus getStatus() {
+ return status;
+ }
+
@JsonIgnore
@Override
public BusInternalEventType getBusEventType() {
@@ -95,6 +111,12 @@ public class DefaultPaymentPluginErrorEvent extends BusEventBase implements Paym
if (paymentId != null ? !paymentId.equals(that.paymentId) : that.paymentId != null) {
return false;
}
+ if (paymentTransactionId != null ? !paymentTransactionId.equals(that.paymentTransactionId) : that.paymentTransactionId != null) {
+ return false;
+ }
+ if (status != null ? !status.equals(that.status) : that.status != null) {
+ return false;
+ }
return true;
}
@@ -105,6 +127,8 @@ public class DefaultPaymentPluginErrorEvent extends BusEventBase implements Paym
result = 31 * result + (accountId != null ? accountId.hashCode() : 0);
result = 31 * result + (transactionType != null ? transactionType.hashCode() : 0);
result = 31 * result + (paymentId != null ? paymentId.hashCode() : 0);
+ result = 31 * result + (paymentTransactionId != null ? paymentTransactionId.hashCode() : 0);
+ result = 31 * result + (status != null ? status.hashCode() : 0);
return result;
}
}
diff --git a/payment/src/main/java/org/killbill/billing/payment/core/janitor/CompletionTaskBase.java b/payment/src/main/java/org/killbill/billing/payment/core/janitor/CompletionTaskBase.java
index 7df99f1..60281b4 100644
--- a/payment/src/main/java/org/killbill/billing/payment/core/janitor/CompletionTaskBase.java
+++ b/payment/src/main/java/org/killbill/billing/payment/core/janitor/CompletionTaskBase.java
@@ -17,6 +17,7 @@
package org.killbill.billing.payment.core.janitor;
+import java.io.IOException;
import java.util.List;
import org.killbill.billing.account.api.Account;
@@ -24,6 +25,7 @@ import org.killbill.billing.account.api.AccountApiException;
import org.killbill.billing.account.api.AccountInternalApi;
import org.killbill.billing.callcontext.DefaultCallContext;
import org.killbill.billing.callcontext.InternalTenantContext;
+import org.killbill.billing.events.PaymentInternalEvent;
import org.killbill.billing.osgi.api.OSGIServiceRegistration;
import org.killbill.billing.payment.core.ProcessorBase;
import org.killbill.billing.payment.core.sm.PaymentControlStateMachineHelper;
@@ -42,6 +44,7 @@ import org.killbill.clock.Clock;
import org.killbill.commons.locker.GlobalLock;
import org.killbill.commons.locker.GlobalLocker;
import org.killbill.commons.locker.LockFailedException;
+import org.killbill.notificationq.api.NotificationQueue;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -59,6 +62,8 @@ abstract class CompletionTaskBase<T> implements Runnable {
protected final OSGIServiceRegistration<PaymentPluginApi> pluginRegistry;
protected final GlobalLocker locker;
+ protected NotificationQueue janitorQueue;
+
private volatile boolean isStopped;
public CompletionTaskBase(final InternalCallContextFactory internalCallContextFactory, final PaymentConfig paymentConfig,
@@ -105,8 +110,14 @@ abstract class CompletionTaskBase<T> implements Runnable {
public abstract void doIteration(final T item);
+ public abstract void processPaymentEvent(final PaymentInternalEvent event, final NotificationQueue janitorQueue) throws IOException;
+
+ public void attachJanitorQueue(final NotificationQueue janitorQueue) {
+ this.janitorQueue = janitorQueue;
+ }
+
public interface JanitorIterationCallback {
- public <T> T doIteration();
+ public <T> T doIteration();
}
protected <T> T doJanitorOperationWithAccountLock(final JanitorIterationCallback callback, final InternalTenantContext internalTenantContext) {
diff --git a/payment/src/main/java/org/killbill/billing/payment/core/janitor/IncompletePaymentAttemptTask.java b/payment/src/main/java/org/killbill/billing/payment/core/janitor/IncompletePaymentAttemptTask.java
index a02fb1a..d39f831 100644
--- a/payment/src/main/java/org/killbill/billing/payment/core/janitor/IncompletePaymentAttemptTask.java
+++ b/payment/src/main/java/org/killbill/billing/payment/core/janitor/IncompletePaymentAttemptTask.java
@@ -17,6 +17,7 @@
package org.killbill.billing.payment.core.janitor;
+import java.io.IOException;
import java.util.List;
import javax.inject.Inject;
@@ -27,6 +28,7 @@ import org.killbill.billing.account.api.AccountApiException;
import org.killbill.billing.account.api.AccountInternalApi;
import org.killbill.billing.callcontext.InternalCallContext;
import org.killbill.billing.callcontext.InternalTenantContext;
+import org.killbill.billing.events.PaymentInternalEvent;
import org.killbill.billing.osgi.api.OSGIServiceRegistration;
import org.killbill.billing.payment.api.PaymentApiException;
import org.killbill.billing.payment.api.TransactionStatus;
@@ -46,6 +48,7 @@ import org.killbill.billing.util.config.PaymentConfig;
import org.killbill.billing.util.entity.Pagination;
import org.killbill.clock.Clock;
import org.killbill.commons.locker.GlobalLocker;
+import org.killbill.notificationq.api.NotificationQueue;
import com.google.common.base.Predicate;
import com.google.common.collect.Iterables;
@@ -142,6 +145,11 @@ public class IncompletePaymentAttemptTask extends CompletionTaskBase<PaymentAtte
}
}
+ @Override
+ public void processPaymentEvent(final PaymentInternalEvent event, final NotificationQueue janitorQueue) throws IOException {
+ // Nothing
+ }
+
private DateTime getCreatedDateBefore() {
final long delayBeforeNowMs = paymentConfig.getIncompleteAttemptsTimeSpanDelay().getMillis();
return clock.getUTCNow().minusMillis((int) delayBeforeNowMs);
diff --git a/payment/src/main/java/org/killbill/billing/payment/core/janitor/IncompletePaymentTransactionTask.java b/payment/src/main/java/org/killbill/billing/payment/core/janitor/IncompletePaymentTransactionTask.java
index 2779e21..99d4725 100644
--- a/payment/src/main/java/org/killbill/billing/payment/core/janitor/IncompletePaymentTransactionTask.java
+++ b/payment/src/main/java/org/killbill/billing/payment/core/janitor/IncompletePaymentTransactionTask.java
@@ -17,9 +17,12 @@
package org.killbill.billing.payment.core.janitor;
+import java.io.IOException;
import java.math.BigDecimal;
import java.util.List;
+import java.util.UUID;
+import javax.annotation.Nullable;
import javax.inject.Inject;
import org.joda.time.DateTime;
@@ -27,6 +30,7 @@ import org.killbill.billing.account.api.AccountInternalApi;
import org.killbill.billing.callcontext.InternalCallContext;
import org.killbill.billing.callcontext.InternalTenantContext;
import org.killbill.billing.catalog.api.Currency;
+import org.killbill.billing.events.PaymentInternalEvent;
import org.killbill.billing.osgi.api.OSGIServiceRegistration;
import org.killbill.billing.payment.api.PluginProperty;
import org.killbill.billing.payment.api.TransactionStatus;
@@ -45,9 +49,10 @@ import org.killbill.billing.util.callcontext.CallContext;
import org.killbill.billing.util.callcontext.InternalCallContextFactory;
import org.killbill.billing.util.callcontext.TenantContext;
import org.killbill.billing.util.config.PaymentConfig;
-import org.killbill.billing.util.entity.Pagination;
import org.killbill.clock.Clock;
import org.killbill.commons.locker.GlobalLocker;
+import org.killbill.notificationq.api.NotificationEvent;
+import org.killbill.notificationq.api.NotificationQueue;
import com.google.common.base.Preconditions;
import com.google.common.base.Predicate;
@@ -57,11 +62,26 @@ import com.google.common.collect.Iterables;
public class IncompletePaymentTransactionTask extends CompletionTaskBase<PaymentTransactionModelDao> {
+ private final static long MILLIS_TO_SEC = 1000L;
+ private final static long SEC_TO_HOUR = 3600L;
+ private final static long HOURS_TO_DAY = 24L;
+
+ private final static List<Long> RETRY_ATTEMPTS = ImmutableList.<Long>of((3L * MILLIS_TO_SEC), // 3 min
+ (10L * MILLIS_TO_SEC), // 10 min
+ (1L * SEC_TO_HOUR * MILLIS_TO_SEC), // 1 hour
+ (HOURS_TO_DAY * SEC_TO_HOUR * MILLIS_TO_SEC), // 7 times every day
+ (HOURS_TO_DAY * SEC_TO_HOUR * MILLIS_TO_SEC),
+ (HOURS_TO_DAY * SEC_TO_HOUR * MILLIS_TO_SEC),
+ (HOURS_TO_DAY * SEC_TO_HOUR * MILLIS_TO_SEC),
+ (HOURS_TO_DAY * SEC_TO_HOUR * MILLIS_TO_SEC),
+ (HOURS_TO_DAY * SEC_TO_HOUR * MILLIS_TO_SEC),
+ (HOURS_TO_DAY * SEC_TO_HOUR * MILLIS_TO_SEC));
private static final ImmutableList<TransactionStatus> TRANSACTION_STATUSES_TO_CONSIDER = ImmutableList.<TransactionStatus>builder()
.add(TransactionStatus.PENDING)
.add(TransactionStatus.UNKNOWN)
.build();
+
@Inject
public IncompletePaymentTransactionTask(final InternalCallContextFactory internalCallContextFactory, final PaymentConfig paymentConfig,
final PaymentDao paymentDao, final Clock clock,
@@ -72,24 +92,24 @@ public class IncompletePaymentTransactionTask extends CompletionTaskBase<Payment
@Override
public Iterable<PaymentTransactionModelDao> getItemsForIteration() {
- // Code will go away in the next commit -- allow to fix h2...
- final Pagination<PaymentTransactionModelDao> result = paymentDao.getByTransactionStatusAcrossTenants(TRANSACTION_STATUSES_TO_CONSIDER, getCreatedDateBefore(), getCreatedDateAfter(), 0L, 1000L);
- if (result.getTotalNbRecords() > 0) {
- log.info("Janitor IncompletePaymentTransactionTask start run: found {} pending/unknown payments", result.getTotalNbRecords());
- }
- return result;
+ // This is not triggered by Janitor proper but instead relies on bus event + notificationQ
+ return ImmutableList.of();
}
@Override
public void doIteration(final PaymentTransactionModelDao paymentTransaction) {
+ // Nothing
+ }
+
+ public void processNotification(final JanitorNotificationKey notificationKey, final UUID userToken, final Long accountRecordId, final long tenantRecordId) {
- final InternalTenantContext internalTenantContext = internalCallContextFactory.createInternalTenantContext(paymentTransaction.getTenantRecordId(), paymentTransaction.getAccountRecordId());
+ final InternalTenantContext internalTenantContext = internalCallContextFactory.createInternalTenantContext(tenantRecordId, accountRecordId);
doJanitorOperationWithAccountLock(new JanitorIterationCallback() {
@Override
public Void doIteration() {
// State may have changed since we originally retrieved with no lock
- final PaymentTransactionModelDao rehydratedPaymentTransaction = paymentDao.getPaymentTransaction(paymentTransaction.getId(), internalTenantContext);
+ final PaymentTransactionModelDao rehydratedPaymentTransaction = paymentDao.getPaymentTransaction(notificationKey.getUuidKey(), internalTenantContext);
final TenantContext tenantContext = internalCallContextFactory.createTenantContext(internalTenantContext);
final PaymentModelDao payment = paymentDao.getPayment(rehydratedPaymentTransaction.getPaymentId(), internalTenantContext);
@@ -123,37 +143,45 @@ public class IncompletePaymentTransactionTask extends CompletionTaskBase<Payment
} catch (final Exception e) {
paymentTransactionInfoPlugin = undefinedPaymentTransaction;
}
- updatePaymentAndTransactionIfNeeded(payment, rehydratedPaymentTransaction, paymentTransactionInfoPlugin, internalTenantContext);
+ updatePaymentAndTransactionIfNeeded(payment, notificationKey.getAttemptNumber(), userToken, rehydratedPaymentTransaction, paymentTransactionInfoPlugin, internalTenantContext);
return null;
}
}, internalTenantContext);
}
+ @Override
+ public void processPaymentEvent(final PaymentInternalEvent event, final NotificationQueue janitorQueue) throws IOException {
+ if (!TRANSACTION_STATUSES_TO_CONSIDER.contains(event.getStatus())) {
+ return;
+ }
+ insertNewNotificationForUnresolvedTransactionIfNeeded(event.getPaymentTransactionId(), 1, event.getUserToken(), event.getSearchKey1(), event.getSearchKey2());
+ }
+
public boolean updatePaymentAndTransactionIfNeededWithAccountLock(final PaymentModelDao payment, final PaymentTransactionModelDao paymentTransaction, final PaymentTransactionInfoPlugin paymentTransactionInfoPlugin, final InternalTenantContext internalTenantContext) {
// Can happen in the GET case, see PaymentProcessor#toPayment
if (!TRANSACTION_STATUSES_TO_CONSIDER.contains(paymentTransaction.getTransactionStatus())) {
// Nothing to do
return false;
}
- final Boolean result = doJanitorOperationWithAccountLock(new JanitorIterationCallback() {
+ final Boolean result = doJanitorOperationWithAccountLock(new JanitorIterationCallback() {
@Override
public Boolean doIteration() {
- return updatePaymentAndTransactionInternal(payment, paymentTransaction, paymentTransactionInfoPlugin, internalTenantContext);
+ return updatePaymentAndTransactionInternal(payment, null, null, paymentTransaction, paymentTransactionInfoPlugin, internalTenantContext);
}
}, internalTenantContext);
return result != null && result;
}
- private boolean updatePaymentAndTransactionIfNeeded(final PaymentModelDao payment, final PaymentTransactionModelDao paymentTransaction, final PaymentTransactionInfoPlugin paymentTransactionInfoPlugin, final InternalTenantContext internalTenantContext) {
+ private boolean updatePaymentAndTransactionIfNeeded(final PaymentModelDao payment, final int attemptNumber, final UUID userToken, final PaymentTransactionModelDao paymentTransaction, final PaymentTransactionInfoPlugin paymentTransactionInfoPlugin, final InternalTenantContext internalTenantContext) {
if (!TRANSACTION_STATUSES_TO_CONSIDER.contains(paymentTransaction.getTransactionStatus())) {
// Nothing to do
return false;
}
- return updatePaymentAndTransactionInternal(payment, paymentTransaction, paymentTransactionInfoPlugin, internalTenantContext);
+ return updatePaymentAndTransactionInternal(payment, attemptNumber, userToken, paymentTransaction, paymentTransactionInfoPlugin, internalTenantContext);
}
- private boolean updatePaymentAndTransactionInternal(final PaymentModelDao payment, final PaymentTransactionModelDao paymentTransaction, final PaymentTransactionInfoPlugin paymentTransactionInfoPlugin, final InternalTenantContext internalTenantContext) {
+ private boolean updatePaymentAndTransactionInternal(final PaymentModelDao payment, @Nullable final Integer attemptNumber, @Nullable final UUID userToken, final PaymentTransactionModelDao paymentTransaction, final PaymentTransactionInfoPlugin paymentTransactionInfoPlugin, final InternalTenantContext internalTenantContext) {
final CallContext callContext = createCallContext("IncompletePaymentTransactionTask", internalTenantContext);
// First obtain the new transactionStatus,
@@ -176,6 +204,7 @@ public class IncompletePaymentTransactionTask extends CompletionTaskBase<Payment
log.info("Janitor IncompletePaymentTransactionTask unable to repair payment {}, transaction {}: {} -> {}",
payment.getId(), paymentTransaction.getId(), paymentTransaction.getTransactionStatus(), transactionStatus);
// We can't get anything interesting from the plugin...
+ insertNewNotificationForUnresolvedTransactionIfNeeded(paymentTransaction.getId(), attemptNumber, userToken, internalTenantContext.getAccountRecordId(), internalTenantContext.getTenantRecordId());
return false;
}
@@ -203,7 +232,6 @@ public class IncompletePaymentTransactionTask extends CompletionTaskBase<Payment
}
-
// Keep the existing currentTransactionStatus if we can't obtain a better answer from the plugin; if not, return the newTransactionStatus
private TransactionStatus computeNewTransactionStatusFromPaymentTransactionInfoPlugin(final PaymentTransactionInfoPlugin input, final TransactionStatus currentTransactionStatus) {
final TransactionStatus newTransactionStatus = PaymentTransactionInfoPluginConverter.toTransactionStatus(input);
@@ -222,6 +250,26 @@ public class IncompletePaymentTransactionTask extends CompletionTaskBase<Payment
return pluginApi;
}
+ private DateTime getNextNotificationTime(@Nullable final Integer attemptNumber) {
+ if (attemptNumber == null || attemptNumber > RETRY_ATTEMPTS.size()) {
+ return null;
+ }
+ final long nextDelay = RETRY_ATTEMPTS.get(attemptNumber - 1);
+ return clock.getUTCNow().plusMillis((int) nextDelay);
+ }
+
+ private void insertNewNotificationForUnresolvedTransactionIfNeeded(final UUID paymentTransactionId, @Nullable final Integer attemptNumber, @Nullable final UUID userToken, final Long accountRecordId, final Long tenantRecordId) {
+ final NotificationEvent key = new JanitorNotificationKey(paymentTransactionId, IncompletePaymentTransactionTask.class.toString(), attemptNumber);
+ final DateTime notificationTime = getNextNotificationTime(attemptNumber);
+ if (notificationTime != null) {
+ try {
+ janitorQueue.recordFutureNotification(notificationTime, key, userToken, accountRecordId, tenantRecordId);
+ } catch (IOException e) {
+ log.warn("Janitor IncompletePaymentTransactionTask : Failed to insert future notification for paymentTransactionId = {}: {}", paymentTransactionId, e.getMessage());
+ }
+ }
+ }
+
private DateTime getCreatedDateBefore() {
final long delayBeforeNowMs = paymentConfig.getIncompleteTransactionsTimeSpanDelay().getMillis();
return clock.getUTCNow().minusMillis((int) delayBeforeNowMs);
diff --git a/payment/src/main/java/org/killbill/billing/payment/core/janitor/Janitor.java b/payment/src/main/java/org/killbill/billing/payment/core/janitor/Janitor.java
index 032b1a0..8ca2037 100644
--- a/payment/src/main/java/org/killbill/billing/payment/core/janitor/Janitor.java
+++ b/payment/src/main/java/org/killbill/billing/payment/core/janitor/Janitor.java
@@ -17,14 +17,28 @@
package org.killbill.billing.payment.core.janitor;
+import java.io.IOException;
+import java.util.UUID;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import javax.inject.Inject;
import javax.inject.Named;
+import org.joda.time.DateTime;
+import org.killbill.billing.events.PaymentErrorInternalEvent;
+import org.killbill.billing.events.PaymentInfoInternalEvent;
+import org.killbill.billing.events.PaymentInternalEvent;
+import org.killbill.billing.payment.glue.DefaultPaymentService;
import org.killbill.billing.payment.glue.PaymentModule;
+import org.killbill.billing.util.callcontext.InternalCallContextFactory;
import org.killbill.billing.util.config.PaymentConfig;
+import org.killbill.notificationq.api.NotificationEvent;
+import org.killbill.notificationq.api.NotificationQueue;
+import org.killbill.notificationq.api.NotificationQueueService;
+import org.killbill.notificationq.api.NotificationQueueService.NoSuchNotificationQueue;
+import org.killbill.notificationq.api.NotificationQueueService.NotificationQueueAlreadyExists;
+import org.killbill.notificationq.api.NotificationQueueService.NotificationQueueHandler;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -36,32 +50,65 @@ public class Janitor {
private static final Logger log = LoggerFactory.getLogger(Janitor.class);
private static final int TERMINATION_TIMEOUT_SEC = 5;
+ public static final String QUEUE_NAME = "janitor";
+ private final NotificationQueueService notificationQueueService;
private final ScheduledExecutorService janitorExecutor;
private final PaymentConfig paymentConfig;
private final IncompletePaymentAttemptTask incompletePaymentAttemptTask;
private final IncompletePaymentTransactionTask incompletePaymentTransactionTask;
+ private final InternalCallContextFactory internalCallContextFactory;
+
+ private NotificationQueue janitorQueue;
private volatile boolean isStopped;
@Inject
public Janitor(final PaymentConfig paymentConfig,
+ final NotificationQueueService notificationQueueService,
@Named(PaymentModule.JANITOR_EXECUTOR_NAMED) final ScheduledExecutorService janitorExecutor,
final IncompletePaymentAttemptTask incompletePaymentAttemptTask,
- final IncompletePaymentTransactionTask incompletePaymentTransactionTask) {
+ final IncompletePaymentTransactionTask incompletePaymentTransactionTask,
+ final InternalCallContextFactory internalCallContextFactory) {
+ this.notificationQueueService = notificationQueueService;
this.janitorExecutor = janitorExecutor;
this.paymentConfig = paymentConfig;
this.incompletePaymentAttemptTask = incompletePaymentAttemptTask;
this.incompletePaymentTransactionTask = incompletePaymentTransactionTask;
+ this.internalCallContextFactory = internalCallContextFactory;
this.isStopped = false;
}
+ public void initialize() throws NotificationQueueAlreadyExists {
+ janitorQueue = notificationQueueService.createNotificationQueue(DefaultPaymentService.SERVICE_NAME,
+ QUEUE_NAME,
+ new NotificationQueueHandler() {
+ @Override
+ public void handleReadyNotification(final NotificationEvent notificationKey, final DateTime eventDateTime, final UUID userToken, final Long accountRecordId, final Long tenantRecordId) {
+ if (! (notificationKey instanceof JanitorNotificationKey)) {
+ log.error("Janitor service received an unexpected event type {}" + notificationKey.getClass().getName());
+ return;
+
+ }
+ final JanitorNotificationKey janitorKey = (JanitorNotificationKey) notificationKey;
+ if (janitorKey.getTaskName().equals(incompletePaymentTransactionTask.getClass().toString())) {
+ incompletePaymentTransactionTask.processNotification(janitorKey, userToken, accountRecordId, tenantRecordId);
+ }
+ }
+ }
+ );
+ incompletePaymentTransactionTask.attachJanitorQueue(janitorQueue);
+ incompletePaymentAttemptTask.attachJanitorQueue(janitorQueue);
+ }
+
public void start() {
if (isStopped) {
log.warn("Janitor is not a restartable service, and was already started, aborting");
return;
}
+ janitorQueue.startQueue();
+
// Start task for completing incomplete payment attempts
final TimeUnit attemptCompletionRateUnit = paymentConfig.getJanitorRunningRate().getUnit();
final long attemptCompletionPeriod = paymentConfig.getJanitorRunningRate().getPeriod();
@@ -73,7 +120,7 @@ public class Janitor {
janitorExecutor.scheduleAtFixedRate(incompletePaymentTransactionTask, erroredCompletionPeriod, erroredCompletionPeriod, erroredCompletionRateUnit);
}
- public void stop() {
+ public void stop() throws NoSuchNotificationQueue {
if (isStopped) {
log.warn("Janitor is already in a stopped state");
return;
@@ -93,6 +140,11 @@ public class Janitor {
if (!success) {
log.warn("Janitor failed to complete termination within " + TERMINATION_TIMEOUT_SEC + "sec");
}
+
+ if (janitorQueue != null) {
+ janitorQueue.stopQueue();
+ notificationQueueService.deleteNotificationQueue(DefaultPaymentService.SERVICE_NAME, QUEUE_NAME);
+ }
} catch (final InterruptedException e) {
Thread.currentThread().interrupt();
log.warn("Janitor stop sequence got interrupted");
@@ -100,4 +152,9 @@ public class Janitor {
isStopped = true;
}
}
+
+ public void processPaymentEvent(final PaymentInternalEvent event) throws IOException {
+ incompletePaymentAttemptTask.processPaymentEvent(event, janitorQueue);
+ incompletePaymentTransactionTask.processPaymentEvent(event, janitorQueue);
+ }
}
diff --git a/payment/src/main/java/org/killbill/billing/payment/core/janitor/JanitorNotificationKey.java b/payment/src/main/java/org/killbill/billing/payment/core/janitor/JanitorNotificationKey.java
new file mode 100644
index 0000000..b0f2c1d
--- /dev/null
+++ b/payment/src/main/java/org/killbill/billing/payment/core/janitor/JanitorNotificationKey.java
@@ -0,0 +1,48 @@
+/*
+ * Copyright 2014-2015 Groupon, Inc
+ * Copyright 2014-2015 The Billing Project, LLC
+ *
+ * The Billing Project licenses this file to you under the Apache License, version 2.0
+ * (the "License"); you may not use this file except in compliance with the
+ * License. You may obtain a copy of the License at:
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.killbill.billing.payment.core.janitor;
+
+import java.util.UUID;
+
+import org.killbill.notificationq.DefaultUUIDNotificationKey;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+
+public class JanitorNotificationKey extends DefaultUUIDNotificationKey {
+
+ private final Integer attemptNumber;
+ private final String taskName;
+
+ @JsonCreator
+ public JanitorNotificationKey(@JsonProperty("uuidKey") final UUID uuidKey,
+ @JsonProperty("taskName") final String taskName,
+ @JsonProperty("attemptNumber") final Integer attemptNumber) {
+ super(uuidKey);
+ this.attemptNumber = attemptNumber;
+ this.taskName = taskName;
+ }
+
+ public Integer getAttemptNumber() {
+ return attemptNumber;
+ }
+
+ public String getTaskName() {
+ return taskName;
+ }
+}
\ No newline at end of file
diff --git a/payment/src/main/java/org/killbill/billing/payment/core/sm/payments/PaymentEnteringStateCallback.java b/payment/src/main/java/org/killbill/billing/payment/core/sm/payments/PaymentEnteringStateCallback.java
index 5b340aa..3dc675a 100644
--- a/payment/src/main/java/org/killbill/billing/payment/core/sm/payments/PaymentEnteringStateCallback.java
+++ b/payment/src/main/java/org/killbill/billing/payment/core/sm/payments/PaymentEnteringStateCallback.java
@@ -65,6 +65,8 @@ public abstract class PaymentEnteringStateCallback implements EnteringStateCallb
//
final BusInternalEvent event = new DefaultPaymentErrorEvent(paymentStateContext.getAccount().getId(),
null,
+ null,
+ null,
paymentStateContext.getTransactionType(),
"Early abortion of payment transaction",
paymentStateContext.getInternalCallContext().getAccountRecordId(),
diff --git a/payment/src/main/java/org/killbill/billing/payment/dao/DefaultPaymentDao.java b/payment/src/main/java/org/killbill/billing/payment/dao/DefaultPaymentDao.java
index 0921bea..33ab485 100644
--- a/payment/src/main/java/org/killbill/billing/payment/dao/DefaultPaymentDao.java
+++ b/payment/src/main/java/org/killbill/billing/payment/dao/DefaultPaymentDao.java
@@ -301,7 +301,7 @@ public class DefaultPaymentDao implements PaymentDao {
} else {
entitySqlDaoWrapperFactory.become(PaymentSqlDao.class).updatePaymentStateName(paymentId.toString(), currentPaymentStateName, context);
}
- postPaymentEventFromTransaction(accountId, transactionStatus, transactionType, paymentId, processedAmount, processedCurrency, clock.getUTCNow(), gatewayErrorCode, entitySqlDaoWrapperFactory, context);
+ postPaymentEventFromTransaction(accountId, transactionStatus, transactionType, paymentId, transactionId, processedAmount, processedCurrency, clock.getUTCNow(), gatewayErrorCode, entitySqlDaoWrapperFactory, context);
return null;
}
});
@@ -561,6 +561,7 @@ public class DefaultPaymentDao implements PaymentDao {
final TransactionStatus transactionStatus,
final TransactionType transactionType,
final UUID paymentId,
+ final UUID transactionId,
final BigDecimal processedAmount,
final Currency processedCurrency,
final DateTime effectiveDate,
@@ -574,6 +575,7 @@ public class DefaultPaymentDao implements PaymentDao {
case PENDING:
event = new DefaultPaymentInfoEvent(accountId,
paymentId,
+ transactionId,
processedAmount,
processedCurrency,
transactionStatus,
@@ -587,6 +589,8 @@ public class DefaultPaymentDao implements PaymentDao {
case PAYMENT_FAILURE:
event = new DefaultPaymentErrorEvent(accountId,
paymentId,
+ transactionId,
+ transactionStatus,
transactionType,
gatewayErrorCode,
context.getAccountRecordId(),
@@ -598,6 +602,8 @@ public class DefaultPaymentDao implements PaymentDao {
default:
event = new DefaultPaymentPluginErrorEvent(accountId,
paymentId,
+ transactionId,
+ transactionStatus,
transactionType,
gatewayErrorCode,
context.getAccountRecordId(),
diff --git a/payment/src/main/java/org/killbill/billing/payment/glue/DefaultPaymentService.java b/payment/src/main/java/org/killbill/billing/payment/glue/DefaultPaymentService.java
index 5cf77f0..d003f74 100644
--- a/payment/src/main/java/org/killbill/billing/payment/glue/DefaultPaymentService.java
+++ b/payment/src/main/java/org/killbill/billing/payment/glue/DefaultPaymentService.java
@@ -20,7 +20,7 @@ package org.killbill.billing.payment.glue;
import org.killbill.billing.payment.api.PaymentApi;
import org.killbill.billing.payment.api.PaymentService;
-import org.killbill.billing.payment.bus.InvoiceHandler;
+import org.killbill.billing.payment.bus.PaymentBusEventHandler;
import org.killbill.billing.payment.invoice.PaymentTagHandler;
import org.killbill.billing.payment.core.janitor.Janitor;
import org.killbill.billing.payment.retry.DefaultRetryService;
@@ -40,7 +40,7 @@ public class DefaultPaymentService implements PaymentService {
public static final String SERVICE_NAME = "payment-service";
- private final InvoiceHandler invoiceHandler;
+ private final PaymentBusEventHandler paymentBusEventHandler;
private final PaymentTagHandler tagHandler;
private final PersistentBus eventBus;
private final PaymentApi api;
@@ -48,13 +48,13 @@ public class DefaultPaymentService implements PaymentService {
private final Janitor janitor;
@Inject
- public DefaultPaymentService(final InvoiceHandler invoiceHandler,
+ public DefaultPaymentService(final PaymentBusEventHandler paymentBusEventHandler,
final PaymentTagHandler tagHandler,
final PaymentApi api,
final DefaultRetryService retryService,
final PersistentBus eventBus,
final Janitor janitor) {
- this.invoiceHandler = invoiceHandler;
+ this.paymentBusEventHandler = paymentBusEventHandler;
this.tagHandler = tagHandler;
this.eventBus = eventBus;
this.api = api;
@@ -70,12 +70,13 @@ public class DefaultPaymentService implements PaymentService {
@LifecycleHandlerType(LifecycleLevel.INIT_SERVICE)
public void initialize() throws NotificationQueueAlreadyExists {
try {
- eventBus.register(invoiceHandler);
+ eventBus.register(paymentBusEventHandler);
eventBus.register(tagHandler);
} catch (final PersistentBus.EventBusException e) {
log.error("Unable to register with the EventBus!", e);
}
- retryService.initialize(SERVICE_NAME);
+ retryService.initialize();
+ janitor.initialize();
}
@LifecycleHandlerType(LifecycleLevel.START_SERVICE)
@@ -87,7 +88,7 @@ public class DefaultPaymentService implements PaymentService {
@LifecycleHandlerType(LifecycleLevel.STOP_SERVICE)
public void stop() throws NoSuchNotificationQueue {
try {
- eventBus.unregister(invoiceHandler);
+ eventBus.unregister(paymentBusEventHandler);
eventBus.unregister(tagHandler);
} catch (final PersistentBus.EventBusException e) {
throw new RuntimeException("Unable to unregister to the EventBus!", e);
diff --git a/payment/src/main/java/org/killbill/billing/payment/glue/PaymentModule.java b/payment/src/main/java/org/killbill/billing/payment/glue/PaymentModule.java
index 8053d33..a602405 100644
--- a/payment/src/main/java/org/killbill/billing/payment/glue/PaymentModule.java
+++ b/payment/src/main/java/org/killbill/billing/payment/glue/PaymentModule.java
@@ -36,7 +36,7 @@ import org.killbill.billing.payment.api.DefaultPaymentGatewayApi;
import org.killbill.billing.payment.api.PaymentApi;
import org.killbill.billing.payment.api.PaymentGatewayApi;
import org.killbill.billing.payment.api.PaymentService;
-import org.killbill.billing.payment.bus.InvoiceHandler;
+import org.killbill.billing.payment.bus.PaymentBusEventHandler;
import org.killbill.billing.payment.core.PaymentGatewayProcessor;
import org.killbill.billing.payment.core.PaymentMethodProcessor;
import org.killbill.billing.payment.core.PaymentProcessor;
@@ -182,7 +182,7 @@ public class PaymentModule extends KillBillModule {
bind(PaymentApi.class).to(DefaultPaymentApi.class).asEagerSingleton();
bind(PaymentGatewayApi.class).to(DefaultPaymentGatewayApi.class).asEagerSingleton();
bind(AdminPaymentApi.class).to(DefaultAdminPaymentApi.class).asEagerSingleton();
- bind(InvoiceHandler.class).asEagerSingleton();
+ bind(PaymentBusEventHandler.class).asEagerSingleton();
bind(PaymentTagHandler.class).asEagerSingleton();
bind(PaymentService.class).to(DefaultPaymentService.class).asEagerSingleton();
installPaymentProviderPlugins(paymentConfig);
diff --git a/payment/src/main/java/org/killbill/billing/payment/retry/BaseRetryService.java b/payment/src/main/java/org/killbill/billing/payment/retry/BaseRetryService.java
index 7370f39..a5ec4ce 100644
--- a/payment/src/main/java/org/killbill/billing/payment/retry/BaseRetryService.java
+++ b/payment/src/main/java/org/killbill/billing/payment/retry/BaseRetryService.java
@@ -44,10 +44,10 @@ import com.google.inject.Inject;
public abstract class BaseRetryService implements RetryService {
private static final Logger log = LoggerFactory.getLogger(BaseRetryService.class);
- private static final String PAYMENT_RETRY_SERVICE = "PaymentRetryService";
private final NotificationQueueService notificationQueueService;
private final InternalCallContextFactory internalCallContextFactory;
+ private final String paymentRetryService;
private NotificationQueue retryQueue;
@@ -55,11 +55,12 @@ public abstract class BaseRetryService implements RetryService {
final InternalCallContextFactory internalCallContextFactory) {
this.notificationQueueService = notificationQueueService;
this.internalCallContextFactory = internalCallContextFactory;
+ this.paymentRetryService = DefaultPaymentService.SERVICE_NAME + "-" + getQueueName();
}
@Override
- public void initialize(final String svcName) throws NotificationQueueAlreadyExists {
- retryQueue = notificationQueueService.createNotificationQueue(svcName,
+ public void initialize() throws NotificationQueueAlreadyExists {
+ retryQueue = notificationQueueService.createNotificationQueue(DefaultPaymentService.SERVICE_NAME,
getQueueName(),
new NotificationQueueHandler() {
@Override
@@ -69,7 +70,7 @@ public abstract class BaseRetryService implements RetryService {
return;
}
final PaymentRetryNotificationKey key = (PaymentRetryNotificationKey) notificationKey;
- final InternalCallContext callContext = internalCallContextFactory.createInternalCallContext(tenantRecordId, accountRecordId, PAYMENT_RETRY_SERVICE, CallOrigin.INTERNAL, UserType.SYSTEM, userToken);
+ final InternalCallContext callContext = internalCallContextFactory.createInternalCallContext(tenantRecordId, accountRecordId, paymentRetryService, CallOrigin.INTERNAL, UserType.SYSTEM, userToken);
retryPaymentTransaction(key.getAttemptId(), key.getPaymentControlPluginNames(), callContext);
}
}
@@ -132,7 +133,8 @@ public abstract class BaseRetryService implements RetryService {
}
protected InternalCallContext createCallContextFromPaymentId(final ObjectType objectType, final UUID objectId, final Long tenantRecordId) {
- return internalCallContextFactory.createInternalCallContext(objectId, objectType, PAYMENT_RETRY_SERVICE, CallOrigin.INTERNAL, UserType.SYSTEM, null, tenantRecordId);
+ final String paymentRetryService = DefaultPaymentService.SERVICE_NAME + "-" + getQueueName();
+ return internalCallContextFactory.createInternalCallContext(objectId, objectType, paymentRetryService, CallOrigin.INTERNAL, UserType.SYSTEM, null, tenantRecordId);
}
public abstract String getQueueName();
diff --git a/payment/src/main/java/org/killbill/billing/payment/retry/RetryService.java b/payment/src/main/java/org/killbill/billing/payment/retry/RetryService.java
index ea320f1..03bdeee 100644
--- a/payment/src/main/java/org/killbill/billing/payment/retry/RetryService.java
+++ b/payment/src/main/java/org/killbill/billing/payment/retry/RetryService.java
@@ -28,7 +28,7 @@ import org.killbill.notificationq.api.NotificationQueueService.NotificationQueue
public interface RetryService {
- public void initialize(final String svcName) throws NotificationQueueAlreadyExists;
+ public void initialize() throws NotificationQueueAlreadyExists;
public void start();
diff --git a/payment/src/test/java/org/killbill/billing/payment/api/TestEventJson.java b/payment/src/test/java/org/killbill/billing/payment/api/TestEventJson.java
index 809199f..8e2338a 100644
--- a/payment/src/test/java/org/killbill/billing/payment/api/TestEventJson.java
+++ b/payment/src/test/java/org/killbill/billing/payment/api/TestEventJson.java
@@ -34,7 +34,7 @@ public class TestEventJson extends PaymentTestSuiteNoDB {
@Test(groups = "fast")
public void testPaymentErrorEvent() throws Exception {
- final PaymentErrorInternalEvent e = new DefaultPaymentErrorEvent(UUID.randomUUID(), UUID.randomUUID(), TransactionType.PURCHASE, "no message", 1L, 2L, UUID.randomUUID());
+ final PaymentErrorInternalEvent e = new DefaultPaymentErrorEvent(UUID.randomUUID(), UUID.randomUUID(), UUID.randomUUID(), TransactionStatus.SUCCESS, TransactionType.PURCHASE, "no message", 1L, 2L, UUID.randomUUID());
final String json = mapper.writeValueAsString(e);
final Class<?> claz = Class.forName(DefaultPaymentErrorEvent.class.getName());
@@ -44,7 +44,7 @@ public class TestEventJson extends PaymentTestSuiteNoDB {
@Test(groups = "fast")
public void testPaymentInfoEvent() throws Exception {
- final PaymentInfoInternalEvent e = new DefaultPaymentInfoEvent(UUID.randomUUID(), UUID.randomUUID(), new BigDecimal(12.9), Currency.EUR, TransactionStatus.SUCCESS,
+ final PaymentInfoInternalEvent e = new DefaultPaymentInfoEvent(UUID.randomUUID(), UUID.randomUUID(), UUID.randomUUID(), new BigDecimal(12.9), Currency.EUR, TransactionStatus.SUCCESS,
TransactionType.PURCHASE, new DateTime(), 1L, 2L, UUID.randomUUID());
final String json = mapper.writeValueAsString(e);
diff --git a/payment/src/test/java/org/killbill/billing/payment/glue/TestPaymentModuleWithEmbeddedDB.java b/payment/src/test/java/org/killbill/billing/payment/glue/TestPaymentModuleWithEmbeddedDB.java
index 7ad8a57..b7ba850 100644
--- a/payment/src/test/java/org/killbill/billing/payment/glue/TestPaymentModuleWithEmbeddedDB.java
+++ b/payment/src/test/java/org/killbill/billing/payment/glue/TestPaymentModuleWithEmbeddedDB.java
@@ -20,6 +20,7 @@ package org.killbill.billing.payment.glue;
import org.killbill.billing.GuicyKillbillTestWithEmbeddedDBModule;
import org.killbill.billing.account.glue.DefaultAccountModule;
+import org.killbill.billing.api.TestApiListener;
import org.killbill.billing.platform.api.KillbillConfigSource;
import org.killbill.billing.util.glue.NonEntityDaoModule;
import org.killbill.clock.Clock;
@@ -35,7 +36,7 @@ public class TestPaymentModuleWithEmbeddedDB extends TestPaymentModule {
install(new GuicyKillbillTestWithEmbeddedDBModule(configSource));
install(new NonEntityDaoModule(configSource));
install(new DefaultAccountModule(configSource));
-
+ bind(TestApiListener.class).asEagerSingleton();
super.configure();
}
}
diff --git a/payment/src/test/java/org/killbill/billing/payment/TestJanitor.java b/payment/src/test/java/org/killbill/billing/payment/TestJanitor.java
index bc68a36..cca3b59 100644
--- a/payment/src/test/java/org/killbill/billing/payment/TestJanitor.java
+++ b/payment/src/test/java/org/killbill/billing/payment/TestJanitor.java
@@ -23,13 +23,19 @@ import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.UUID;
+import java.util.concurrent.Callable;
+import java.util.concurrent.TimeoutException;
import org.joda.time.LocalDate;
import org.killbill.billing.account.api.Account;
+import org.killbill.billing.api.TestApiListener;
+import org.killbill.billing.api.TestApiListener.NextEvent;
+import org.killbill.billing.callcontext.InternalCallContext;
import org.killbill.billing.catalog.api.Currency;
import org.killbill.billing.invoice.api.Invoice;
import org.killbill.billing.invoice.api.InvoiceApiException;
import org.killbill.billing.invoice.api.InvoiceItem;
+import org.killbill.billing.payment.api.DefaultPayment;
import org.killbill.billing.payment.api.Payment;
import org.killbill.billing.payment.api.PaymentApiException;
import org.killbill.billing.payment.api.PaymentOptions;
@@ -37,13 +43,21 @@ import org.killbill.billing.payment.api.PaymentTransaction;
import org.killbill.billing.payment.api.PluginProperty;
import org.killbill.billing.payment.api.TransactionStatus;
import org.killbill.billing.payment.api.TransactionType;
+import org.killbill.billing.payment.bus.PaymentBusEventHandler;
import org.killbill.billing.payment.core.janitor.Janitor;
import org.killbill.billing.payment.dao.PaymentAttemptModelDao;
import org.killbill.billing.payment.dao.PaymentTransactionModelDao;
+import org.killbill.billing.payment.glue.DefaultPaymentService;
import org.killbill.billing.payment.invoice.InvoicePaymentRoutingPluginApi;
import org.killbill.billing.payment.provider.MockPaymentProviderPlugin;
import org.killbill.billing.platform.api.KillbillConfigSource;
+import org.killbill.billing.util.callcontext.InternalCallContextFactory;
import org.killbill.bus.api.PersistentBus.EventBusException;
+import org.killbill.commons.profiling.Profiling;
+import org.killbill.notificationq.api.NotificationEvent;
+import org.killbill.notificationq.api.NotificationEventWithMetadata;
+import org.killbill.notificationq.api.NotificationQueueService;
+import org.killbill.notificationq.api.NotificationQueueService.NoSuchNotificationQueue;
import org.skife.jdbi.v2.Handle;
import org.skife.jdbi.v2.tweak.HandleCallback;
import org.testng.Assert;
@@ -53,11 +67,16 @@ import org.testng.annotations.BeforeClass;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;
+import com.google.common.base.Predicate;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.Iterables;
import com.google.inject.Inject;
+import static com.jayway.awaitility.Awaitility.await;
+import static java.util.concurrent.TimeUnit.SECONDS;
import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.fail;
public class TestJanitor extends PaymentTestSuiteWithEmbeddedDB {
@@ -75,6 +94,14 @@ public class TestJanitor extends PaymentTestSuiteWithEmbeddedDB {
@Inject
private Janitor janitor;
+ @Inject
+ private PaymentBusEventHandler handler;
+ @Inject
+ protected TestApiListener testListener;
+ @Inject
+ protected InternalCallContextFactory internalCallContextFactory;
+ @Inject
+ protected NotificationQueueService notificationQueueService;
private MockPaymentProviderPlugin mockPaymentProviderPlugin;
@@ -93,6 +120,7 @@ public class TestJanitor extends PaymentTestSuiteWithEmbeddedDB {
protected void beforeClass() throws Exception {
super.beforeClass();
mockPaymentProviderPlugin = (MockPaymentProviderPlugin) registry.getServiceForName(MockPaymentProviderPlugin.PLUGIN_NAME);
+ janitor.initialize();
janitor.start();
}
@@ -104,12 +132,17 @@ public class TestJanitor extends PaymentTestSuiteWithEmbeddedDB {
@BeforeMethod(groups = "slow")
public void beforeMethod() throws Exception {
super.beforeMethod();
+ eventBus.register(handler);
+ testListener.reset();
+ eventBus.register(testListener);
mockPaymentProviderPlugin.clear();
account = testHelper.createTestAccount("bobo@gmail.com", true);
}
@AfterMethod(groups = "slow")
public void afterMethod() throws Exception {
+ eventBus.unregister(handler);
+ eventBus.unregister(testListener);
super.afterMethod();
}
@@ -233,20 +266,23 @@ public class TestJanitor extends PaymentTestSuiteWithEmbeddedDB {
final String paymentExternalKey = "qwru";
final String transactionExternalKey = "lkjdsf";
+ testListener.pushExpectedEvent(NextEvent.PAYMENT);
final Payment payment = paymentApi.createAuthorization(account, account.getPaymentMethodId(), null, requestedAmount, account.getCurrency(), paymentExternalKey,
transactionExternalKey, ImmutableList.<PluginProperty>of(), callContext);
+ testListener.assertListenerStatus();
// Artificially move the transaction status to UNKNOWN
+ final InternalCallContext internalCallContext = internalCallContextFactory.createInternalCallContext(account.getId(), callContext);
final String paymentStateName = paymentSMHelper.getErroredStateForTransaction(TransactionType.AUTHORIZE).toString();
+ testListener.pushExpectedEvent(NextEvent.PAYMENT_PLUGIN_ERROR);
paymentDao.updatePaymentAndTransactionOnCompletion(account.getId(), payment.getId(), TransactionType.AUTHORIZE, paymentStateName, paymentStateName,
payment.getTransactions().get(0).getId(), TransactionStatus.UNKNOWN, requestedAmount, account.getCurrency(),
"foo", "bar", internalCallContext);
- // The UnknownPaymentTransactionTask will look for UNKNOWN payment that *just happened* , and that are not too old (less than 7 days)
- clock.addDays(1);
- try {
- Thread.sleep(1500);
- } catch (InterruptedException e) {
- }
+ testListener.assertListenerStatus();
+
+ // Move clock for notification to be processed
+ clock.addDeltaFromReality(5 * 60 * 1000);
+ assertNotificationsCompleted(internalCallContext, 5);
final Payment updatedPayment = paymentApi.getPayment(payment.getId(), false, ImmutableList.<PluginProperty>of(), callContext);
assertEquals(updatedPayment.getTransactions().get(0).getTransactionStatus(), TransactionStatus.SUCCESS);
@@ -261,23 +297,26 @@ public class TestJanitor extends PaymentTestSuiteWithEmbeddedDB {
// Make sure the state as seen by the plugin will be in PaymentPluginStatus.ERROR, which will be returned later to Janitor
mockPaymentProviderPlugin.makeNextPaymentFailWithError();
+ testListener.pushExpectedEvent(NextEvent.PAYMENT_ERROR);
final Payment payment = paymentApi.createAuthorization(account, account.getPaymentMethodId(), null, requestedAmount, account.getCurrency(), paymentExternalKey,
transactionExternalKey, ImmutableList.<PluginProperty>of(), callContext);
+ testListener.assertListenerStatus();
// Artificially move the transaction status to UNKNOWN
+ final InternalCallContext internalCallContext = internalCallContextFactory.createInternalCallContext(account.getId(), callContext);
final String paymentStateName = paymentSMHelper.getErroredStateForTransaction(TransactionType.AUTHORIZE).toString();
+ testListener.pushExpectedEvent(NextEvent.PAYMENT_PLUGIN_ERROR);
paymentDao.updatePaymentAndTransactionOnCompletion(account.getId(), payment.getId(), TransactionType.AUTHORIZE, paymentStateName, paymentStateName,
payment.getTransactions().get(0).getId(), TransactionStatus.UNKNOWN, requestedAmount, account.getCurrency(),
"foo", "bar", internalCallContext);
+ testListener.assertListenerStatus();
final List<PaymentTransactionModelDao> paymentTransactionHistoryBeforeJanitor = getPaymentTransactionHistory(transactionExternalKey);
Assert.assertEquals(paymentTransactionHistoryBeforeJanitor.size(), 3);
- clock.addDays(1);
- try {
- Thread.sleep(1500);
- } catch (InterruptedException e) {
- }
+ // Move clock for notification to be processed
+ clock.addDeltaFromReality(5 * 60 * 1000);
+ assertNotificationsCompleted(internalCallContext, 5);
// Proves the Janitor ran (and updated the transaction)
final List<PaymentTransactionModelDao> paymentTransactionHistoryAfterJanitor = getPaymentTransactionHistory(transactionExternalKey);
@@ -299,26 +338,33 @@ public class TestJanitor extends PaymentTestSuiteWithEmbeddedDB {
// Make sure the state as seen by the plugin will be in PaymentPluginStatus.ERROR, which will be returned later to Janitor
mockPaymentProviderPlugin.makeNextPaymentFailWithException();
try {
+ testListener.pushExpectedEvent(NextEvent.PAYMENT_PLUGIN_ERROR);
paymentApi.createAuthorization(account, account.getPaymentMethodId(), null, requestedAmount, account.getCurrency(), paymentExternalKey,
transactionExternalKey, ImmutableList.<PluginProperty>of(), callContext);
} catch (PaymentApiException ignore) {
+ testListener.assertListenerStatus();
}
final Payment payment = paymentApi.getPaymentByExternalKey(paymentExternalKey, false, ImmutableList.<PluginProperty>of(), callContext);
// Artificially move the transaction status to UNKNOWN
+ final InternalCallContext internalCallContext = internalCallContextFactory.createInternalCallContext(account.getId(), callContext);
+
final String paymentStateName = paymentSMHelper.getErroredStateForTransaction(TransactionType.AUTHORIZE).toString();
+ testListener.pushExpectedEvent(NextEvent.PAYMENT_PLUGIN_ERROR);
paymentDao.updatePaymentAndTransactionOnCompletion(account.getId(), payment.getId(), TransactionType.AUTHORIZE, paymentStateName, paymentStateName,
payment.getTransactions().get(0).getId(), TransactionStatus.UNKNOWN, requestedAmount, account.getCurrency(),
"foo", "bar", internalCallContext);
+ testListener.assertListenerStatus();
+
+ // Move clock for notification to be processed
+ clock.addDeltaFromReality(5 * 60 * 1000);
+ // NO because we will keep retrying as we can't fix it...
+ //assertNotificationsCompleted(internalCallContext, 5);
final List<PaymentTransactionModelDao> paymentTransactionHistoryBeforeJanitor = getPaymentTransactionHistory(transactionExternalKey);
Assert.assertEquals(paymentTransactionHistoryBeforeJanitor.size(), 3);
- clock.addDays(1);
- try {
- Thread.sleep(1500);
- } catch (InterruptedException e) {
- }
+
// Nothing new happened
final List<PaymentTransactionModelDao> paymentTransactionHistoryAfterJanitor = getPaymentTransactionHistory(transactionExternalKey);
@@ -326,31 +372,36 @@ public class TestJanitor extends PaymentTestSuiteWithEmbeddedDB {
}
@Test(groups = "slow")
- public void testPendingEntries() throws PaymentApiException, EventBusException {
+ public void testPendingEntries() throws PaymentApiException, EventBusException, NoSuchNotificationQueue {
final BigDecimal requestedAmount = BigDecimal.TEN;
final String paymentExternalKey = "jhj44";
final String transactionExternalKey = "4jhjj2";
+ testListener.pushExpectedEvent(NextEvent.PAYMENT);
final Payment payment = paymentApi.createAuthorization(account, account.getPaymentMethodId(), null, requestedAmount, account.getCurrency(), paymentExternalKey,
transactionExternalKey, ImmutableList.<PluginProperty>of(), callContext);
+ testListener.assertListenerStatus();
+
+ final InternalCallContext internalCallContext = internalCallContextFactory.createInternalCallContext(account.getId(), callContext);
// Artificially move the transaction status to PENDING
final String paymentStateName = paymentSMHelper.getPendingStateForTransaction(TransactionType.AUTHORIZE).toString();
+ testListener.pushExpectedEvent(NextEvent.PAYMENT);
paymentDao.updatePaymentAndTransactionOnCompletion(account.getId(), payment.getId(), TransactionType.AUTHORIZE, paymentStateName, paymentStateName,
payment.getTransactions().get(0).getId(), TransactionStatus.PENDING, requestedAmount, account.getCurrency(),
"loup", "chat", internalCallContext);
- clock.addDays(1);
- try {
- Thread.sleep(1500);
- } catch (InterruptedException e) {
- }
+ testListener.assertListenerStatus();
+
+ // Move clock for notification to be processed
+ clock.addDeltaFromReality(5 * 60 * 1000);
+ assertNotificationsCompleted(internalCallContext, 5);
final Payment updatedPayment = paymentApi.getPayment(payment.getId(), false, ImmutableList.<PluginProperty>of(), callContext);
Assert.assertEquals(updatedPayment.getTransactions().get(0).getTransactionStatus(), TransactionStatus.SUCCESS);
-
}
+
private List<PluginProperty> createPropertiesForInvoice(final Invoice invoice) {
final List<PluginProperty> result = new ArrayList<PluginProperty>();
result.add(new PluginProperty(InvoicePaymentRoutingPluginApi.PROP_IPCD_INVOICE_ID, invoice.getId().toString(), false));
@@ -385,5 +436,19 @@ public class TestJanitor extends PaymentTestSuiteWithEmbeddedDB {
}
});
}
+
+ private void assertNotificationsCompleted(final InternalCallContext internalCallContext, final long timeoutSec) {
+ try {
+ await().atMost(timeoutSec, SECONDS).until(new Callable<Boolean>() {
+ @Override
+ public Boolean call() throws Exception {
+ final List<NotificationEventWithMetadata<NotificationEvent>> notifications = notificationQueueService.getNotificationQueue(DefaultPaymentService.SERVICE_NAME, Janitor.QUEUE_NAME).getFutureOrInProcessingNotificationForSearchKeys(internalCallContext.getAccountRecordId(), internalCallContext.getTenantRecordId());
+ return notifications.isEmpty();
+ }
+ });
+ } catch (final Exception e) {
+ fail("Test failed ", e);
+ }
+ }
}
diff --git a/payment/src/test/java/org/killbill/billing/payment/TestRetryService.java b/payment/src/test/java/org/killbill/billing/payment/TestRetryService.java
index 2a1ae78..dcf9a9a 100644
--- a/payment/src/test/java/org/killbill/billing/payment/TestRetryService.java
+++ b/payment/src/test/java/org/killbill/billing/payment/TestRetryService.java
@@ -70,7 +70,7 @@ public class TestRetryService extends PaymentTestSuiteNoDB {
((MockPaymentDao) paymentDao).reset();
mockPaymentProviderPlugin = (MockPaymentProviderPlugin) registry.getServiceForName(MockPaymentProviderPlugin.PLUGIN_NAME);
mockPaymentProviderPlugin.clear();
- retryService.initialize(DefaultPaymentService.SERVICE_NAME);
+ retryService.initialize();
retryService.start();
}