killbill-memoizeit
Changes
payment/src/main/java/org/killbill/billing/payment/api/DefaultPaymentInternalEvent.java 209(+209 -0)
payment/src/main/java/org/killbill/billing/payment/api/DefaultPaymentPluginErrorEvent.java 82(+19 -63)
payment/src/main/java/org/killbill/billing/payment/core/janitor/IncompletePaymentAttemptTask.java 30(+24 -6)
payment/src/main/java/org/killbill/billing/payment/core/janitor/IncompletePaymentTransactionTask.java 161(+111 -50)
payment/src/main/java/org/killbill/billing/payment/core/janitor/JanitorNotificationKey.java 48(+48 -0)
payment/src/main/java/org/killbill/billing/payment/core/sm/payments/PaymentEnteringStateCallback.java 5(+5 -0)
payment/src/test/java/org/killbill/billing/payment/core/janitor/TestIncompletePaymentTransactionTask.java 68(+68 -0)
payment/src/test/java/org/killbill/billing/payment/glue/TestPaymentModuleWithEmbeddedDB.java 3(+2 -1)
Details
diff --git a/api/src/main/java/org/killbill/billing/events/PaymentErrorInternalEvent.java b/api/src/main/java/org/killbill/billing/events/PaymentErrorInternalEvent.java
index 3b5889f..c261a13 100644
--- a/api/src/main/java/org/killbill/billing/events/PaymentErrorInternalEvent.java
+++ b/api/src/main/java/org/killbill/billing/events/PaymentErrorInternalEvent.java
@@ -13,19 +13,9 @@
* License for the specific language governing permissions and limitations
* under the License.
*/
-package org.killbill.billing.events;
-
-import java.util.UUID;
-import org.killbill.billing.payment.api.TransactionType;
-
-public interface PaymentErrorInternalEvent extends BusInternalEvent {
+package org.killbill.billing.events;
+public interface PaymentErrorInternalEvent extends PaymentInternalEvent {
public String getMessage();
-
- public UUID getAccountId();
-
- public UUID getPaymentId();
-
- public TransactionType getTransactionType();
}
diff --git a/api/src/main/java/org/killbill/billing/events/PaymentInfoInternalEvent.java b/api/src/main/java/org/killbill/billing/events/PaymentInfoInternalEvent.java
index f1265b9..2e30ab6 100644
--- a/api/src/main/java/org/killbill/billing/events/PaymentInfoInternalEvent.java
+++ b/api/src/main/java/org/killbill/billing/events/PaymentInfoInternalEvent.java
@@ -16,27 +16,5 @@
package org.killbill.billing.events;
-import java.math.BigDecimal;
-import java.util.UUID;
-
-import org.joda.time.DateTime;
-import org.killbill.billing.catalog.api.Currency;
-import org.killbill.billing.payment.api.TransactionStatus;
-import org.killbill.billing.payment.api.TransactionType;
-
-public interface PaymentInfoInternalEvent extends BusInternalEvent {
-
- public UUID getPaymentId();
-
- public UUID getAccountId();
-
- public BigDecimal getAmount();
-
- public Currency getCurrency();
-
- public DateTime getEffectiveDate();
-
- public TransactionStatus getStatus();
-
- public TransactionType getTransactionType();
+public interface PaymentInfoInternalEvent extends PaymentInternalEvent {
}
diff --git a/api/src/main/java/org/killbill/billing/events/PaymentInternalEvent.java b/api/src/main/java/org/killbill/billing/events/PaymentInternalEvent.java
new file mode 100644
index 0000000..63d38e4
--- /dev/null
+++ b/api/src/main/java/org/killbill/billing/events/PaymentInternalEvent.java
@@ -0,0 +1,45 @@
+/*
+ * Copyright 2014-2015 Groupon, Inc
+ * Copyright 2014-2015 The Billing Project, LLC
+ *
+ * The Billing Project licenses this file to you under the Apache License, version 2.0
+ * (the "License"); you may not use this file except in compliance with the
+ * License. You may obtain a copy of the License at:
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.killbill.billing.events;
+
+import java.math.BigDecimal;
+import java.util.UUID;
+
+import org.joda.time.DateTime;
+import org.killbill.billing.catalog.api.Currency;
+import org.killbill.billing.payment.api.TransactionStatus;
+import org.killbill.billing.payment.api.TransactionType;
+
+public interface PaymentInternalEvent extends BusInternalEvent {
+
+ public UUID getAccountId();
+
+ public UUID getPaymentId();
+
+ public UUID getPaymentTransactionId();
+
+ public BigDecimal getAmount();
+
+ public Currency getCurrency();
+
+ public TransactionStatus getStatus();
+
+ public TransactionType getTransactionType();
+
+ public DateTime getEffectiveDate();
+}
diff --git a/api/src/main/java/org/killbill/billing/events/PaymentPluginErrorInternalEvent.java b/api/src/main/java/org/killbill/billing/events/PaymentPluginErrorInternalEvent.java
index a0198cc..be1a80b 100644
--- a/api/src/main/java/org/killbill/billing/events/PaymentPluginErrorInternalEvent.java
+++ b/api/src/main/java/org/killbill/billing/events/PaymentPluginErrorInternalEvent.java
@@ -13,19 +13,9 @@
* License for the specific language governing permissions and limitations
* under the License.
*/
-package org.killbill.billing.events;
-
-import java.util.UUID;
-import org.killbill.billing.payment.api.TransactionType;
-
-public interface PaymentPluginErrorInternalEvent extends BusInternalEvent {
+package org.killbill.billing.events;
+public interface PaymentPluginErrorInternalEvent extends PaymentInternalEvent {
public String getMessage();
-
- public UUID getAccountId();
-
- public UUID getPaymentId();
-
- public TransactionType getTransactionType();
}
diff --git a/invoice/src/main/java/org/killbill/billing/invoice/InvoiceListener.java b/invoice/src/main/java/org/killbill/billing/invoice/InvoiceListener.java
index 86d77d9..ca63e2b 100644
--- a/invoice/src/main/java/org/killbill/billing/invoice/InvoiceListener.java
+++ b/invoice/src/main/java/org/killbill/billing/invoice/InvoiceListener.java
@@ -37,6 +37,7 @@ import org.killbill.billing.events.EffectiveSubscriptionInternalEvent;
import org.killbill.billing.events.RepairSubscriptionInternalEvent;
import org.killbill.billing.util.config.InvoiceConfig;
+import com.google.common.eventbus.AllowConcurrentEvents;
import com.google.common.eventbus.Subscribe;
import com.google.inject.Inject;
@@ -60,6 +61,7 @@ public class InvoiceListener {
this.clock = clock;
}
+ @AllowConcurrentEvents
@Subscribe
public void handleRepairSubscriptionEvent(final RepairSubscriptionInternalEvent event) {
@@ -71,6 +73,7 @@ public class InvoiceListener {
}
}
+ @AllowConcurrentEvents
@Subscribe
public void handleSubscriptionTransition(final EffectiveSubscriptionInternalEvent event) {
@@ -89,6 +92,7 @@ public class InvoiceListener {
}
}
+ @AllowConcurrentEvents
@Subscribe
public void handleEntitlementTransition(final EffectiveEntitlementInternalEvent event) {
@@ -100,6 +104,7 @@ public class InvoiceListener {
}
}
+ @AllowConcurrentEvents
@Subscribe
public void handleBlockingStateTransition(final BlockingTransitionInternalEvent event) {
diff --git a/invoice/src/main/java/org/killbill/billing/invoice/InvoiceTagHandler.java b/invoice/src/main/java/org/killbill/billing/invoice/InvoiceTagHandler.java
index e293356..cb6ed8c 100644
--- a/invoice/src/main/java/org/killbill/billing/invoice/InvoiceTagHandler.java
+++ b/invoice/src/main/java/org/killbill/billing/invoice/InvoiceTagHandler.java
@@ -31,6 +31,7 @@ import org.killbill.clock.Clock;
import org.killbill.billing.events.ControlTagDeletionInternalEvent;
import org.killbill.billing.util.tag.ControlTagType;
+import com.google.common.eventbus.AllowConcurrentEvents;
import com.google.common.eventbus.Subscribe;
import com.google.inject.Inject;
@@ -51,6 +52,7 @@ public class InvoiceTagHandler {
this.internalCallContextFactory = internalCallContextFactory;
}
+ @AllowConcurrentEvents
@Subscribe
public void process_AUTO_INVOICING_OFF_removal(final ControlTagDeletionInternalEvent event) {
diff --git a/jaxrs/src/main/java/org/killbill/billing/jaxrs/util/KillbillEventHandler.java b/jaxrs/src/main/java/org/killbill/billing/jaxrs/util/KillbillEventHandler.java
index cae76d5..1b4a1d4 100644
--- a/jaxrs/src/main/java/org/killbill/billing/jaxrs/util/KillbillEventHandler.java
+++ b/jaxrs/src/main/java/org/killbill/billing/jaxrs/util/KillbillEventHandler.java
@@ -24,6 +24,7 @@ import org.killbill.billing.events.BusInternalEvent;
import org.killbill.billing.util.userrequest.CompletionUserRequest;
import org.killbill.billing.util.userrequest.CompletionUserRequestNotifier;
+import com.google.common.eventbus.AllowConcurrentEvents;
import com.google.common.eventbus.Subscribe;
public class KillbillEventHandler {
@@ -55,6 +56,7 @@ public class KillbillEventHandler {
/*
* Killbill server event handler
*/
+ @AllowConcurrentEvents
@Subscribe
public void handleSubscriptionevents(final BusInternalEvent event) {
final List<CompletionUserRequestNotifier> runningWaiters = new ArrayList<CompletionUserRequestNotifier>();
diff --git a/overdue/src/main/java/org/killbill/billing/overdue/listener/OverdueListener.java b/overdue/src/main/java/org/killbill/billing/overdue/listener/OverdueListener.java
index a8d11b9..fa2e581 100644
--- a/overdue/src/main/java/org/killbill/billing/overdue/listener/OverdueListener.java
+++ b/overdue/src/main/java/org/killbill/billing/overdue/listener/OverdueListener.java
@@ -50,6 +50,7 @@ import org.killbill.clock.Clock;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import com.google.common.eventbus.AllowConcurrentEvents;
import com.google.common.eventbus.Subscribe;
import com.google.inject.Inject;
@@ -73,6 +74,7 @@ public class OverdueListener {
this.internalCallContextFactory = internalCallContextFactory;
}
+ @AllowConcurrentEvents
@Subscribe
public void handle_OVERDUE_ENFORCEMENT_OFF_Insert(final ControlTagCreationInternalEvent event) {
if (event.getTagDefinition().getName().equals(ControlTagType.OVERDUE_ENFORCEMENT_OFF.toString()) && event.getObjectType() == ObjectType.ACCOUNT) {
@@ -80,6 +82,7 @@ public class OverdueListener {
}
}
+ @AllowConcurrentEvents
@Subscribe
public void handle_OVERDUE_ENFORCEMENT_OFF_Removal(final ControlTagDeletionInternalEvent event) {
if (event.getTagDefinition().getName().equals(ControlTagType.OVERDUE_ENFORCEMENT_OFF.toString()) && event.getObjectType() == ObjectType.ACCOUNT) {
@@ -87,18 +90,21 @@ public class OverdueListener {
}
}
+ @AllowConcurrentEvents
@Subscribe
public void handlePaymentInfoEvent(final PaymentInfoInternalEvent event) {
log.debug("Received PaymentInfo event {}", event);
insertBusEventIntoNotificationQueue(event.getAccountId(), event, OverdueAsyncBusNotificationAction.REFRESH, event.getSearchKey2());
}
+ @AllowConcurrentEvents
@Subscribe
public void handlePaymentErrorEvent(final PaymentErrorInternalEvent event) {
log.debug("Received PaymentError event {}", event);
insertBusEventIntoNotificationQueue(event.getAccountId(), event, OverdueAsyncBusNotificationAction.REFRESH, event.getSearchKey2());
}
+ @AllowConcurrentEvents
@Subscribe
public void handleInvoiceAdjustmentEvent(final InvoiceAdjustmentInternalEvent event) {
log.debug("Received InvoiceAdjustment event {}", event);
diff --git a/payment/src/main/java/org/killbill/billing/payment/api/DefaultPaymentErrorEvent.java b/payment/src/main/java/org/killbill/billing/payment/api/DefaultPaymentErrorEvent.java
index d81ebad..48d93b7 100644
--- a/payment/src/main/java/org/killbill/billing/payment/api/DefaultPaymentErrorEvent.java
+++ b/payment/src/main/java/org/killbill/billing/payment/api/DefaultPaymentErrorEvent.java
@@ -16,55 +16,36 @@
package org.killbill.billing.payment.api;
+import java.math.BigDecimal;
import java.util.UUID;
-import org.killbill.billing.events.BusEventBase;
+import org.joda.time.DateTime;
+import org.killbill.billing.catalog.api.Currency;
import org.killbill.billing.events.PaymentErrorInternalEvent;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonIgnore;
import com.fasterxml.jackson.annotation.JsonProperty;
-public class DefaultPaymentErrorEvent extends BusEventBase implements PaymentErrorInternalEvent {
+public class DefaultPaymentErrorEvent extends DefaultPaymentInternalEvent implements PaymentErrorInternalEvent {
private final String message;
- private final UUID accountId;
- private final UUID paymentId;
- private final TransactionType transactionType;
@JsonCreator
public DefaultPaymentErrorEvent(@JsonProperty("accountId") final UUID accountId,
@JsonProperty("paymentId") final UUID paymentId,
- @JsonProperty("transactionType") final TransactionType transactionType,
+ @JsonProperty("paymentTransactionId") final UUID paymentTransactionId,
+ @JsonProperty("amount") final BigDecimal amount,
+ @JsonProperty("currency") final Currency currency,
+ @JsonProperty("status") final TransactionStatus status,
+ @JsonProperty("transactionType") final TransactionType transactionType,
+ @JsonProperty("effectiveDate") final DateTime effectiveDate,
@JsonProperty("message") final String message,
@JsonProperty("searchKey1") final Long searchKey1,
@JsonProperty("searchKey2") final Long searchKey2,
@JsonProperty("userToken") final UUID userToken) {
- super(searchKey1, searchKey2, userToken);
+ super(accountId, paymentId, paymentTransactionId, amount, currency, status, transactionType, effectiveDate, searchKey1, searchKey2, userToken);
this.message = message;
- this.accountId = accountId;
- this.paymentId = paymentId;
- this.transactionType = transactionType;
- }
-
- @Override
- public String getMessage() {
- return message;
- }
-
- @Override
- public UUID getAccountId() {
- return accountId;
- }
-
- @Override
- public UUID getPaymentId() {
- return paymentId;
- }
-
- @Override
- public TransactionType getTransactionType() {
- return transactionType;
}
@JsonIgnore
@@ -74,49 +55,12 @@ public class DefaultPaymentErrorEvent extends BusEventBase implements PaymentErr
}
@Override
- public String toString() {
- final StringBuilder sb = new StringBuilder("DefaultPaymentErrorEvent{");
- sb.append("message='").append(message).append('\'');
- sb.append(", accountId=").append(accountId);
- sb.append(", paymentId=").append(paymentId);
- sb.append(", transactionType=").append(transactionType);
- sb.append('}');
- return sb.toString();
- }
-
- @Override
- public boolean equals(final Object o) {
- if (this == o) {
- return true;
- }
- if (!(o instanceof DefaultPaymentErrorEvent)) {
- return false;
- }
-
- final DefaultPaymentErrorEvent that = (DefaultPaymentErrorEvent) o;
-
- if (accountId != null ? !accountId.equals(that.accountId) : that.accountId != null) {
- return false;
- }
- if (transactionType != null ? !transactionType.equals(that.transactionType) : that.transactionType != null) {
- return false;
- }
- if (message != null ? !message.equals(that.message) : that.message != null) {
- return false;
- }
- if (paymentId != null ? !paymentId.equals(that.paymentId) : that.paymentId != null) {
- return false;
- }
-
- return true;
+ public String getMessage() {
+ return message;
}
@Override
- public int hashCode() {
- int result = message != null ? message.hashCode() : 0;
- result = 31 * result + (accountId != null ? accountId.hashCode() : 0);
- result = 31 * result + (transactionType != null ? transactionType.hashCode() : 0);
- result = 31 * result + (paymentId != null ? paymentId.hashCode() : 0);
- return result;
+ protected Class getPaymentInternalEventClass() {
+ return DefaultPaymentErrorEvent.class;
}
}
diff --git a/payment/src/main/java/org/killbill/billing/payment/api/DefaultPaymentInfoEvent.java b/payment/src/main/java/org/killbill/billing/payment/api/DefaultPaymentInfoEvent.java
index d3d73b2..3caea55 100644
--- a/payment/src/main/java/org/killbill/billing/payment/api/DefaultPaymentInfoEvent.java
+++ b/payment/src/main/java/org/killbill/billing/payment/api/DefaultPaymentInfoEvent.java
@@ -21,58 +21,25 @@ import java.util.UUID;
import org.joda.time.DateTime;
import org.killbill.billing.catalog.api.Currency;
-import org.killbill.billing.events.BusEventBase;
import org.killbill.billing.events.PaymentInfoInternalEvent;
-import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonIgnore;
import com.fasterxml.jackson.annotation.JsonProperty;
-public class DefaultPaymentInfoEvent extends BusEventBase implements PaymentInfoInternalEvent {
+public class DefaultPaymentInfoEvent extends DefaultPaymentInternalEvent implements PaymentInfoInternalEvent {
- private final UUID accountId;
- private final UUID paymentId;
- private final BigDecimal amount;
- private final Currency currency;
- private final TransactionStatus status;
- private final TransactionType transactionType;
- private final DateTime effectiveDate;
-
- @JsonCreator
public DefaultPaymentInfoEvent(@JsonProperty("accountId") final UUID accountId,
@JsonProperty("paymentId") final UUID paymentId,
+ @JsonProperty("paymentTransactionId") final UUID paymentTransactionId,
@JsonProperty("amount") final BigDecimal amount,
@JsonProperty("currency") final Currency currency,
@JsonProperty("status") final TransactionStatus status,
- @JsonProperty("transactionType") final TransactionType transactionType,
- @JsonProperty("extFirstPaymentRefId") final String extFirstPaymentRefId /* TODO for backward compatibility only */,
- @JsonProperty("extSecondPaymentRefId") final String extSecondPaymentRefId /* TODO for backward compatibility only */,
+ @JsonProperty("transactionType") final TransactionType transactionType,
@JsonProperty("effectiveDate") final DateTime effectiveDate,
@JsonProperty("searchKey1") final Long searchKey1,
@JsonProperty("searchKey2") final Long searchKey2,
@JsonProperty("userToken") final UUID userToken) {
- super(searchKey1, searchKey2, userToken);
- this.accountId = accountId;
- this.paymentId = paymentId;
- this.amount = amount;
- this.currency = currency;
- this.status = status;
- this.transactionType = transactionType;
- this.effectiveDate = effectiveDate;
- }
-
- public DefaultPaymentInfoEvent(final UUID accountId,
- final UUID paymentId,
- final BigDecimal amount,
- final Currency currency,
- final TransactionStatus status,
- final TransactionType transactionType,
- final DateTime effectiveDate,
- final Long searchKey1,
- final Long searchKey2,
- final UUID userToken) {
- this(accountId, paymentId, amount, currency, status, transactionType, null, null,
- effectiveDate, searchKey1, searchKey2, userToken);
+ super(accountId, paymentId, paymentTransactionId, amount, currency, status, transactionType, effectiveDate, searchKey1, searchKey2, userToken);
}
@JsonIgnore
@@ -81,131 +48,9 @@ public class DefaultPaymentInfoEvent extends BusEventBase implements PaymentInfo
return BusInternalEventType.PAYMENT_INFO;
}
- @Override
- public UUID getAccountId() {
- return accountId;
- }
-
-
- @Override
- public BigDecimal getAmount() {
- return amount;
- }
-
- @Override
- public Currency getCurrency() {
- return currency;
- }
-
- @Override
- public DateTime getEffectiveDate() {
- return effectiveDate;
- }
-
- @Override
- public UUID getPaymentId() {
- return paymentId;
- }
-
- @Override
- public TransactionType getTransactionType() {
- return transactionType;
- }
-
- @Override
- public TransactionStatus getStatus() {
- return status;
- }
-
- @Override
- public String toString() {
- final StringBuilder sb = new StringBuilder();
- sb.append("DefaultPaymentInfoEvent");
- sb.append("{accountId=").append(accountId);
- sb.append(", paymentId=").append(paymentId);
- sb.append(", amount=").append(amount);
- sb.append(", currency=").append(currency);
- sb.append(", status=").append(status);
- sb.append(", transactionType=").append(transactionType);
- sb.append(", effectiveDate=").append(effectiveDate);
- sb.append('}');
- return sb.toString();
- }
-
- @Override
- public int hashCode() {
- final int prime = 31;
- int result = 1;
- result = prime * result
- + ((accountId == null) ? 0 : accountId.hashCode());
- result = prime * result + ((amount == null) ? 0 : amount.hashCode());
- result = prime * result
- + ((effectiveDate == null) ? 0 : effectiveDate.hashCode());
- result = prime * result
- + ((paymentId == null) ? 0 : paymentId.hashCode());
- result = prime * result
- + ((currency == null) ? 0 : currency.hashCode());
- result = prime * result + ((status == null) ? 0 : status.hashCode());
- return result;
- }
@Override
- public boolean equals(final Object obj) {
- if (this == obj) {
- return true;
- }
- if (obj == null) {
- return false;
- }
- if (getClass() != obj.getClass()) {
- return false;
- }
- final DefaultPaymentInfoEvent other = (DefaultPaymentInfoEvent) obj;
- if (accountId == null) {
- if (other.accountId != null) {
- return false;
- }
- } else if (!accountId.equals(other.accountId)) {
- return false;
- }
- if (transactionType == null) {
- if (other.transactionType != null) {
- return false;
- }
- } else if (!transactionType.equals(other.transactionType)) {
- return false;
- }
- if (amount == null) {
- if (other.amount != null) {
- return false;
- }
- } else if (amount.compareTo(other.amount) != 0) {
- return false;
- }
- if (effectiveDate == null) {
- if (other.effectiveDate != null) {
- return false;
- }
- } else if (effectiveDate.compareTo(other.effectiveDate) != 0) {
- return false;
- }
- if (paymentId == null) {
- if (other.paymentId != null) {
- return false;
- }
- } else if (!paymentId.equals(other.paymentId)) {
- return false;
- }
- if (currency == null) {
- if (other.currency != null) {
- return false;
- }
- } else if (!currency.equals(other.currency)) {
- return false;
- }
- if (status != other.status) {
- return false;
- }
- return true;
+ protected Class getPaymentInternalEventClass() {
+ return DefaultPaymentInfoEvent.class;
}
}
diff --git a/payment/src/main/java/org/killbill/billing/payment/api/DefaultPaymentInternalEvent.java b/payment/src/main/java/org/killbill/billing/payment/api/DefaultPaymentInternalEvent.java
new file mode 100644
index 0000000..84b2e36
--- /dev/null
+++ b/payment/src/main/java/org/killbill/billing/payment/api/DefaultPaymentInternalEvent.java
@@ -0,0 +1,209 @@
+/*
+ * Copyright 2014-2015 Groupon, Inc
+ * Copyright 2014-2015 The Billing Project, LLC
+ *
+ * The Billing Project licenses this file to you under the Apache License, version 2.0
+ * (the "License"); you may not use this file except in compliance with the
+ * License. You may obtain a copy of the License at:
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.killbill.billing.payment.api;
+
+import java.math.BigDecimal;
+import java.util.UUID;
+
+import org.joda.time.DateTime;
+import org.killbill.billing.catalog.api.Currency;
+import org.killbill.billing.events.BusEventBase;
+import org.killbill.billing.events.PaymentInternalEvent;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonIgnore;
+import com.fasterxml.jackson.annotation.JsonProperty;
+
+public abstract class DefaultPaymentInternalEvent extends BusEventBase implements PaymentInternalEvent {
+
+ private final UUID accountId;
+ private final UUID paymentId;
+ private final UUID paymentTransactionId;
+ private final BigDecimal amount;
+ private final Currency currency;
+ private final TransactionStatus status;
+ private final TransactionType transactionType;
+ private final DateTime effectiveDate;
+
+ @JsonCreator
+ public DefaultPaymentInternalEvent(@JsonProperty("accountId") final UUID accountId,
+ @JsonProperty("paymentId") final UUID paymentId,
+ @JsonProperty("paymentTransactionId") final UUID paymentTransactionId,
+ @JsonProperty("amount") final BigDecimal amount,
+ @JsonProperty("currency") final Currency currency,
+ @JsonProperty("status") final TransactionStatus status,
+ @JsonProperty("transactionType") final TransactionType transactionType,
+ @JsonProperty("effectiveDate") final DateTime effectiveDate,
+ @JsonProperty("searchKey1") final Long searchKey1,
+ @JsonProperty("searchKey2") final Long searchKey2,
+ @JsonProperty("userToken") final UUID userToken) {
+ super(searchKey1, searchKey2, userToken);
+ this.accountId = accountId;
+ this.paymentId = paymentId;
+ this.paymentTransactionId = paymentTransactionId;
+ this.amount = amount;
+ this.currency = currency;
+ this.status = status;
+ this.transactionType = transactionType;
+ this.effectiveDate = effectiveDate;
+ }
+
+ @Override
+ public UUID getAccountId() {
+ return accountId;
+ }
+
+ @Override
+ public BigDecimal getAmount() {
+ return amount;
+ }
+
+ @Override
+ public Currency getCurrency() {
+ return currency;
+ }
+
+ @Override
+ public DateTime getEffectiveDate() {
+ return effectiveDate;
+ }
+
+ @Override
+ public UUID getPaymentId() {
+ return paymentId;
+ }
+
+ @Override
+ public UUID getPaymentTransactionId() {
+ return paymentTransactionId;
+ }
+
+ @Override
+ public TransactionType getTransactionType() {
+ return transactionType;
+ }
+
+ @Override
+ public TransactionStatus getStatus() {
+ return status;
+ }
+
+ @Override
+ public String toString() {
+ final StringBuilder sb = new StringBuilder();
+ sb.append(getPaymentInternalEventClass().toString());
+ sb.append(" {accountId=").append(accountId);
+ sb.append(", paymentId=").append(paymentId);
+ sb.append(", paymentTransactionId=").append(paymentTransactionId);
+ sb.append(", amount=").append(amount);
+ sb.append(", currency=").append(currency);
+ sb.append(", status=").append(status);
+ sb.append(", transactionType=").append(transactionType);
+ sb.append(", effectiveDate=").append(effectiveDate);
+ sb.append('}');
+ return sb.toString();
+ }
+
+ @Override
+ public int hashCode() {
+ final int prime = 31;
+ int result = 1;
+ result = prime * result
+ + ((accountId == null) ? 0 : accountId.hashCode());
+ result = prime * result + ((amount == null) ? 0 : amount.hashCode());
+ result = prime * result
+ + ((effectiveDate == null) ? 0 : effectiveDate.hashCode());
+ result = prime * result
+ + ((paymentId == null) ? 0 : paymentId.hashCode());
+ result = prime * result
+ + ((paymentTransactionId == null) ? 0 : paymentTransactionId.hashCode());
+ result = prime * result
+ + ((currency == null) ? 0 : currency.hashCode());
+ result = prime * result + ((status == null) ? 0 : status.hashCode());
+ return result;
+ }
+
+ @Override
+ public boolean equals(final Object obj) {
+ if (this == obj) {
+ return true;
+ }
+ if (obj == null) {
+ return false;
+ }
+ if (getClass() != obj.getClass()) {
+ return false;
+ }
+ final DefaultPaymentInternalEvent other = (DefaultPaymentInternalEvent) obj;
+ if (accountId == null) {
+ if (other.accountId != null) {
+ return false;
+ }
+ } else if (!accountId.equals(other.accountId)) {
+ return false;
+ }
+ if (transactionType == null) {
+ if (other.transactionType != null) {
+ return false;
+ }
+ } else if (!transactionType.equals(other.transactionType)) {
+ return false;
+ }
+ if (amount == null) {
+ if (other.amount != null) {
+ return false;
+ }
+ } else if (amount.compareTo(other.amount) != 0) {
+ return false;
+ }
+ if (effectiveDate == null) {
+ if (other.effectiveDate != null) {
+ return false;
+ }
+ } else if (effectiveDate.compareTo(other.effectiveDate) != 0) {
+ return false;
+ }
+ if (paymentId == null) {
+ if (other.paymentId != null) {
+ return false;
+ }
+ } else if (!paymentId.equals(other.paymentId)) {
+ return false;
+ }
+ if (paymentTransactionId == null) {
+ if (other.paymentTransactionId != null) {
+ return false;
+ }
+ } else if (!paymentTransactionId.equals(other.paymentTransactionId)) {
+ return false;
+ }
+ if (currency == null) {
+ if (other.currency != null) {
+ return false;
+ }
+ } else if (!currency.equals(other.currency)) {
+ return false;
+ }
+ if (status != other.status) {
+ return false;
+ }
+ return true;
+ }
+
+ protected abstract Class getPaymentInternalEventClass();
+}
diff --git a/payment/src/main/java/org/killbill/billing/payment/api/DefaultPaymentPluginErrorEvent.java b/payment/src/main/java/org/killbill/billing/payment/api/DefaultPaymentPluginErrorEvent.java
index 5ef1ac2..dee9ada 100644
--- a/payment/src/main/java/org/killbill/billing/payment/api/DefaultPaymentPluginErrorEvent.java
+++ b/payment/src/main/java/org/killbill/billing/payment/api/DefaultPaymentPluginErrorEvent.java
@@ -16,54 +16,36 @@
package org.killbill.billing.payment.api;
+import java.math.BigDecimal;
+import java.util.UUID;
+
+import org.joda.time.DateTime;
+import org.killbill.billing.catalog.api.Currency;
+import org.killbill.billing.events.PaymentPluginErrorInternalEvent;
+
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonIgnore;
import com.fasterxml.jackson.annotation.JsonProperty;
-import org.killbill.billing.events.BusEventBase;
-import org.killbill.billing.events.PaymentPluginErrorInternalEvent;
-
-import java.util.UUID;
-public class DefaultPaymentPluginErrorEvent extends BusEventBase implements PaymentPluginErrorInternalEvent {
+public class DefaultPaymentPluginErrorEvent extends DefaultPaymentInternalEvent implements PaymentPluginErrorInternalEvent {
private final String message;
- private final UUID accountId;
- private final UUID paymentId;
- private final TransactionType transactionType;
@JsonCreator
public DefaultPaymentPluginErrorEvent(@JsonProperty("accountId") final UUID accountId,
@JsonProperty("paymentId") final UUID paymentId,
- @JsonProperty("transactionType") final TransactionType transactionType,
+ @JsonProperty("paymentTransactionId") final UUID paymentTransactionId,
+ @JsonProperty("amount") final BigDecimal amount,
+ @JsonProperty("currency") final Currency currency,
+ @JsonProperty("status") final TransactionStatus status,
+ @JsonProperty("transactionType") final TransactionType transactionType,
+ @JsonProperty("effectiveDate") final DateTime effectiveDate,
@JsonProperty("message") final String message,
@JsonProperty("searchKey1") final Long searchKey1,
@JsonProperty("searchKey2") final Long searchKey2,
@JsonProperty("userToken") final UUID userToken) {
- super(searchKey1, searchKey2, userToken);
+ super(accountId, paymentId, paymentTransactionId, amount, currency, status, transactionType, effectiveDate, searchKey1, searchKey2, userToken);
this.message = message;
- this.accountId = accountId;
- this.paymentId = paymentId;
- this.transactionType = transactionType;
- }
-
- @Override
- public String getMessage() {
- return message;
- }
-
- @Override
- public UUID getAccountId() {
- return accountId;
- }
-
- @Override
- public UUID getPaymentId() {
- return paymentId;
- }
-
- @Override
- public TransactionType getTransactionType() {
- return transactionType;
}
@JsonIgnore
@@ -73,38 +55,12 @@ public class DefaultPaymentPluginErrorEvent extends BusEventBase implements Paym
}
@Override
- public boolean equals(final Object o) {
- if (this == o) {
- return true;
- }
- if (!(o instanceof DefaultPaymentPluginErrorEvent)) {
- return false;
- }
-
- final DefaultPaymentPluginErrorEvent that = (DefaultPaymentPluginErrorEvent) o;
-
- if (accountId != null ? !accountId.equals(that.accountId) : that.accountId != null) {
- return false;
- }
- if (transactionType != null ? !transactionType.equals(that.transactionType) : that.transactionType != null) {
- return false;
- }
- if (message != null ? !message.equals(that.message) : that.message != null) {
- return false;
- }
- if (paymentId != null ? !paymentId.equals(that.paymentId) : that.paymentId != null) {
- return false;
- }
-
- return true;
+ public String getMessage() {
+ return message;
}
@Override
- public int hashCode() {
- int result = message != null ? message.hashCode() : 0;
- result = 31 * result + (accountId != null ? accountId.hashCode() : 0);
- result = 31 * result + (transactionType != null ? transactionType.hashCode() : 0);
- result = 31 * result + (paymentId != null ? paymentId.hashCode() : 0);
- return result;
+ protected Class getPaymentInternalEventClass() {
+ return DefaultPaymentPluginErrorEvent.class;
}
}
diff --git a/payment/src/main/java/org/killbill/billing/payment/core/janitor/CompletionTaskBase.java b/payment/src/main/java/org/killbill/billing/payment/core/janitor/CompletionTaskBase.java
index ac1a218..60281b4 100644
--- a/payment/src/main/java/org/killbill/billing/payment/core/janitor/CompletionTaskBase.java
+++ b/payment/src/main/java/org/killbill/billing/payment/core/janitor/CompletionTaskBase.java
@@ -17,12 +17,17 @@
package org.killbill.billing.payment.core.janitor;
+import java.io.IOException;
import java.util.List;
+import org.killbill.billing.account.api.Account;
+import org.killbill.billing.account.api.AccountApiException;
import org.killbill.billing.account.api.AccountInternalApi;
import org.killbill.billing.callcontext.DefaultCallContext;
import org.killbill.billing.callcontext.InternalTenantContext;
+import org.killbill.billing.events.PaymentInternalEvent;
import org.killbill.billing.osgi.api.OSGIServiceRegistration;
+import org.killbill.billing.payment.core.ProcessorBase;
import org.killbill.billing.payment.core.sm.PaymentControlStateMachineHelper;
import org.killbill.billing.payment.core.sm.PaymentStateMachineHelper;
import org.killbill.billing.payment.dao.PaymentDao;
@@ -34,7 +39,12 @@ import org.killbill.billing.util.callcontext.InternalCallContextFactory;
import org.killbill.billing.util.callcontext.TenantContext;
import org.killbill.billing.util.callcontext.UserType;
import org.killbill.billing.util.config.PaymentConfig;
+import org.killbill.billing.util.globallocker.LockerType;
import org.killbill.clock.Clock;
+import org.killbill.commons.locker.GlobalLock;
+import org.killbill.commons.locker.GlobalLocker;
+import org.killbill.commons.locker.LockFailedException;
+import org.killbill.notificationq.api.NotificationQueue;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -50,13 +60,16 @@ abstract class CompletionTaskBase<T> implements Runnable {
protected final PaymentControlStateMachineHelper retrySMHelper;
protected final AccountInternalApi accountInternalApi;
protected final OSGIServiceRegistration<PaymentPluginApi> pluginRegistry;
+ protected final GlobalLocker locker;
+
+ protected NotificationQueue janitorQueue;
private volatile boolean isStopped;
public CompletionTaskBase(final InternalCallContextFactory internalCallContextFactory, final PaymentConfig paymentConfig,
final PaymentDao paymentDao, final Clock clock, final PaymentStateMachineHelper paymentStateMachineHelper,
final PaymentControlStateMachineHelper retrySMHelper, final AccountInternalApi accountInternalApi,
- final OSGIServiceRegistration<PaymentPluginApi> pluginRegistry) {
+ final OSGIServiceRegistration<PaymentPluginApi> pluginRegistry, final GlobalLocker locker) {
this.internalCallContextFactory = internalCallContextFactory;
this.paymentConfig = paymentConfig;
this.paymentDao = paymentDao;
@@ -65,6 +78,7 @@ abstract class CompletionTaskBase<T> implements Runnable {
this.retrySMHelper = retrySMHelper;
this.accountInternalApi = accountInternalApi;
this.pluginRegistry = pluginRegistry;
+ this.locker = locker;
this.isStopped = false;
}
@@ -74,7 +88,7 @@ abstract class CompletionTaskBase<T> implements Runnable {
log.info("Janitor was requested to stop");
return;
}
- final List<T> items = getItemsForIteration();
+ final Iterable<T> items = getItemsForIteration();
for (final T item : items) {
if (isStopped) {
log.info("Janitor was requested to stop");
@@ -92,10 +106,38 @@ abstract class CompletionTaskBase<T> implements Runnable {
this.isStopped = true;
}
- public abstract List<T> getItemsForIteration();
+ public abstract Iterable<T> getItemsForIteration();
public abstract void doIteration(final T item);
+ public abstract void processPaymentEvent(final PaymentInternalEvent event, final NotificationQueue janitorQueue) throws IOException;
+
+ public void attachJanitorQueue(final NotificationQueue janitorQueue) {
+ this.janitorQueue = janitorQueue;
+ }
+
+ public interface JanitorIterationCallback {
+ public <T> T doIteration();
+ }
+
+ protected <T> T doJanitorOperationWithAccountLock(final JanitorIterationCallback callback, final InternalTenantContext internalTenantContext) {
+ GlobalLock lock = null;
+ try {
+ final Account account = accountInternalApi.getAccountByRecordId(internalTenantContext.getAccountRecordId(), internalTenantContext);
+ lock = locker.lockWithNumberOfTries(LockerType.ACCNT_INV_PAY.toString(), account.getExternalKey(), ProcessorBase.NB_LOCK_TRY);
+ return callback.doIteration();
+ } catch (AccountApiException e) {
+ log.warn(String.format("Janitor failed to retrieve account with recordId %s", internalTenantContext.getAccountRecordId()), e);
+ } catch (LockFailedException e) {
+ log.warn(String.format("Janitor failed to lock account with recordId %s", internalTenantContext.getAccountRecordId()), e);
+ } finally {
+ if (lock != null) {
+ lock.release();
+ }
+ }
+ return null;
+ }
+
protected CallContext createCallContext(final String taskName, final InternalTenantContext internalTenantContext) {
final TenantContext tenantContext = internalCallContextFactory.createTenantContext(internalTenantContext);
return new DefaultCallContext(tenantContext.getTenantId(), taskName, CallOrigin.INTERNAL, UserType.SYSTEM, UUIDs.randomUUID(), clock);
diff --git a/payment/src/main/java/org/killbill/billing/payment/core/janitor/IncompletePaymentAttemptTask.java b/payment/src/main/java/org/killbill/billing/payment/core/janitor/IncompletePaymentAttemptTask.java
index ddd73dc..01a9101 100644
--- a/payment/src/main/java/org/killbill/billing/payment/core/janitor/IncompletePaymentAttemptTask.java
+++ b/payment/src/main/java/org/killbill/billing/payment/core/janitor/IncompletePaymentAttemptTask.java
@@ -17,6 +17,7 @@
package org.killbill.billing.payment.core.janitor;
+import java.io.IOException;
import java.util.List;
import javax.inject.Inject;
@@ -27,6 +28,7 @@ import org.killbill.billing.account.api.AccountApiException;
import org.killbill.billing.account.api.AccountInternalApi;
import org.killbill.billing.callcontext.InternalCallContext;
import org.killbill.billing.callcontext.InternalTenantContext;
+import org.killbill.billing.events.PaymentInternalEvent;
import org.killbill.billing.osgi.api.OSGIServiceRegistration;
import org.killbill.billing.payment.api.PaymentApiException;
import org.killbill.billing.payment.api.TransactionStatus;
@@ -43,7 +45,10 @@ import org.killbill.billing.payment.plugin.api.PaymentPluginApi;
import org.killbill.billing.util.callcontext.CallContext;
import org.killbill.billing.util.callcontext.InternalCallContextFactory;
import org.killbill.billing.util.config.PaymentConfig;
+import org.killbill.billing.util.entity.Pagination;
import org.killbill.clock.Clock;
+import org.killbill.commons.locker.GlobalLocker;
+import org.killbill.notificationq.api.NotificationQueue;
import com.google.common.base.Predicate;
import com.google.common.collect.Iterables;
@@ -56,28 +61,36 @@ import com.google.common.collect.Iterables;
*/
public class IncompletePaymentAttemptTask extends CompletionTaskBase<PaymentAttemptModelDao> {
+ //
+ // Each paymentAttempt *should* transition to a new state, so fetching a limited size will still allow us to progress (as opposed to fetching the same entries over and over)
+ // We also don't expect to see too many entries in the INIT state.
+ //
+ private final static long MAX_ATTEMPTS_PER_ITERATIONS = 1000L;
+
private final PluginRoutingPaymentAutomatonRunner pluginControlledPaymentAutomatonRunner;
@Inject
public IncompletePaymentAttemptTask(final InternalCallContextFactory internalCallContextFactory, final PaymentConfig paymentConfig,
final PaymentDao paymentDao, final Clock clock, final PaymentStateMachineHelper paymentStateMachineHelper,
final PaymentControlStateMachineHelper retrySMHelper, final AccountInternalApi accountInternalApi,
- final PluginRoutingPaymentAutomatonRunner pluginControlledPaymentAutomatonRunner, final OSGIServiceRegistration<PaymentPluginApi> pluginRegistry) {
- super(internalCallContextFactory, paymentConfig, paymentDao, clock, paymentStateMachineHelper, retrySMHelper, accountInternalApi, pluginRegistry);
+ final PluginRoutingPaymentAutomatonRunner pluginControlledPaymentAutomatonRunner,
+ final OSGIServiceRegistration<PaymentPluginApi> pluginRegistry, final GlobalLocker locker) {
+ super(internalCallContextFactory, paymentConfig, paymentDao, clock, paymentStateMachineHelper, retrySMHelper, accountInternalApi, pluginRegistry, locker);
this.pluginControlledPaymentAutomatonRunner = pluginControlledPaymentAutomatonRunner;
}
@Override
- public List<PaymentAttemptModelDao> getItemsForIteration() {
- final List<PaymentAttemptModelDao> incompleteAttempts = paymentDao.getPaymentAttemptsByStateAcrossTenants(retrySMHelper.getInitialState().getName(), getCreatedDateBefore());
- if (!incompleteAttempts.isEmpty()) {
- log.info("Janitor AttemptCompletionTask start run: found {} incomplete attempts", incompleteAttempts.size());
+ public Iterable<PaymentAttemptModelDao> getItemsForIteration() {
+ final Pagination<PaymentAttemptModelDao> incompleteAttempts = paymentDao.getPaymentAttemptsByStateAcrossTenants(retrySMHelper.getInitialState().getName(), getCreatedDateBefore(), 0L, MAX_ATTEMPTS_PER_ITERATIONS);
+ if (incompleteAttempts.getTotalNbRecords() > 0) {
+ log.info("Janitor AttemptCompletionTask start run: found {} incomplete attempts", incompleteAttempts.getTotalNbRecords());
}
return incompleteAttempts;
}
@Override
public void doIteration(final PaymentAttemptModelDao attempt) {
+ // We don't grab account lock here as the lock will be taken when calling the completeRun API.
final InternalTenantContext tenantContext = internalCallContextFactory.createInternalTenantContext(attempt.getTenantRecordId(), attempt.getAccountRecordId());
final CallContext callContext = createCallContext("AttemptCompletionJanitorTask", tenantContext);
final InternalCallContext internalCallContext = internalCallContextFactory.createInternalCallContext(attempt.getAccountId(), callContext);
@@ -132,6 +145,11 @@ public class IncompletePaymentAttemptTask extends CompletionTaskBase<PaymentAtte
}
}
+ @Override
+ public void processPaymentEvent(final PaymentInternalEvent event, final NotificationQueue janitorQueue) {
+ // Nothing
+ }
+
private DateTime getCreatedDateBefore() {
final long delayBeforeNowMs = paymentConfig.getIncompleteAttemptsTimeSpanDelay().getMillis();
return clock.getUTCNow().minusMillis((int) delayBeforeNowMs);
diff --git a/payment/src/main/java/org/killbill/billing/payment/core/janitor/IncompletePaymentTransactionTask.java b/payment/src/main/java/org/killbill/billing/payment/core/janitor/IncompletePaymentTransactionTask.java
index f7d87a1..1547008 100644
--- a/payment/src/main/java/org/killbill/billing/payment/core/janitor/IncompletePaymentTransactionTask.java
+++ b/payment/src/main/java/org/killbill/billing/payment/core/janitor/IncompletePaymentTransactionTask.java
@@ -17,9 +17,12 @@
package org.killbill.billing.payment.core.janitor;
+import java.io.IOException;
import java.math.BigDecimal;
import java.util.List;
+import java.util.UUID;
+import javax.annotation.Nullable;
import javax.inject.Inject;
import org.joda.time.DateTime;
@@ -27,6 +30,7 @@ import org.killbill.billing.account.api.AccountInternalApi;
import org.killbill.billing.callcontext.InternalCallContext;
import org.killbill.billing.callcontext.InternalTenantContext;
import org.killbill.billing.catalog.api.Currency;
+import org.killbill.billing.events.PaymentInternalEvent;
import org.killbill.billing.osgi.api.OSGIServiceRegistration;
import org.killbill.billing.payment.api.PluginProperty;
import org.killbill.billing.payment.api.TransactionStatus;
@@ -46,7 +50,12 @@ import org.killbill.billing.util.callcontext.InternalCallContextFactory;
import org.killbill.billing.util.callcontext.TenantContext;
import org.killbill.billing.util.config.PaymentConfig;
import org.killbill.clock.Clock;
+import org.killbill.commons.locker.GlobalLocker;
+import org.killbill.notificationq.api.NotificationEvent;
+import org.killbill.notificationq.api.NotificationQueue;
+import org.skife.config.TimeSpan;
+import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.google.common.base.Predicate;
import com.google.common.base.Supplier;
@@ -59,72 +68,108 @@ public class IncompletePaymentTransactionTask extends CompletionTaskBase<Payment
.add(TransactionStatus.PENDING)
.add(TransactionStatus.UNKNOWN)
.build();
- private static final int MAX_ITEMS_PER_LOOP = 100;
@Inject
public IncompletePaymentTransactionTask(final InternalCallContextFactory internalCallContextFactory, final PaymentConfig paymentConfig,
final PaymentDao paymentDao, final Clock clock,
final PaymentStateMachineHelper paymentStateMachineHelper, final PaymentControlStateMachineHelper retrySMHelper, final AccountInternalApi accountInternalApi,
- final OSGIServiceRegistration<PaymentPluginApi> pluginRegistry) {
- super(internalCallContextFactory, paymentConfig, paymentDao, clock, paymentStateMachineHelper, retrySMHelper, accountInternalApi, pluginRegistry);
+ final OSGIServiceRegistration<PaymentPluginApi> pluginRegistry, final GlobalLocker locker) {
+ super(internalCallContextFactory, paymentConfig, paymentDao, clock, paymentStateMachineHelper, retrySMHelper, accountInternalApi, pluginRegistry, locker);
}
@Override
- public List<PaymentTransactionModelDao> getItemsForIteration() {
- final List<PaymentTransactionModelDao> result = paymentDao.getByTransactionStatusAcrossTenants(TRANSACTION_STATUSES_TO_CONSIDER, getCreatedDateBefore(), getCreatedDateAfter(), MAX_ITEMS_PER_LOOP);
- if (!result.isEmpty()) {
- log.info("Janitor IncompletePaymentTransactionTask start run: found {} pending/unknown payments", result.size());
- }
- return result;
+ public Iterable<PaymentTransactionModelDao> getItemsForIteration() {
+ // This is not triggered by Janitor proper but instead relies on bus event + notificationQ
+ return ImmutableList.of();
}
@Override
public void doIteration(final PaymentTransactionModelDao paymentTransaction) {
- final InternalTenantContext internalTenantContext = internalCallContextFactory.createInternalTenantContext(paymentTransaction.getTenantRecordId(), paymentTransaction.getAccountRecordId());
- final TenantContext tenantContext = internalCallContextFactory.createTenantContext(internalTenantContext);
- final PaymentModelDao payment = paymentDao.getPayment(paymentTransaction.getPaymentId(), internalTenantContext);
-
- final PaymentMethodModelDao paymentMethod = paymentDao.getPaymentMethod(payment.getPaymentMethodId(), internalTenantContext);
- final PaymentPluginApi paymentPluginApi = getPaymentPluginApi(payment, paymentMethod.getPluginName());
-
- final PaymentTransactionInfoPlugin undefinedPaymentTransaction = new DefaultNoOpPaymentInfoPlugin(payment.getId(),
- paymentTransaction.getId(),
- paymentTransaction.getTransactionType(),
- paymentTransaction.getAmount(),
- paymentTransaction.getCurrency(),
- paymentTransaction.getCreatedDate(),
- paymentTransaction.getCreatedDate(),
- PaymentPluginStatus.UNDEFINED,
- null);
- PaymentTransactionInfoPlugin paymentTransactionInfoPlugin;
- try {
- final List<PaymentTransactionInfoPlugin> result = paymentPluginApi.getPaymentInfo(payment.getAccountId(), payment.getId(), ImmutableList.<PluginProperty>of(), tenantContext);
- paymentTransactionInfoPlugin = Iterables.tryFind(result, new Predicate<PaymentTransactionInfoPlugin>() {
- @Override
- public boolean apply(final PaymentTransactionInfoPlugin input) {
- return input.getKbTransactionPaymentId().equals(paymentTransaction.getId());
- }
- }).or(new Supplier<PaymentTransactionInfoPlugin>() {
- @Override
- public PaymentTransactionInfoPlugin get() {
- return undefinedPaymentTransaction;
+ // Nothing
+ }
+
+ public void processNotification(final JanitorNotificationKey notificationKey, final UUID userToken, final Long accountRecordId, final long tenantRecordId) {
+
+ final InternalTenantContext internalTenantContext = internalCallContextFactory.createInternalTenantContext(tenantRecordId, accountRecordId);
+ doJanitorOperationWithAccountLock(new JanitorIterationCallback() {
+ @Override
+ public Void doIteration() {
+
+ // State may have changed since we originally retrieved with no lock
+ final PaymentTransactionModelDao rehydratedPaymentTransaction = paymentDao.getPaymentTransaction(notificationKey.getUuidKey(), internalTenantContext);
+
+ final TenantContext tenantContext = internalCallContextFactory.createTenantContext(internalTenantContext);
+ final PaymentModelDao payment = paymentDao.getPayment(rehydratedPaymentTransaction.getPaymentId(), internalTenantContext);
+
+ final PaymentMethodModelDao paymentMethod = paymentDao.getPaymentMethod(payment.getPaymentMethodId(), internalTenantContext);
+ final PaymentPluginApi paymentPluginApi = getPaymentPluginApi(payment, paymentMethod.getPluginName());
+
+ final PaymentTransactionInfoPlugin undefinedPaymentTransaction = new DefaultNoOpPaymentInfoPlugin(payment.getId(),
+ rehydratedPaymentTransaction.getId(),
+ rehydratedPaymentTransaction.getTransactionType(),
+ rehydratedPaymentTransaction.getAmount(),
+ rehydratedPaymentTransaction.getCurrency(),
+ rehydratedPaymentTransaction.getCreatedDate(),
+ rehydratedPaymentTransaction.getCreatedDate(),
+ PaymentPluginStatus.UNDEFINED,
+ null);
+ PaymentTransactionInfoPlugin paymentTransactionInfoPlugin;
+ try {
+ final List<PaymentTransactionInfoPlugin> result = paymentPluginApi.getPaymentInfo(payment.getAccountId(), payment.getId(), ImmutableList.<PluginProperty>of(), tenantContext);
+ paymentTransactionInfoPlugin = Iterables.tryFind(result, new Predicate<PaymentTransactionInfoPlugin>() {
+ @Override
+ public boolean apply(final PaymentTransactionInfoPlugin input) {
+ return input.getKbTransactionPaymentId().equals(rehydratedPaymentTransaction.getId());
+ }
+ }).or(new Supplier<PaymentTransactionInfoPlugin>() {
+ @Override
+ public PaymentTransactionInfoPlugin get() {
+ return undefinedPaymentTransaction;
+ }
+ });
+ } catch (final Exception e) {
+ paymentTransactionInfoPlugin = undefinedPaymentTransaction;
}
- });
- } catch (final Exception e) {
- paymentTransactionInfoPlugin = undefinedPaymentTransaction;
- }
+ updatePaymentAndTransactionIfNeeded(payment, notificationKey.getAttemptNumber(), userToken, rehydratedPaymentTransaction, paymentTransactionInfoPlugin, internalTenantContext);
+ return null;
+ }
+ }, internalTenantContext);
- updatePaymentAndTransactionIfNeeded(payment, paymentTransaction, paymentTransactionInfoPlugin, internalTenantContext);
}
- public boolean updatePaymentAndTransactionIfNeeded(final PaymentModelDao payment, final PaymentTransactionModelDao paymentTransaction, final PaymentTransactionInfoPlugin paymentTransactionInfoPlugin, final InternalTenantContext internalTenantContext) {
- final CallContext callContext = createCallContext("IncompletePaymentTransactionTask", internalTenantContext);
+ @Override
+ public void processPaymentEvent(final PaymentInternalEvent event, final NotificationQueue janitorQueue) {
+ if (!TRANSACTION_STATUSES_TO_CONSIDER.contains(event.getStatus())) {
+ return;
+ }
+ insertNewNotificationForUnresolvedTransactionIfNeeded(event.getPaymentTransactionId(), 1, event.getUserToken(), event.getSearchKey1(), event.getSearchKey2());
+ }
+ public boolean updatePaymentAndTransactionIfNeededWithAccountLock(final PaymentModelDao payment, final PaymentTransactionModelDao paymentTransaction, final PaymentTransactionInfoPlugin paymentTransactionInfoPlugin, final InternalTenantContext internalTenantContext) {
// Can happen in the GET case, see PaymentProcessor#toPayment
if (!TRANSACTION_STATUSES_TO_CONSIDER.contains(paymentTransaction.getTransactionStatus())) {
// Nothing to do
return false;
}
+ final Boolean result = doJanitorOperationWithAccountLock(new JanitorIterationCallback() {
+ @Override
+ public Boolean doIteration() {
+ return updatePaymentAndTransactionInternal(payment, null, null, paymentTransaction, paymentTransactionInfoPlugin, internalTenantContext);
+ }
+ }, internalTenantContext);
+ return result != null && result;
+ }
+
+ private boolean updatePaymentAndTransactionIfNeeded(final PaymentModelDao payment, final int attemptNumber, final UUID userToken, final PaymentTransactionModelDao paymentTransaction, final PaymentTransactionInfoPlugin paymentTransactionInfoPlugin, final InternalTenantContext internalTenantContext) {
+ if (!TRANSACTION_STATUSES_TO_CONSIDER.contains(paymentTransaction.getTransactionStatus())) {
+ // Nothing to do
+ return false;
+ }
+ return updatePaymentAndTransactionInternal(payment, attemptNumber, userToken, paymentTransaction, paymentTransactionInfoPlugin, internalTenantContext);
+ }
+
+ private boolean updatePaymentAndTransactionInternal(final PaymentModelDao payment, @Nullable final Integer attemptNumber, @Nullable final UUID userToken, final PaymentTransactionModelDao paymentTransaction, final PaymentTransactionInfoPlugin paymentTransactionInfoPlugin, final InternalTenantContext internalTenantContext) {
+ final CallContext callContext = createCallContext("IncompletePaymentTransactionTask", internalTenantContext);
// First obtain the new transactionStatus,
// Then compute the new paymentState; this one is mostly interesting in case of success (to compute the lastSuccessPaymentState below)
@@ -146,6 +191,7 @@ public class IncompletePaymentTransactionTask extends CompletionTaskBase<Payment
log.info("Janitor IncompletePaymentTransactionTask unable to repair payment {}, transaction {}: {} -> {}",
payment.getId(), paymentTransaction.getId(), paymentTransaction.getTransactionStatus(), transactionStatus);
// We can't get anything interesting from the plugin...
+ insertNewNotificationForUnresolvedTransactionIfNeeded(paymentTransaction.getId(), attemptNumber, userToken, internalTenantContext.getAccountRecordId(), internalTenantContext.getTenantRecordId());
return false;
}
@@ -170,6 +216,7 @@ public class IncompletePaymentTransactionTask extends CompletionTaskBase<Payment
paymentTransaction.getId(), transactionStatus, processedAmount, processedCurrency, gatewayErrorCode, gatewayError, internalCallContext);
return true;
+
}
// Keep the existing currentTransactionStatus if we can't obtain a better answer from the plugin; if not, return the newTransactionStatus
@@ -190,13 +237,27 @@ public class IncompletePaymentTransactionTask extends CompletionTaskBase<Payment
return pluginApi;
}
- private DateTime getCreatedDateBefore() {
- final long delayBeforeNowMs = paymentConfig.getIncompleteTransactionsTimeSpanDelay().getMillis();
- return clock.getUTCNow().minusMillis((int) delayBeforeNowMs);
+ @VisibleForTesting
+ DateTime getNextNotificationTime(@Nullable final Integer attemptNumber) {
+
+ final List<TimeSpan> retries = paymentConfig.getIncompleteTransactionsRetries();
+ if (attemptNumber == null || attemptNumber > retries.size()) {
+ return null;
+ }
+ final TimeSpan nextDelay = retries.get(attemptNumber - 1);
+ return clock.getUTCNow().plusMillis((int) nextDelay.getMillis());
}
- private DateTime getCreatedDateAfter() {
- final long delayBeforeNowMs = paymentConfig.getIncompleteTransactionsTimeSpanGiveup().getMillis();
- return clock.getUTCNow().minusMillis((int) delayBeforeNowMs);
+ private void insertNewNotificationForUnresolvedTransactionIfNeeded(final UUID paymentTransactionId, @Nullable final Integer attemptNumber, @Nullable final UUID userToken, final Long accountRecordId, final Long tenantRecordId) {
+ final NotificationEvent key = new JanitorNotificationKey(paymentTransactionId, IncompletePaymentTransactionTask.class.toString(), attemptNumber);
+ final DateTime notificationTime = getNextNotificationTime(attemptNumber);
+ // Will be null in the GET path or when we run out opf attempts..
+ if (notificationTime != null) {
+ try {
+ janitorQueue.recordFutureNotification(notificationTime, key, userToken, accountRecordId, tenantRecordId);
+ } catch (IOException e) {
+ log.warn("Janitor IncompletePaymentTransactionTask : Failed to insert future notification for paymentTransactionId = {}: {}", paymentTransactionId, e.getMessage());
+ }
+ }
}
}
diff --git a/payment/src/main/java/org/killbill/billing/payment/core/janitor/Janitor.java b/payment/src/main/java/org/killbill/billing/payment/core/janitor/Janitor.java
index 032b1a0..3b06de1 100644
--- a/payment/src/main/java/org/killbill/billing/payment/core/janitor/Janitor.java
+++ b/payment/src/main/java/org/killbill/billing/payment/core/janitor/Janitor.java
@@ -17,14 +17,24 @@
package org.killbill.billing.payment.core.janitor;
+import java.util.UUID;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import javax.inject.Inject;
import javax.inject.Named;
+import org.joda.time.DateTime;
+import org.killbill.billing.events.PaymentInternalEvent;
+import org.killbill.billing.payment.glue.DefaultPaymentService;
import org.killbill.billing.payment.glue.PaymentModule;
import org.killbill.billing.util.config.PaymentConfig;
+import org.killbill.notificationq.api.NotificationEvent;
+import org.killbill.notificationq.api.NotificationQueue;
+import org.killbill.notificationq.api.NotificationQueueService;
+import org.killbill.notificationq.api.NotificationQueueService.NoSuchNotificationQueue;
+import org.killbill.notificationq.api.NotificationQueueService.NotificationQueueAlreadyExists;
+import org.killbill.notificationq.api.NotificationQueueService.NotificationQueueHandler;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -36,19 +46,25 @@ public class Janitor {
private static final Logger log = LoggerFactory.getLogger(Janitor.class);
private static final int TERMINATION_TIMEOUT_SEC = 5;
+ public static final String QUEUE_NAME = "janitor";
+ private final NotificationQueueService notificationQueueService;
private final ScheduledExecutorService janitorExecutor;
private final PaymentConfig paymentConfig;
private final IncompletePaymentAttemptTask incompletePaymentAttemptTask;
private final IncompletePaymentTransactionTask incompletePaymentTransactionTask;
+ private NotificationQueue janitorQueue;
+
private volatile boolean isStopped;
@Inject
public Janitor(final PaymentConfig paymentConfig,
+ final NotificationQueueService notificationQueueService,
@Named(PaymentModule.JANITOR_EXECUTOR_NAMED) final ScheduledExecutorService janitorExecutor,
final IncompletePaymentAttemptTask incompletePaymentAttemptTask,
final IncompletePaymentTransactionTask incompletePaymentTransactionTask) {
+ this.notificationQueueService = notificationQueueService;
this.janitorExecutor = janitorExecutor;
this.paymentConfig = paymentConfig;
this.incompletePaymentAttemptTask = incompletePaymentAttemptTask;
@@ -56,12 +72,36 @@ public class Janitor {
this.isStopped = false;
}
+ public void initialize() throws NotificationQueueAlreadyExists {
+ janitorQueue = notificationQueueService.createNotificationQueue(DefaultPaymentService.SERVICE_NAME,
+ QUEUE_NAME,
+ new NotificationQueueHandler() {
+ @Override
+ public void handleReadyNotification(final NotificationEvent notificationKey, final DateTime eventDateTime, final UUID userToken, final Long accountRecordId, final Long tenantRecordId) {
+ if (!(notificationKey instanceof JanitorNotificationKey)) {
+ log.error("Janitor service received an unexpected event type {}" + notificationKey.getClass().getName());
+ return;
+
+ }
+ final JanitorNotificationKey janitorKey = (JanitorNotificationKey) notificationKey;
+ if (janitorKey.getTaskName().equals(incompletePaymentTransactionTask.getClass().toString())) {
+ incompletePaymentTransactionTask.processNotification(janitorKey, userToken, accountRecordId, tenantRecordId);
+ }
+ }
+ }
+ );
+ incompletePaymentTransactionTask.attachJanitorQueue(janitorQueue);
+ incompletePaymentAttemptTask.attachJanitorQueue(janitorQueue);
+ }
+
public void start() {
if (isStopped) {
log.warn("Janitor is not a restartable service, and was already started, aborting");
return;
}
+ janitorQueue.startQueue();
+
// Start task for completing incomplete payment attempts
final TimeUnit attemptCompletionRateUnit = paymentConfig.getJanitorRunningRate().getUnit();
final long attemptCompletionPeriod = paymentConfig.getJanitorRunningRate().getPeriod();
@@ -73,7 +113,7 @@ public class Janitor {
janitorExecutor.scheduleAtFixedRate(incompletePaymentTransactionTask, erroredCompletionPeriod, erroredCompletionPeriod, erroredCompletionRateUnit);
}
- public void stop() {
+ public void stop() throws NoSuchNotificationQueue {
if (isStopped) {
log.warn("Janitor is already in a stopped state");
return;
@@ -93,6 +133,11 @@ public class Janitor {
if (!success) {
log.warn("Janitor failed to complete termination within " + TERMINATION_TIMEOUT_SEC + "sec");
}
+
+ if (janitorQueue != null) {
+ janitorQueue.stopQueue();
+ notificationQueueService.deleteNotificationQueue(DefaultPaymentService.SERVICE_NAME, QUEUE_NAME);
+ }
} catch (final InterruptedException e) {
Thread.currentThread().interrupt();
log.warn("Janitor stop sequence got interrupted");
@@ -100,4 +145,9 @@ public class Janitor {
isStopped = true;
}
}
+
+ public void processPaymentEvent(final PaymentInternalEvent event) {
+ incompletePaymentAttemptTask.processPaymentEvent(event, janitorQueue);
+ incompletePaymentTransactionTask.processPaymentEvent(event, janitorQueue);
+ }
}
diff --git a/payment/src/main/java/org/killbill/billing/payment/core/janitor/JanitorNotificationKey.java b/payment/src/main/java/org/killbill/billing/payment/core/janitor/JanitorNotificationKey.java
new file mode 100644
index 0000000..b0f2c1d
--- /dev/null
+++ b/payment/src/main/java/org/killbill/billing/payment/core/janitor/JanitorNotificationKey.java
@@ -0,0 +1,48 @@
+/*
+ * Copyright 2014-2015 Groupon, Inc
+ * Copyright 2014-2015 The Billing Project, LLC
+ *
+ * The Billing Project licenses this file to you under the Apache License, version 2.0
+ * (the "License"); you may not use this file except in compliance with the
+ * License. You may obtain a copy of the License at:
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.killbill.billing.payment.core.janitor;
+
+import java.util.UUID;
+
+import org.killbill.notificationq.DefaultUUIDNotificationKey;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+
+public class JanitorNotificationKey extends DefaultUUIDNotificationKey {
+
+ private final Integer attemptNumber;
+ private final String taskName;
+
+ @JsonCreator
+ public JanitorNotificationKey(@JsonProperty("uuidKey") final UUID uuidKey,
+ @JsonProperty("taskName") final String taskName,
+ @JsonProperty("attemptNumber") final Integer attemptNumber) {
+ super(uuidKey);
+ this.attemptNumber = attemptNumber;
+ this.taskName = taskName;
+ }
+
+ public Integer getAttemptNumber() {
+ return attemptNumber;
+ }
+
+ public String getTaskName() {
+ return taskName;
+ }
+}
\ No newline at end of file
diff --git a/payment/src/main/java/org/killbill/billing/payment/core/PaymentProcessor.java b/payment/src/main/java/org/killbill/billing/payment/core/PaymentProcessor.java
index cf6afd1..0af54d5 100644
--- a/payment/src/main/java/org/killbill/billing/payment/core/PaymentProcessor.java
+++ b/payment/src/main/java/org/killbill/billing/payment/core/PaymentProcessor.java
@@ -416,7 +416,7 @@ public class PaymentProcessor extends ProcessorBase {
if (paymentTransactionInfoPlugin != null) {
// Make sure to invoke the Janitor task in case the plugin fixes its state on the fly
// See https://github.com/killbill/killbill/issues/341
- final boolean hasChanged = incompletePaymentTransactionTask.updatePaymentAndTransactionIfNeeded(newPaymentModelDao, newPaymentTransactionModelDao, paymentTransactionInfoPlugin, internalTenantContext);
+ final boolean hasChanged = incompletePaymentTransactionTask.updatePaymentAndTransactionIfNeededWithAccountLock(newPaymentModelDao, newPaymentTransactionModelDao, paymentTransactionInfoPlugin, internalTenantContext);
if (hasChanged) {
newPaymentModelDao = paymentDao.getPayment(newPaymentModelDao.getId(), internalTenantContext);
newPaymentTransactionModelDao = paymentDao.getPaymentTransaction(newPaymentTransactionModelDao.getId(), internalTenantContext);
diff --git a/payment/src/main/java/org/killbill/billing/payment/core/ProcessorBase.java b/payment/src/main/java/org/killbill/billing/payment/core/ProcessorBase.java
index 77a1265..ae4e3a2 100644
--- a/payment/src/main/java/org/killbill/billing/payment/core/ProcessorBase.java
+++ b/payment/src/main/java/org/killbill/billing/payment/core/ProcessorBase.java
@@ -67,7 +67,7 @@ import com.google.common.collect.Iterables;
public abstract class ProcessorBase {
- private static final int NB_LOCK_TRY = 5;
+ public static final int NB_LOCK_TRY = 5;
protected final OSGIServiceRegistration<PaymentPluginApi> pluginRegistry;
protected final AccountInternalApi accountInternalApi;
diff --git a/payment/src/main/java/org/killbill/billing/payment/core/sm/payments/PaymentEnteringStateCallback.java b/payment/src/main/java/org/killbill/billing/payment/core/sm/payments/PaymentEnteringStateCallback.java
index 5b340aa..8cdc6eb 100644
--- a/payment/src/main/java/org/killbill/billing/payment/core/sm/payments/PaymentEnteringStateCallback.java
+++ b/payment/src/main/java/org/killbill/billing/payment/core/sm/payments/PaymentEnteringStateCallback.java
@@ -65,7 +65,12 @@ public abstract class PaymentEnteringStateCallback implements EnteringStateCallb
//
final BusInternalEvent event = new DefaultPaymentErrorEvent(paymentStateContext.getAccount().getId(),
null,
+ null,
+ paymentStateContext.getAmount(),
+ paymentStateContext.getCurrency(),
+ null,
paymentStateContext.getTransactionType(),
+ null,
"Early abortion of payment transaction",
paymentStateContext.getInternalCallContext().getAccountRecordId(),
paymentStateContext.getInternalCallContext().getTenantRecordId(),
diff --git a/payment/src/main/java/org/killbill/billing/payment/dao/DefaultPaymentDao.java b/payment/src/main/java/org/killbill/billing/payment/dao/DefaultPaymentDao.java
index 3b41115..ff64501 100644
--- a/payment/src/main/java/org/killbill/billing/payment/dao/DefaultPaymentDao.java
+++ b/payment/src/main/java/org/killbill/billing/payment/dao/DefaultPaymentDao.java
@@ -20,11 +20,9 @@ package org.killbill.billing.payment.dao;
import java.math.BigDecimal;
import java.util.Collection;
-import java.util.HashMap;
+import java.util.Date;
import java.util.Iterator;
-import java.util.LinkedList;
import java.util.List;
-import java.util.Map;
import java.util.UUID;
import javax.annotation.Nullable;
@@ -41,10 +39,12 @@ import org.killbill.billing.payment.api.DefaultPaymentInfoEvent;
import org.killbill.billing.payment.api.DefaultPaymentPluginErrorEvent;
import org.killbill.billing.payment.api.Payment;
import org.killbill.billing.payment.api.PaymentMethod;
+import org.killbill.billing.payment.api.PaymentTransaction;
import org.killbill.billing.payment.api.TransactionStatus;
import org.killbill.billing.payment.api.TransactionType;
import org.killbill.billing.util.cache.CacheControllerDispatcher;
import org.killbill.billing.util.dao.NonEntityDao;
+import org.killbill.billing.util.entity.Entity;
import org.killbill.billing.util.entity.Pagination;
import org.killbill.billing.util.entity.dao.DefaultPaginationSqlDaoHelper;
import org.killbill.billing.util.entity.dao.DefaultPaginationSqlDaoHelper.PaginationIteratorBuilder;
@@ -58,7 +58,6 @@ import org.skife.jdbi.v2.IDBI;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import com.google.common.base.Function;
import com.google.common.base.Functions;
import com.google.common.base.Preconditions;
import com.google.common.base.Predicate;
@@ -122,14 +121,24 @@ public class DefaultPaymentDao implements PaymentDao {
}
@Override
- public List<PaymentAttemptModelDao> getPaymentAttemptsByStateAcrossTenants(final String stateName, final DateTime createdBeforeDate) {
- return transactionalSqlDao.execute(new EntitySqlDaoTransactionWrapper<List<PaymentAttemptModelDao>>() {
- @Override
- public List<PaymentAttemptModelDao> inTransaction(final EntitySqlDaoWrapperFactory entitySqlDaoWrapperFactory) throws Exception {
- final PaymentAttemptSqlDao transactional = entitySqlDaoWrapperFactory.become(PaymentAttemptSqlDao.class);
- return transactional.getByStateNameAcrossTenants(stateName, createdBeforeDate.toDate());
- }
- });
+ public Pagination<PaymentAttemptModelDao> getPaymentAttemptsByStateAcrossTenants(final String stateName, final DateTime createdBeforeDate, final Long offset, final Long limit) {
+
+ final Date createdBefore = createdBeforeDate.toDate();
+ return paginationHelper.getPagination(PaymentAttemptSqlDao.class, new PaginationIteratorBuilder<PaymentAttemptModelDao, Entity, PaymentAttemptSqlDao>() {
+ @Override
+ public Long getCount(final PaymentAttemptSqlDao sqlDao, final InternalTenantContext context) {
+ return sqlDao.getCountByStateNameAcrossTenants(stateName, createdBefore);
+ }
+ @Override
+ public Iterator<PaymentAttemptModelDao> build(final PaymentAttemptSqlDao sqlDao, final Long limit, final InternalTenantContext context) {
+ return sqlDao.getByStateNameAcrossTenants(stateName, createdBefore, offset, limit);
+ }
+ },
+ offset,
+ limit,
+ null
+ );
+
}
@Override
@@ -157,18 +166,29 @@ public class DefaultPaymentDao implements PaymentDao {
}
@Override
- public List<PaymentTransactionModelDao> getByTransactionStatusAcrossTenants(final Iterable<TransactionStatus> transactionStatuses, final DateTime createdBeforeDate, final DateTime createdAfterDate, final int limit) {
- return transactionalSqlDao.execute(new EntitySqlDaoTransactionWrapper<List<PaymentTransactionModelDao>>() {
- @Override
- public List<PaymentTransactionModelDao> inTransaction(final EntitySqlDaoWrapperFactory entitySqlDaoWrapperFactory) throws Exception {
- final TransactionSqlDao transactional = entitySqlDaoWrapperFactory.become(TransactionSqlDao.class);
+ public Pagination<PaymentTransactionModelDao> getByTransactionStatusAcrossTenants(final Iterable<TransactionStatus> transactionStatuses, final DateTime createdBeforeDate, final DateTime createdAfterDate, final Long offset, final Long limit) {
- final Collection<String> allTransactionStatus = ImmutableList.copyOf(Iterables.transform(transactionStatuses, Functions.toStringFunction()));
- return transactional.getByTransactionStatusPriorDateAcrossTenants(allTransactionStatus, createdBeforeDate.toDate(), createdAfterDate.toDate(), limit);
- }
- });
- }
+ final Collection<String> allTransactionStatus = ImmutableList.copyOf(Iterables.transform(transactionStatuses, Functions.toStringFunction()));
+ final Date createdBefore = createdBeforeDate.toDate();
+ final Date createdAfter = createdAfterDate.toDate();
+
+ return paginationHelper.getPagination(TransactionSqlDao.class,
+ new PaginationIteratorBuilder<PaymentTransactionModelDao, PaymentTransaction, TransactionSqlDao>() {
+ @Override
+ public Long getCount(final TransactionSqlDao sqlDao, final InternalTenantContext context) {
+ return sqlDao.getCountByTransactionStatusPriorDateAcrossTenants(allTransactionStatus, createdBefore, createdAfter);
+ }
+ @Override
+ public Iterator<PaymentTransactionModelDao> build(final TransactionSqlDao sqlDao, final Long limit, final InternalTenantContext context) {
+ return sqlDao.getByTransactionStatusPriorDateAcrossTenants(allTransactionStatus, createdBefore, createdAfter, offset, limit);
+ }
+ },
+ offset,
+ limit,
+ null
+ );
+ }
@Override
public List<PaymentTransactionModelDao> getPaymentTransactionsByExternalKey(final String transactionExternalKey, final InternalTenantContext context) {
@@ -281,7 +301,7 @@ public class DefaultPaymentDao implements PaymentDao {
} else {
entitySqlDaoWrapperFactory.become(PaymentSqlDao.class).updatePaymentStateName(paymentId.toString(), currentPaymentStateName, context);
}
- postPaymentEventFromTransaction(accountId, transactionStatus, transactionType, paymentId, processedAmount, processedCurrency, clock.getUTCNow(), gatewayErrorCode, entitySqlDaoWrapperFactory, context);
+ postPaymentEventFromTransaction(accountId, transactionStatus, transactionType, paymentId, transactionId, processedAmount, processedCurrency, clock.getUTCNow(), gatewayErrorCode, entitySqlDaoWrapperFactory, context);
return null;
}
});
@@ -541,6 +561,7 @@ public class DefaultPaymentDao implements PaymentDao {
final TransactionStatus transactionStatus,
final TransactionType transactionType,
final UUID paymentId,
+ final UUID transactionId,
final BigDecimal processedAmount,
final Currency processedCurrency,
final DateTime effectiveDate,
@@ -554,6 +575,7 @@ public class DefaultPaymentDao implements PaymentDao {
case PENDING:
event = new DefaultPaymentInfoEvent(accountId,
paymentId,
+ transactionId,
processedAmount,
processedCurrency,
transactionStatus,
@@ -567,7 +589,12 @@ public class DefaultPaymentDao implements PaymentDao {
case PAYMENT_FAILURE:
event = new DefaultPaymentErrorEvent(accountId,
paymentId,
+ transactionId,
+ processedAmount,
+ processedCurrency,
+ transactionStatus,
transactionType,
+ effectiveDate,
gatewayErrorCode,
context.getAccountRecordId(),
context.getTenantRecordId(),
@@ -578,7 +605,12 @@ public class DefaultPaymentDao implements PaymentDao {
default:
event = new DefaultPaymentPluginErrorEvent(accountId,
paymentId,
+ transactionId,
+ processedAmount,
+ processedCurrency,
+ transactionStatus,
transactionType,
+ effectiveDate,
gatewayErrorCode,
context.getAccountRecordId(),
context.getTenantRecordId(),
diff --git a/payment/src/main/java/org/killbill/billing/payment/dao/PaymentAttemptSqlDao.java b/payment/src/main/java/org/killbill/billing/payment/dao/PaymentAttemptSqlDao.java
index 544e02e..cc2959e 100644
--- a/payment/src/main/java/org/killbill/billing/payment/dao/PaymentAttemptSqlDao.java
+++ b/payment/src/main/java/org/killbill/billing/payment/dao/PaymentAttemptSqlDao.java
@@ -17,6 +17,7 @@
package org.killbill.billing.payment.dao;
import java.util.Date;
+import java.util.Iterator;
import java.util.List;
import org.killbill.billing.callcontext.InternalCallContext;
@@ -50,7 +51,13 @@ public interface PaymentAttemptSqlDao extends EntitySqlDao<PaymentAttemptModelDa
@BindBean final InternalTenantContext context);
@SqlQuery
- List<PaymentAttemptModelDao> getByStateNameAcrossTenants(@Bind("stateName") final String stateName,
- @Bind("createdBeforeDate") final Date createdBeforeDate);
+ Long getCountByStateNameAcrossTenants(@Bind("stateName") final String stateName,
+ @Bind("createdBeforeDate") final Date createdBeforeDate);
+
+ @SqlQuery
+ Iterator<PaymentAttemptModelDao> getByStateNameAcrossTenants(@Bind("stateName") final String stateName,
+ @Bind("createdBeforeDate") final Date createdBeforeDate,
+ @Bind("offset") final Long offset,
+ @Bind("rowCount") final Long rowCount);
}
diff --git a/payment/src/main/java/org/killbill/billing/payment/dao/PaymentDao.java b/payment/src/main/java/org/killbill/billing/payment/dao/PaymentDao.java
index 2ac7ea1..535a623 100644
--- a/payment/src/main/java/org/killbill/billing/payment/dao/PaymentDao.java
+++ b/payment/src/main/java/org/killbill/billing/payment/dao/PaymentDao.java
@@ -30,13 +30,13 @@ import org.killbill.billing.util.entity.Pagination;
public interface PaymentDao {
- public List<PaymentTransactionModelDao> getByTransactionStatusAcrossTenants(final Iterable<TransactionStatus> transactionStatuses, DateTime createdBeforeDate, DateTime createdAfterDate, int limit);
+ public Pagination<PaymentTransactionModelDao> getByTransactionStatusAcrossTenants(final Iterable<TransactionStatus> transactionStatuses, DateTime createdBeforeDate, DateTime createdAfterDate, final Long offset, final Long limit);
public PaymentAttemptModelDao insertPaymentAttemptWithProperties(PaymentAttemptModelDao attempt, InternalCallContext context);
public void updatePaymentAttempt(UUID paymentAttemptId, UUID transactionId, String state, InternalCallContext context);
- public List<PaymentAttemptModelDao> getPaymentAttemptsByStateAcrossTenants(String stateName, DateTime createdBeforeDate);
+ public Pagination<PaymentAttemptModelDao> getPaymentAttemptsByStateAcrossTenants(String stateName, DateTime createdBeforeDate, final Long offset, final Long limit);
public List<PaymentAttemptModelDao> getPaymentAttempts(String paymentExternalKey, InternalTenantContext context);
diff --git a/payment/src/main/java/org/killbill/billing/payment/dao/PaymentSqlDao.java b/payment/src/main/java/org/killbill/billing/payment/dao/PaymentSqlDao.java
index 9e9ed12..0f5475b 100644
--- a/payment/src/main/java/org/killbill/billing/payment/dao/PaymentSqlDao.java
+++ b/payment/src/main/java/org/killbill/billing/payment/dao/PaymentSqlDao.java
@@ -70,8 +70,8 @@ public interface PaymentSqlDao extends EntitySqlDao<PaymentModelDao, Payment> {
@SqlQuery
@SmartFetchSize(shouldStream = true)
public Iterator<PaymentModelDao> getByPluginName(@Bind("pluginName") final String pluginName,
- @Bind("offset") final Long offset,
- @Bind("rowCount") final Long rowCount,
+ @Bind("offset") final Long offset,
+ @Bind("rowCount") final Long rowCount,
@BindBean final InternalTenantContext context);
@SqlQuery
diff --git a/payment/src/main/java/org/killbill/billing/payment/dao/TransactionSqlDao.java b/payment/src/main/java/org/killbill/billing/payment/dao/TransactionSqlDao.java
index d291ca0..d31d589 100644
--- a/payment/src/main/java/org/killbill/billing/payment/dao/TransactionSqlDao.java
+++ b/payment/src/main/java/org/killbill/billing/payment/dao/TransactionSqlDao.java
@@ -19,6 +19,7 @@ package org.killbill.billing.payment.dao;
import java.math.BigDecimal;
import java.util.Collection;
import java.util.Date;
+import java.util.Iterator;
import java.util.List;
import java.util.UUID;
@@ -52,10 +53,16 @@ public interface TransactionSqlDao extends EntitySqlDao<PaymentTransactionModelD
@BindBean final InternalTenantContext context);
@SqlQuery
- List<PaymentTransactionModelDao> getByTransactionStatusPriorDateAcrossTenants(@TransactionStatusCollectionBinder final Collection<String> statuses,
+ Long getCountByTransactionStatusPriorDateAcrossTenants(@TransactionStatusCollectionBinder final Collection<String> statuses,
+ @Bind("createdBeforeDate") final Date createdBeforeDate,
+ @Bind("createdAfterDate") final Date createdAfterDate);
+
+ @SqlQuery
+ Iterator<PaymentTransactionModelDao> getByTransactionStatusPriorDateAcrossTenants(@TransactionStatusCollectionBinder final Collection<String> statuses,
@Bind("createdBeforeDate") final Date createdBeforeDate,
@Bind("createdAfterDate") final Date createdAfterDate,
- @Bind("limit") final int limit);
+ @Bind("offset") final Long offset,
+ @Bind("rowCount") final Long rowCount);
@SqlQuery
public List<PaymentTransactionModelDao> getByPaymentId(@Bind("paymentId") final UUID paymentId,
diff --git a/payment/src/main/java/org/killbill/billing/payment/glue/DefaultPaymentService.java b/payment/src/main/java/org/killbill/billing/payment/glue/DefaultPaymentService.java
index 5cf77f0..d003f74 100644
--- a/payment/src/main/java/org/killbill/billing/payment/glue/DefaultPaymentService.java
+++ b/payment/src/main/java/org/killbill/billing/payment/glue/DefaultPaymentService.java
@@ -20,7 +20,7 @@ package org.killbill.billing.payment.glue;
import org.killbill.billing.payment.api.PaymentApi;
import org.killbill.billing.payment.api.PaymentService;
-import org.killbill.billing.payment.bus.InvoiceHandler;
+import org.killbill.billing.payment.bus.PaymentBusEventHandler;
import org.killbill.billing.payment.invoice.PaymentTagHandler;
import org.killbill.billing.payment.core.janitor.Janitor;
import org.killbill.billing.payment.retry.DefaultRetryService;
@@ -40,7 +40,7 @@ public class DefaultPaymentService implements PaymentService {
public static final String SERVICE_NAME = "payment-service";
- private final InvoiceHandler invoiceHandler;
+ private final PaymentBusEventHandler paymentBusEventHandler;
private final PaymentTagHandler tagHandler;
private final PersistentBus eventBus;
private final PaymentApi api;
@@ -48,13 +48,13 @@ public class DefaultPaymentService implements PaymentService {
private final Janitor janitor;
@Inject
- public DefaultPaymentService(final InvoiceHandler invoiceHandler,
+ public DefaultPaymentService(final PaymentBusEventHandler paymentBusEventHandler,
final PaymentTagHandler tagHandler,
final PaymentApi api,
final DefaultRetryService retryService,
final PersistentBus eventBus,
final Janitor janitor) {
- this.invoiceHandler = invoiceHandler;
+ this.paymentBusEventHandler = paymentBusEventHandler;
this.tagHandler = tagHandler;
this.eventBus = eventBus;
this.api = api;
@@ -70,12 +70,13 @@ public class DefaultPaymentService implements PaymentService {
@LifecycleHandlerType(LifecycleLevel.INIT_SERVICE)
public void initialize() throws NotificationQueueAlreadyExists {
try {
- eventBus.register(invoiceHandler);
+ eventBus.register(paymentBusEventHandler);
eventBus.register(tagHandler);
} catch (final PersistentBus.EventBusException e) {
log.error("Unable to register with the EventBus!", e);
}
- retryService.initialize(SERVICE_NAME);
+ retryService.initialize();
+ janitor.initialize();
}
@LifecycleHandlerType(LifecycleLevel.START_SERVICE)
@@ -87,7 +88,7 @@ public class DefaultPaymentService implements PaymentService {
@LifecycleHandlerType(LifecycleLevel.STOP_SERVICE)
public void stop() throws NoSuchNotificationQueue {
try {
- eventBus.unregister(invoiceHandler);
+ eventBus.unregister(paymentBusEventHandler);
eventBus.unregister(tagHandler);
} catch (final PersistentBus.EventBusException e) {
throw new RuntimeException("Unable to unregister to the EventBus!", e);
diff --git a/payment/src/main/java/org/killbill/billing/payment/glue/PaymentModule.java b/payment/src/main/java/org/killbill/billing/payment/glue/PaymentModule.java
index 8053d33..a602405 100644
--- a/payment/src/main/java/org/killbill/billing/payment/glue/PaymentModule.java
+++ b/payment/src/main/java/org/killbill/billing/payment/glue/PaymentModule.java
@@ -36,7 +36,7 @@ import org.killbill.billing.payment.api.DefaultPaymentGatewayApi;
import org.killbill.billing.payment.api.PaymentApi;
import org.killbill.billing.payment.api.PaymentGatewayApi;
import org.killbill.billing.payment.api.PaymentService;
-import org.killbill.billing.payment.bus.InvoiceHandler;
+import org.killbill.billing.payment.bus.PaymentBusEventHandler;
import org.killbill.billing.payment.core.PaymentGatewayProcessor;
import org.killbill.billing.payment.core.PaymentMethodProcessor;
import org.killbill.billing.payment.core.PaymentProcessor;
@@ -182,7 +182,7 @@ public class PaymentModule extends KillBillModule {
bind(PaymentApi.class).to(DefaultPaymentApi.class).asEagerSingleton();
bind(PaymentGatewayApi.class).to(DefaultPaymentGatewayApi.class).asEagerSingleton();
bind(AdminPaymentApi.class).to(DefaultAdminPaymentApi.class).asEagerSingleton();
- bind(InvoiceHandler.class).asEagerSingleton();
+ bind(PaymentBusEventHandler.class).asEagerSingleton();
bind(PaymentTagHandler.class).asEagerSingleton();
bind(PaymentService.class).to(DefaultPaymentService.class).asEagerSingleton();
installPaymentProviderPlugins(paymentConfig);
diff --git a/payment/src/main/java/org/killbill/billing/payment/invoice/PaymentTagHandler.java b/payment/src/main/java/org/killbill/billing/payment/invoice/PaymentTagHandler.java
index 792ae1c..3c1f13c 100644
--- a/payment/src/main/java/org/killbill/billing/payment/invoice/PaymentTagHandler.java
+++ b/payment/src/main/java/org/killbill/billing/payment/invoice/PaymentTagHandler.java
@@ -35,6 +35,7 @@ import org.killbill.clock.Clock;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import com.google.common.eventbus.AllowConcurrentEvents;
import com.google.common.eventbus.Subscribe;
import com.google.inject.Inject;
@@ -63,6 +64,8 @@ public class PaymentTagHandler {
this.internalCallContextFactory = internalCallContextFactory;
}
+
+ @AllowConcurrentEvents
@Subscribe
public void process_AUTO_PAY_OFF_removal(final ControlTagDeletionInternalEvent event) {
diff --git a/payment/src/main/java/org/killbill/billing/payment/retry/BaseRetryService.java b/payment/src/main/java/org/killbill/billing/payment/retry/BaseRetryService.java
index 7370f39..a5ec4ce 100644
--- a/payment/src/main/java/org/killbill/billing/payment/retry/BaseRetryService.java
+++ b/payment/src/main/java/org/killbill/billing/payment/retry/BaseRetryService.java
@@ -44,10 +44,10 @@ import com.google.inject.Inject;
public abstract class BaseRetryService implements RetryService {
private static final Logger log = LoggerFactory.getLogger(BaseRetryService.class);
- private static final String PAYMENT_RETRY_SERVICE = "PaymentRetryService";
private final NotificationQueueService notificationQueueService;
private final InternalCallContextFactory internalCallContextFactory;
+ private final String paymentRetryService;
private NotificationQueue retryQueue;
@@ -55,11 +55,12 @@ public abstract class BaseRetryService implements RetryService {
final InternalCallContextFactory internalCallContextFactory) {
this.notificationQueueService = notificationQueueService;
this.internalCallContextFactory = internalCallContextFactory;
+ this.paymentRetryService = DefaultPaymentService.SERVICE_NAME + "-" + getQueueName();
}
@Override
- public void initialize(final String svcName) throws NotificationQueueAlreadyExists {
- retryQueue = notificationQueueService.createNotificationQueue(svcName,
+ public void initialize() throws NotificationQueueAlreadyExists {
+ retryQueue = notificationQueueService.createNotificationQueue(DefaultPaymentService.SERVICE_NAME,
getQueueName(),
new NotificationQueueHandler() {
@Override
@@ -69,7 +70,7 @@ public abstract class BaseRetryService implements RetryService {
return;
}
final PaymentRetryNotificationKey key = (PaymentRetryNotificationKey) notificationKey;
- final InternalCallContext callContext = internalCallContextFactory.createInternalCallContext(tenantRecordId, accountRecordId, PAYMENT_RETRY_SERVICE, CallOrigin.INTERNAL, UserType.SYSTEM, userToken);
+ final InternalCallContext callContext = internalCallContextFactory.createInternalCallContext(tenantRecordId, accountRecordId, paymentRetryService, CallOrigin.INTERNAL, UserType.SYSTEM, userToken);
retryPaymentTransaction(key.getAttemptId(), key.getPaymentControlPluginNames(), callContext);
}
}
@@ -132,7 +133,8 @@ public abstract class BaseRetryService implements RetryService {
}
protected InternalCallContext createCallContextFromPaymentId(final ObjectType objectType, final UUID objectId, final Long tenantRecordId) {
- return internalCallContextFactory.createInternalCallContext(objectId, objectType, PAYMENT_RETRY_SERVICE, CallOrigin.INTERNAL, UserType.SYSTEM, null, tenantRecordId);
+ final String paymentRetryService = DefaultPaymentService.SERVICE_NAME + "-" + getQueueName();
+ return internalCallContextFactory.createInternalCallContext(objectId, objectType, paymentRetryService, CallOrigin.INTERNAL, UserType.SYSTEM, null, tenantRecordId);
}
public abstract String getQueueName();
diff --git a/payment/src/main/java/org/killbill/billing/payment/retry/RetryService.java b/payment/src/main/java/org/killbill/billing/payment/retry/RetryService.java
index ea320f1..03bdeee 100644
--- a/payment/src/main/java/org/killbill/billing/payment/retry/RetryService.java
+++ b/payment/src/main/java/org/killbill/billing/payment/retry/RetryService.java
@@ -28,7 +28,7 @@ import org.killbill.notificationq.api.NotificationQueueService.NotificationQueue
public interface RetryService {
- public void initialize(final String svcName) throws NotificationQueueAlreadyExists;
+ public void initialize() throws NotificationQueueAlreadyExists;
public void start();
diff --git a/payment/src/main/resources/org/killbill/billing/payment/dao/PaymentAttemptSqlDao.sql.stg b/payment/src/main/resources/org/killbill/billing/payment/dao/PaymentAttemptSqlDao.sql.stg
index c1f77a9..83860cc 100644
--- a/payment/src/main/resources/org/killbill/billing/payment/dao/PaymentAttemptSqlDao.sql.stg
+++ b/payment/src/main/resources/org/killbill/billing/payment/dao/PaymentAttemptSqlDao.sql.stg
@@ -73,6 +73,17 @@ where state_name = :stateName
and created_date \< :createdBeforeDate
<andCheckSoftDeletionWithComma("")>
<defaultOrderBy()>
+limit :offset, :rowCount
+;
+>>
+
+getCountByStateNameAcrossTenants() ::= <<
+select
+count(1) as count
+from <tableName()>
+where state_name = :stateName
+and created_date \< :createdBeforeDate
+<andCheckSoftDeletionWithComma("")>
;
>>
diff --git a/payment/src/main/resources/org/killbill/billing/payment/dao/TransactionSqlDao.sql.stg b/payment/src/main/resources/org/killbill/billing/payment/dao/TransactionSqlDao.sql.stg
index 1d54780..7d36552 100644
--- a/payment/src/main/resources/org/killbill/billing/payment/dao/TransactionSqlDao.sql.stg
+++ b/payment/src/main/resources/org/killbill/billing/payment/dao/TransactionSqlDao.sql.stg
@@ -90,7 +90,18 @@ created_date >= :createdAfterDate
and created_date \< :createdBeforeDate
and transaction_status in (<statuses: {status | :status_<i0>}; separator="," >)
<defaultOrderBy()>
-limit :limit
+limit :offset, :rowCount
+;
+>>
+
+getCountByTransactionStatusPriorDateAcrossTenants(statuses) ::= <<
+select
+count(1) as count
+from <tableName()>
+where
+created_date >= :createdAfterDate
+and created_date \< :createdBeforeDate
+and transaction_status in (<statuses: {status | :status_<i0>}; separator="," >)
;
>>
diff --git a/payment/src/main/resources/org/killbill/billing/payment/PaymentStates.xml b/payment/src/main/resources/org/killbill/billing/payment/PaymentStates.xml
index db9d165..e6f68d2 100644
--- a/payment/src/main/resources/org/killbill/billing/payment/PaymentStates.xml
+++ b/payment/src/main/resources/org/killbill/billing/payment/PaymentStates.xml
@@ -184,6 +184,7 @@
<stateMachine name="REFUND">
<states>
<state name="REFUND_INIT"/>
+ <state name="REFUND_PENDING"/>
<state name="REFUND_SUCCESS"/>
<state name="REFUND_FAILED"/>
<state name="REFUND_ERRORED"/>
@@ -204,6 +205,30 @@
<transition>
<initialState>REFUND_INIT</initialState>
<operation>OP_REFUND</operation>
+ <operationResult>PENDING</operationResult>
+ <finalState>REFUND_PENDING</finalState>
+ </transition>
+ <transition>
+ <initialState>REFUND_PENDING</initialState>
+ <operation>OP_REFUND</operation>
+ <operationResult>SUCCESS</operationResult>
+ <finalState>REFUND_SUCCESS</finalState>
+ </transition>
+ <transition>
+ <initialState>REFUND_PENDING</initialState>
+ <operation>OP_REFUND</operation>
+ <operationResult>FAILURE</operationResult>
+ <finalState>REFUND_FAILED</finalState>
+ </transition>
+ <transition>
+ <initialState>REFUND_PENDING</initialState>
+ <operation>OP_REFUND</operation>
+ <operationResult>EXCEPTION</operationResult>
+ <finalState>REFUND_ERRORED</finalState>
+ </transition>
+ <transition>
+ <initialState>REFUND_INIT</initialState>
+ <operation>OP_REFUND</operation>
<operationResult>EXCEPTION</operationResult>
<finalState>REFUND_ERRORED</finalState>
</transition>
@@ -216,6 +241,7 @@
<stateMachine name="CREDIT">
<states>
<state name="CREDIT_INIT"/>
+ <state name="CREDIT_PENDING"/>
<state name="CREDIT_SUCCESS"/>
<state name="CREDIT_FAILED"/>
<state name="CREDIT_ERRORED"/>
@@ -236,6 +262,30 @@
<transition>
<initialState>CREDIT_INIT</initialState>
<operation>OP_CREDIT</operation>
+ <operationResult>PENDING</operationResult>
+ <finalState>CREDIT_PENDING</finalState>
+ </transition>
+ <transition>
+ <initialState>CREDIT_PENDING</initialState>
+ <operation>OP_CREDIT</operation>
+ <operationResult>SUCCESS</operationResult>
+ <finalState>CREDIT_SUCCESS</finalState>
+ </transition>
+ <transition>
+ <initialState>CREDIT_PENDING</initialState>
+ <operation>OP_CREDIT</operation>
+ <operationResult>FAILURE</operationResult>
+ <finalState>CREDIT_FAILED</finalState>
+ </transition>
+ <transition>
+ <initialState>CREDIT_PENDING</initialState>
+ <operation>OP_CREDIT</operation>
+ <operationResult>EXCEPTION</operationResult>
+ <finalState>CREDIT_ERRORED</finalState>
+ </transition>
+ <transition>
+ <initialState>CREDIT_INIT</initialState>
+ <operation>OP_CREDIT</operation>
<operationResult>EXCEPTION</operationResult>
<finalState>CREDIT_ERRORED</finalState>
</transition>
diff --git a/payment/src/test/java/org/killbill/billing/payment/api/TestEventJson.java b/payment/src/test/java/org/killbill/billing/payment/api/TestEventJson.java
index 809199f..257c157 100644
--- a/payment/src/test/java/org/killbill/billing/payment/api/TestEventJson.java
+++ b/payment/src/test/java/org/killbill/billing/payment/api/TestEventJson.java
@@ -34,7 +34,7 @@ public class TestEventJson extends PaymentTestSuiteNoDB {
@Test(groups = "fast")
public void testPaymentErrorEvent() throws Exception {
- final PaymentErrorInternalEvent e = new DefaultPaymentErrorEvent(UUID.randomUUID(), UUID.randomUUID(), TransactionType.PURCHASE, "no message", 1L, 2L, UUID.randomUUID());
+ final PaymentErrorInternalEvent e = new DefaultPaymentErrorEvent(UUID.randomUUID(), UUID.randomUUID(), UUID.randomUUID(), BigDecimal.ONE, Currency.USD, TransactionStatus.SUCCESS, TransactionType.PURCHASE, new DateTime(), "no message", 1L, 2L, UUID.randomUUID());
final String json = mapper.writeValueAsString(e);
final Class<?> claz = Class.forName(DefaultPaymentErrorEvent.class.getName());
@@ -44,7 +44,7 @@ public class TestEventJson extends PaymentTestSuiteNoDB {
@Test(groups = "fast")
public void testPaymentInfoEvent() throws Exception {
- final PaymentInfoInternalEvent e = new DefaultPaymentInfoEvent(UUID.randomUUID(), UUID.randomUUID(), new BigDecimal(12.9), Currency.EUR, TransactionStatus.SUCCESS,
+ final PaymentInfoInternalEvent e = new DefaultPaymentInfoEvent(UUID.randomUUID(), UUID.randomUUID(), UUID.randomUUID(), new BigDecimal(12.9), Currency.EUR, TransactionStatus.SUCCESS,
TransactionType.PURCHASE, new DateTime(), 1L, 2L, UUID.randomUUID());
final String json = mapper.writeValueAsString(e);
diff --git a/payment/src/test/java/org/killbill/billing/payment/core/janitor/TestIncompletePaymentTransactionTask.java b/payment/src/test/java/org/killbill/billing/payment/core/janitor/TestIncompletePaymentTransactionTask.java
new file mode 100644
index 0000000..ec7a4ee
--- /dev/null
+++ b/payment/src/test/java/org/killbill/billing/payment/core/janitor/TestIncompletePaymentTransactionTask.java
@@ -0,0 +1,68 @@
+/*
+ * Copyright 2014-2015 Groupon, Inc
+ * Copyright 2014-2015 The Billing Project, LLC
+ *
+ * The Billing Project licenses this file to you under the Apache License, version 2.0
+ * (the "License"); you may not use this file except in compliance with the
+ * License. You may obtain a copy of the License at:
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.killbill.billing.payment.core.janitor;
+
+import org.joda.time.DateTime;
+import org.killbill.billing.payment.PaymentTestSuiteNoDB;
+import org.testng.Assert;
+import org.testng.annotations.Test;
+
+import com.google.inject.Inject;
+
+import static org.testng.Assert.assertNotNull;
+import static org.testng.Assert.assertNull;
+import static org.testng.Assert.assertTrue;
+
+public class TestIncompletePaymentTransactionTask extends PaymentTestSuiteNoDB {
+
+ @Inject
+ protected IncompletePaymentTransactionTask incompletePaymentTransactionTask;
+
+ @Test(groups = "fast")
+ public void testGetNextNotificationTime() {
+ assertNull(incompletePaymentTransactionTask.getNextNotificationTime(null));
+ final DateTime initTime = clock.getUTCNow();
+
+ // Based on config "15s,1m,3m,1h,1d,1d,1d,1d,1d"
+ for (int i = 1; i < 10; i++) {
+ final DateTime nextTime = incompletePaymentTransactionTask.getNextNotificationTime(i);
+ assertNotNull(nextTime);
+ assertTrue(nextTime.compareTo(initTime) > 0);
+ if (i == 0) {
+ assertTrue(nextTime.compareTo(initTime.plusSeconds(3).plusSeconds(1)) < 0);
+ } else if (i == 1) {
+ assertTrue(nextTime.compareTo(initTime.plusMinutes(1).plusSeconds(1)) < 0);
+ } else if (i == 2) {
+ assertTrue(nextTime.compareTo(initTime.plusMinutes(3).plusSeconds(1)) < 0);
+ } else if (i == 3) {
+ assertTrue(nextTime.compareTo(initTime.plusHours(1).plusSeconds(1)) < 0);
+ } else if (i == 4) {
+ assertTrue(nextTime.compareTo(initTime.plusDays(1).plusSeconds(1)) < 0);
+ } else if (i == 5) {
+ assertTrue(nextTime.compareTo(initTime.plusDays(1).plusSeconds(1)) < 0);
+ } else if (i == 6) {
+ assertTrue(nextTime.compareTo(initTime.plusDays(1).plusSeconds(1)) < 0);
+ } else if (i == 7) {
+ assertTrue(nextTime.compareTo(initTime.plusDays(1).plusSeconds(1)) < 0);
+ } else if (i == 8) {
+ assertTrue(nextTime.compareTo(initTime.plusDays(1).plusSeconds(1)) < 0);
+ }
+ }
+ assertNull(incompletePaymentTransactionTask.getNextNotificationTime(10));
+ }
+}
\ No newline at end of file
diff --git a/payment/src/test/java/org/killbill/billing/payment/dao/MockPaymentDao.java b/payment/src/test/java/org/killbill/billing/payment/dao/MockPaymentDao.java
index 21aee20..c761832 100644
--- a/payment/src/test/java/org/killbill/billing/payment/dao/MockPaymentDao.java
+++ b/payment/src/test/java/org/killbill/billing/payment/dao/MockPaymentDao.java
@@ -36,6 +36,7 @@ import org.killbill.billing.catalog.api.Currency;
import org.killbill.billing.dao.MockNonEntityDao;
import org.killbill.billing.payment.api.TransactionStatus;
import org.killbill.billing.payment.api.TransactionType;
+import org.killbill.billing.util.entity.DefaultPagination;
import org.killbill.billing.util.entity.Pagination;
import com.google.common.base.Predicate;
@@ -64,8 +65,8 @@ public class MockPaymentDao implements PaymentDao {
}
@Override
- public List<PaymentTransactionModelDao> getByTransactionStatusAcrossTenants(final Iterable<TransactionStatus> transactionStatuses, DateTime createdBeforeDate, DateTime createdAfterDate, int limit) {
- return ImmutableList.copyOf(Iterables.filter(transactions.values(), new Predicate<PaymentTransactionModelDao>() {
+ public Pagination<PaymentTransactionModelDao> getByTransactionStatusAcrossTenants(final Iterable<TransactionStatus> transactionStatuses, DateTime createdBeforeDate, DateTime createdAfterDate, Long offset, Long limit) {
+ final List<PaymentTransactionModelDao> result= ImmutableList.copyOf(Iterables.filter(transactions.values(), new Predicate<PaymentTransactionModelDao>() {
@Override
public boolean apply(final PaymentTransactionModelDao input) {
return Iterables.any(transactionStatuses, new Predicate<TransactionStatus>() {
@@ -76,6 +77,7 @@ public class MockPaymentDao implements PaymentDao {
});
}
}));
+ return new DefaultPagination<PaymentTransactionModelDao>(new Long(result.size()), result.iterator());
}
@Override
@@ -108,7 +110,7 @@ public class MockPaymentDao implements PaymentDao {
}
@Override
- public List<PaymentAttemptModelDao> getPaymentAttemptsByStateAcrossTenants(final String stateName, final DateTime createdBeforeDate) {
+ public Pagination<PaymentAttemptModelDao> getPaymentAttemptsByStateAcrossTenants(final String stateName, final DateTime createdBeforeDate, final Long offset, final Long limit) {
return null;
}
diff --git a/payment/src/test/java/org/killbill/billing/payment/dao/TestPaymentDao.java b/payment/src/test/java/org/killbill/billing/payment/dao/TestPaymentDao.java
index 358cf2b..9afbb45 100644
--- a/payment/src/test/java/org/killbill/billing/payment/dao/TestPaymentDao.java
+++ b/payment/src/test/java/org/killbill/billing/payment/dao/TestPaymentDao.java
@@ -18,6 +18,7 @@ package org.killbill.billing.payment.dao;
import java.math.BigDecimal;
import java.util.ArrayList;
+import java.util.Iterator;
import java.util.List;
import java.util.UUID;
@@ -32,6 +33,7 @@ import org.killbill.billing.payment.dao.PluginPropertySerializer.PluginPropertyS
import org.killbill.billing.util.callcontext.CallOrigin;
import org.killbill.billing.util.callcontext.InternalCallContextFactory;
import org.killbill.billing.util.callcontext.UserType;
+import org.killbill.billing.util.entity.Pagination;
import org.testng.Assert;
import org.testng.annotations.Test;
@@ -298,7 +300,7 @@ public class TestPaymentDao extends PaymentTestSuiteWithEmbeddedDB {
final List<PaymentTransactionModelDao> result = getPendingTransactions(paymentModelDao.getId());
Assert.assertEquals(result.size(), 3);
- final List<PaymentTransactionModelDao> transactions1 = paymentDao.getByTransactionStatusAcrossTenants(ImmutableList.of(TransactionStatus.PENDING), newTime, initialTime, 3);
+ final Iterable<PaymentTransactionModelDao> transactions1 = paymentDao.getByTransactionStatusAcrossTenants(ImmutableList.of(TransactionStatus.PENDING), newTime, initialTime, 0L, 3L);
for (PaymentTransactionModelDao paymentTransaction : transactions1) {
final String newPaymentState = "XXX_FAILED";
paymentDao.updatePaymentAndTransactionOnCompletion(payment.getAccountId(), payment.getId(), paymentTransaction.getTransactionType(), newPaymentState, payment.getLastSuccessStateName(),
@@ -316,7 +318,7 @@ public class TestPaymentDao extends PaymentTestSuiteWithEmbeddedDB {
}
;
- final List<PaymentTransactionModelDao> transactions2 = paymentDao.getByTransactionStatusAcrossTenants(ImmutableList.of(TransactionStatus.PENDING), clock.getUTCNow(), initialTime, 1);
+ final Iterable<PaymentTransactionModelDao> transactions2 = paymentDao.getByTransactionStatusAcrossTenants(ImmutableList.of(TransactionStatus.PENDING), clock.getUTCNow(), initialTime, 0L, 1L);
for (PaymentTransactionModelDao paymentTransaction : transactions2) {
final String newPaymentState = "XXX_FAILED";
paymentDao.updatePaymentAndTransactionOnCompletion(payment.getAccountId(), payment.getId(), paymentTransaction.getTransactionType(), newPaymentState, payment.getLastSuccessStateName(),
@@ -467,7 +469,46 @@ public class TestPaymentDao extends PaymentTestSuiteWithEmbeddedDB {
assertEquals(result.size(), 2);
}
+
@Test(groups = "slow")
+ public void testPaginationForPaymentByStatesAcrossTenants() {
+ // Right before createdAfterDate, so should not be returned
+ final DateTime createdDate1 = clock.getUTCNow().minusHours(1);
+
+ final int NB_ENTRIES = 30;
+ for (int i = 0; i < NB_ENTRIES; i++) {
+ final PaymentModelDao paymentModelDao1 = new PaymentModelDao(createdDate1, createdDate1, UUID.randomUUID(), UUID.randomUUID(), UUID.randomUUID().toString());
+ final PaymentTransactionModelDao transaction1 = new PaymentTransactionModelDao(createdDate1, createdDate1, null, UUID.randomUUID().toString(),
+ paymentModelDao1.getId(), TransactionType.AUTHORIZE, createdDate1,
+ TransactionStatus.UNKNOWN, BigDecimal.TEN, Currency.AED,
+ "unknown", "");
+
+ final InternalCallContext context1 = new InternalCallContext(1L,
+ 1L,
+ internalCallContext.getUserToken(),
+ internalCallContext.getCreatedBy(),
+ internalCallContext.getCallOrigin(),
+ internalCallContext.getContextUserType(),
+ internalCallContext.getReasonCode(),
+ internalCallContext.getComments(),
+ createdDate1,
+ createdDate1);
+ paymentDao.insertPaymentWithFirstTransaction(paymentModelDao1, transaction1, context1);
+ }
+
+ final Pagination<PaymentTransactionModelDao> result = paymentDao.getByTransactionStatusAcrossTenants(ImmutableList.of(TransactionStatus.UNKNOWN), clock.getUTCNow(), createdDate1, 0L, new Long(NB_ENTRIES));
+ Assert.assertEquals(result.getTotalNbRecords(), new Long(NB_ENTRIES));
+
+ final Iterator<PaymentTransactionModelDao> iterator = result.iterator();
+ for (int i = 0; i < NB_ENTRIES; i++) {
+ System.out.println("i = " + i);
+ Assert.assertTrue(iterator.hasNext());
+ final PaymentTransactionModelDao nextEntry = iterator.next();
+ Assert.assertEquals(nextEntry.getTransactionStatus(), TransactionStatus.UNKNOWN);
+ }
+ }
+
+ @Test(groups = "slow")
public void testPaymentAttemptsByStateAcrossTenants() {
final UUID paymentMethodId = UUID.randomUUID();
@@ -519,8 +560,8 @@ public class TestPaymentDao extends PaymentTestSuiteWithEmbeddedDB {
paymentDao.insertPaymentAttemptWithProperties(attempt2, context2);
- final List<PaymentAttemptModelDao> result = paymentDao.getPaymentAttemptsByStateAcrossTenants(stateName, createdBeforeDate);
- Assert.assertEquals(result.size(), 2);
+ final Pagination<PaymentAttemptModelDao> result = paymentDao.getPaymentAttemptsByStateAcrossTenants(stateName, createdBeforeDate, 0L, 2L);
+ Assert.assertEquals(result.getTotalNbRecords().longValue(), 2L);
}
diff --git a/payment/src/test/java/org/killbill/billing/payment/glue/TestPaymentModuleWithEmbeddedDB.java b/payment/src/test/java/org/killbill/billing/payment/glue/TestPaymentModuleWithEmbeddedDB.java
index 7ad8a57..b7ba850 100644
--- a/payment/src/test/java/org/killbill/billing/payment/glue/TestPaymentModuleWithEmbeddedDB.java
+++ b/payment/src/test/java/org/killbill/billing/payment/glue/TestPaymentModuleWithEmbeddedDB.java
@@ -20,6 +20,7 @@ package org.killbill.billing.payment.glue;
import org.killbill.billing.GuicyKillbillTestWithEmbeddedDBModule;
import org.killbill.billing.account.glue.DefaultAccountModule;
+import org.killbill.billing.api.TestApiListener;
import org.killbill.billing.platform.api.KillbillConfigSource;
import org.killbill.billing.util.glue.NonEntityDaoModule;
import org.killbill.clock.Clock;
@@ -35,7 +36,7 @@ public class TestPaymentModuleWithEmbeddedDB extends TestPaymentModule {
install(new GuicyKillbillTestWithEmbeddedDBModule(configSource));
install(new NonEntityDaoModule(configSource));
install(new DefaultAccountModule(configSource));
-
+ bind(TestApiListener.class).asEagerSingleton();
super.configure();
}
}
diff --git a/payment/src/test/java/org/killbill/billing/payment/TestJanitor.java b/payment/src/test/java/org/killbill/billing/payment/TestJanitor.java
index bc68a36..cca3b59 100644
--- a/payment/src/test/java/org/killbill/billing/payment/TestJanitor.java
+++ b/payment/src/test/java/org/killbill/billing/payment/TestJanitor.java
@@ -23,13 +23,19 @@ import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.UUID;
+import java.util.concurrent.Callable;
+import java.util.concurrent.TimeoutException;
import org.joda.time.LocalDate;
import org.killbill.billing.account.api.Account;
+import org.killbill.billing.api.TestApiListener;
+import org.killbill.billing.api.TestApiListener.NextEvent;
+import org.killbill.billing.callcontext.InternalCallContext;
import org.killbill.billing.catalog.api.Currency;
import org.killbill.billing.invoice.api.Invoice;
import org.killbill.billing.invoice.api.InvoiceApiException;
import org.killbill.billing.invoice.api.InvoiceItem;
+import org.killbill.billing.payment.api.DefaultPayment;
import org.killbill.billing.payment.api.Payment;
import org.killbill.billing.payment.api.PaymentApiException;
import org.killbill.billing.payment.api.PaymentOptions;
@@ -37,13 +43,21 @@ import org.killbill.billing.payment.api.PaymentTransaction;
import org.killbill.billing.payment.api.PluginProperty;
import org.killbill.billing.payment.api.TransactionStatus;
import org.killbill.billing.payment.api.TransactionType;
+import org.killbill.billing.payment.bus.PaymentBusEventHandler;
import org.killbill.billing.payment.core.janitor.Janitor;
import org.killbill.billing.payment.dao.PaymentAttemptModelDao;
import org.killbill.billing.payment.dao.PaymentTransactionModelDao;
+import org.killbill.billing.payment.glue.DefaultPaymentService;
import org.killbill.billing.payment.invoice.InvoicePaymentRoutingPluginApi;
import org.killbill.billing.payment.provider.MockPaymentProviderPlugin;
import org.killbill.billing.platform.api.KillbillConfigSource;
+import org.killbill.billing.util.callcontext.InternalCallContextFactory;
import org.killbill.bus.api.PersistentBus.EventBusException;
+import org.killbill.commons.profiling.Profiling;
+import org.killbill.notificationq.api.NotificationEvent;
+import org.killbill.notificationq.api.NotificationEventWithMetadata;
+import org.killbill.notificationq.api.NotificationQueueService;
+import org.killbill.notificationq.api.NotificationQueueService.NoSuchNotificationQueue;
import org.skife.jdbi.v2.Handle;
import org.skife.jdbi.v2.tweak.HandleCallback;
import org.testng.Assert;
@@ -53,11 +67,16 @@ import org.testng.annotations.BeforeClass;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;
+import com.google.common.base.Predicate;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.Iterables;
import com.google.inject.Inject;
+import static com.jayway.awaitility.Awaitility.await;
+import static java.util.concurrent.TimeUnit.SECONDS;
import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.fail;
public class TestJanitor extends PaymentTestSuiteWithEmbeddedDB {
@@ -75,6 +94,14 @@ public class TestJanitor extends PaymentTestSuiteWithEmbeddedDB {
@Inject
private Janitor janitor;
+ @Inject
+ private PaymentBusEventHandler handler;
+ @Inject
+ protected TestApiListener testListener;
+ @Inject
+ protected InternalCallContextFactory internalCallContextFactory;
+ @Inject
+ protected NotificationQueueService notificationQueueService;
private MockPaymentProviderPlugin mockPaymentProviderPlugin;
@@ -93,6 +120,7 @@ public class TestJanitor extends PaymentTestSuiteWithEmbeddedDB {
protected void beforeClass() throws Exception {
super.beforeClass();
mockPaymentProviderPlugin = (MockPaymentProviderPlugin) registry.getServiceForName(MockPaymentProviderPlugin.PLUGIN_NAME);
+ janitor.initialize();
janitor.start();
}
@@ -104,12 +132,17 @@ public class TestJanitor extends PaymentTestSuiteWithEmbeddedDB {
@BeforeMethod(groups = "slow")
public void beforeMethod() throws Exception {
super.beforeMethod();
+ eventBus.register(handler);
+ testListener.reset();
+ eventBus.register(testListener);
mockPaymentProviderPlugin.clear();
account = testHelper.createTestAccount("bobo@gmail.com", true);
}
@AfterMethod(groups = "slow")
public void afterMethod() throws Exception {
+ eventBus.unregister(handler);
+ eventBus.unregister(testListener);
super.afterMethod();
}
@@ -233,20 +266,23 @@ public class TestJanitor extends PaymentTestSuiteWithEmbeddedDB {
final String paymentExternalKey = "qwru";
final String transactionExternalKey = "lkjdsf";
+ testListener.pushExpectedEvent(NextEvent.PAYMENT);
final Payment payment = paymentApi.createAuthorization(account, account.getPaymentMethodId(), null, requestedAmount, account.getCurrency(), paymentExternalKey,
transactionExternalKey, ImmutableList.<PluginProperty>of(), callContext);
+ testListener.assertListenerStatus();
// Artificially move the transaction status to UNKNOWN
+ final InternalCallContext internalCallContext = internalCallContextFactory.createInternalCallContext(account.getId(), callContext);
final String paymentStateName = paymentSMHelper.getErroredStateForTransaction(TransactionType.AUTHORIZE).toString();
+ testListener.pushExpectedEvent(NextEvent.PAYMENT_PLUGIN_ERROR);
paymentDao.updatePaymentAndTransactionOnCompletion(account.getId(), payment.getId(), TransactionType.AUTHORIZE, paymentStateName, paymentStateName,
payment.getTransactions().get(0).getId(), TransactionStatus.UNKNOWN, requestedAmount, account.getCurrency(),
"foo", "bar", internalCallContext);
- // The UnknownPaymentTransactionTask will look for UNKNOWN payment that *just happened* , and that are not too old (less than 7 days)
- clock.addDays(1);
- try {
- Thread.sleep(1500);
- } catch (InterruptedException e) {
- }
+ testListener.assertListenerStatus();
+
+ // Move clock for notification to be processed
+ clock.addDeltaFromReality(5 * 60 * 1000);
+ assertNotificationsCompleted(internalCallContext, 5);
final Payment updatedPayment = paymentApi.getPayment(payment.getId(), false, ImmutableList.<PluginProperty>of(), callContext);
assertEquals(updatedPayment.getTransactions().get(0).getTransactionStatus(), TransactionStatus.SUCCESS);
@@ -261,23 +297,26 @@ public class TestJanitor extends PaymentTestSuiteWithEmbeddedDB {
// Make sure the state as seen by the plugin will be in PaymentPluginStatus.ERROR, which will be returned later to Janitor
mockPaymentProviderPlugin.makeNextPaymentFailWithError();
+ testListener.pushExpectedEvent(NextEvent.PAYMENT_ERROR);
final Payment payment = paymentApi.createAuthorization(account, account.getPaymentMethodId(), null, requestedAmount, account.getCurrency(), paymentExternalKey,
transactionExternalKey, ImmutableList.<PluginProperty>of(), callContext);
+ testListener.assertListenerStatus();
// Artificially move the transaction status to UNKNOWN
+ final InternalCallContext internalCallContext = internalCallContextFactory.createInternalCallContext(account.getId(), callContext);
final String paymentStateName = paymentSMHelper.getErroredStateForTransaction(TransactionType.AUTHORIZE).toString();
+ testListener.pushExpectedEvent(NextEvent.PAYMENT_PLUGIN_ERROR);
paymentDao.updatePaymentAndTransactionOnCompletion(account.getId(), payment.getId(), TransactionType.AUTHORIZE, paymentStateName, paymentStateName,
payment.getTransactions().get(0).getId(), TransactionStatus.UNKNOWN, requestedAmount, account.getCurrency(),
"foo", "bar", internalCallContext);
+ testListener.assertListenerStatus();
final List<PaymentTransactionModelDao> paymentTransactionHistoryBeforeJanitor = getPaymentTransactionHistory(transactionExternalKey);
Assert.assertEquals(paymentTransactionHistoryBeforeJanitor.size(), 3);
- clock.addDays(1);
- try {
- Thread.sleep(1500);
- } catch (InterruptedException e) {
- }
+ // Move clock for notification to be processed
+ clock.addDeltaFromReality(5 * 60 * 1000);
+ assertNotificationsCompleted(internalCallContext, 5);
// Proves the Janitor ran (and updated the transaction)
final List<PaymentTransactionModelDao> paymentTransactionHistoryAfterJanitor = getPaymentTransactionHistory(transactionExternalKey);
@@ -299,26 +338,33 @@ public class TestJanitor extends PaymentTestSuiteWithEmbeddedDB {
// Make sure the state as seen by the plugin will be in PaymentPluginStatus.ERROR, which will be returned later to Janitor
mockPaymentProviderPlugin.makeNextPaymentFailWithException();
try {
+ testListener.pushExpectedEvent(NextEvent.PAYMENT_PLUGIN_ERROR);
paymentApi.createAuthorization(account, account.getPaymentMethodId(), null, requestedAmount, account.getCurrency(), paymentExternalKey,
transactionExternalKey, ImmutableList.<PluginProperty>of(), callContext);
} catch (PaymentApiException ignore) {
+ testListener.assertListenerStatus();
}
final Payment payment = paymentApi.getPaymentByExternalKey(paymentExternalKey, false, ImmutableList.<PluginProperty>of(), callContext);
// Artificially move the transaction status to UNKNOWN
+ final InternalCallContext internalCallContext = internalCallContextFactory.createInternalCallContext(account.getId(), callContext);
+
final String paymentStateName = paymentSMHelper.getErroredStateForTransaction(TransactionType.AUTHORIZE).toString();
+ testListener.pushExpectedEvent(NextEvent.PAYMENT_PLUGIN_ERROR);
paymentDao.updatePaymentAndTransactionOnCompletion(account.getId(), payment.getId(), TransactionType.AUTHORIZE, paymentStateName, paymentStateName,
payment.getTransactions().get(0).getId(), TransactionStatus.UNKNOWN, requestedAmount, account.getCurrency(),
"foo", "bar", internalCallContext);
+ testListener.assertListenerStatus();
+
+ // Move clock for notification to be processed
+ clock.addDeltaFromReality(5 * 60 * 1000);
+ // NO because we will keep retrying as we can't fix it...
+ //assertNotificationsCompleted(internalCallContext, 5);
final List<PaymentTransactionModelDao> paymentTransactionHistoryBeforeJanitor = getPaymentTransactionHistory(transactionExternalKey);
Assert.assertEquals(paymentTransactionHistoryBeforeJanitor.size(), 3);
- clock.addDays(1);
- try {
- Thread.sleep(1500);
- } catch (InterruptedException e) {
- }
+
// Nothing new happened
final List<PaymentTransactionModelDao> paymentTransactionHistoryAfterJanitor = getPaymentTransactionHistory(transactionExternalKey);
@@ -326,31 +372,36 @@ public class TestJanitor extends PaymentTestSuiteWithEmbeddedDB {
}
@Test(groups = "slow")
- public void testPendingEntries() throws PaymentApiException, EventBusException {
+ public void testPendingEntries() throws PaymentApiException, EventBusException, NoSuchNotificationQueue {
final BigDecimal requestedAmount = BigDecimal.TEN;
final String paymentExternalKey = "jhj44";
final String transactionExternalKey = "4jhjj2";
+ testListener.pushExpectedEvent(NextEvent.PAYMENT);
final Payment payment = paymentApi.createAuthorization(account, account.getPaymentMethodId(), null, requestedAmount, account.getCurrency(), paymentExternalKey,
transactionExternalKey, ImmutableList.<PluginProperty>of(), callContext);
+ testListener.assertListenerStatus();
+
+ final InternalCallContext internalCallContext = internalCallContextFactory.createInternalCallContext(account.getId(), callContext);
// Artificially move the transaction status to PENDING
final String paymentStateName = paymentSMHelper.getPendingStateForTransaction(TransactionType.AUTHORIZE).toString();
+ testListener.pushExpectedEvent(NextEvent.PAYMENT);
paymentDao.updatePaymentAndTransactionOnCompletion(account.getId(), payment.getId(), TransactionType.AUTHORIZE, paymentStateName, paymentStateName,
payment.getTransactions().get(0).getId(), TransactionStatus.PENDING, requestedAmount, account.getCurrency(),
"loup", "chat", internalCallContext);
- clock.addDays(1);
- try {
- Thread.sleep(1500);
- } catch (InterruptedException e) {
- }
+ testListener.assertListenerStatus();
+
+ // Move clock for notification to be processed
+ clock.addDeltaFromReality(5 * 60 * 1000);
+ assertNotificationsCompleted(internalCallContext, 5);
final Payment updatedPayment = paymentApi.getPayment(payment.getId(), false, ImmutableList.<PluginProperty>of(), callContext);
Assert.assertEquals(updatedPayment.getTransactions().get(0).getTransactionStatus(), TransactionStatus.SUCCESS);
-
}
+
private List<PluginProperty> createPropertiesForInvoice(final Invoice invoice) {
final List<PluginProperty> result = new ArrayList<PluginProperty>();
result.add(new PluginProperty(InvoicePaymentRoutingPluginApi.PROP_IPCD_INVOICE_ID, invoice.getId().toString(), false));
@@ -385,5 +436,19 @@ public class TestJanitor extends PaymentTestSuiteWithEmbeddedDB {
}
});
}
+
+ private void assertNotificationsCompleted(final InternalCallContext internalCallContext, final long timeoutSec) {
+ try {
+ await().atMost(timeoutSec, SECONDS).until(new Callable<Boolean>() {
+ @Override
+ public Boolean call() throws Exception {
+ final List<NotificationEventWithMetadata<NotificationEvent>> notifications = notificationQueueService.getNotificationQueue(DefaultPaymentService.SERVICE_NAME, Janitor.QUEUE_NAME).getFutureOrInProcessingNotificationForSearchKeys(internalCallContext.getAccountRecordId(), internalCallContext.getTenantRecordId());
+ return notifications.isEmpty();
+ }
+ });
+ } catch (final Exception e) {
+ fail("Test failed ", e);
+ }
+ }
}
diff --git a/payment/src/test/java/org/killbill/billing/payment/TestRetryService.java b/payment/src/test/java/org/killbill/billing/payment/TestRetryService.java
index 2a1ae78..dcf9a9a 100644
--- a/payment/src/test/java/org/killbill/billing/payment/TestRetryService.java
+++ b/payment/src/test/java/org/killbill/billing/payment/TestRetryService.java
@@ -70,7 +70,7 @@ public class TestRetryService extends PaymentTestSuiteNoDB {
((MockPaymentDao) paymentDao).reset();
mockPaymentProviderPlugin = (MockPaymentProviderPlugin) registry.getServiceForName(MockPaymentProviderPlugin.PLUGIN_NAME);
mockPaymentProviderPlugin.clear();
- retryService.initialize(DefaultPaymentService.SERVICE_NAME);
+ retryService.initialize();
retryService.start();
}
diff --git a/profiles/killbill/src/main/java/org/killbill/billing/server/notifications/PushNotificationListener.java b/profiles/killbill/src/main/java/org/killbill/billing/server/notifications/PushNotificationListener.java
index dfa6f2c..d821177 100644
--- a/profiles/killbill/src/main/java/org/killbill/billing/server/notifications/PushNotificationListener.java
+++ b/profiles/killbill/src/main/java/org/killbill/billing/server/notifications/PushNotificationListener.java
@@ -44,6 +44,7 @@ import com.ning.http.client.Response;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.annotations.VisibleForTesting;
+import com.google.common.eventbus.AllowConcurrentEvents;
import com.google.common.eventbus.Subscribe;
public class PushNotificationListener {
@@ -70,6 +71,7 @@ public class PushNotificationListener {
this.mapper = mapper;
}
+ @AllowConcurrentEvents
@Subscribe
public void triggerPushNotifications(final ExtBusEvent event) {
final TenantContext context = contextFactory.createTenantContext(event.getTenantId());
diff --git a/util/src/main/java/org/killbill/billing/util/config/PaymentConfig.java b/util/src/main/java/org/killbill/billing/util/config/PaymentConfig.java
index e0fb8bf..7b630c6 100644
--- a/util/src/main/java/org/killbill/billing/util/config/PaymentConfig.java
+++ b/util/src/main/java/org/killbill/billing/util/config/PaymentConfig.java
@@ -66,16 +66,10 @@ public interface PaymentConfig extends KillbillConfig {
@Description("Delay before which unresolved attempt should be retried")
public TimeSpan getIncompleteAttemptsTimeSpanDelay();
- @Config("org.killbill.payment.janitor.transactions.delay")
- @Default("3m")
+ @Config("org.killbill.payment.janitor.transactions.retries")
+ @Default("15s,1m,3m,1h,1d,1d,1d,1d,1d")
@Description("Delay before which unresolved transactions should be retried")
- public TimeSpan getIncompleteTransactionsTimeSpanDelay();
-
-
- @Config("org.killbill.payment.janitor.transactions.giveup")
- @Default("7d")
- @Description("Delay after which unresolved transactions should be abandoned")
- public TimeSpan getIncompleteTransactionsTimeSpanGiveup();
+ public List<TimeSpan> getIncompleteTransactionsRetries();
@Config("org.killbill.payment.janitor.rate")
@Default("1h")
diff --git a/util/src/main/java/org/killbill/billing/util/entity/dao/DefaultPaginationSqlDaoHelper.java b/util/src/main/java/org/killbill/billing/util/entity/dao/DefaultPaginationSqlDaoHelper.java
index 48c23f1..1f9a9fd 100644
--- a/util/src/main/java/org/killbill/billing/util/entity/dao/DefaultPaginationSqlDaoHelper.java
+++ b/util/src/main/java/org/killbill/billing/util/entity/dao/DefaultPaginationSqlDaoHelper.java
@@ -18,6 +18,8 @@ package org.killbill.billing.util.entity.dao;
import java.util.Iterator;
+import javax.annotation.Nullable;
+
import org.killbill.billing.callcontext.InternalTenantContext;
import org.killbill.billing.util.entity.DefaultPagination;
import org.killbill.billing.util.entity.Entity;
@@ -35,7 +37,7 @@ public class DefaultPaginationSqlDaoHelper {
final PaginationIteratorBuilder<M, E, S> paginationIteratorBuilder,
final Long offset,
final Long limit,
- final InternalTenantContext context) {
+ @Nullable final InternalTenantContext context) {
// Note: the connection will be busy as we stream the results out: hence we cannot use
// SQL_CALC_FOUND_ROWS / FOUND_ROWS on the actual query.
// We still need to know the actual number of results, mainly for the UI so that it knows if it needs to fetch
@@ -51,7 +53,7 @@ public class DefaultPaginationSqlDaoHelper {
// We usually always want to wrap our queries in an EntitySqlDaoTransactionWrapper... except here.
// Since we want to stream the results out, we don't want to auto-commit when this method returns.
final EntitySqlDao<M, E> sqlDao = transactionalSqlDao.onDemandForStreamingResults(sqlDaoClazz);
- final Long totalCount = sqlDao.getCount(context);
+ final Long totalCount = context != null ? sqlDao.getCount(context) : null;
final Iterator<M> results = paginationIteratorBuilder.build((S) sqlDao, limit, context);
return new DefaultPagination<M>(offset, limit, count, totalCount, results);