killbill-aplcache
Changes
entitlement/src/main/java/com/ning/billing/entitlement/api/test/DefaultEntitlementTestApi.java 26(+21 -5)
entitlement/src/main/java/com/ning/billing/entitlement/engine/core/ApiEventProcessorBase.java 253(+0 -253)
entitlement/src/main/java/com/ning/billing/entitlement/engine/core/DefaultApiEventProcessor.java 63(+0 -63)
entitlement/src/main/java/com/ning/billing/entitlement/engine/dao/EntitlementSqlDao.java 152(+85 -67)
entitlement/src/main/resources/com/ning/billing/entitlement/engine/dao/EventSqlDao.sql.stg 79(+7 -72)
entitlement/src/test/java/com/ning/billing/entitlement/api/billing/BrainDeadMockEntitlementDao.java 17(+5 -12)
entitlement/src/test/java/com/ning/billing/entitlement/engine/core/MockApiEventProcessorMemory.java 63(+0 -63)
entitlement/src/test/java/com/ning/billing/entitlement/engine/dao/MockEntitlementDaoMemory.java 113(+71 -42)
entitlement/src/test/java/com/ning/billing/entitlement/engine/dao/MockEntitlementDaoSql.java 21(+14 -7)
payment/src/test/java/com/ning/billing/payment/provider/MockPaymentProviderPlugin.java 202(+162 -40)
pom.xml 1(+1 -0)
util/pom.xml 15(+14 -1)
util/src/main/java/com/ning/billing/util/notificationq/DefaultNotificationQueueService.java 41(+41 -0)
util/src/main/resources/com/ning/billing/util/notificationq/dao/NotificationSqlDao.sql.stg 83(+83 -0)
Details
diff --git a/account/src/main/java/com/ning/billing/account/dao/DefaultAccountDao.java b/account/src/main/java/com/ning/billing/account/dao/DefaultAccountDao.java
index 0fdb5d3..f501267 100644
--- a/account/src/main/java/com/ning/billing/account/dao/DefaultAccountDao.java
+++ b/account/src/main/java/com/ning/billing/account/dao/DefaultAccountDao.java
@@ -18,10 +18,11 @@ package com.ning.billing.account.dao;
import java.util.List;
import java.util.UUID;
+
import org.skife.jdbi.v2.IDBI;
import org.skife.jdbi.v2.Transaction;
import org.skife.jdbi.v2.TransactionStatus;
-import org.skife.jdbi.v2.exceptions.TransactionFailedException;
+
import com.google.inject.Inject;
import com.ning.billing.ErrorCode;
import com.ning.billing.account.api.Account;
@@ -157,7 +158,7 @@ public class DefaultAccountDao implements AccountDao {
}
}
}
-
+
@Override
public void deleteByKey(final String externalKey) throws AccountApiException {
try {
@@ -204,7 +205,7 @@ public class DefaultAccountDao implements AccountDao {
}
}
- private void saveCustomFieldsFromWithinTransaction(final Account account, final AccountSqlDao transactionalDao, final boolean isCreation) {
+ private void saveTagsFromWithinTransaction(final Account account, final AccountSqlDao transactionalDao, final boolean isCreation) {
String accountId = account.getId().toString();
String objectType = account.getObjectName();
@@ -219,7 +220,7 @@ public class DefaultAccountDao implements AccountDao {
}
}
- private void saveTagsFromWithinTransaction(final Account account, final AccountSqlDao transactionalDao, final boolean isCreation) {
+ private void saveCustomFieldsFromWithinTransaction(final Account account, final AccountSqlDao transactionalDao, final boolean isCreation) {
String accountId = account.getId().toString();
String objectType = account.getObjectName();
@@ -234,5 +235,5 @@ public class DefaultAccountDao implements AccountDao {
}
}
-
+
}
diff --git a/analytics/src/test/java/com/ning/billing/analytics/AnalyticsTestModule.java b/analytics/src/test/java/com/ning/billing/analytics/AnalyticsTestModule.java
index d4fb56a..48663fa 100644
--- a/analytics/src/test/java/com/ning/billing/analytics/AnalyticsTestModule.java
+++ b/analytics/src/test/java/com/ning/billing/analytics/AnalyticsTestModule.java
@@ -26,6 +26,7 @@ import com.ning.billing.dbi.MysqlTestingHelper;
import com.ning.billing.entitlement.glue.EntitlementModule;
import com.ning.billing.util.glue.ClockModule;
import com.ning.billing.util.glue.EventBusModule;
+import com.ning.billing.util.glue.NotificationQueueModule;
import com.ning.billing.util.glue.TagStoreModule;
public class AnalyticsTestModule extends AnalyticsModule
@@ -42,6 +43,7 @@ public class AnalyticsTestModule extends AnalyticsModule
install(new EntitlementModule());
install(new ClockModule());
install(new TagStoreModule());
+ install(new NotificationQueueModule());
// Install the Dao layer
final MysqlTestingHelper helper = new MysqlTestingHelper();
diff --git a/api/src/main/java/com/ning/billing/invoice/api/InvoiceUserApi.java b/api/src/main/java/com/ning/billing/invoice/api/InvoiceUserApi.java
index 94b28e5..d826d5c 100644
--- a/api/src/main/java/com/ning/billing/invoice/api/InvoiceUserApi.java
+++ b/api/src/main/java/com/ning/billing/invoice/api/InvoiceUserApi.java
@@ -33,11 +33,6 @@ public interface InvoiceUserApi {
public void notifyOfPaymentAttempt(InvoicePayment invoicePayment);
-// public void paymentAttemptFailed(UUID invoiceId, UUID paymentId, DateTime paymentAttemptDate);
-//
-// public void paymentAttemptSuccessful(UUID invoiceId, BigDecimal amount, Currency currency,
-// UUID paymentId, DateTime paymentDate);
-
public BigDecimal getAccountBalance(UUID accountId);
}
diff --git a/api/src/main/java/com/ning/billing/payment/api/CreditCardPaymentMethodInfo.java b/api/src/main/java/com/ning/billing/payment/api/CreditCardPaymentMethodInfo.java
index 75a4ab2..a9e25dc 100644
--- a/api/src/main/java/com/ning/billing/payment/api/CreditCardPaymentMethodInfo.java
+++ b/api/src/main/java/com/ning/billing/payment/api/CreditCardPaymentMethodInfo.java
@@ -23,6 +23,12 @@ public final class CreditCardPaymentMethodInfo extends PaymentMethodInfo {
private String cardType;
private String expirationDate;
private String maskNumber;
+ private String cardAddress1;
+ private String cardAddress2;
+ private String cardCity;
+ private String cardState;
+ private String cardPostalCode;
+ private String cardCountry;
public Builder() {
super(Builder.class);
@@ -30,6 +36,16 @@ public final class CreditCardPaymentMethodInfo extends PaymentMethodInfo {
public Builder(CreditCardPaymentMethodInfo src) {
super(Builder.class, src);
+ this.cardHolderName = src.cardHolderName;
+ this.cardType = src.cardType;
+ this.expirationDate = src.expirationDate;
+ this.cardAddress1 = src.cardAddress1;
+ this.cardAddress2 = src.cardAddress2;
+ this.cardCity = src.cardCity;
+ this.cardState = src.cardState;
+ this.cardPostalCode = src.cardPostalCode;
+ this.cardCountry = src.cardCountry;
+ this.maskNumber = src.maskNumber;
}
public Builder setCardHolderName(String cardHolderName) {
@@ -47,13 +63,55 @@ public final class CreditCardPaymentMethodInfo extends PaymentMethodInfo {
return this;
}
+ public Builder setCardAddress1(String creditCardAddress1) {
+ this.cardAddress1 = creditCardAddress1;
+ return this;
+ }
+
+ public Builder setCardAddress2(String creditCardAddress2) {
+ this.cardAddress2 = creditCardAddress2;
+ return this;
+ }
+
+ public Builder setCardCity(String creditCardCity) {
+ this.cardCity = creditCardCity;
+ return this;
+ }
+
+ public Builder setCardState(String creditCardState) {
+ this.cardState = creditCardState;
+ return this;
+ }
+
+ public Builder setCardPostalCode(String creditCardPostalCode) {
+ this.cardPostalCode = creditCardPostalCode;
+ return this;
+ }
+
+ public Builder setCardCountry(String creditCardCountry) {
+ this.cardCountry = creditCardCountry;
+ return this;
+ }
+
public Builder setMaskNumber(String maskNumber) {
this.maskNumber = maskNumber;
return this;
}
public CreditCardPaymentMethodInfo build() {
- return new CreditCardPaymentMethodInfo(id, accountId, defaultMethod, cardHolderName, cardType, expirationDate, maskNumber);
+ return new CreditCardPaymentMethodInfo(id,
+ accountId,
+ defaultMethod,
+ cardHolderName,
+ cardType,
+ expirationDate,
+ maskNumber,
+ cardAddress1,
+ cardAddress2,
+ cardCity,
+ cardState,
+ cardPostalCode,
+ cardCountry);
}
}
@@ -61,6 +119,12 @@ public final class CreditCardPaymentMethodInfo extends PaymentMethodInfo {
private final String cardType;
private final String expirationDate;
private final String maskNumber;
+ private final String cardAddress1;
+ private final String cardAddress2;
+ private final String cardCity;
+ private final String cardState;
+ private final String cardPostalCode;
+ private final String cardCountry;
public CreditCardPaymentMethodInfo(String id,
String accountId,
@@ -68,12 +132,25 @@ public final class CreditCardPaymentMethodInfo extends PaymentMethodInfo {
String cardHolderName,
String cardType,
String expirationDate,
- String maskNumber) {
+ String maskNumber,
+ String cardAddress1,
+ String cardAddress2,
+ String cardCity,
+ String cardState,
+ String cardPostalCode,
+ String cardCountry) {
+
super(id, accountId, defaultMethod, "CreditCard");
this.cardHolderName = cardHolderName;
this.cardType = cardType;
this.expirationDate = expirationDate;
this.maskNumber = maskNumber;
+ this.cardAddress1 = cardAddress1;
+ this.cardAddress2 = cardAddress2;
+ this.cardCity = cardCity;
+ this.cardState = cardState;
+ this.cardPostalCode = cardPostalCode;
+ this.cardCountry = cardCountry;
}
public String getCardHolderName() {
@@ -84,6 +161,30 @@ public final class CreditCardPaymentMethodInfo extends PaymentMethodInfo {
return cardType;
}
+ public String getCardAddress1() {
+ return cardAddress1;
+ }
+
+ public String getCardAddress2() {
+ return cardAddress2;
+ }
+
+ public String getCardCity() {
+ return cardCity;
+ }
+
+ public String getCardState() {
+ return cardState;
+ }
+
+ public String getCardPostalCode() {
+ return cardPostalCode;
+ }
+
+ public String getCardCountry() {
+ return cardCountry;
+ }
+
public String getExpirationDate() {
return expirationDate;
}
diff --git a/api/src/main/java/com/ning/billing/payment/api/PaymentApi.java b/api/src/main/java/com/ning/billing/payment/api/PaymentApi.java
index 8e665ed..189e87c 100644
--- a/api/src/main/java/com/ning/billing/payment/api/PaymentApi.java
+++ b/api/src/main/java/com/ning/billing/payment/api/PaymentApi.java
@@ -23,18 +23,19 @@ import javax.annotation.Nullable;
import com.ning.billing.account.api.Account;
public interface PaymentApi {
- Either<PaymentError, PaymentMethodInfo> getPaymentMethod(@Nullable String accountKey, String paymentMethodId);
- Either<PaymentError, List<PaymentMethodInfo>> getPaymentMethods(String accountKey);
+ Either<PaymentError, Void> updatePaymentGateway(String accountKey);
- Either<PaymentError, Void> deletePaymentMethod(String accountKey, String paymentMethodId);
+ Either<PaymentError, PaymentMethodInfo> getPaymentMethod(@Nullable String accountKey, String paymentMethodId);
- Either<PaymentError, Void> updatePaymentGateway(String accountKey);
+ Either<PaymentError, List<PaymentMethodInfo>> getPaymentMethods(String accountKey);
- Either<PaymentError, String> addPaypalPaymentMethod(@Nullable String accountKey, PaypalPaymentMethodInfo paypalPaymentMethod);
+ Either<PaymentError, String> addPaymentMethod(@Nullable String accountKey, PaymentMethodInfo paymentMethod);
Either<PaymentError, PaymentMethodInfo> updatePaymentMethod(String accountKey, PaymentMethodInfo paymentMethodInfo);
+ Either<PaymentError, Void> deletePaymentMethod(String accountKey, String paymentMethodId);
+
List<Either<PaymentError, PaymentInfo>> createPayment(String accountKey, List<String> invoiceIds);
List<Either<PaymentError, PaymentInfo>> createPayment(Account account, List<String> invoiceIds);
@@ -44,7 +45,7 @@ public interface PaymentApi {
Either<PaymentError, String> createPaymentProviderAccount(Account account);
- Either<PaymentError, PaymentProviderAccount> updatePaymentProviderAccount(Account account);
+ Either<PaymentError, Void> updatePaymentProviderAccountContact(Account account);
PaymentAttempt getPaymentAttemptForPaymentId(String id);
diff --git a/api/src/main/java/com/ning/billing/payment/api/PaymentProviderAccount.java b/api/src/main/java/com/ning/billing/payment/api/PaymentProviderAccount.java
index 553d6a2..47cad39 100644
--- a/api/src/main/java/com/ning/billing/payment/api/PaymentProviderAccount.java
+++ b/api/src/main/java/com/ning/billing/payment/api/PaymentProviderAccount.java
@@ -20,18 +20,18 @@ import com.google.common.base.Objects;
public class PaymentProviderAccount {
private final String id;
- private final String accountNumber;
+ private final String accountKey;
private final String accountName;
private final String phoneNumber;
private final String defaultPaymentMethodId;
public PaymentProviderAccount(String id,
- String accountNumber,
+ String accountKey,
String accountName,
String phoneNumber,
String defaultPaymentMethodId) {
this.id = id;
- this.accountNumber = accountNumber;
+ this.accountKey = accountKey;
this.accountName = accountName;
this.phoneNumber = phoneNumber;
this.defaultPaymentMethodId = defaultPaymentMethodId;
@@ -41,8 +41,8 @@ public class PaymentProviderAccount {
return id;
}
- public String getAccountNumber() {
- return accountNumber;
+ public String getAccountKey() {
+ return accountKey;
}
public String getAccountName() {
@@ -59,18 +59,27 @@ public class PaymentProviderAccount {
public static class Builder {
private String id;
- private String accountNumber;
+ private String accountKey;
private String accountName;
private String phoneNumber;
private String defaultPaymentMethodId;
+ public Builder copyFrom(PaymentProviderAccount src) {
+ this.id = src.getId();
+ this.accountKey = src.getAccountKey();
+ this.accountName = src.getAccountName();
+ this.phoneNumber = src.getPhoneNumber();
+ this.defaultPaymentMethodId = src.getDefaultPaymentMethodId();
+ return this;
+ }
+
public Builder setId(String id) {
this.id = id;
return this;
}
- public Builder setAccountNumber(String accountNumber) {
- this.accountNumber = accountNumber;
+ public Builder setAccountKey(String accountKey) {
+ this.accountKey = accountKey;
return this;
}
@@ -90,7 +99,7 @@ public class PaymentProviderAccount {
}
public PaymentProviderAccount build() {
- return new PaymentProviderAccount(id, accountNumber, accountName, phoneNumber, defaultPaymentMethodId);
+ return new PaymentProviderAccount(id, accountKey, accountName, phoneNumber, defaultPaymentMethodId);
}
}
@@ -98,7 +107,7 @@ public class PaymentProviderAccount {
@Override
public int hashCode() {
return Objects.hashCode(id,
- accountNumber,
+ accountKey,
accountName,
phoneNumber,
defaultPaymentMethodId);
@@ -113,7 +122,8 @@ public class PaymentProviderAccount {
}
else {
return Objects.equal(id, other.id) &&
- Objects.equal(accountNumber, other.accountNumber) &&
+ Objects.equal(accountKey, other.accountKey) &&
+ Objects.equal(accountName, other.accountName) &&
Objects.equal(phoneNumber, other.phoneNumber) &&
Objects.equal(defaultPaymentMethodId, other.defaultPaymentMethodId);
}
@@ -121,4 +131,9 @@ public class PaymentProviderAccount {
return false;
}
+ @Override
+ public String toString() {
+ return "PaymentProviderAccount [id=" + id + ", accountKey=" + accountKey + ", accountName=" + accountName + ", phoneNumber=" + phoneNumber + ", defaultPaymentMethodId=" + defaultPaymentMethodId + "]";
+ }
+
}
diff --git a/api/src/main/java/com/ning/billing/payment/api/PaypalPaymentMethodInfo.java b/api/src/main/java/com/ning/billing/payment/api/PaypalPaymentMethodInfo.java
index 65c36e9..0977071 100644
--- a/api/src/main/java/com/ning/billing/payment/api/PaypalPaymentMethodInfo.java
+++ b/api/src/main/java/com/ning/billing/payment/api/PaypalPaymentMethodInfo.java
@@ -20,6 +20,8 @@ import com.google.common.base.Strings;
public final class PaypalPaymentMethodInfo extends PaymentMethodInfo {
+ public static final String TYPE = "PayPal";
+
public static final class Builder extends BuilderBase<PaypalPaymentMethodInfo, Builder> {
private String baid;
private String email;
@@ -30,6 +32,8 @@ public final class PaypalPaymentMethodInfo extends PaymentMethodInfo {
public Builder(PaypalPaymentMethodInfo src) {
super(Builder.class, src);
+ this.baid = src.baid;
+ this.email = src.email;
}
public Builder setBaid(String baid) {
@@ -55,10 +59,10 @@ public final class PaypalPaymentMethodInfo extends PaymentMethodInfo {
Boolean defaultMethod,
String baid,
String email) {
- super(id, accountId, defaultMethod, "PayPal");
+ super(id, accountId, defaultMethod, TYPE);
- if (Strings.isNullOrEmpty(accountId) || Strings.isNullOrEmpty(baid) || Strings.isNullOrEmpty(email)) {
- throw new IllegalArgumentException("accountId, baid and email should be present");
+ if (Strings.isNullOrEmpty(baid) || Strings.isNullOrEmpty(email)) {
+ throw new IllegalArgumentException("baid and email should be present");
}
this.baid = baid;
@@ -72,4 +76,10 @@ public final class PaypalPaymentMethodInfo extends PaymentMethodInfo {
public String getEmail() {
return email;
}
+
+ @Override
+ public String toString() {
+ return "PaypalPaymentMethodInfo [baid=" + baid + ", email=" + email + "]";
+ }
+
}
diff --git a/entitlement/src/main/java/com/ning/billing/entitlement/api/test/DefaultEntitlementTestApi.java b/entitlement/src/main/java/com/ning/billing/entitlement/api/test/DefaultEntitlementTestApi.java
index c2f5878..19b851d 100644
--- a/entitlement/src/main/java/com/ning/billing/entitlement/api/test/DefaultEntitlementTestApi.java
+++ b/entitlement/src/main/java/com/ning/billing/entitlement/api/test/DefaultEntitlementTestApi.java
@@ -18,7 +18,11 @@ package com.ning.billing.entitlement.api.test;
import com.google.inject.Inject;
import com.ning.billing.config.EntitlementConfig;
-import com.ning.billing.entitlement.engine.core.EventNotifier;
+import com.ning.billing.entitlement.engine.core.Engine;
+import com.ning.billing.util.notificationq.NotificationQueue;
+import com.ning.billing.util.notificationq.NotificationQueueService;
+import com.ning.billing.util.notificationq.NotificationQueueService.NoSuchNotificationQueue;
+
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -28,20 +32,32 @@ public class DefaultEntitlementTestApi implements EntitlementTestApi {
private final static Logger log = LoggerFactory.getLogger(DefaultEntitlementTestApi.class);
- private final EventNotifier apiEventProcessor;
private final EntitlementConfig config;
+ private final NotificationQueueService notificationQueueService;
@Inject
- public DefaultEntitlementTestApi(EventNotifier apiEventProcessor, EntitlementConfig config) {
- this.apiEventProcessor = apiEventProcessor;
+ public DefaultEntitlementTestApi(NotificationQueueService notificationQueueService, EntitlementConfig config) {
this.config = config;
+ this.notificationQueueService = notificationQueueService;
}
@Override
public void doProcessReadyEvents(UUID [] subscriptionsIds, Boolean recursive, Boolean oneEventOnly) {
if (config.isEventProcessingOff()) {
log.warn("Running event processing loop");
- apiEventProcessor.processAllReadyEvents(subscriptionsIds, recursive, oneEventOnly);
+ NotificationQueue queue = getNotificationQueue();
+ queue.processReadyNotification();
+
+ }
+ }
+
+ private NotificationQueue getNotificationQueue() {
+ try {
+ NotificationQueue subscritionEventQueue = notificationQueueService.getNotificationQueue(Engine.ENTITLEMENT_SERVICE_NAME,
+ Engine.NOTIFICATION_QUEUE_NAME);
+ return subscritionEventQueue;
+ } catch (NoSuchNotificationQueue e) {
+ throw new RuntimeException(e);
}
}
}
diff --git a/entitlement/src/main/java/com/ning/billing/entitlement/engine/core/Engine.java b/entitlement/src/main/java/com/ning/billing/entitlement/engine/core/Engine.java
index 3eb5dd3..7bd1124 100644
--- a/entitlement/src/main/java/com/ning/billing/entitlement/engine/core/Engine.java
+++ b/entitlement/src/main/java/com/ning/billing/entitlement/engine/core/Engine.java
@@ -16,6 +16,9 @@
package com.ning.billing.entitlement.engine.core;
+import java.util.UUID;
+
+
import org.joda.time.DateTime;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -45,10 +48,16 @@ import com.ning.billing.lifecycle.LifecycleHandlerType.LifecycleLevel;
import com.ning.billing.util.clock.Clock;
import com.ning.billing.util.eventbus.EventBus;
import com.ning.billing.util.eventbus.EventBus.EventBusException;
+import com.ning.billing.util.notificationq.NotificationConfig;
+import com.ning.billing.util.notificationq.NotificationQueue;
+import com.ning.billing.util.notificationq.NotificationQueueService;
+import com.ning.billing.util.notificationq.NotificationQueueService.NotficationQueueAlreadyExists;
+import com.ning.billing.util.notificationq.NotificationQueueService.NotificationQueueHandler;
public class Engine implements EventListener, EntitlementService {
- private static final String ENTITLEMENT_SERVICE_NAME = "entitlement-service";
+ public static final String NOTIFICATION_QUEUE_NAME = "subscription-events";
+ public static final String ENTITLEMENT_SERVICE_NAME = "entitlement-service";
private final long MAX_NOTIFICATION_THREAD_WAIT_MS = 10000; // 10 secs
private final long NOTIFICATION_THREAD_WAIT_INCREMENT_MS = 1000; // 1 sec
@@ -58,33 +67,36 @@ public class Engine implements EventListener, EntitlementService {
private final Clock clock;
private final EntitlementDao dao;
- private final EventNotifier apiEventProcessor;
private final PlanAligner planAligner;
private final EntitlementUserApi userApi;
private final EntitlementBillingApi billingApi;
private final EntitlementTestApi testApi;
private final EntitlementMigrationApi migrationApi;
private final EventBus eventBus;
+ private final EntitlementConfig config;
+ private final NotificationQueueService notificationQueueService;
private boolean startedNotificationThread;
+ private boolean stoppedNotificationThread;
+ private NotificationQueue subscritionEventQueue;
@Inject
- public Engine(Clock clock, EntitlementDao dao, EventNotifier apiEventProcessor,
- PlanAligner planAligner, EntitlementConfig config, DefaultEntitlementUserApi userApi,
+ public Engine(Clock clock, EntitlementDao dao, PlanAligner planAligner,
+ EntitlementConfig config, DefaultEntitlementUserApi userApi,
DefaultEntitlementBillingApi billingApi, DefaultEntitlementTestApi testApi,
- DefaultEntitlementMigrationApi migrationApi, EventBus eventBus) {
+ DefaultEntitlementMigrationApi migrationApi, EventBus eventBus,
+ NotificationQueueService notificationQueueService) {
super();
this.clock = clock;
this.dao = dao;
- this.apiEventProcessor = apiEventProcessor;
this.planAligner = planAligner;
this.userApi = userApi;
this.testApi = testApi;
this.billingApi = billingApi;
this.migrationApi = migrationApi;
+ this.config = config;
this.eventBus = eventBus;
-
- this.startedNotificationThread = false;
+ this.notificationQueueService = notificationQueueService;
}
@Override
@@ -92,20 +104,75 @@ public class Engine implements EventListener, EntitlementService {
return ENTITLEMENT_SERVICE_NAME;
}
-
@LifecycleHandlerType(LifecycleLevel.INIT_SERVICE)
public void initialize() {
+
+ try {
+ this.stoppedNotificationThread = false;
+ this.startedNotificationThread = false;
+ subscritionEventQueue = notificationQueueService.createNotificationQueue(ENTITLEMENT_SERVICE_NAME,
+ NOTIFICATION_QUEUE_NAME,
+ new NotificationQueueHandler() {
+ @Override
+ public void handleReadyNotification(String notificationKey) {
+ EntitlementEvent event = dao.getEventById(UUID.fromString(notificationKey));
+ if (event == null) {
+ log.warn("Failed to extract event for notification key {}", notificationKey);
+ } else {
+ processEventReady(event);
+ }
+ }
+
+ @Override
+ public void completedQueueStop() {
+ synchronized (this) {
+ stoppedNotificationThread = true;
+ this.notifyAll();
+ }
+ }
+ @Override
+ public void completedQueueStart() {
+ synchronized (this) {
+ startedNotificationThread = true;
+ this.notifyAll();
+ }
+ }
+ },
+ new NotificationConfig() {
+ @Override
+ public boolean isNotificationProcessingOff() {
+ return config.isEventProcessingOff();
+ }
+ @Override
+ public long getNotificationSleepTimeMs() {
+ return config.getNotificationSleepTimeMs();
+ }
+ @Override
+ public int getDaoMaxReadyEvents() {
+ return config.getDaoMaxReadyEvents();
+ }
+ @Override
+ public long getDaoClaimTimeMs() {
+ return config.getDaoMaxReadyEvents();
+ }
+ });
+ } catch (NotficationQueueAlreadyExists e) {
+ throw new RuntimeException(e);
+ }
}
@LifecycleHandlerType(LifecycleLevel.START_SERVICE)
public void start() {
- apiEventProcessor.startNotifications(this);
+ subscritionEventQueue.startQueue();
waitForNotificationStartCompletion();
}
@LifecycleHandlerType(LifecycleLevel.STOP_SERVICE)
public void stop() {
- apiEventProcessor.stopNotifications();
+ if (subscritionEventQueue != null) {
+ subscritionEventQueue.stopQueue();
+ waitForNotificationStopCompletion();
+ }
startedNotificationThread = false;
}
@@ -133,6 +200,9 @@ public class Engine implements EventListener, EntitlementService {
@Override
public void processEventReady(EntitlementEvent event) {
+ if (!event.isActive()) {
+ return;
+ }
SubscriptionData subscription = (SubscriptionData) dao.getSubscriptionFromId(event.getSubscriptionId());
if (subscription == null) {
log.warn("Failed to retrieve subscription for id %s", event.getSubscriptionId());
@@ -148,23 +218,20 @@ public class Engine implements EventListener, EntitlementService {
}
}
- //
- // We want to ensure the notification thread is indeed started when we return from start()
- //
- @Override
- public void completedNotificationStart() {
- synchronized (this) {
- startedNotificationThread = true;
- this.notifyAll();
- }
+ private void waitForNotificationStartCompletion() {
+ waitForNotificationEventCompletion(true);
}
- private void waitForNotificationStartCompletion() {
+ private void waitForNotificationStopCompletion() {
+ waitForNotificationEventCompletion(false);
+ }
+
+ private void waitForNotificationEventCompletion(boolean startEvent) {
long ini = System.nanoTime();
synchronized(this) {
do {
- if (startedNotificationThread) {
+ if ((startEvent ? startedNotificationThread : stoppedNotificationThread)) {
break;
}
try {
@@ -173,14 +240,18 @@ public class Engine implements EventListener, EntitlementService {
Thread.currentThread().interrupt();
throw new EntitlementError(e);
}
- } while (!startedNotificationThread &&
+ } while (!(startEvent ? startedNotificationThread : stoppedNotificationThread) &&
(System.nanoTime() - ini) / NANO_TO_MS < MAX_NOTIFICATION_THREAD_WAIT_MS);
- if (!startedNotificationThread) {
- log.error("Could not start notification thread in %d msec !!!", MAX_NOTIFICATION_THREAD_WAIT_MS);
+ if (!(startEvent ? startedNotificationThread : stoppedNotificationThread)) {
+ log.error("Could not {} notification thread in {} msec !!!",
+ (startEvent ? "start" : "stop"),
+ MAX_NOTIFICATION_THREAD_WAIT_MS);
throw new EntitlementError("Failed to start service!!");
}
- log.info("Notification thread has been started in {} ms", (System.nanoTime() - ini) / NANO_TO_MS);
+ log.info("Notification thread has been {} in {} ms",
+ (startEvent ? "started" : "stopped"),
+ (System.nanoTime() - ini) / NANO_TO_MS);
}
}
diff --git a/entitlement/src/main/java/com/ning/billing/entitlement/engine/core/EventListener.java b/entitlement/src/main/java/com/ning/billing/entitlement/engine/core/EventListener.java
index 7a84c51..e9962d8 100644
--- a/entitlement/src/main/java/com/ning/billing/entitlement/engine/core/EventListener.java
+++ b/entitlement/src/main/java/com/ning/billing/entitlement/engine/core/EventListener.java
@@ -24,5 +24,4 @@ public interface EventListener {
public void processEventReady(EntitlementEvent event);
- public void completedNotificationStart();
}
diff --git a/entitlement/src/main/java/com/ning/billing/entitlement/engine/dao/EntitlementDao.java b/entitlement/src/main/java/com/ning/billing/entitlement/engine/dao/EntitlementDao.java
index b118110..ea62b84 100644
--- a/entitlement/src/main/java/com/ning/billing/entitlement/engine/dao/EntitlementDao.java
+++ b/entitlement/src/main/java/com/ning/billing/entitlement/engine/dao/EntitlementDao.java
@@ -54,14 +54,12 @@ public interface EntitlementDao {
// Event apis
public void createNextPhaseEvent(UUID subscriptionId, EntitlementEvent nextPhase);
+ public EntitlementEvent getEventById(UUID eventId);
+
public List<EntitlementEvent> getEventsForSubscription(UUID subscriptionId);
public List<EntitlementEvent> getPendingEventsForSubscription(UUID subscriptionId);
- public List<EntitlementEvent> getEventsReady(UUID ownerId, int sequenceId);
-
- public void clearEventsReady(UUID ownerId, Collection<EntitlementEvent> cleared);
-
// Subscription creation, cancellation, changePlan apis
public void createSubscription(SubscriptionData subscription, List<EntitlementEvent> initialEvents);
diff --git a/entitlement/src/main/java/com/ning/billing/entitlement/engine/dao/EntitlementSqlDao.java b/entitlement/src/main/java/com/ning/billing/entitlement/engine/dao/EntitlementSqlDao.java
index 016f005..72a71d8 100644
--- a/entitlement/src/main/java/com/ning/billing/entitlement/engine/dao/EntitlementSqlDao.java
+++ b/entitlement/src/main/java/com/ning/billing/entitlement/engine/dao/EntitlementSqlDao.java
@@ -16,9 +16,14 @@
package com.ning.billing.entitlement.engine.dao;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Date;
+import java.util.List;
+import java.util.UUID;
+
import com.google.inject.Inject;
import com.ning.billing.catalog.api.ProductCategory;
-import com.ning.billing.config.EntitlementConfig;
import com.ning.billing.entitlement.api.migration.AccountMigrationData;
import com.ning.billing.entitlement.api.migration.AccountMigrationData.BundleMigrationData;
import com.ning.billing.entitlement.api.migration.AccountMigrationData.SubscriptionMigrationData;
@@ -28,20 +33,26 @@ import com.ning.billing.entitlement.api.user.SubscriptionBundleData;
import com.ning.billing.entitlement.api.user.SubscriptionData;
import com.ning.billing.entitlement.api.user.SubscriptionFactory;
import com.ning.billing.entitlement.api.user.SubscriptionFactory.SubscriptionBuilder;
+import com.ning.billing.entitlement.engine.core.Engine;
import com.ning.billing.entitlement.events.EntitlementEvent;
import com.ning.billing.entitlement.events.EntitlementEvent.EventType;
import com.ning.billing.entitlement.events.user.ApiEvent;
import com.ning.billing.entitlement.events.user.ApiEventType;
import com.ning.billing.entitlement.exceptions.EntitlementError;
-import com.ning.billing.util.Hostname;
import com.ning.billing.util.clock.Clock;
+import com.ning.billing.util.notificationq.NotificationKey;
+import com.ning.billing.util.notificationq.NotificationQueue;
+import com.ning.billing.util.notificationq.NotificationQueueService;
+import com.ning.billing.util.notificationq.NotificationQueueService.NoSuchNotificationQueue;
+
+import org.joda.time.DateTime;
import org.skife.jdbi.v2.DBI;
import org.skife.jdbi.v2.Transaction;
import org.skife.jdbi.v2.TransactionStatus;
+import org.skife.jdbi.v2.sqlobject.mixins.Transmogrifier;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.util.*;
public class EntitlementSqlDao implements EntitlementDao {
@@ -51,19 +62,17 @@ public class EntitlementSqlDao implements EntitlementDao {
private final SubscriptionSqlDao subscriptionsDao;
private final BundleSqlDao bundlesDao;
private final EventSqlDao eventsDao;
- private final EntitlementConfig config;
- private final String hostname;
private final SubscriptionFactory factory;
+ private final NotificationQueueService notificationQueueService;
@Inject
- public EntitlementSqlDao(DBI dbi, Clock clock, EntitlementConfig config, SubscriptionFactory factory) {
+ public EntitlementSqlDao(DBI dbi, Clock clock, SubscriptionFactory factory, NotificationQueueService notificationQueueService) {
this.clock = clock;
- this.config = config;
this.factory = factory;
this.subscriptionsDao = dbi.onDemand(SubscriptionSqlDao.class);
this.eventsDao = dbi.onDemand(EventSqlDao.class);
this.bundlesDao = dbi.onDemand(BundleSqlDao.class);
- this.hostname = Hostname.get();
+ this.notificationQueueService = notificationQueueService;
}
@Override
@@ -140,11 +149,24 @@ public class EntitlementSqlDao implements EntitlementDao {
TransactionStatus status) throws Exception {
cancelNextPhaseEventFromTransaction(subscriptionId, dao);
dao.insertEvent(nextPhase);
+ recordFutureNotificationFromTransaction(dao,
+ nextPhase.getEffectiveDate(),
+ new NotificationKey() {
+ @Override
+ public String toString() {
+ return nextPhase.getId().toString();
+ }
+ });
return null;
}
});
}
+ @Override
+ public EntitlementEvent getEventById(UUID eventId) {
+ return eventsDao.getEventById(eventId.toString());
+ }
+
@Override
public List<EntitlementEvent> getEventsForSubscription(UUID subscriptionId) {
@@ -159,61 +181,6 @@ public class EntitlementSqlDao implements EntitlementDao {
return results;
}
- @Override
- public List<EntitlementEvent> getEventsReady(final UUID ownerId, final int sequenceId) {
-
- final Date now = clock.getUTCNow().toDate();
- final Date nextAvailable = clock.getUTCNow().plus(config.getDaoClaimTimeMs()).toDate();
-
- log.debug(String.format("EntitlementDao getEventsReady START effectiveNow = %s", now));
-
- List<EntitlementEvent> events = eventsDao.inTransaction(new Transaction<List<EntitlementEvent>, EventSqlDao>() {
-
- @Override
- public List<EntitlementEvent> inTransaction(EventSqlDao dao,
- TransactionStatus status) throws Exception {
-
- List<EntitlementEvent> claimedEvents = new ArrayList<EntitlementEvent>();
- List<EntitlementEvent> input = dao.getReadyEvents(now, config.getDaoMaxReadyEvents());
- for (EntitlementEvent cur : input) {
- final boolean claimed = (dao.claimEvent(ownerId.toString(), nextAvailable, cur.getId().toString(), now) == 1);
- if (claimed) {
- claimedEvents.add(cur);
- dao.insertClaimedHistory(sequenceId, ownerId.toString(), hostname, now, cur.getId().toString());
- }
- }
- return claimedEvents;
- }
- });
-
- for (EntitlementEvent cur : events) {
- log.debug(String.format("EntitlementDao %s [host %s] claimed events %s", ownerId, hostname, cur.getId()));
- if (cur.getOwner() != null && !cur.getOwner().equals(ownerId)) {
- log.warn(String.format("EventProcessor %s stealing event %s from %s", ownerId, cur, cur.getOwner()));
- }
- }
- return events;
- }
-
- @Override
- public void clearEventsReady(final UUID ownerId, final Collection<EntitlementEvent> cleared) {
-
- log.debug(String.format("EntitlementDao clearEventsReady START cleared size = %d", cleared.size()));
-
- eventsDao.inTransaction(new Transaction<Void, EventSqlDao>() {
-
- @Override
- public Void inTransaction(EventSqlDao dao,
- TransactionStatus status) throws Exception {
- // STEPH Same here batch would nice
- for (EntitlementEvent cur : cleared) {
- dao.clearEvent(cur.getId().toString(), ownerId.toString());
- log.debug(String.format("EntitlementDao %s [host %s] cleared events %s", ownerId, hostname, cur.getId()));
- }
- return null;
- }
- });
- }
@Override
public void createSubscription(final SubscriptionData subscription,
@@ -228,8 +195,16 @@ public class EntitlementSqlDao implements EntitlementDao {
dao.insertSubscription(subscription);
// STEPH batch as well
EventSqlDao eventsDaoFromSameTranscation = dao.become(EventSqlDao.class);
- for (EntitlementEvent cur : initialEvents) {
+ for (final EntitlementEvent cur : initialEvents) {
eventsDaoFromSameTranscation.insertEvent(cur);
+ recordFutureNotificationFromTransaction(dao,
+ cur.getEffectiveDate(),
+ new NotificationKey() {
+ @Override
+ public String toString() {
+ return cur.getId().toString();
+ }
+ });
}
return null;
}
@@ -246,6 +221,14 @@ public class EntitlementSqlDao implements EntitlementDao {
cancelNextChangeEventFromTransaction(subscriptionId, dao);
cancelNextPhaseEventFromTransaction(subscriptionId, dao);
dao.insertEvent(cancelEvent);
+ recordFutureNotificationFromTransaction(dao,
+ cancelEvent.getEffectiveDate(),
+ new NotificationKey() {
+ @Override
+ public String toString() {
+ return cancelEvent.getId().toString();
+ }
+ });
return null;
}
});
@@ -275,8 +258,16 @@ public class EntitlementSqlDao implements EntitlementDao {
if (existingCancelId != null) {
dao.unactiveEvent(existingCancelId.toString(), now);
- for (EntitlementEvent cur : uncancelEvents) {
+ for (final EntitlementEvent cur : uncancelEvents) {
dao.insertEvent(cur);
+ recordFutureNotificationFromTransaction(dao,
+ cur.getEffectiveDate(),
+ new NotificationKey() {
+ @Override
+ public String toString() {
+ return cur.getId().toString();
+ }
+ });
}
}
return null;
@@ -292,8 +283,16 @@ public class EntitlementSqlDao implements EntitlementDao {
TransactionStatus status) throws Exception {
cancelNextChangeEventFromTransaction(subscriptionId, dao);
cancelNextPhaseEventFromTransaction(subscriptionId, dao);
- for (EntitlementEvent cur : changeEvents) {
+ for (final EntitlementEvent cur : changeEvents) {
dao.insertEvent(cur);
+ recordFutureNotificationFromTransaction(dao,
+ cur.getEffectiveDate(),
+ new NotificationKey() {
+ @Override
+ public String toString() {
+ return cur.getId().toString();
+ }
+ });
}
return null;
}
@@ -366,8 +365,16 @@ public class EntitlementSqlDao implements EntitlementDao {
SubscriptionBundleData bundleData = curBundle.getData();
for (SubscriptionMigrationData curSubscription : curBundle.getSubscriptions()) {
SubscriptionData subData = curSubscription.getData();
- for (EntitlementEvent curEvent : curSubscription.getInitialEvents()) {
+ for (final EntitlementEvent curEvent : curSubscription.getInitialEvents()) {
transEventDao.insertEvent(curEvent);
+ recordFutureNotificationFromTransaction(transEventDao,
+ curEvent.getEffectiveDate(),
+ new NotificationKey() {
+ @Override
+ public String toString() {
+ return curEvent.getId().toString();
+ }
+ });
}
transSubDao.insertSubscription(subData);
}
@@ -378,6 +385,7 @@ public class EntitlementSqlDao implements EntitlementDao {
});
}
+
@Override
public void undoMigration(final UUID accountId) {
@@ -406,4 +414,14 @@ public class EntitlementSqlDao implements EntitlementDao {
transBundleDao.removeBundle(curBundle.getId().toString());
}
}
+
+ private void recordFutureNotificationFromTransaction(final Transmogrifier transactionalDao, final DateTime effectiveDate, final NotificationKey notificationKey) {
+ try {
+ NotificationQueue subscritionEventQueue = notificationQueueService.getNotificationQueue(Engine.ENTITLEMENT_SERVICE_NAME,
+ Engine.NOTIFICATION_QUEUE_NAME);
+ subscritionEventQueue.recordFutureNotificationFromTransaction(transactionalDao, effectiveDate, notificationKey);
+ } catch (NoSuchNotificationQueue e) {
+ throw new RuntimeException(e);
+ }
+ }
}
diff --git a/entitlement/src/main/java/com/ning/billing/entitlement/engine/dao/EventSqlDao.java b/entitlement/src/main/java/com/ning/billing/entitlement/engine/dao/EventSqlDao.java
index de61217..5f485e5 100644
--- a/entitlement/src/main/java/com/ning/billing/entitlement/engine/dao/EventSqlDao.java
+++ b/entitlement/src/main/java/com/ning/billing/entitlement/engine/dao/EventSqlDao.java
@@ -19,7 +19,6 @@ package com.ning.billing.entitlement.engine.dao;
import com.ning.billing.entitlement.events.EntitlementEvent;
import com.ning.billing.entitlement.events.EntitlementEvent.EventType;
import com.ning.billing.entitlement.events.EventBaseBuilder;
-import com.ning.billing.entitlement.events.EventLifecycle.EventLifecycleState;
import com.ning.billing.entitlement.events.phase.PhaseEvent;
import com.ning.billing.entitlement.events.phase.PhaseEventBuilder;
import com.ning.billing.entitlement.events.phase.PhaseEventData;
@@ -49,43 +48,31 @@ import java.util.UUID;
@ExternalizedSqlViaStringTemplate3()
public interface EventSqlDao extends Transactional<EventSqlDao>, CloseMe, Transmogrifier {
- //
- // APIs for event notifications
- //
@SqlQuery
- @Mapper(IEventSqlMapper.class)
- public List<EntitlementEvent> getReadyEvents(@Bind("now") Date now, @Bind("max") int max);
+ @Mapper(EventSqlMapper.class)
+ public EntitlementEvent getEventById(@Bind("event_id") String eventId);
@SqlUpdate
- public int claimEvent(@Bind("owner") String owner, @Bind("next_available") Date nextAvailable, @Bind("event_id") String eventId, @Bind("now") Date now);
+ public void insertEvent(@Bind(binder = EventSqlDaoBinder.class) EntitlementEvent evt);
@SqlUpdate
public void removeEvents(@Bind("subscription_id") String subscriptionId);
@SqlUpdate
- public void clearEvent(@Bind("event_id") String eventId, @Bind("owner") String owner);
-
- @SqlUpdate
- public void insertEvent(@Bind(binder = IEventSqlDaoBinder.class) EntitlementEvent evt);
-
- @SqlUpdate
- public void insertClaimedHistory(@Bind("sequence_id") int sequenceId, @Bind("owner_id") String ownerId, @Bind("hostname") String hostname, @Bind("claimed_dt") Date clainedDate, @Bind("event_id") String eventId);
-
- @SqlUpdate
public void unactiveEvent(@Bind("event_id")String eventId, @Bind("now") Date now);
@SqlUpdate
public void reactiveEvent(@Bind("event_id")String eventId, @Bind("now") Date now);
@SqlQuery
- @Mapper(IEventSqlMapper.class)
+ @Mapper(EventSqlMapper.class)
public List<EntitlementEvent> getFutureActiveEventForSubscription(@Bind("subscription_id") String subscriptionId, @Bind("now") Date now);
@SqlQuery
- @Mapper(IEventSqlMapper.class)
+ @Mapper(EventSqlMapper.class)
public List<EntitlementEvent> getEventsForSubscription(@Bind("subscription_id") String subscriptionId);
- public static class IEventSqlDaoBinder implements Binder<Bind, EntitlementEvent> {
+ public static class EventSqlDaoBinder implements Binder<Bind, EntitlementEvent> {
private Date getDate(DateTime dateTime) {
return dateTime == null ? null : dateTime.toDate();
@@ -106,13 +93,10 @@ public interface EventSqlDao extends Transactional<EventSqlDao>, CloseMe, Transm
stmt.bind("plist_name", (evt.getType() == EventType.API_USER) ? ((ApiEvent) evt).getPriceList() : null);
stmt.bind("current_version", evt.getActiveVersion());
stmt.bind("is_active", evt.isActive());
- stmt.bind("processing_available_dt", getDate(evt.getNextAvailableDate()));
- stmt.bind("processing_owner", (String) null);
- stmt.bind("processing_state", EventLifecycleState.AVAILABLE.toString());
}
}
- public static class IEventSqlMapper implements ResultSetMapper<EntitlementEvent> {
+ public static class EventSqlMapper implements ResultSetMapper<EntitlementEvent> {
private DateTime getDate(ResultSet r, String fieldName) throws SQLException {
final Timestamp resultStamp = r.getTimestamp(fieldName);
@@ -135,9 +119,6 @@ public interface EventSqlDao extends Transactional<EventSqlDao>, CloseMe, Transm
String priceListName = r.getString("plist_name");
long currentVersion = r.getLong("current_version");
boolean isActive = r.getBoolean("is_active");
- DateTime nextAvailableDate = getDate(r, "processing_available_dt");
- UUID processingOwner = (r.getString("processing_owner") != null) ? UUID.fromString(r.getString("processing_owner")) : null;
- EventLifecycleState processingState = EventLifecycleState.valueOf(r.getString("processing_state"));
EventBaseBuilder<?> base = ((eventType == EventType.PHASE) ?
new PhaseEventBuilder() :
@@ -148,11 +129,7 @@ public interface EventSqlDao extends Transactional<EventSqlDao>, CloseMe, Transm
.setEffectiveDate(effectiveDate)
.setProcessedDate(createdDate)
.setActiveVersion(currentVersion)
- .setActive(isActive)
- .setProcessingOwner(processingOwner)
- .setNextAvailableProcessingTime(nextAvailableDate)
- .setProcessingState(processingState);
-
+ .setActive(isActive);
EntitlementEvent result = null;
if (eventType == EventType.PHASE) {
diff --git a/entitlement/src/main/java/com/ning/billing/entitlement/events/EntitlementEvent.java b/entitlement/src/main/java/com/ning/billing/entitlement/events/EntitlementEvent.java
index af74752..b7bfece 100644
--- a/entitlement/src/main/java/com/ning/billing/entitlement/events/EntitlementEvent.java
+++ b/entitlement/src/main/java/com/ning/billing/entitlement/events/EntitlementEvent.java
@@ -21,7 +21,7 @@ import org.joda.time.DateTime;
import java.util.UUID;
-public interface EntitlementEvent extends EventLifecycle, Comparable<EntitlementEvent> {
+public interface EntitlementEvent extends Comparable<EntitlementEvent> {
public enum EventType {
API_USER,
@@ -32,6 +32,16 @@ public interface EntitlementEvent extends EventLifecycle, Comparable<Entitlement
public UUID getId();
+ public long getActiveVersion();
+
+ public void setActiveVersion(long activeVersion);
+
+ public boolean isActive();
+
+ public void deactivate();
+
+ public void reactivate();
+
public DateTime getProcessedDate();
public DateTime getRequestedDate();
diff --git a/entitlement/src/main/java/com/ning/billing/entitlement/events/EventBase.java b/entitlement/src/main/java/com/ning/billing/entitlement/events/EventBase.java
index 93dc9e1..9420fbf 100644
--- a/entitlement/src/main/java/com/ning/billing/entitlement/events/EventBase.java
+++ b/entitlement/src/main/java/com/ning/billing/entitlement/events/EventBase.java
@@ -30,12 +30,8 @@ public abstract class EventBase implements EntitlementEvent {
private final DateTime effectiveDate;
private final DateTime processedDate;
- // Lifecyle of the event
private long activeVersion;
private boolean isActive;
- private UUID processingOwner;
- private DateTime nextAvailableProcessingTime;
- private EventLifecycleState processingState;
public EventBase(EventBaseBuilder<?> builder) {
this.uuid = builder.getUuid();
@@ -46,31 +42,17 @@ public abstract class EventBase implements EntitlementEvent {
this.activeVersion = builder.getActiveVersion();
this.isActive = builder.isActive();
- this.processingOwner = builder.getProcessingOwner();
- this.nextAvailableProcessingTime = builder.getNextAvailableProcessingTime();
- this.processingState = builder.getProcessingState();
}
public EventBase(UUID subscriptionId, DateTime requestedDate,
DateTime effectiveDate, DateTime processedDate,
long activeVersion, boolean isActive) {
- this(subscriptionId, requestedDate, effectiveDate, processedDate, activeVersion, isActive, null, null, EventLifecycleState.AVAILABLE);
- }
-
- private EventBase(UUID subscriptionId, DateTime requestedDate,
- DateTime effectiveDate, DateTime processedDate,
- long activeVersion, boolean isActive,
- UUID processingOwner, DateTime nextAvailableProcessingTime,
- EventLifecycleState processingState) {
- this(UUID.randomUUID(), subscriptionId, requestedDate, effectiveDate, processedDate, activeVersion, isActive,
- processingOwner, nextAvailableProcessingTime, processingState);
+ this(UUID.randomUUID(), subscriptionId, requestedDate, effectiveDate, processedDate, activeVersion, isActive);
}
public EventBase(UUID id, UUID subscriptionId, DateTime requestedDate,
DateTime effectiveDate, DateTime processedDate,
- long activeVersion, boolean isActive,
- UUID processingOwner, DateTime nextAvailableProcessingTime,
- EventLifecycleState processingState) {
+ long activeVersion, boolean isActive) {
this.uuid = id;
this.subscriptionId = subscriptionId;
this.requestedDate = requestedDate;
@@ -79,10 +61,6 @@ public abstract class EventBase implements EntitlementEvent {
this.activeVersion = activeVersion;
this.isActive = isActive;
- this.processingOwner = processingOwner;
- this.nextAvailableProcessingTime = nextAvailableProcessingTime;
- this.processingState = processingState;
-
}
@@ -138,64 +116,9 @@ public abstract class EventBase implements EntitlementEvent {
}
- @Override
- public UUID getOwner() {
- return processingOwner;
- }
-
- @Override
- public void setOwner(UUID owner) {
- this.processingOwner = owner;
- }
-
- @Override
- public DateTime getNextAvailableDate() {
- return nextAvailableProcessingTime;
- }
-
- @Override
- public void setNextAvailableDate(DateTime dateTime) {
- this.nextAvailableProcessingTime = dateTime;
- }
-
-
- @Override
- public EventLifecycleState getProcessingState() {
- return processingState;
- }
-
- @Override
- public void setProcessingState(EventLifecycleState processingState) {
- this.processingState = processingState;
- }
-
- @Override
- public boolean isAvailableForProcessing(DateTime now) {
-
- // Event got deactivated, will never be processed
- if (!isActive) {
- return false;
- }
-
- switch(processingState) {
- case AVAILABLE:
- break;
- case IN_PROCESSING:
- // Somebody already got the event, not available yet
- if (nextAvailableProcessingTime.isAfter(now)) {
- return false;
- }
- break;
- case PROCESSED:
- return false;
- default:
- throw new EntitlementError(String.format("Unkwnon IEvent processing state %s", processingState));
- }
- return effectiveDate.isBefore(now);
- }
//
- // Really used for unit tesrs only as the sql implementation relies on date first and then event insertion
+ // Really used for unit tests only as the sql implementation relies on date first and then event insertion
//
// Order first by:
// - effectiveDate, followed by processedDate, requestedDate
diff --git a/entitlement/src/main/java/com/ning/billing/entitlement/events/EventBaseBuilder.java b/entitlement/src/main/java/com/ning/billing/entitlement/events/EventBaseBuilder.java
index 104fbef..17f5e15 100644
--- a/entitlement/src/main/java/com/ning/billing/entitlement/events/EventBaseBuilder.java
+++ b/entitlement/src/main/java/com/ning/billing/entitlement/events/EventBaseBuilder.java
@@ -16,7 +16,6 @@
package com.ning.billing.entitlement.events;
-import com.ning.billing.entitlement.events.EventLifecycle.EventLifecycleState;
import org.joda.time.DateTime;
import java.util.UUID;
@@ -32,15 +31,11 @@ public class EventBaseBuilder<T extends EventBaseBuilder<T>> {
private long activeVersion;
private boolean isActive;
- private UUID processingOwner;
- private DateTime nextAvailableProcessingTime;
- private EventLifecycleState processingState;
public EventBaseBuilder() {
this.uuid = UUID.randomUUID();
this.isActive = true;
- this.processingState = EventLifecycleState.AVAILABLE;
}
public EventBaseBuilder(EventBaseBuilder<?> copy) {
@@ -52,9 +47,6 @@ public class EventBaseBuilder<T extends EventBaseBuilder<T>> {
this.activeVersion = copy.activeVersion;
this.isActive = copy.isActive;
- this.processingOwner = copy.processingOwner;
- this.nextAvailableProcessingTime = copy.nextAvailableProcessingTime;
- this.processingState = copy.processingState;
}
public T setUuid(UUID uuid) {
@@ -92,21 +84,6 @@ public class EventBaseBuilder<T extends EventBaseBuilder<T>> {
return (T) this;
}
- public T setProcessingOwner(UUID processingOwner) {
- this.processingOwner = processingOwner;
- return (T) this;
- }
-
- public T setNextAvailableProcessingTime(DateTime nextAvailableProcessingTime) {
- this.nextAvailableProcessingTime = nextAvailableProcessingTime;
- return (T) this;
- }
-
- public T setProcessingState(EventLifecycleState processingState) {
- this.processingState = processingState;
- return (T) this;
- }
-
public UUID getUuid() {
return uuid;
}
@@ -134,16 +111,4 @@ public class EventBaseBuilder<T extends EventBaseBuilder<T>> {
public boolean isActive() {
return isActive;
}
-
- public UUID getProcessingOwner() {
- return processingOwner;
- }
-
- public DateTime getNextAvailableProcessingTime() {
- return nextAvailableProcessingTime;
- }
-
- public EventLifecycleState getProcessingState() {
- return processingState;
- }
}
diff --git a/entitlement/src/main/java/com/ning/billing/entitlement/events/user/ApiEventBase.java b/entitlement/src/main/java/com/ning/billing/entitlement/events/user/ApiEventBase.java
index 3f033a4..2438ddf 100644
--- a/entitlement/src/main/java/com/ning/billing/entitlement/events/user/ApiEventBase.java
+++ b/entitlement/src/main/java/com/ning/billing/entitlement/events/user/ApiEventBase.java
@@ -58,17 +58,6 @@ public class ApiEventBase extends EventBase implements ApiEvent {
}
- public ApiEventBase(UUID id, UUID subscriptionId, DateTime processed, String eventPlan, String eventPhase,
- String priceList, DateTime requestedDate, ApiEventType eventType, DateTime effectiveDate, long activeVersion,
- boolean isActive, UUID processingOwner, DateTime nextAvailableProcessingTime,EventLifecycleState processingState) {
- super(id, subscriptionId, requestedDate, effectiveDate, processed, activeVersion, isActive, processingOwner, nextAvailableProcessingTime, processingState);
- this.eventType = eventType;
- this.eventPlan = eventPlan;
- this.eventPlanPhase = eventPhase;
- this.eventPriceList = priceList;
- }
-
-
@Override
public ApiEventType getEventType() {
return eventType;
diff --git a/entitlement/src/main/java/com/ning/billing/entitlement/glue/EntitlementModule.java b/entitlement/src/main/java/com/ning/billing/entitlement/glue/EntitlementModule.java
index c1268ca..d2776ad 100644
--- a/entitlement/src/main/java/com/ning/billing/entitlement/glue/EntitlementModule.java
+++ b/entitlement/src/main/java/com/ning/billing/entitlement/glue/EntitlementModule.java
@@ -32,12 +32,9 @@ import com.ning.billing.entitlement.api.test.EntitlementTestApi;
import com.ning.billing.entitlement.api.user.DefaultEntitlementUserApi;
import com.ning.billing.entitlement.api.user.EntitlementUserApi;
import com.ning.billing.entitlement.api.user.SubscriptionApiService;
-import com.ning.billing.entitlement.engine.core.DefaultApiEventProcessor;
import com.ning.billing.entitlement.engine.core.Engine;
-import com.ning.billing.entitlement.engine.core.EventNotifier;
import com.ning.billing.entitlement.engine.dao.EntitlementDao;
import com.ning.billing.entitlement.engine.dao.EntitlementSqlDao;
-import com.ning.billing.util.glue.ClockModule;
@@ -47,10 +44,7 @@ public class EntitlementModule extends AbstractModule {
final EntitlementConfig config = new ConfigurationObjectFactory(System.getProperties()).build(EntitlementConfig.class);
bind(EntitlementConfig.class).toInstance(config);
}
-
- protected void installApiEventProcessor() {
- bind(EventNotifier.class).to(DefaultApiEventProcessor.class).asEagerSingleton();
- }
+
protected void installEntitlementDao() {
bind(EntitlementDao.class).to(EntitlementSqlDao.class).asEagerSingleton();
@@ -71,7 +65,6 @@ public class EntitlementModule extends AbstractModule {
@Override
protected void configure() {
installConfig();
- installApiEventProcessor();
installEntitlementDao();
installEntitlementCore();
}
diff --git a/entitlement/src/main/resources/com/ning/billing/entitlement/ddl.sql b/entitlement/src/main/resources/com/ning/billing/entitlement/ddl.sql
index 4540ad8..55ad7f4 100644
--- a/entitlement/src/main/resources/com/ning/billing/entitlement/ddl.sql
+++ b/entitlement/src/main/resources/com/ning/billing/entitlement/ddl.sql
@@ -14,24 +14,9 @@ CREATE TABLE events (
plist_name varchar(64) DEFAULT NULL,
current_version int(11) DEFAULT 1,
is_active bool DEFAULT 1,
- processing_owner char(36) DEFAULT NULL,
- processing_available_dt datetime DEFAULT NULL,
- processing_state varchar(14) DEFAULT 'AVAILABLE',
PRIMARY KEY(id)
) ENGINE=innodb;
-DROP TABLE IF EXISTS claimed_events;
-CREATE TABLE claimed_events (
- id int(11) unsigned NOT NULL AUTO_INCREMENT,
- sequence_id int(11) unsigned NOT NULL,
- owner_id char(36) NOT NULL,
- hostname varchar(64) NOT NULL,
- claimed_dt datetime NOT NULL,
- event_id char(36) NOT NULL,
- PRIMARY KEY(id)
-) ENGINE=innodb;
-
-
DROP TABLE IF EXISTS subscriptions;
CREATE TABLE subscriptions (
id char(36) NOT NULL,
diff --git a/entitlement/src/main/resources/com/ning/billing/entitlement/engine/dao/EventSqlDao.sql.stg b/entitlement/src/main/resources/com/ning/billing/entitlement/engine/dao/EventSqlDao.sql.stg
index 31e9eaf..704e2c7 100644
--- a/entitlement/src/main/resources/com/ning/billing/entitlement/engine/dao/EventSqlDao.sql.stg
+++ b/entitlement/src/main/resources/com/ning/billing/entitlement/engine/dao/EventSqlDao.sql.stg
@@ -1,8 +1,8 @@
group EventSqlDao;
-getReadyEvents(now, max) ::= <<
- select
- event_id
+getEventById(event_id) ::= <<
+ select
+ event_id
, event_type
, user_type
, created_dt
@@ -14,48 +14,11 @@ getReadyEvents(now, max) ::= <<
, phase_name
, plist_name
, current_version
- , is_active
- , processing_owner
- , processing_available_dt
- , processing_state
- from events
- where
- effective_dt \<= :now
- and is_active = 1
- and processing_state != 'PROCESSED'
- and (processing_owner IS NULL OR processing_available_dt \<= :now)
- order by
- effective_dt asc
- , created_dt asc
- , requested_dt asc
- , id asc
- limit :max
- ;
->>
-
-claimEvent(owner, next_available, event_id, now) ::= <<
- update events
- set
- processing_owner = :owner
- , processing_available_dt = :next_available
- , processing_state = 'IN_PROCESSING'
- where
- event_id = :event_id
- and is_active = 1
- and processing_state != 'PROCESSED'
- and (processing_owner IS NULL OR processing_available_dt \<= :now)
- ;
->>
-
-clearEvent(event_id, owner) ::= <<
- update events
- set
- processing_owner = NULL
- , processing_state = 'PROCESSED'
- where
+ , is_active
+ from events
+ where
event_id = :event_id
- and processing_owner = :owner
- ;
+ ;
>>
insertEvent() ::= <<
@@ -73,9 +36,6 @@ insertEvent() ::= <<
, plist_name
, current_version
, is_active
- , processing_owner
- , processing_available_dt
- , processing_state
) values (
:event_id
, :event_type
@@ -90,9 +50,6 @@ insertEvent() ::= <<
, :plist_name
, :current_version
, :is_active
- , :processing_owner
- , :processing_available_dt
- , :processing_state
);
>>
@@ -103,22 +60,6 @@ removeEvents(subscription_id) ::= <<
;
>>
-insertClaimedHistory(sequence_id, owner_id, hostname, claimed_dt, event_id) ::= <<
- insert into claimed_events (
- sequence_id
- , owner_id
- , hostname
- , claimed_dt
- , event_id
- ) values (
- :sequence_id
- , :owner_id
- , :hostname
- , :claimed_dt
- , :event_id
- );
->>
-
unactiveEvent(event_id, now) ::= <<
update events
set
@@ -154,9 +95,6 @@ getFutureActiveEventForSubscription(subscription_id, now) ::= <<
, plist_name
, current_version
, is_active
- , processing_owner
- , processing_available_dt
- , processing_state
from events
where
subscription_id = :subscription_id
@@ -185,9 +123,6 @@ getEventsForSubscription(subscription_id) ::= <<
, plist_name
, current_version
, is_active
- , processing_owner
- , processing_available_dt
- , processing_state
from events
where
subscription_id = :subscription_id
diff --git a/entitlement/src/test/java/com/ning/billing/entitlement/api/billing/BrainDeadMockEntitlementDao.java b/entitlement/src/test/java/com/ning/billing/entitlement/api/billing/BrainDeadMockEntitlementDao.java
index f7d8186..68cae5a 100644
--- a/entitlement/src/test/java/com/ning/billing/entitlement/api/billing/BrainDeadMockEntitlementDao.java
+++ b/entitlement/src/test/java/com/ning/billing/entitlement/api/billing/BrainDeadMockEntitlementDao.java
@@ -99,18 +99,6 @@ class BrainDeadMockEntitlementDao implements EntitlementDao {
}
@Override
- public List<EntitlementEvent> getEventsReady(UUID ownerId,
- int sequenceId) {
- throw new UnsupportedOperationException();
- }
-
- @Override
- public void clearEventsReady(UUID ownerId,
- Collection<EntitlementEvent> cleared) {
- throw new UnsupportedOperationException();
- }
-
- @Override
public void createSubscription(SubscriptionData subscription,
List<EntitlementEvent> initialEvents) {
throw new UnsupportedOperationException();
@@ -144,5 +132,10 @@ class BrainDeadMockEntitlementDao implements EntitlementDao {
public void undoMigration(UUID accountId) {
throw new UnsupportedOperationException();
}
+
+ @Override
+ public EntitlementEvent getEventById(UUID eventId) {
+ throw new UnsupportedOperationException();
+ }
}
\ No newline at end of file
diff --git a/entitlement/src/test/java/com/ning/billing/entitlement/api/TestApiBase.java b/entitlement/src/test/java/com/ning/billing/entitlement/api/TestApiBase.java
index 81696b0..b0483bb 100644
--- a/entitlement/src/test/java/com/ning/billing/entitlement/api/TestApiBase.java
+++ b/entitlement/src/test/java/com/ning/billing/entitlement/api/TestApiBase.java
@@ -94,7 +94,7 @@ public abstract class TestApiBase {
protected ApiTestListener testListener;
protected SubscriptionBundle bundle;
- public static void loadSystemPropertiesFromClasspath( final String resource )
+ public static void loadSystemPropertiesFromClasspath(final String resource)
{
final URL url = TestApiBase.class.getResource(resource);
assertNotNull(url);
diff --git a/entitlement/src/test/java/com/ning/billing/entitlement/api/user/TestUserApiCancelSql.java b/entitlement/src/test/java/com/ning/billing/entitlement/api/user/TestUserApiCancelSql.java
index 187c080..87491c7 100644
--- a/entitlement/src/test/java/com/ning/billing/entitlement/api/user/TestUserApiCancelSql.java
+++ b/entitlement/src/test/java/com/ning/billing/entitlement/api/user/TestUserApiCancelSql.java
@@ -32,7 +32,7 @@ public class TestUserApiCancelSql extends TestUserApiCancel {
return Guice.createInjector(Stage.DEVELOPMENT, new MockEngineModuleSql());
}
- @Test(enabled= true, groups={"stress"})
+ @Test(enabled= false, groups={"stress"})
public void stressTest() {
for (int i = 0; i < MAX_STRESS_ITERATIONS; i++) {
cleanupTest();
diff --git a/entitlement/src/test/java/com/ning/billing/entitlement/api/user/TestUserApiChangePlan.java b/entitlement/src/test/java/com/ning/billing/entitlement/api/user/TestUserApiChangePlan.java
index dc6567f..51c3d8b 100644
--- a/entitlement/src/test/java/com/ning/billing/entitlement/api/user/TestUserApiChangePlan.java
+++ b/entitlement/src/test/java/com/ning/billing/entitlement/api/user/TestUserApiChangePlan.java
@@ -198,6 +198,15 @@ public abstract class TestUserApiChangePlan extends TestApiBase {
testListener.pushExpectedEvent(NextEvent.PHASE);
clock.addDeltaFromReality(currentPhase.getDuration());
DateTime futureNow = clock.getUTCNow();
+
+ /*
+ try {
+ Thread.sleep(1000 * 3000);
+ } catch (Exception e) {
+
+ }
+ */
+
assertTrue(futureNow.isAfter(nextExpectedPhaseChange));
assertTrue(testListener.isCompleted(3000));
diff --git a/entitlement/src/test/java/com/ning/billing/entitlement/engine/dao/MockEntitlementDao.java b/entitlement/src/test/java/com/ning/billing/entitlement/engine/dao/MockEntitlementDao.java
index 74ac1e7..3623da2 100644
--- a/entitlement/src/test/java/com/ning/billing/entitlement/engine/dao/MockEntitlementDao.java
+++ b/entitlement/src/test/java/com/ning/billing/entitlement/engine/dao/MockEntitlementDao.java
@@ -17,5 +17,6 @@
package com.ning.billing.entitlement.engine.dao;
public interface MockEntitlementDao {
+
public void reset();
}
diff --git a/entitlement/src/test/java/com/ning/billing/entitlement/engine/dao/MockEntitlementDaoMemory.java b/entitlement/src/test/java/com/ning/billing/entitlement/engine/dao/MockEntitlementDaoMemory.java
index 7fc8342..86e458f 100644
--- a/entitlement/src/test/java/com/ning/billing/entitlement/engine/dao/MockEntitlementDaoMemory.java
+++ b/entitlement/src/test/java/com/ning/billing/entitlement/engine/dao/MockEntitlementDaoMemory.java
@@ -31,16 +31,26 @@ import com.ning.billing.entitlement.api.user.SubscriptionBundleData;
import com.ning.billing.entitlement.api.user.SubscriptionFactory;
import com.ning.billing.entitlement.api.user.SubscriptionFactory.SubscriptionBuilder;
+import com.ning.billing.entitlement.engine.core.Engine;
import com.ning.billing.entitlement.events.EntitlementEvent;
import com.ning.billing.entitlement.events.EntitlementEvent.EventType;
-import com.ning.billing.entitlement.events.EventLifecycle.EventLifecycleState;
import com.ning.billing.entitlement.events.user.ApiEvent;
import com.ning.billing.entitlement.events.user.ApiEventType;
import com.ning.billing.util.clock.Clock;
+import com.ning.billing.util.notificationq.NotificationKey;
+import com.ning.billing.util.notificationq.NotificationLifecycle;
+import com.ning.billing.util.notificationq.NotificationQueue;
+import com.ning.billing.util.notificationq.NotificationQueueService;
+import com.ning.billing.util.notificationq.NotificationQueueService.NoSuchNotificationQueue;
+
+import org.joda.time.DateTime;
+import org.skife.jdbi.v2.sqlobject.mixins.Transmogrifier;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.*;
+import java.util.concurrent.Executor;
+import java.util.concurrent.Executors;
public class MockEntitlementDaoMemory implements EntitlementDao, MockEntitlementDao {
@@ -52,15 +62,15 @@ public class MockEntitlementDaoMemory implements EntitlementDao, MockEntitlement
private final Clock clock;
private final EntitlementConfig config;
private final SubscriptionFactory factory;
-
-
+ private final NotificationQueueService notificationQueueService;
@Inject
- public MockEntitlementDaoMemory(Clock clock, EntitlementConfig config, SubscriptionFactory factory) {
+ public MockEntitlementDaoMemory(Clock clock, EntitlementConfig config, SubscriptionFactory factory, NotificationQueueService notificationQueueService) {
super();
this.clock = clock;
this.config = config;
this.factory = factory;
+ this.notificationQueueService = notificationQueueService;
this.bundles = new ArrayList<SubscriptionBundle>();
this.subscriptions = new ArrayList<Subscription>();
this.events = new TreeSet<EntitlementEvent>();
@@ -138,6 +148,14 @@ public class MockEntitlementDaoMemory implements EntitlementDao, MockEntitlement
synchronized(events) {
events.addAll(initalEvents);
+ for (final EntitlementEvent cur : initalEvents) {
+ recordFutureNotificationFromTransaction(null, cur.getEffectiveDate(), new NotificationKey() {
+ @Override
+ public String toString() {
+ return cur.getId().toString();
+ }
+ });
+ }
}
Subscription updatedSubscription = buildSubscription(subscription);
subscriptions.add(updatedSubscription);
@@ -174,7 +192,7 @@ public class MockEntitlementDaoMemory implements EntitlementDao, MockEntitlement
List<EntitlementEvent> results = new LinkedList<EntitlementEvent>();
for (EntitlementEvent cur : events) {
if (cur.isActive() &&
- cur.getProcessingState() == EventLifecycleState.AVAILABLE &&
+ cur.getEffectiveDate().isAfter(clock.getUTCNow()) &&
cur.getSubscriptionId().equals(subscriptionId)) {
results.add(cur);
}
@@ -202,39 +220,6 @@ public class MockEntitlementDaoMemory implements EntitlementDao, MockEntitlement
}
- @Override
- public List<EntitlementEvent> getEventsReady(UUID ownerId, int sequenceId) {
- synchronized(events) {
- List<EntitlementEvent> readyList = new LinkedList<EntitlementEvent>();
- for (EntitlementEvent cur : events) {
- if (cur.isAvailableForProcessing(clock.getUTCNow())) {
-
- if (cur.getOwner() != null) {
- log.warn(String.format("EventProcessor %s stealing event %s from %s", ownerId, cur, cur.getOwner()));
- }
- cur.setOwner(ownerId);
- cur.setNextAvailableDate(clock.getUTCNow().plus(config.getDaoClaimTimeMs()));
- cur.setProcessingState(EventLifecycleState.IN_PROCESSING);
- readyList.add(cur);
- }
- }
- Collections.sort(readyList);
- return readyList;
- }
- }
-
- @Override
- public void clearEventsReady(UUID ownerId, Collection<EntitlementEvent> cleared) {
- synchronized(events) {
- for (EntitlementEvent cur : cleared) {
- if (cur.getOwner().equals(ownerId)) {
- cur.setProcessingState(EventLifecycleState.PROCESSED);
- } else {
- log.warn(String.format("EventProcessor %s trying to clear event %s that it does not own", ownerId, cur));
- }
- }
- }
- }
private Subscription buildSubscription(SubscriptionData in) {
return factory.createSubscription(new SubscriptionBuilder(in), getEventsForSubscription(in.getId()));
@@ -272,12 +257,26 @@ public class MockEntitlementDaoMemory implements EntitlementDao, MockEntitlement
cancelNextChangeEvent(subscriptionId);
cancelNextPhaseEvent(subscriptionId);
events.addAll(changeEvents);
+ for (final EntitlementEvent cur : changeEvents) {
+ recordFutureNotificationFromTransaction(null, cur.getEffectiveDate(), new NotificationKey() {
+ @Override
+ public String toString() {
+ return cur.getId().toString();
+ }
+ });
+ }
}
}
- private void insertEvent(EntitlementEvent event) {
+ private void insertEvent(final EntitlementEvent event) {
synchronized(events) {
events.add(event);
+ recordFutureNotificationFromTransaction(null, event.getEffectiveDate(), new NotificationKey() {
+ @Override
+ public String toString() {
+ return event.getId().toString();
+ }
+ });
}
}
@@ -298,10 +297,11 @@ public class MockEntitlementDaoMemory implements EntitlementDao, MockEntitlement
continue;
}
if (cur.getType() == EventType.PHASE &&
- cur.getProcessingState() == EventLifecycleState.AVAILABLE) {
+ cur.getEffectiveDate().isAfter(clock.getUTCNow())) {
cur.deactivate();
break;
}
+
}
}
}
@@ -319,7 +319,7 @@ public class MockEntitlementDaoMemory implements EntitlementDao, MockEntitlement
}
if (cur.getType() == EventType.API_USER &&
ApiEventType.CHANGE == ((ApiEvent) cur).getEventType() &&
- cur.getProcessingState() == EventLifecycleState.AVAILABLE) {
+ cur.getEffectiveDate().isAfter(clock.getUTCNow())) {
cur.deactivate();
break;
}
@@ -364,8 +364,15 @@ public class MockEntitlementDaoMemory implements EntitlementDao, MockEntitlement
SubscriptionBundleData bundleData = curBundle.getData();
for (SubscriptionMigrationData curSubscription : curBundle.getSubscriptions()) {
SubscriptionData subData = curSubscription.getData();
- for (EntitlementEvent curEvent : curSubscription.getInitialEvents()) {
+ for (final EntitlementEvent curEvent : curSubscription.getInitialEvents()) {
events.add(curEvent);
+ recordFutureNotificationFromTransaction(null, curEvent.getEffectiveDate(), new NotificationKey() {
+ @Override
+ public String toString() {
+ return curEvent.getId().toString();
+ }
+ });
+
}
subscriptions.add(subData);
}
@@ -393,4 +400,26 @@ public class MockEntitlementDaoMemory implements EntitlementDao, MockEntitlement
}
}
+
+ @Override
+ public EntitlementEvent getEventById(UUID eventId) {
+ synchronized(events) {
+ for (EntitlementEvent cur : events) {
+ if (cur.getId().equals(eventId)) {
+ return cur;
+ }
+ }
+ }
+ return null;
+ }
+
+ private void recordFutureNotificationFromTransaction(final Transmogrifier transactionalDao, final DateTime effectiveDate, final NotificationKey notificationKey) {
+ try {
+ NotificationQueue subscritionEventQueue = notificationQueueService.getNotificationQueue(Engine.ENTITLEMENT_SERVICE_NAME,
+ Engine.NOTIFICATION_QUEUE_NAME);
+ subscritionEventQueue.recordFutureNotificationFromTransaction(transactionalDao, effectiveDate, notificationKey);
+ } catch (NoSuchNotificationQueue e) {
+ throw new RuntimeException(e);
+ }
+ }
}
diff --git a/entitlement/src/test/java/com/ning/billing/entitlement/engine/dao/MockEntitlementDaoSql.java b/entitlement/src/test/java/com/ning/billing/entitlement/engine/dao/MockEntitlementDaoSql.java
index 2ebdea9..503fd1a 100644
--- a/entitlement/src/test/java/com/ning/billing/entitlement/engine/dao/MockEntitlementDaoSql.java
+++ b/entitlement/src/test/java/com/ning/billing/entitlement/engine/dao/MockEntitlementDaoSql.java
@@ -20,6 +20,8 @@ import com.google.inject.Inject;
import com.ning.billing.config.EntitlementConfig;
import com.ning.billing.entitlement.api.user.SubscriptionFactory;
import com.ning.billing.util.clock.Clock;
+import com.ning.billing.util.notificationq.NotificationQueueService;
+
import org.skife.jdbi.v2.DBI;
import org.skife.jdbi.v2.Transaction;
import org.skife.jdbi.v2.TransactionStatus;
@@ -32,11 +34,12 @@ public class MockEntitlementDaoSql extends EntitlementSqlDao implements MockEnti
private final ResetSqlDao resetDao;
@Inject
- public MockEntitlementDaoSql(DBI dbi, Clock clock, EntitlementConfig config, SubscriptionFactory factory) {
- super(dbi, clock, config, factory);
+ public MockEntitlementDaoSql(DBI dbi, Clock clock, SubscriptionFactory factory, NotificationQueueService notificationQueueService) {
+ super(dbi, clock, factory, notificationQueueService);
this.resetDao = dbi.onDemand(ResetSqlDao.class);
}
+
@Override
public void reset() {
resetDao.inTransaction(new Transaction<Void, ResetSqlDao>() {
@@ -45,9 +48,10 @@ public class MockEntitlementDaoSql extends EntitlementSqlDao implements MockEnti
public Void inTransaction(ResetSqlDao dao, TransactionStatus status)
throws Exception {
resetDao.resetEvents();
- resetDao.resetClaimedEvents();
resetDao.resetSubscriptions();
resetDao.resetBundles();
+ resetDao.resetClaimedNotifications();
+ resetDao.resetNotifications();
return null;
}
});
@@ -58,14 +62,17 @@ public class MockEntitlementDaoSql extends EntitlementSqlDao implements MockEnti
@SqlUpdate("truncate table events")
public void resetEvents();
- @SqlUpdate("truncate table claimed_events")
- public void resetClaimedEvents();
-
@SqlUpdate("truncate table subscriptions")
public void resetSubscriptions();
@SqlUpdate("truncate table bundles")
public void resetBundles();
- }
+ @SqlUpdate("truncate table notifications")
+ public void resetNotifications();
+
+ @SqlUpdate("truncate table claimed_notifications")
+ public void resetClaimedNotifications();
+
+ }
}
diff --git a/entitlement/src/test/java/com/ning/billing/entitlement/glue/MockEngineModule.java b/entitlement/src/test/java/com/ning/billing/entitlement/glue/MockEngineModule.java
index e9033b1..0f6fca3 100644
--- a/entitlement/src/test/java/com/ning/billing/entitlement/glue/MockEngineModule.java
+++ b/entitlement/src/test/java/com/ning/billing/entitlement/glue/MockEngineModule.java
@@ -20,7 +20,6 @@ import com.ning.billing.account.glue.AccountModuleWithMocks;
import com.ning.billing.catalog.glue.CatalogModule;
import com.ning.billing.util.clock.MockClockModule;
import com.ning.billing.util.glue.EventBusModule;
-import com.ning.billing.entitlement.glue.EntitlementModule;
public class MockEngineModule extends EntitlementModule {
diff --git a/entitlement/src/test/java/com/ning/billing/entitlement/glue/MockEngineModuleMemory.java b/entitlement/src/test/java/com/ning/billing/entitlement/glue/MockEngineModuleMemory.java
index c6ce269..786f1e3 100644
--- a/entitlement/src/test/java/com/ning/billing/entitlement/glue/MockEngineModuleMemory.java
+++ b/entitlement/src/test/java/com/ning/billing/entitlement/glue/MockEngineModuleMemory.java
@@ -17,16 +17,12 @@
package com.ning.billing.entitlement.glue;
-import com.ning.billing.entitlement.engine.core.EventNotifier;
-import com.ning.billing.entitlement.engine.core.MockApiEventProcessorMemory;
import com.ning.billing.entitlement.engine.dao.EntitlementDao;
import com.ning.billing.entitlement.engine.dao.MockEntitlementDaoMemory;
+import com.ning.billing.util.notificationq.MockNotificationQueueService;
+import com.ning.billing.util.notificationq.NotificationQueueService;
public class MockEngineModuleMemory extends MockEngineModule {
- @Override
- protected void installApiEventProcessor() {
- bind(EventNotifier.class).to(MockApiEventProcessorMemory.class).asEagerSingleton();
- }
@Override
protected void installEntitlementDao() {
@@ -34,8 +30,12 @@ public class MockEngineModuleMemory extends MockEngineModule {
}
+ private void installNotificationQueue() {
+ bind(NotificationQueueService.class).to(MockNotificationQueueService.class).asEagerSingleton();
+ }
@Override
protected void configure() {
super.configure();
+ installNotificationQueue();
}
}
diff --git a/entitlement/src/test/java/com/ning/billing/entitlement/glue/MockEngineModuleSql.java b/entitlement/src/test/java/com/ning/billing/entitlement/glue/MockEngineModuleSql.java
index fb5c890..2532c6f 100644
--- a/entitlement/src/test/java/com/ning/billing/entitlement/glue/MockEngineModuleSql.java
+++ b/entitlement/src/test/java/com/ning/billing/entitlement/glue/MockEngineModuleSql.java
@@ -22,6 +22,8 @@ import com.ning.billing.entitlement.engine.dao.EntitlementDao;
import com.ning.billing.entitlement.engine.dao.MockEntitlementDaoSql;
import com.ning.billing.util.clock.Clock;
import com.ning.billing.util.clock.ClockMock;
+import com.ning.billing.util.glue.NotificationQueueModule;
+
import org.skife.config.ConfigurationObjectFactory;
import org.skife.jdbi.v2.DBI;
@@ -42,6 +44,7 @@ public class MockEngineModuleSql extends MockEngineModule {
@Override
protected void configure() {
installDBI();
+ install(new NotificationQueueModule());
super.configure();
}
}
diff --git a/payment/src/main/java/com/ning/billing/payment/api/DefaultPaymentApi.java b/payment/src/main/java/com/ning/billing/payment/api/DefaultPaymentApi.java
index adc4669..e89e799 100644
--- a/payment/src/main/java/com/ning/billing/payment/api/DefaultPaymentApi.java
+++ b/payment/src/main/java/com/ning/billing/payment/api/DefaultPaymentApi.java
@@ -68,9 +68,6 @@ public class DefaultPaymentApi implements PaymentApi {
if (account != null) {
return getPaymentProviderPlugin(account);
}
- else {
- throw new IllegalArgumentException("Did not find account with accountKey " + accountKey);
- }
}
return pluginRegistry.getPlugin(paymentProviderName);
@@ -105,21 +102,21 @@ public class DefaultPaymentApi implements PaymentApi {
}
@Override
- public Either<PaymentError, String> addPaypalPaymentMethod(@Nullable String accountKey, PaypalPaymentMethodInfo paypalPaymentMethod) {
+ public Either<PaymentError, String> addPaymentMethod(@Nullable String accountKey, PaymentMethodInfo paymentMethod) {
final PaymentProviderPlugin plugin = getPaymentProviderPlugin(accountKey);
- return plugin.addPaypalPaymentMethod(accountKey, paypalPaymentMethod);
+ return plugin.addPaymentMethod(accountKey, paymentMethod);
}
@Override
public Either<PaymentError, Void> deletePaymentMethod(String accountKey, String paymentMethodId) {
final PaymentProviderPlugin plugin = getPaymentProviderPlugin(accountKey);
- return plugin.deletePaypalPaymentMethod(accountKey, paymentMethodId);
+ return plugin.deletePaymentMethod(accountKey, paymentMethodId);
}
@Override
public Either<PaymentError, PaymentMethodInfo> updatePaymentMethod(String accountKey, PaymentMethodInfo paymentMethodInfo) {
final PaymentProviderPlugin plugin = getPaymentProviderPlugin(accountKey);
- return plugin.updatePaypalPaymentMethod(accountKey, paymentMethodInfo);
+ return plugin.updatePaymentMethod(accountKey, paymentMethodInfo);
}
@Override
@@ -182,9 +179,9 @@ public class DefaultPaymentApi implements PaymentApi {
}
@Override
- public Either<PaymentError, PaymentProviderAccount> updatePaymentProviderAccount(Account account) {
+ public Either<PaymentError, Void> updatePaymentProviderAccountContact(Account account) {
final PaymentProviderPlugin plugin = getPaymentProviderPlugin(account);
- return plugin.updatePaymentProviderAccount(account);
+ return plugin.updatePaymentProviderAccountExistingContact(account);
}
@Override
diff --git a/payment/src/main/java/com/ning/billing/payment/provider/PaymentProviderPlugin.java b/payment/src/main/java/com/ning/billing/payment/provider/PaymentProviderPlugin.java
index 460951d..7a9ae71 100644
--- a/payment/src/main/java/com/ning/billing/payment/provider/PaymentProviderPlugin.java
+++ b/payment/src/main/java/com/ning/billing/payment/provider/PaymentProviderPlugin.java
@@ -25,21 +25,22 @@ import com.ning.billing.payment.api.PaymentError;
import com.ning.billing.payment.api.PaymentInfo;
import com.ning.billing.payment.api.PaymentMethodInfo;
import com.ning.billing.payment.api.PaymentProviderAccount;
-import com.ning.billing.payment.api.PaypalPaymentMethodInfo;
public interface PaymentProviderPlugin {
Either<PaymentError, PaymentInfo> processInvoice(Account account, Invoice invoice);
Either<PaymentError, String> createPaymentProviderAccount(Account account);
- Either<PaymentError, String> addPaypalPaymentMethod(String accountId, PaypalPaymentMethodInfo paypalPaymentMethod);
- Either<PaymentError, PaymentProviderAccount> updatePaymentProviderAccount(Account account);
Either<PaymentError, PaymentInfo> getPaymentInfo(String paymentId);
- Either<PaymentError, PaymentMethodInfo> getPaymentMethodInfo(String paymentMethodId);
Either<PaymentError, PaymentProviderAccount> getPaymentProviderAccount(String accountKey);
+ Either<PaymentError, Void> updatePaymentGateway(String accountKey);
+
+ Either<PaymentError, PaymentMethodInfo> getPaymentMethodInfo(String paymentMethodId);
Either<PaymentError, List<PaymentMethodInfo>> getPaymentMethods(String accountKey);
+ Either<PaymentError, String> addPaymentMethod(String accountKey, PaymentMethodInfo paymentMethod);
+ Either<PaymentError, PaymentMethodInfo> updatePaymentMethod(String accountKey, PaymentMethodInfo paymentMethodInfo);
+ Either<PaymentError, Void> deletePaymentMethod(String accountKey, String paymentMethodId);
- Either<PaymentError, Void> updatePaymentGateway(String accountKey);
- Either<PaymentError, Void> deletePaypalPaymentMethod(String accountKey, String paymentMethodId);
- Either<PaymentError, PaymentMethodInfo> updatePaypalPaymentMethod(String accountKey, PaymentMethodInfo paymentMethodInfo);
+ Either<PaymentError, Void> updatePaymentProviderAccountExistingContact(Account account);
+ Either<PaymentError, Void> updatePaymentProviderAccountWithNewContact(Account account);
}
diff --git a/payment/src/test/java/com/ning/billing/payment/api/TestPaymentApi.java b/payment/src/test/java/com/ning/billing/payment/api/TestPaymentApi.java
index a1b02b3..c756963 100644
--- a/payment/src/test/java/com/ning/billing/payment/api/TestPaymentApi.java
+++ b/payment/src/test/java/com/ning/billing/payment/api/TestPaymentApi.java
@@ -25,7 +25,9 @@ import java.util.Arrays;
import java.util.List;
import java.util.UUID;
+import org.apache.commons.lang.RandomStringUtils;
import org.joda.time.DateTime;
+import org.joda.time.DateTimeZone;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;
@@ -33,6 +35,7 @@ import org.testng.annotations.Test;
import com.google.inject.Inject;
import com.ning.billing.account.api.Account;
import com.ning.billing.account.api.AccountApiException;
+import com.ning.billing.account.api.user.AccountBuilder;
import com.ning.billing.catalog.api.Currency;
import com.ning.billing.invoice.api.Invoice;
import com.ning.billing.invoice.model.DefaultInvoiceItem;
@@ -58,10 +61,9 @@ public abstract class TestPaymentApi {
eventBus.stop();
}
-// @Test(groups = "fast")
@Test
public void testCreatePayment() throws AccountApiException {
- final DateTime now = new DateTime();
+ final DateTime now = new DateTime(DateTimeZone.UTC);
final Account account = testHelper.createTestAccount();
final Invoice invoice = testHelper.createTestInvoice(account, now, Currency.USD);
final BigDecimal amount = new BigDecimal("10.00");
@@ -84,6 +86,113 @@ public abstract class TestPaymentApi {
PaymentInfo paymentInfo = results.get(0).getRight();
assertNotNull(paymentInfo.getPaymentId());
- assertEquals(paymentInfo.getAmount().doubleValue(), amount.doubleValue());
+ assertTrue(paymentInfo.getAmount().compareTo(amount) == 0);
+ assertNotNull(paymentInfo.getPaymentNumber());
+
+ PaymentAttempt paymentAttempt = paymentApi.getPaymentAttemptForPaymentId(paymentInfo.getPaymentId());
+ assertNotNull(paymentAttempt);
+ assertNotNull(paymentAttempt.getPaymentAttemptId());
+ assertEquals(paymentAttempt.getInvoiceId(), invoice.getId());
+ assertTrue(paymentAttempt.getAmount().compareTo(amount) == 0);
+ assertEquals(paymentAttempt.getCurrency(), Currency.USD);
+ assertEquals(paymentAttempt.getPaymentId(), paymentInfo.getPaymentId());
+ assertEquals(paymentAttempt.getPaymentAttemptDate().withMillisOfSecond(0), now.withMillisOfSecond(0));
+
+ }
+
+ private PaymentProviderAccount setupAccountWithPaymentMethod() throws AccountApiException {
+ final Account account = testHelper.createTestAccount();
+ paymentApi.createPaymentProviderAccount(account);
+
+ String accountKey = account.getExternalKey();
+
+ PaypalPaymentMethodInfo paymentMethod = new PaypalPaymentMethodInfo.Builder()
+ .setBaid("12345")
+ .setEmail(account.getEmail())
+ .setDefaultMethod(true)
+ .build();
+ Either<PaymentError, String> paymentMethodIdOrError = paymentApi.addPaymentMethod(accountKey, paymentMethod);
+
+ assertTrue(paymentMethodIdOrError.isRight());
+ assertNotNull(paymentMethodIdOrError.getRight());
+
+ Either<PaymentError, PaymentMethodInfo> paymentMethodInfoOrError = paymentApi.getPaymentMethod(accountKey, paymentMethodIdOrError.getRight());
+
+ assertTrue(paymentMethodInfoOrError.isRight());
+ assertNotNull(paymentMethodInfoOrError.getRight());
+
+ Either<PaymentError, PaymentProviderAccount> accountOrError = paymentApi.getPaymentProviderAccount(accountKey);
+
+ assertTrue(accountOrError.isRight());
+
+ return accountOrError.getRight();
+ }
+
+ @Test
+ public void testCreatePaymentMethod() throws AccountApiException {
+ PaymentProviderAccount account = setupAccountWithPaymentMethod();
+ assertNotNull(account);
+ }
+
+ @Test
+ public void testUpdatePaymentProviderAccountContact() throws AccountApiException {
+ final Account account = testHelper.createTestAccount();
+ paymentApi.createPaymentProviderAccount(account);
+
+ String newName = "Tester " + RandomStringUtils.randomAlphanumeric(10);
+ String newNumber = "888-888-" + RandomStringUtils.randomNumeric(4);
+
+ final Account accountToUpdate = new AccountBuilder(account.getId())
+ .name(newName)
+ .firstNameLength(newName.length())
+ .externalKey(account.getExternalKey())
+ .phone(newNumber)
+ .email(account.getEmail())
+ .currency(account.getCurrency())
+ .billingCycleDay(account.getBillCycleDay())
+ .build();
+
+ Either<PaymentError, Void> voidOrError = paymentApi.updatePaymentProviderAccountContact(accountToUpdate);
+ assertTrue(voidOrError.isRight());
+ }
+
+ @Test
+ public void testCannotDeleteDefaultPaymentMethod() throws AccountApiException {
+ PaymentProviderAccount account = setupAccountWithPaymentMethod();
+
+ Either<PaymentError, Void> errorOrVoid = paymentApi.deletePaymentMethod(account.getAccountKey(), account.getDefaultPaymentMethodId());
+
+ assertTrue(errorOrVoid.isLeft());
+ }
+
+ @Test
+ public void testDeleteNonDefaultPaymentMethod() throws AccountApiException {
+ final Account account = testHelper.createTestAccount();
+ paymentApi.createPaymentProviderAccount(account);
+
+ String accountKey = account.getExternalKey();
+
+ PaypalPaymentMethodInfo paymentMethod1 = new PaypalPaymentMethodInfo.Builder().setDefaultMethod(false).setBaid("12345").setEmail(account.getEmail()).build();
+ Either<PaymentError, String> paymentMethodIdOrError1 = paymentApi.addPaymentMethod(accountKey, paymentMethod1);
+
+ assertTrue(paymentMethodIdOrError1.isRight());
+ assertNotNull(paymentMethodIdOrError1.getRight());
+
+ PaypalPaymentMethodInfo paymentMethod2 = new PaypalPaymentMethodInfo.Builder().setDefaultMethod(true).setBaid("12345").setEmail(account.getEmail()).build();
+
+ Either<PaymentError, String> paymentMethodIdOrError2 = paymentApi.addPaymentMethod(accountKey, paymentMethod2);
+
+ assertTrue(paymentMethodIdOrError2.isRight());
+ assertNotNull(paymentMethodIdOrError2.getRight());
+
+ Either<PaymentError, List<PaymentMethodInfo>> paymentMethodsOrError = paymentApi.getPaymentMethods(accountKey);
+
+ assertTrue(paymentMethodsOrError.isRight());
+
+ Either<PaymentError, Void> errorOrVoid1 = paymentApi.deletePaymentMethod(accountKey, paymentMethodIdOrError1.getRight());
+ Either<PaymentError, Void> errorOrVoid2 = paymentApi.deletePaymentMethod(accountKey, paymentMethodIdOrError2.getRight());
+
+ assertTrue(errorOrVoid1.isRight());
+ assertTrue(errorOrVoid2.isLeft());
}
}
diff --git a/payment/src/test/java/com/ning/billing/payment/provider/MockPaymentProviderPlugin.java b/payment/src/test/java/com/ning/billing/payment/provider/MockPaymentProviderPlugin.java
index 32d0d71..97e6136 100644
--- a/payment/src/test/java/com/ning/billing/payment/provider/MockPaymentProviderPlugin.java
+++ b/payment/src/test/java/com/ning/billing/payment/provider/MockPaymentProviderPlugin.java
@@ -16,16 +16,21 @@
package com.ning.billing.payment.provider;
+import java.util.ArrayList;
+import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
-import org.apache.commons.lang.math.RandomUtils;
+import org.apache.commons.lang.RandomStringUtils;
import org.joda.time.DateTime;
+import com.google.common.base.Predicate;
+import com.google.common.collect.Collections2;
import com.ning.billing.account.api.Account;
import com.ning.billing.invoice.api.Invoice;
+import com.ning.billing.payment.api.CreditCardPaymentMethodInfo;
import com.ning.billing.payment.api.Either;
import com.ning.billing.payment.api.PaymentError;
import com.ning.billing.payment.api.PaymentInfo;
@@ -36,19 +41,20 @@ import com.ning.billing.payment.api.PaypalPaymentMethodInfo;
public class MockPaymentProviderPlugin implements PaymentProviderPlugin {
private final Map<String, PaymentInfo> payments = new ConcurrentHashMap<String, PaymentInfo>();
private final Map<String, PaymentProviderAccount> accounts = new ConcurrentHashMap<String, PaymentProviderAccount>();
+ private final Map<String, PaymentMethodInfo> paymentMethods = new ConcurrentHashMap<String, PaymentMethodInfo>();
@Override
public Either<PaymentError, PaymentInfo> processInvoice(Account account, Invoice invoice) {
PaymentInfo payment = new PaymentInfo.Builder().setPaymentId(UUID.randomUUID().toString())
- .setAmount(invoice.getAmountOutstanding())
- .setStatus("Processed")
- .setBankIdentificationNumber("1234")
- .setCreatedDate(new DateTime())
- .setEffectiveDate(new DateTime())
- .setPaymentNumber("12345")
- .setReferenceId("12345")
- .setType("Electronic")
- .build();
+ .setAmount(invoice.getAmountOutstanding())
+ .setStatus("Processed")
+ .setBankIdentificationNumber("1234")
+ .setCreatedDate(new DateTime())
+ .setEffectiveDate(new DateTime())
+ .setPaymentNumber("12345")
+ .setReferenceId("12345")
+ .setType("Electronic")
+ .build();
payments.put(payment.getPaymentId(), payment);
return Either.right(payment);
@@ -69,13 +75,13 @@ public class MockPaymentProviderPlugin implements PaymentProviderPlugin {
@Override
public Either<PaymentError, String> createPaymentProviderAccount(Account account) {
if (account != null) {
- PaymentProviderAccount paymentProviderAccount = accounts.put(account.getExternalKey(),
- new PaymentProviderAccount.Builder().setAccountNumber(String.valueOf(RandomUtils.nextInt(10)))
- .setDefaultPaymentMethod(String.valueOf(RandomUtils.nextInt(10)))
- .setId(String.valueOf(RandomUtils.nextInt(10)))
- .build());
+ String id = String.valueOf(RandomStringUtils.randomAlphanumeric(10));
+ accounts.put(account.getExternalKey(),
+ new PaymentProviderAccount.Builder().setAccountKey(account.getExternalKey())
+ .setId(id)
+ .build());
- return Either.right(paymentProviderAccount.getId());
+ return Either.right(id);
}
else {
return Either.left(new PaymentError("unknown", "Did not get account to create payment provider account"));
@@ -83,51 +89,167 @@ public class MockPaymentProviderPlugin implements PaymentProviderPlugin {
}
@Override
- public Either<PaymentError, PaymentProviderAccount> updatePaymentProviderAccount(Account account) {
- // TODO Auto-generated method stub
- return null;
+ public Either<PaymentError, PaymentProviderAccount> getPaymentProviderAccount(String accountKey) {
+ if (accountKey != null) {
+ return Either.right(accounts.get(accountKey));
+ }
+ else {
+ return Either.left(new PaymentError("unknown", "Did not get account for accountKey " + accountKey));
+ }
}
@Override
- public Either<PaymentError, PaymentMethodInfo> getPaymentMethodInfo(String paymentMethodId) {
- // TODO
- return Either.left(new PaymentError("unknown", "Not implemented"));
+ public Either<PaymentError, String> addPaymentMethod(String accountKey, PaymentMethodInfo paymentMethod) {
+ if (paymentMethod != null) {
+ PaymentProviderAccount account = accounts.get(accountKey);
+
+ if (account != null && account.getId() != null) {
+ String existingDefaultMethod = account.getDefaultPaymentMethodId();
+
+ String paymentMethodId = RandomStringUtils.randomAlphanumeric(10);
+ boolean shouldBeDefault = Boolean.TRUE.equals(paymentMethod.getDefaultMethod()) || existingDefaultMethod == null;
+ PaymentMethodInfo realPaymentMethod = null;
+
+ if (paymentMethod instanceof PaypalPaymentMethodInfo) {
+ PaypalPaymentMethodInfo paypalPaymentMethod = (PaypalPaymentMethodInfo)paymentMethod;
+
+ realPaymentMethod = new PaypalPaymentMethodInfo.Builder(paypalPaymentMethod)
+ .setId(paymentMethodId)
+ .setAccountId(accountKey)
+ .setDefaultMethod(shouldBeDefault)
+ .setBaid(paypalPaymentMethod.getBaid())
+ .setEmail(paypalPaymentMethod.getEmail())
+ .build();
+ }
+ else if (paymentMethod instanceof CreditCardPaymentMethodInfo) {
+ CreditCardPaymentMethodInfo ccPaymentMethod = (CreditCardPaymentMethodInfo)paymentMethod;
+ realPaymentMethod = new CreditCardPaymentMethodInfo.Builder(ccPaymentMethod).setId(paymentMethodId).build();
+ }
+ if (realPaymentMethod == null) {
+ return Either.left(new PaymentError("unsupported", "Payment method " + paymentMethod.getType() + " not supported by the plugin"));
+ }
+ else {
+ if (shouldBeDefault) {
+ setDefaultPaymentMethodOnAccount(account, paymentMethodId);
+ }
+ paymentMethods.put(paymentMethodId, realPaymentMethod);
+ return Either.right(paymentMethodId);
+ }
+ }
+ else {
+ return Either.left(new PaymentError("noaccount", "Could not retrieve account for accountKey " + accountKey));
+ }
+ }
+ else {
+ return Either.left(new PaymentError("unknown", "Could not create add payment method " + paymentMethod + " for " + accountKey));
+ }
+ }
+
+ public void setDefaultPaymentMethodOnAccount(PaymentProviderAccount account, String paymentMethodId) {
+ if (paymentMethodId != null && account != null) {
+ accounts.put(account.getAccountKey(),
+ new PaymentProviderAccount.Builder()
+ .copyFrom(account)
+ .setDefaultPaymentMethod(paymentMethodId)
+ .build());
+ List<PaymentMethodInfo> paymentMethodsToUpdate = new ArrayList<PaymentMethodInfo>();
+ for (PaymentMethodInfo paymentMethod : paymentMethods.values()) {
+ if (account.getAccountKey().equals(paymentMethod.getAccountId()) && !paymentMethodId.equals(paymentMethod.getId())) {
+ if (paymentMethod instanceof PaypalPaymentMethodInfo) {
+ PaypalPaymentMethodInfo paypalPaymentMethod = (PaypalPaymentMethodInfo)paymentMethod;
+ paymentMethodsToUpdate.add(new PaypalPaymentMethodInfo.Builder(paypalPaymentMethod).setDefaultMethod(false).build());
+ }
+ else if (paymentMethod instanceof CreditCardPaymentMethodInfo) {
+ CreditCardPaymentMethodInfo ccPaymentMethod = (CreditCardPaymentMethodInfo)paymentMethod;
+ paymentMethodsToUpdate.add(new CreditCardPaymentMethodInfo.Builder(ccPaymentMethod).setDefaultMethod(false).build());
+ }
+ }
+ }
+ for (PaymentMethodInfo paymentMethod : paymentMethodsToUpdate) {
+ paymentMethods.put(paymentMethod.getId(), paymentMethod);
+ }
+ }
+ }
+
+ @Override
+ public Either<PaymentError, PaymentMethodInfo> updatePaymentMethod(String accountKey, PaymentMethodInfo paymentMethod) {
+ if (paymentMethod != null) {
+ PaymentMethodInfo realPaymentMethod = null;
+
+ if (paymentMethod instanceof PaypalPaymentMethodInfo) {
+ PaypalPaymentMethodInfo paypalPaymentMethod = (PaypalPaymentMethodInfo)paymentMethod;
+ realPaymentMethod = new PaypalPaymentMethodInfo.Builder(paypalPaymentMethod).build();
+ }
+ else if (paymentMethod instanceof CreditCardPaymentMethodInfo) {
+ CreditCardPaymentMethodInfo ccPaymentMethod = (CreditCardPaymentMethodInfo)paymentMethod;
+ realPaymentMethod = new CreditCardPaymentMethodInfo.Builder(ccPaymentMethod).build();
+ }
+ if (realPaymentMethod == null) {
+ return Either.left(new PaymentError("unsupported", "Payment method " + paymentMethod.getType() + " not supported by the plugin"));
+ }
+ else {
+ paymentMethods.put(paymentMethod.getId(), paymentMethod);
+ return Either.right(realPaymentMethod);
+ }
+ }
+ else {
+ return Either.left(new PaymentError("unknown", "Could not create add payment method " + paymentMethod + " for " + accountKey));
+ }
}
@Override
- public Either<PaymentError, List<PaymentMethodInfo>> getPaymentMethods(String accountId) {
- // TODO
- return Either.left(new PaymentError("unknown", "Not implemented"));
+ public Either<PaymentError, Void> deletePaymentMethod(String accountKey, String paymentMethodId) {
+ PaymentMethodInfo paymentMethodInfo = paymentMethods.get(paymentMethodId);
+ if (paymentMethodInfo != null) {
+ if (Boolean.FALSE.equals(paymentMethodInfo.getDefaultMethod()) || paymentMethodInfo.getDefaultMethod() == null) {
+ if (paymentMethods.remove(paymentMethodId) == null) {
+ return Either.left(new PaymentError("unknown", "Did not get any result back"));
+ }
+ }
+ else {
+ return Either.left(new PaymentError("error", "Cannot delete default payment method"));
+ }
+ }
+ return Either.right(null);
}
@Override
- public Either<PaymentError, Void> updatePaymentGateway(String accountKey) {
- // TODO
- return Either.left(new PaymentError("unknown", "Not implemented"));
+ public Either<PaymentError, PaymentMethodInfo> getPaymentMethodInfo(String paymentMethodId) {
+ if (paymentMethodId == null) {
+ return Either.left(new PaymentError("unknown", "Could not retrieve payment method for paymentMethodId " + paymentMethodId));
+ }
+
+ return Either.right(paymentMethods.get(paymentMethodId));
}
@Override
- public Either<PaymentError, PaymentProviderAccount> getPaymentProviderAccount(String accountKey) {
- // TODO
- return Either.left(new PaymentError("unknown", "Not implemented"));
+ public Either<PaymentError, List<PaymentMethodInfo>> getPaymentMethods(final String accountKey) {
+
+ Collection<PaymentMethodInfo> filteredPaymentMethods = Collections2.filter(paymentMethods.values(), new Predicate<PaymentMethodInfo>() {
+ @Override
+ public boolean apply(PaymentMethodInfo input) {
+ return accountKey.equals(input.getAccountId());
+ }
+ });
+ List<PaymentMethodInfo> result = new ArrayList<PaymentMethodInfo>(filteredPaymentMethods);
+ return Either.right(result);
}
@Override
- public Either<PaymentError, String> addPaypalPaymentMethod(String accountId, PaypalPaymentMethodInfo paypalPaymentMethod) {
- // TODO
- return Either.left(new PaymentError("unknown", "Not implemented"));
+ public Either<PaymentError, Void> updatePaymentGateway(String accountKey) {
+ return Either.right(null);
}
@Override
- public Either<PaymentError, Void> deletePaypalPaymentMethod(String accountKey, String paymentMethodId) {
- // TODO
- return Either.left(new PaymentError("unknown", "Not implemented"));
+ public Either<PaymentError, Void> updatePaymentProviderAccountExistingContact(Account account) {
+ // nothing to do here
+ return Either.right(null);
}
@Override
- public Either<PaymentError, PaymentMethodInfo> updatePaypalPaymentMethod(String accountKey, PaymentMethodInfo paymentMethodInfo) {
- // TODO
- return Either.left(new PaymentError("unknown", "Not implemented"));
+ public Either<PaymentError, Void> updatePaymentProviderAccountWithNewContact(Account account) {
+ // nothing to do here
+ return Either.right(null);
}
}
diff --git a/payment/src/test/java/com/ning/billing/payment/TestHelper.java b/payment/src/test/java/com/ning/billing/payment/TestHelper.java
index 862d98c..2a1ce91 100644
--- a/payment/src/test/java/com/ning/billing/payment/TestHelper.java
+++ b/payment/src/test/java/com/ning/billing/payment/TestHelper.java
@@ -46,10 +46,11 @@ public class TestHelper {
}
public Account createTestAccount() throws AccountApiException {
- final String name = "First" + RandomStringUtils.random(5) + " " + "Last" + RandomStringUtils.random(5);
+ final String name = "First" + RandomStringUtils.randomAlphanumeric(5) + " " + "Last" + RandomStringUtils.randomAlphanumeric(5);
+ final String externalKey = RandomStringUtils.randomAlphanumeric(10);
final Account account = new AccountBuilder(UUID.randomUUID()).name(name)
.firstNameLength(name.length())
- .externalKey("12345")
+ .externalKey(externalKey)
.phone("123-456-7890")
.email("user@example.com")
.currency(Currency.USD)
pom.xml 1(+1 -0)
diff --git a/pom.xml b/pom.xml
index 7b8944a..eb22569 100644
--- a/pom.xml
+++ b/pom.xml
@@ -370,6 +370,7 @@
<exclude>**/.project</exclude>
<exclude>.git/**</exclude>
<exclude>.gitignore</exclude>
+ <exclude>ignore/**</exclude>
<exclude>API.txt</exclude>
<exclude>RELEASE.sh</exclude>
<exclude>deploy.sh</exclude>
util/pom.xml 15(+14 -1)
diff --git a/util/pom.xml b/util/pom.xml
index c2894a7..2b23ebb 100644
--- a/util/pom.xml
+++ b/util/pom.xml
@@ -33,6 +33,10 @@
<artifactId>jdbi</artifactId>
</dependency>
<dependency>
+ <groupId>mysql</groupId>
+ <artifactId>mysql-connector-java</artifactId>
+ </dependency>
+ <dependency>
<groupId>com.google.inject</groupId>
<artifactId>guice</artifactId>
<version>3.0</version>
@@ -83,7 +87,16 @@
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
</dependency>
-
+ <dependency>
+ <groupId>org.antlr</groupId>
+ <artifactId>stringtemplate</artifactId>
+ <scope>runtime</scope>
+ </dependency>
+ <dependency>
+ <groupId>com.mysql</groupId>
+ <artifactId>management-dbfiles</artifactId>
+ <scope>test</scope>
+ </dependency>
</dependencies>
<build>
<plugins>
diff --git a/util/src/main/java/com/ning/billing/util/glue/ClockModule.java b/util/src/main/java/com/ning/billing/util/glue/ClockModule.java
index f090bba..e7c7c3f 100644
--- a/util/src/main/java/com/ning/billing/util/glue/ClockModule.java
+++ b/util/src/main/java/com/ning/billing/util/glue/ClockModule.java
@@ -26,5 +26,4 @@ public class ClockModule extends AbstractModule {
protected void configure() {
bind(Clock.class).to(DefaultClock.class).asEagerSingleton();
}
-
}
diff --git a/util/src/main/java/com/ning/billing/util/glue/NotificationQueueModule.java b/util/src/main/java/com/ning/billing/util/glue/NotificationQueueModule.java
new file mode 100644
index 0000000..f0babf9
--- /dev/null
+++ b/util/src/main/java/com/ning/billing/util/glue/NotificationQueueModule.java
@@ -0,0 +1,29 @@
+/*
+ * Copyright 2010-2011 Ning, Inc.
+ *
+ * Ning licenses this file to you under the Apache License, version 2.0
+ * (the "License"); you may not use this file except in compliance with the
+ * License. You may obtain a copy of the License at:
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations
+ * under the License.
+ */
+
+package com.ning.billing.util.glue;
+
+import com.google.inject.AbstractModule;
+import com.ning.billing.util.notificationq.DefaultNotificationQueueService;
+import com.ning.billing.util.notificationq.NotificationQueueService;
+
+public class NotificationQueueModule extends AbstractModule {
+
+ @Override
+ protected void configure() {
+ bind(NotificationQueueService.class).to(DefaultNotificationQueueService.class).asEagerSingleton();
+ }
+}
diff --git a/util/src/main/java/com/ning/billing/util/notificationq/dao/NotificationSqlDao.java b/util/src/main/java/com/ning/billing/util/notificationq/dao/NotificationSqlDao.java
new file mode 100644
index 0000000..818d831
--- /dev/null
+++ b/util/src/main/java/com/ning/billing/util/notificationq/dao/NotificationSqlDao.java
@@ -0,0 +1,108 @@
+/*
+ * Copyright 2010-2011 Ning, Inc.
+ *
+ * Ning licenses this file to you under the Apache License, version 2.0
+ * (the "License"); you may not use this file except in compliance with the
+ * License. You may obtain a copy of the License at:
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations
+ * under the License.
+ */
+
+package com.ning.billing.util.notificationq.dao;
+
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.sql.Timestamp;
+import java.util.Date;
+import java.util.List;
+import java.util.UUID;
+
+import org.joda.time.DateTime;
+import org.joda.time.DateTimeZone;
+import org.skife.jdbi.v2.SQLStatement;
+import org.skife.jdbi.v2.StatementContext;
+import org.skife.jdbi.v2.sqlobject.Bind;
+import org.skife.jdbi.v2.sqlobject.Binder;
+import org.skife.jdbi.v2.sqlobject.SqlQuery;
+import org.skife.jdbi.v2.sqlobject.SqlUpdate;
+import org.skife.jdbi.v2.sqlobject.customizers.Mapper;
+import org.skife.jdbi.v2.sqlobject.mixins.CloseMe;
+import org.skife.jdbi.v2.sqlobject.mixins.Transactional;
+import org.skife.jdbi.v2.sqlobject.stringtemplate.ExternalizedSqlViaStringTemplate3;
+import org.skife.jdbi.v2.tweak.ResultSetMapper;
+
+import com.ning.billing.util.notificationq.DefaultNotification;
+import com.ning.billing.util.notificationq.Notification;
+import com.ning.billing.util.notificationq.NotificationLifecycle.NotificationLifecycleState;
+
+@ExternalizedSqlViaStringTemplate3()
+public interface NotificationSqlDao extends Transactional<NotificationSqlDao>, CloseMe {
+
+ //
+ // APIs for event notifications
+ //
+ @SqlQuery
+ @Mapper(NotificationSqlMapper.class)
+ public List<Notification> getReadyNotifications(@Bind("now") Date now, @Bind("max") int max);
+
+ @SqlUpdate
+ public int claimNotification(@Bind("owner") String owner, @Bind("next_available") Date nextAvailable, @Bind("notification_id") String eventId, @Bind("now") Date now);
+
+ @SqlUpdate
+ public void clearNotification(@Bind("notification_id") String eventId, @Bind("owner") String owner);
+
+ @SqlUpdate
+ public void insertNotification(@Bind(binder = NotificationSqlDaoBinder.class) Notification evt);
+
+ @SqlUpdate
+ public void insertClaimedHistory(@Bind("sequence_id") int sequenceId, @Bind("owner") String owner, @Bind("claimed_dt") Date clainedDate, @Bind("notification_id") String notificationId);
+
+ public static class NotificationSqlDaoBinder implements Binder<Bind, Notification> {
+
+ private Date getDate(DateTime dateTime) {
+ return dateTime == null ? null : dateTime.toDate();
+ }
+
+ @Override
+ public void bind(@SuppressWarnings("rawtypes") SQLStatement stmt, Bind bind, Notification evt) {
+ stmt.bind("notification_id", evt.getId().toString());
+ stmt.bind("created_dt", getDate(new DateTime()));
+ stmt.bind("notification_key", evt.getNotificationKey());
+ stmt.bind("effective_dt", getDate(evt.getEffectiveDate()));
+ stmt.bind("processing_available_dt", getDate(evt.getNextAvailableDate()));
+ stmt.bind("processing_owner", (String) null);
+ stmt.bind("processing_state", NotificationLifecycleState.AVAILABLE.toString());
+ }
+ }
+
+
+ public static class NotificationSqlMapper implements ResultSetMapper<Notification> {
+
+ private DateTime getDate(ResultSet r, String fieldName) throws SQLException {
+ final Timestamp resultStamp = r.getTimestamp(fieldName);
+ return r.wasNull() ? null : new DateTime(resultStamp).toDateTime(DateTimeZone.UTC);
+ }
+
+ @Override
+ public Notification map(int index, ResultSet r, StatementContext ctx)
+ throws SQLException {
+
+ final UUID id = UUID.fromString(r.getString("notification_id"));
+ final String notificationKey = r.getString("notification_key");
+ final DateTime effectiveDate = getDate(r, "effective_dt");
+ final DateTime nextAvailableDate = getDate(r, "processing_available_dt");
+ final String processingOwner = r.getString("processing_owner");
+ final NotificationLifecycleState processingState = NotificationLifecycleState.valueOf(r.getString("processing_state"));
+
+ return new DefaultNotification(id, processingOwner, nextAvailableDate,
+ processingState, notificationKey, effectiveDate);
+
+ }
+ }
+}
diff --git a/util/src/main/java/com/ning/billing/util/notificationq/DefaultNotification.java b/util/src/main/java/com/ning/billing/util/notificationq/DefaultNotification.java
new file mode 100644
index 0000000..2946e13
--- /dev/null
+++ b/util/src/main/java/com/ning/billing/util/notificationq/DefaultNotification.java
@@ -0,0 +1,97 @@
+/*
+ * Copyright 2010-2011 Ning, Inc.
+ *
+ * Ning licenses this file to you under the Apache License, version 2.0
+ * (the "License"); you may not use this file except in compliance with the
+ * License. You may obtain a copy of the License at:
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations
+ * under the License.
+ */
+
+package com.ning.billing.util.notificationq;
+
+import java.util.UUID;
+
+import org.joda.time.DateTime;
+
+public class DefaultNotification implements Notification {
+
+ private final UUID id;
+ private final String owner;
+ private final DateTime nextAvailableDate;
+ private final NotificationLifecycleState lifecycleState;
+ private final String notificationKey;
+ private final DateTime effectiveDate;
+
+
+ public DefaultNotification(UUID id, String owner, DateTime nextAvailableDate,
+ NotificationLifecycleState lifecycleState,
+ String notificationKey, DateTime effectiveDate) {
+ super();
+ this.id = id;
+ this.owner = owner;
+ this.nextAvailableDate = nextAvailableDate;
+ this.lifecycleState = lifecycleState;
+ this.notificationKey = notificationKey;
+ this.effectiveDate = effectiveDate;
+ }
+
+ public DefaultNotification(String notificationKey, DateTime effectiveDate) {
+ this(UUID.randomUUID(), null, null, NotificationLifecycleState.AVAILABLE, notificationKey, effectiveDate);
+ }
+
+ @Override
+ public UUID getId() {
+ return id;
+ }
+
+ @Override
+ public String getOwner() {
+ return owner;
+ }
+
+ @Override
+ public DateTime getNextAvailableDate() {
+ return nextAvailableDate;
+ }
+
+ @Override
+ public NotificationLifecycleState getProcessingState() {
+ return lifecycleState;
+ }
+
+ @Override
+ public boolean isAvailableForProcessing(DateTime now) {
+ switch(lifecycleState) {
+ case AVAILABLE:
+ break;
+ case IN_PROCESSING:
+ // Somebody already got the event, not available yet
+ if (nextAvailableDate.isAfter(now)) {
+ return false;
+ }
+ break;
+ case PROCESSED:
+ return false;
+ default:
+ throw new RuntimeException(String.format("Unkwnon IEvent processing state %s", lifecycleState));
+ }
+ return effectiveDate.isBefore(now);
+ }
+
+ @Override
+ public String getNotificationKey() {
+ return notificationKey;
+ }
+
+ @Override
+ public DateTime getEffectiveDate() {
+ return effectiveDate;
+ }
+}
diff --git a/util/src/main/java/com/ning/billing/util/notificationq/DefaultNotificationQueue.java b/util/src/main/java/com/ning/billing/util/notificationq/DefaultNotificationQueue.java
new file mode 100644
index 0000000..16d28b2
--- /dev/null
+++ b/util/src/main/java/com/ning/billing/util/notificationq/DefaultNotificationQueue.java
@@ -0,0 +1,120 @@
+/*
+ * Copyright 2010-2011 Ning, Inc.
+ *
+ * Ning licenses this file to you under the Apache License, version 2.0
+ * (the "License"); you may not use this file except in compliance with the
+ * License. You may obtain a copy of the License at:
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations
+ * under the License.
+ */
+
+package com.ning.billing.util.notificationq;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Date;
+import java.util.List;
+
+import org.joda.time.DateTime;
+import org.skife.jdbi.v2.DBI;
+import org.skife.jdbi.v2.IDBI;
+import org.skife.jdbi.v2.Transaction;
+import org.skife.jdbi.v2.TransactionStatus;
+import org.skife.jdbi.v2.sqlobject.mixins.Transmogrifier;
+
+import com.ning.billing.util.clock.Clock;
+import com.ning.billing.util.notificationq.NotificationQueueService.NotificationQueueHandler;
+import com.ning.billing.util.notificationq.dao.NotificationSqlDao;
+
+public class DefaultNotificationQueue extends NotificationQueueBase {
+
+ protected final NotificationSqlDao dao;
+
+ public DefaultNotificationQueue(final IDBI dbi, final Clock clock, final String svcName, final String queueName, final NotificationQueueHandler handler, final NotificationConfig config) {
+ super(clock, svcName, queueName, handler, config);
+ this.dao = dbi.onDemand(NotificationSqlDao.class);
+ }
+
+ @Override
+ protected void doProcessEvents(int sequenceId) {
+ List<Notification> notifications = getReadyNotifications(sequenceId);
+ for (Notification cur : notifications) {
+ nbProcessedEvents.incrementAndGet();
+ handler.handleReadyNotification(cur.getNotificationKey());
+ }
+ // If anything happens before we get to clear those notifications, somebody else will pick them up
+ clearNotifications(notifications);
+ }
+
+ @Override
+ public void recordFutureNotificationFromTransaction(final Transmogrifier transactionalDao,
+ final DateTime futureNotificationTime, final NotificationKey notificationKey) {
+ NotificationSqlDao transactionalNotificationDao = transactionalDao.become(NotificationSqlDao.class);
+ Notification notification = new DefaultNotification(notificationKey.toString(), futureNotificationTime);
+ transactionalNotificationDao.insertNotification(notification);
+ }
+
+
+ private void clearNotifications(final Collection<Notification> cleared) {
+
+ log.debug(String.format("NotificationQueue %s clearEventsReady START cleared size = %d",
+ getFullQName(),
+ cleared.size()));
+
+ dao.inTransaction(new Transaction<Void, NotificationSqlDao>() {
+
+ @Override
+ public Void inTransaction(NotificationSqlDao transactional,
+ TransactionStatus status) throws Exception {
+ for (Notification cur : cleared) {
+ transactional.clearNotification(cur.getId().toString(), hostname);
+ log.debug(String.format("NotificationQueue %s cleared events %s", getFullQName(), cur.getId()));
+ }
+ return null;
+ }
+ });
+ }
+
+ private List<Notification> getReadyNotifications(final int seqId) {
+
+ final Date now = clock.getUTCNow().toDate();
+ final Date nextAvailable = clock.getUTCNow().plus(config.getDaoClaimTimeMs()).toDate();
+
+ log.debug(String.format("NotificationQueue %s getEventsReady START effectiveNow = %s", getFullQName(), now));
+
+ List<Notification> result = dao.inTransaction(new Transaction<List<Notification>, NotificationSqlDao>() {
+
+ @Override
+ public List<Notification> inTransaction(NotificationSqlDao transactionalDao,
+ TransactionStatus status) throws Exception {
+
+ List<Notification> claimedNotifications = new ArrayList<Notification>();
+ List<Notification> input = transactionalDao.getReadyNotifications(now, config.getDaoMaxReadyEvents());
+ for (Notification cur : input) {
+ final boolean claimed = (transactionalDao.claimNotification(hostname, nextAvailable, cur.getId().toString(), now) == 1);
+ if (claimed) {
+ claimedNotifications.add(cur);
+ transactionalDao.insertClaimedHistory(seqId, hostname, now, cur.getId().toString());
+ }
+ }
+ return claimedNotifications;
+ }
+ });
+
+ for (Notification cur : result) {
+ log.debug(String.format("NotificationQueue %sclaimed events %s",
+ getFullQName(), cur.getId()));
+ if (cur.getOwner() != null && !cur.getOwner().equals(hostname)) {
+ log.warn(String.format("NotificationQueue %s stealing notification %s from %s",
+ getFullQName(), cur, cur.getOwner()));
+ }
+ }
+ return result;
+ }
+}
diff --git a/util/src/main/java/com/ning/billing/util/notificationq/DefaultNotificationQueueService.java b/util/src/main/java/com/ning/billing/util/notificationq/DefaultNotificationQueueService.java
new file mode 100644
index 0000000..fe18ead
--- /dev/null
+++ b/util/src/main/java/com/ning/billing/util/notificationq/DefaultNotificationQueueService.java
@@ -0,0 +1,41 @@
+/*
+ * Copyright 2010-2011 Ning, Inc.
+ *
+ * Ning licenses this file to you under the Apache License, version 2.0
+ * (the "License"); you may not use this file except in compliance with the
+ * License. You may obtain a copy of the License at:
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations
+ * under the License.
+ */
+
+package com.ning.billing.util.notificationq;
+
+import org.skife.jdbi.v2.IDBI;
+
+import com.google.inject.Inject;
+import com.ning.billing.util.clock.Clock;
+
+public class DefaultNotificationQueueService extends NotificationQueueServiceBase {
+
+ private final IDBI dbi;
+
+ @Inject
+ public DefaultNotificationQueueService(final IDBI dbi, final Clock clock) {
+ super(clock);
+ this.dbi = dbi;
+ }
+
+ @Override
+ protected NotificationQueue createNotificationQueueInternal(String svcName,
+ String queueName, NotificationQueueHandler handler,
+ NotificationConfig config) {
+ return new DefaultNotificationQueue(dbi, clock, svcName, queueName, handler, config);
+ }
+
+}
diff --git a/util/src/main/java/com/ning/billing/util/notificationq/NotificationQueue.java b/util/src/main/java/com/ning/billing/util/notificationq/NotificationQueue.java
new file mode 100644
index 0000000..23f0de0
--- /dev/null
+++ b/util/src/main/java/com/ning/billing/util/notificationq/NotificationQueue.java
@@ -0,0 +1,59 @@
+/*
+ * Copyright 2010-2011 Ning, Inc.
+ *
+ * Ning licenses this file to you under the Apache License, version 2.0
+ * (the "License"); you may not use this file except in compliance with the
+ * License. You may obtain a copy of the License at:
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations
+ * under the License.
+ */
+
+package com.ning.billing.util.notificationq;
+
+import org.joda.time.DateTime;
+import org.skife.jdbi.v2.sqlobject.mixins.Transmogrifier;
+
+import com.ning.billing.util.notificationq.NotificationQueueService.NotificationQueueHandler;
+
+public interface NotificationQueue {
+
+ /**
+ *
+ * Record from within a transaction the need to be called back when the notification is ready
+ *
+ * @param transactionalDao the transactionalDao
+ * @param futureNotificationTime the time at which the notification is ready
+ * @param notificationKey the key for that notification
+ */
+ public void recordFutureNotificationFromTransaction(final Transmogrifier transactionalDao,
+ final DateTime futureNotificationTime, final NotificationKey notificationKey);
+
+ /**
+ * This is only valid when the queue has been configured with isNotificationProcessingOff is true
+ * In which case, it will callback users for all the ready notifications.
+ *
+ */
+ public void processReadyNotification();
+
+ /**
+ * Stops the queue.
+ *
+ * @see NotificationQueueHandler.completedQueueStop to be notified when the notification thread exited
+ */
+ public void stopQueue();
+
+ /**
+ * Starts the queue.
+ *
+ * @see NotificationQueueHandler.completedQueueStart to be notified when the notification thread started
+ */
+ public void startQueue();
+
+
+}
diff --git a/util/src/main/java/com/ning/billing/util/notificationq/NotificationQueueBase.java b/util/src/main/java/com/ning/billing/util/notificationq/NotificationQueueBase.java
new file mode 100644
index 0000000..6eaf33f
--- /dev/null
+++ b/util/src/main/java/com/ning/billing/util/notificationq/NotificationQueueBase.java
@@ -0,0 +1,193 @@
+/*
+ * Copyright 2010-2011 Ning, Inc.
+ *
+ * Ning licenses this file to you under the Apache License, version 2.0
+ * (the "License"); you may not use this file except in compliance with the
+ * License. You may obtain a copy of the License at:
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations
+ * under the License.
+ */
+
+package com.ning.billing.util.notificationq;
+
+import java.lang.Thread.UncaughtExceptionHandler;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Date;
+import java.util.List;
+import java.util.concurrent.Executor;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
+
+import org.joda.time.DateTime;
+import org.skife.jdbi.v2.DBI;
+import org.skife.jdbi.v2.Transaction;
+import org.skife.jdbi.v2.TransactionStatus;
+import org.skife.jdbi.v2.sqlobject.mixins.Transmogrifier;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.ning.billing.util.Hostname;
+import com.ning.billing.util.clock.Clock;
+import com.ning.billing.util.notificationq.NotificationQueueService.NotificationQueueHandler;
+import com.ning.billing.util.notificationq.dao.NotificationSqlDao;
+
+
+public abstract class NotificationQueueBase implements NotificationQueue {
+
+ protected final static Logger log = LoggerFactory.getLogger(NotificationQueueBase.class);
+
+ protected static final String NOTIFICATION_THREAD_PREFIX = "Notification-";
+ protected final long STOP_WAIT_TIMEOUT_MS = 60000;
+
+ protected final String svcName;
+ protected final String queueName;
+ protected final NotificationQueueHandler handler;
+ protected final NotificationConfig config;
+
+ protected final Executor executor;
+ protected final Clock clock;
+ protected final String hostname;
+
+ protected static final AtomicInteger sequenceId = new AtomicInteger();
+
+ protected AtomicLong nbProcessedEvents;
+
+ // Use this object's monitor for synchronization (no need for volatile)
+ protected boolean isProcessingEvents;
+
+ // Package visibility on purpose
+ NotificationQueueBase(final Clock clock, final String svcName, final String queueName, final NotificationQueueHandler handler, final NotificationConfig config) {
+ this.clock = clock;
+ this.svcName = svcName;
+ this.queueName = queueName;
+ this.handler = handler;
+ this.config = config;
+ this.hostname = Hostname.get();
+
+ this.executor = Executors.newSingleThreadExecutor(new ThreadFactory() {
+ @Override
+ public Thread newThread(Runnable r) {
+ Thread th = new Thread(r);
+ th.setName(NOTIFICATION_THREAD_PREFIX + svcName + "-" + queueName);
+ th.setUncaughtExceptionHandler(new UncaughtExceptionHandler() {
+ @Override
+ public void uncaughtException(Thread t, Throwable e) {
+ log.error("Uncaught exception for thread " + t.getName(), e);
+ }
+ });
+ return th;
+ }
+ });
+ }
+
+
+ @Override
+ public void processReadyNotification() {
+ // STEPH to be implemented
+ }
+
+
+ @Override
+ public void stopQueue() {
+ if (config.isNotificationProcessingOff()) {
+ handler.completedQueueStop();
+ return;
+ }
+
+ synchronized(this) {
+ isProcessingEvents = false;
+ try {
+ log.info("NotificationQueue requested to stop");
+ wait(STOP_WAIT_TIMEOUT_MS);
+ log.info("NotificationQueue requested should have exited");
+ } catch (InterruptedException e) {
+ log.warn("NotificationQueue got interrupted exception when stopping notifications", e);
+ }
+ }
+
+ }
+
+ @Override
+ public void startQueue() {
+
+ this.isProcessingEvents = true;
+ this.nbProcessedEvents = new AtomicLong();
+
+
+ if (config.isNotificationProcessingOff()) {
+ log.warn(String.format("KILLBILL NOTIFICATION PROCESSING FOR SVC %s IS OFF !!!", getFullQName()));
+ handler.completedQueueStart();
+ return;
+ }
+ final NotificationQueueBase notificationQueue = this;
+
+ executor.execute(new Runnable() {
+ @Override
+ public void run() {
+
+ log.info(String.format("NotificationQueue thread %s [%d] started",
+ Thread.currentThread().getName(),
+ Thread.currentThread().getId()));
+
+ // Thread is now started, notify the listener
+ handler.completedQueueStart();
+
+ try {
+ while (true) {
+
+ synchronized (notificationQueue) {
+ if (!isProcessingEvents) {
+ log.info(String.format("NotificationQueue has been requested to stop, thread %s [%d] stopping...",
+ Thread.currentThread().getName(),
+ Thread.currentThread().getId()));
+ notificationQueue.notify();
+ break;
+ }
+ }
+
+ // Callback may trigger exceptions in user code so catch anything here and live with it.
+ try {
+ doProcessEvents(sequenceId.getAndIncrement());
+ } catch (Exception e) {
+ log.error(String.format("NotificationQueue thread %s [%d] got an exception..",
+ Thread.currentThread().getName(),
+ Thread.currentThread().getId()), e);
+ }
+ sleepALittle();
+ }
+ } catch (InterruptedException e) {
+ log.warn(Thread.currentThread().getName() + " got interrupted ", e);
+ } catch (Throwable e) {
+ log.error(Thread.currentThread().getName() + " got an exception exiting...", e);
+ // Just to make it really obvious in the log
+ e.printStackTrace();
+ } finally {
+ handler.completedQueueStop();
+ log.info(String.format("NotificationQueue thread %s [%d] exited...",
+ Thread.currentThread().getName(),
+ Thread.currentThread().getId()));
+ }
+ }
+
+ private void sleepALittle() throws InterruptedException {
+ Thread.sleep(config.getNotificationSleepTimeMs());
+ }
+ });
+ }
+
+
+ protected String getFullQName() {
+ return svcName + ":" + queueName;
+ }
+
+ protected abstract void doProcessEvents(int sequenceId);
+}
diff --git a/util/src/main/java/com/ning/billing/util/notificationq/NotificationQueueService.java b/util/src/main/java/com/ning/billing/util/notificationq/NotificationQueueService.java
new file mode 100644
index 0000000..a18906b
--- /dev/null
+++ b/util/src/main/java/com/ning/billing/util/notificationq/NotificationQueueService.java
@@ -0,0 +1,86 @@
+/*
+ * Copyright 2010-2011 Ning, Inc.
+ *
+ * Ning licenses this file to you under the Apache License, version 2.0
+ * (the "License"); you may not use this file except in compliance with the
+ * License. You may obtain a copy of the License at:
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations
+ * under the License.
+ */
+
+package com.ning.billing.util.notificationq;
+
+import java.util.NoSuchElementException;
+
+
+public interface NotificationQueueService {
+
+ public interface NotificationQueueHandler {
+ /**
+ * Called when the Notification thread has been started
+ */
+ public void completedQueueStart();
+
+ /**
+ * Called for each notification ready
+ *
+ * @param key the notification key associated to that notification entry
+ */
+ public void handleReadyNotification(String notificationKey);
+ /**
+ * Called right before the Notification thread is about to exit
+ */
+ public void completedQueueStop();
+ }
+
+ public static final class NotficationQueueAlreadyExists extends Exception {
+ private static final long serialVersionUID = 1541281L;
+
+ public NotficationQueueAlreadyExists(String msg) {
+ super(msg);
+ }
+ }
+
+ public static final class NoSuchNotificationQueue extends Exception {
+ private static final long serialVersionUID = 1541281L;
+
+ public NoSuchNotificationQueue(String msg) {
+ super(msg);
+ }
+ }
+
+ /**
+ * Creates a new NotificationQueue for a given associated with the given service and queueName
+ *
+ * @param svcName the name of the service using that queue
+ * @param queueName a name for that queue (unique per service)
+ * @param handler the handler required for notifying the caller of state change
+ * @param config the notification queue configuration
+ *
+ * @return a new NotificationQueue
+ *
+ * @throws NotficationQueueAlreadyExists is the queue associated with that service and name already exits
+ *
+ */
+ NotificationQueue createNotificationQueue(final String svcName, final String queueName, final NotificationQueueHandler handler, final NotificationConfig config)
+ throws NotficationQueueAlreadyExists;
+
+ /**
+ * Retrieves an already created NotificationQueue by service and name if it exists
+ *
+ * @param svcName
+ * @param queueName
+ * @return
+ *
+ * @throws NoSuchNotificationQueue if queue does not exist
+ */
+ NotificationQueue getNotificationQueue(final String svcName, final String queueName)
+ throws NoSuchNotificationQueue;
+
+}
diff --git a/util/src/main/java/com/ning/billing/util/notificationq/NotificationQueueServiceBase.java b/util/src/main/java/com/ning/billing/util/notificationq/NotificationQueueServiceBase.java
new file mode 100644
index 0000000..a4dc64e
--- /dev/null
+++ b/util/src/main/java/com/ning/billing/util/notificationq/NotificationQueueServiceBase.java
@@ -0,0 +1,90 @@
+/*
+ * Copyright 2010-2011 Ning, Inc.
+ *
+ * Ning licenses this file to you under the Apache License, version 2.0
+ * (the "License"); you may not use this file except in compliance with the
+ * License. You may obtain a copy of the License at:
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations
+ * under the License.
+ */
+
+package com.ning.billing.util.notificationq;
+
+import java.util.Map;
+import java.util.TreeMap;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.inject.Inject;
+import com.ning.billing.util.clock.Clock;
+
+public abstract class NotificationQueueServiceBase implements NotificationQueueService {
+
+ protected final Logger log = LoggerFactory.getLogger(DefaultNotificationQueueService.class);
+
+ protected final Clock clock;
+
+ private final Map<String, NotificationQueue> queues;
+
+ @Inject
+ public NotificationQueueServiceBase(final Clock clock) {
+
+ this.clock = clock;
+ this.queues = new TreeMap<String, NotificationQueue>();
+ }
+
+ @Override
+ public NotificationQueue createNotificationQueue(String svcName,
+ String queueName, NotificationQueueHandler handler,
+ NotificationConfig config) throws NotficationQueueAlreadyExists {
+ if (svcName == null || queueName == null || handler == null || config == null) {
+ throw new RuntimeException("Need to specify all parameters");
+ }
+
+ String compositeName = getCompositeName(svcName, queueName);
+ NotificationQueue result = null;
+ synchronized(queues) {
+ result = queues.get(compositeName);
+ if (result != null) {
+ throw new NotficationQueueAlreadyExists(String.format("Queue for svc %s and name %s already exist",
+ svcName, queueName));
+ }
+ result = createNotificationQueueInternal(svcName, queueName, handler, config);
+ queues.put(compositeName, result);
+ }
+ return result;
+ }
+
+ @Override
+ public NotificationQueue getNotificationQueue(String svcName,
+ String queueName) throws NoSuchNotificationQueue {
+
+ NotificationQueue result = null;
+ String compositeName = getCompositeName(svcName, queueName);
+ synchronized(queues) {
+ result = queues.get(compositeName);
+ if (result == null) {
+ throw new NoSuchNotificationQueue(String.format("Queue for svc %s and name %s does not exist",
+ svcName, queueName));
+ }
+ }
+ return result;
+ }
+
+
+ protected abstract NotificationQueue createNotificationQueueInternal(String svcName,
+ String queueName, NotificationQueueHandler handler,
+ NotificationConfig config);
+
+
+ private String getCompositeName(String svcName, String queueName) {
+ return svcName + ":" + queueName;
+ }
+}
diff --git a/util/src/main/resources/com/ning/billing/util/ddl.sql b/util/src/main/resources/com/ning/billing/util/ddl.sql
index c0bff95..a65e0cd 100644
--- a/util/src/main/resources/com/ning/billing/util/ddl.sql
+++ b/util/src/main/resources/com/ning/billing/util/ddl.sql
@@ -32,4 +32,31 @@ CREATE TABLE tags (
PRIMARY KEY(id)
) ENGINE = innodb;
CREATE INDEX tags_by_object ON tags(object_id);
-CREATE UNIQUE INDEX tags_unique ON tags(tag_definition_name, object_id);
\ No newline at end of file
+CREATE UNIQUE INDEX tags_unique ON tags(tag_definition_name, object_id);
+
+DROP TABLE IF EXISTS notifications;
+CREATE TABLE notifications (
+ id int(11) unsigned NOT NULL AUTO_INCREMENT,
+ notification_id char(36) NOT NULL,
+ created_dt datetime NOT NULL,
+ notification_key varchar(256) NOT NULL,
+ effective_dt datetime NOT NULL,
+ processing_owner char(36) DEFAULT NULL,
+ processing_available_dt datetime DEFAULT NULL,
+ processing_state varchar(14) DEFAULT 'AVAILABLE',
+ PRIMARY KEY(id)
+) ENGINE=innodb;
+CREATE INDEX `idx_comp_where` ON notifications (`effective_dt`,`processing_state`,`processing_owner`,`processing_available_dt`);
+CREATE INDEX `idx_update` ON notifications (`notification_id`,`processing_state`,`processing_owner`,`processing_available_dt`);
+CREATE INDEX `idx_update1` ON notifications (`notification_id`,`processing_owner`);
+CREATE INDEX `idx_get_ready` ON notifications (`effective_dt`,`created_dt`,`id`);
+
+DROP TABLE IF EXISTS claimed_notifications;
+CREATE TABLE claimed_notifications (
+ id int(11) unsigned NOT NULL AUTO_INCREMENT,
+ sequence_id int(11) unsigned NOT NULL,
+ owner_id varchar(64) NOT NULL,
+ claimed_dt datetime NOT NULL,
+ notification_id char(36) NOT NULL,
+ PRIMARY KEY(id)
+) ENGINE=innodb;
diff --git a/util/src/main/resources/com/ning/billing/util/notificationq/dao/NotificationSqlDao.sql.stg b/util/src/main/resources/com/ning/billing/util/notificationq/dao/NotificationSqlDao.sql.stg
new file mode 100644
index 0000000..5a44431
--- /dev/null
+++ b/util/src/main/resources/com/ning/billing/util/notificationq/dao/NotificationSqlDao.sql.stg
@@ -0,0 +1,83 @@
+group NotificationSqlDao;
+
+getReadyNotifications(now, max) ::= <<
+ select
+ notification_id
+ , notification_key
+ , created_dt
+ , effective_dt
+ , processing_owner
+ , processing_available_dt
+ , processing_state
+ from notifications
+ where
+ effective_dt \<= :now
+ and processing_state != 'PROCESSED'
+ and (processing_owner IS NULL OR processing_available_dt \<= :now)
+ order by
+ effective_dt asc
+ , created_dt asc
+ , id asc
+ limit :max
+ ;
+>>
+
+
+claimNotification(owner, next_available, notification_id, now) ::= <<
+ update notifications
+ set
+ processing_owner = :owner
+ , processing_available_dt = :next_available
+ , processing_state = 'IN_PROCESSING'
+ where
+ notification_id = :notification_id
+ and processing_state != 'PROCESSED'
+ and (processing_owner IS NULL OR processing_available_dt \<= :now)
+ ;
+>>
+
+clearNotification(notification_id, owner) ::= <<
+ update notifications
+ set
+ processing_owner = NULL
+ , processing_state = 'PROCESSED'
+ where
+ notification_id = :notification_id
+ and processing_owner = :owner
+ ;
+>>
+
+insertNotification() ::= <<
+ insert into notifications (
+ notification_id
+ , notification_key
+ , created_dt
+ , effective_dt
+ , processing_owner
+ , processing_available_dt
+ , processing_state
+ ) values (
+ :notification_id
+ , :notification_key
+ , :created_dt
+ , :effective_dt
+ , :processing_owner
+ , :processing_available_dt
+ , :processing_state
+ );
+>>
+
+
+insertClaimedHistory(sequence_id, owner, hostname, claimed_dt, notification_id) ::= <<
+ insert into claimed_notifications (
+ sequence_id
+ , owner_id
+ , claimed_dt
+ , notification_id
+ ) values (
+ :sequence_id
+ , :owner
+ , :claimed_dt
+ , :notification_id
+ );
+>>
diff --git a/util/src/test/java/com/ning/billing/util/clock/ClockMock.java b/util/src/test/java/com/ning/billing/util/clock/ClockMock.java
index 0fb473d..7698697 100644
--- a/util/src/test/java/com/ning/billing/util/clock/ClockMock.java
+++ b/util/src/test/java/com/ning/billing/util/clock/ClockMock.java
@@ -76,6 +76,15 @@ public class ClockMock extends DefaultClock {
deltaFromRealityMs = delta;
}
+ public synchronized void addDeltaFromReality(long delta) {
+ if (deltaType != DeltaType.DELTA_ABS) {
+ throw new RuntimeException("ClockMock should be set with type DELTA_ABS");
+ }
+ deltaFromRealityDuration = null;
+ deltaFromRealitDurationEpsilon = 0;
+ deltaFromRealityMs += delta;
+ }
+
public synchronized void resetDeltaFromReality() {
deltaType = DeltaType.DELTA_NONE;
deltaFromRealityDuration = null;
diff --git a/util/src/test/java/com/ning/billing/util/eventbus/TestEventBus.java b/util/src/test/java/com/ning/billing/util/eventbus/TestEventBus.java
index 1419b8f..ba9f76a 100644
--- a/util/src/test/java/com/ning/billing/util/eventbus/TestEventBus.java
+++ b/util/src/test/java/com/ning/billing/util/eventbus/TestEventBus.java
@@ -81,7 +81,7 @@ public class TestEventBus {
@Subscribe
public synchronized void processEvent(MyEvent event) {
gotEvents++;
- log.info("Got event {} {}", event.name, event.value);
+ //log.debug("Got event {} {}", event.name, event.value);
}
public synchronized boolean waitForCompletion(long timeoutMs) {
diff --git a/util/src/test/java/com/ning/billing/util/notificationq/dao/TestNotificationSqlDao.java b/util/src/test/java/com/ning/billing/util/notificationq/dao/TestNotificationSqlDao.java
new file mode 100644
index 0000000..89dfd76
--- /dev/null
+++ b/util/src/test/java/com/ning/billing/util/notificationq/dao/TestNotificationSqlDao.java
@@ -0,0 +1,212 @@
+/*
+ * Copyright 2010-2011 Ning, Inc.
+ *
+ * Ning licenses this file to you under the Apache License, version 2.0
+ * (the "License"); you may not use this file except in compliance with the
+ * License. You may obtain a copy of the License at:
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations
+ * under the License.
+ */
+
+package com.ning.billing.util.notificationq.dao;
+
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertNotNull;
+import static org.testng.Assert.assertNull;
+
+import java.io.IOException;
+import java.sql.SQLException;
+import java.util.List;
+import java.util.UUID;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.commons.io.IOUtils;
+import org.joda.time.DateTime;
+import org.joda.time.DateTimeZone;
+import org.skife.config.ConfigurationObjectFactory;
+import org.skife.jdbi.v2.DBI;
+import org.skife.jdbi.v2.Handle;
+import org.skife.jdbi.v2.tweak.HandleCallback;
+import org.testng.Assert;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.AfterSuite;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.BeforeSuite;
+import org.testng.annotations.BeforeTest;
+import org.testng.annotations.Guice;
+import org.testng.annotations.Test;
+
+import com.google.inject.AbstractModule;
+import com.google.inject.Inject;
+import com.ning.billing.dbi.DBIProvider;
+import com.ning.billing.dbi.DbiConfig;
+import com.ning.billing.dbi.MysqlTestingHelper;
+import com.ning.billing.util.notificationq.DefaultNotification;
+import com.ning.billing.util.notificationq.Notification;
+import com.ning.billing.util.notificationq.NotificationLifecycle.NotificationLifecycleState;
+import com.ning.billing.util.notificationq.dao.NotificationSqlDao.NotificationSqlMapper;
+
+@Guice(modules = TestNotificationSqlDao.TestNotificationSqlDaoModule.class)
+public class TestNotificationSqlDao {
+
+ private static AtomicInteger sequenceId = new AtomicInteger();
+
+ @Inject
+ private DBI dbi;
+
+ @Inject
+ MysqlTestingHelper helper;
+
+ private NotificationSqlDao dao;
+
+ private void startMysql() throws IOException, ClassNotFoundException, SQLException {
+
+
+ final String ddl = IOUtils.toString(NotificationSqlDao.class.getResourceAsStream("/com/ning/billing/util/ddl.sql"));
+ helper.startMysql();
+ helper.initDb(ddl);
+ }
+
+ @BeforeSuite(alwaysRun = true)
+ public void setup() {
+ try {
+ startMysql();
+ dao = dbi.onDemand(NotificationSqlDao.class);
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
+ }
+
+ @AfterSuite(alwaysRun = true)
+ public void stopMysql()
+ {
+ helper.stopMysql();
+ }
+
+
+ @BeforeTest
+ public void cleanupDb() {
+ dbi.withHandle(new HandleCallback<Void>() {
+
+ @Override
+ public Void withHandle(Handle handle) throws Exception {
+ handle.execute("delete from notifications");
+ handle.execute("delete from claimed_notifications");
+ return null;
+ }
+ });
+ }
+
+ @Test
+ public void testBasic() throws InterruptedException {
+
+ final String ownerId = UUID.randomUUID().toString();
+
+ String notificationKey = UUID.randomUUID().toString();
+ DateTime effDt = new DateTime();
+ Notification notif = new DefaultNotification(notificationKey, effDt);
+ dao.insertNotification(notif);
+
+ Thread.sleep(1000);
+ DateTime now = new DateTime();
+ List<Notification> notifications = dao.getReadyNotifications(now.toDate(), 3);
+ assertNotNull(notifications);
+ assertEquals(notifications.size(), 1);
+
+ Notification notification = notifications.get(0);
+ assertEquals(notification.getNotificationKey(), notificationKey);
+ validateDate(notification.getEffectiveDate(), effDt);
+ assertEquals(notification.getOwner(), null);
+ assertEquals(notification.getProcessingState(), NotificationLifecycleState.AVAILABLE);
+ assertEquals(notification.getNextAvailableDate(), null);
+
+ DateTime nextAvailable = now.plusMinutes(5);
+ int res = dao.claimNotification(ownerId, nextAvailable.toDate(), notification.getId().toString(), now.toDate());
+ assertEquals(res, 1);
+ dao.insertClaimedHistory(sequenceId.incrementAndGet(), ownerId, now.toDate(), notification.getId().toString());
+
+ notification = fetchNotification(notification.getId().toString());
+ assertEquals(notification.getNotificationKey(), notificationKey);
+ validateDate(notification.getEffectiveDate(), effDt);
+ assertEquals(notification.getOwner().toString(), ownerId);
+ assertEquals(notification.getProcessingState(), NotificationLifecycleState.IN_PROCESSING);
+ validateDate(notification.getNextAvailableDate(), nextAvailable);
+
+ dao.clearNotification(notification.getId().toString(), ownerId);
+
+ notification = fetchNotification(notification.getId().toString());
+ assertEquals(notification.getNotificationKey(), notificationKey);
+ validateDate(notification.getEffectiveDate(), effDt);
+ assertEquals(notification.getOwner(), null);
+ assertEquals(notification.getProcessingState(), NotificationLifecycleState.PROCESSED);
+ validateDate(notification.getNextAvailableDate(), nextAvailable);
+
+ }
+
+ private Notification fetchNotification(final String notificationId) {
+ Notification res = dbi.withHandle(new HandleCallback<Notification>() {
+
+ @Override
+ public Notification withHandle(Handle handle) throws Exception {
+ Notification res = handle.createQuery(" select" +
+ " notification_id" +
+ ", notification_key" +
+ ", created_dt" +
+ ", effective_dt" +
+ ", processing_owner" +
+ ", processing_available_dt" +
+ ", processing_state" +
+ " from notifications " +
+ " where " +
+ " notification_id = '" + notificationId + "';")
+ .map(new NotificationSqlMapper())
+ .first();
+ return res;
+ }
+ });
+ return res;
+ }
+
+ private void validateDate(DateTime input, DateTime expected) {
+ if (input == null && expected != null) {
+ Assert.fail("Got input date null");
+ }
+ if (input != null && expected == null) {
+ Assert.fail("Was expecting null date");
+ }
+ expected = truncateAndUTC(expected);
+ input = truncateAndUTC(input);
+ Assert.assertEquals(input, expected);
+ }
+
+ private DateTime truncateAndUTC(DateTime input) {
+ if (input == null) {
+ return null;
+ }
+ DateTime result = input.minus(input.getMillisOfSecond());
+ return result.toDateTime(DateTimeZone.UTC);
+ }
+
+ public static class TestNotificationSqlDaoModule extends AbstractModule {
+ @Override
+ protected void configure() {
+
+ final MysqlTestingHelper helper = new MysqlTestingHelper();
+ bind(MysqlTestingHelper.class).toInstance(helper);
+ DBI dbi = helper.getDBI();
+ bind(DBI.class).toInstance(dbi);
+
+ /*
+ bind(DBI.class).toProvider(DBIProvider.class).asEagerSingleton();
+ final DbiConfig config = new ConfigurationObjectFactory(System.getProperties()).build(DbiConfig.class);
+ bind(DbiConfig.class).toInstance(config);
+ */
+ }
+ }
+}
diff --git a/util/src/test/java/com/ning/billing/util/notificationq/DummyObject.java b/util/src/test/java/com/ning/billing/util/notificationq/DummyObject.java
new file mode 100644
index 0000000..9495001
--- /dev/null
+++ b/util/src/test/java/com/ning/billing/util/notificationq/DummyObject.java
@@ -0,0 +1,38 @@
+/*
+ * Copyright 2010-2011 Ning, Inc.
+ *
+ * Ning licenses this file to you under the Apache License, version 2.0
+ * (the "License"); you may not use this file except in compliance with the
+ * License. You may obtain a copy of the License at:
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations
+ * under the License.
+ */
+
+package com.ning.billing.util.notificationq;
+
+import java.util.UUID;
+
+public class DummyObject {
+ private final String value;
+ private final UUID key;
+
+ public DummyObject(String value, UUID key) {
+ super();
+ this.value = value;
+ this.key = key;
+ }
+
+ public String getValue() {
+ return value;
+ }
+
+ public UUID getKey() {
+ return key;
+ }
+}
diff --git a/util/src/test/java/com/ning/billing/util/notificationq/DummySqlTest.java b/util/src/test/java/com/ning/billing/util/notificationq/DummySqlTest.java
new file mode 100644
index 0000000..83b1b20
--- /dev/null
+++ b/util/src/test/java/com/ning/billing/util/notificationq/DummySqlTest.java
@@ -0,0 +1,64 @@
+/*
+ * Copyright 2010-2011 Ning, Inc.
+ *
+ * Ning licenses this file to you under the Apache License, version 2.0
+ * (the "License"); you may not use this file except in compliance with the
+ * License. You may obtain a copy of the License at:
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations
+ * under the License.
+ */
+
+package com.ning.billing.util.notificationq;
+
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.util.UUID;
+
+import org.skife.jdbi.v2.SQLStatement;
+import org.skife.jdbi.v2.StatementContext;
+import org.skife.jdbi.v2.sqlobject.Bind;
+import org.skife.jdbi.v2.sqlobject.Binder;
+import org.skife.jdbi.v2.sqlobject.SqlQuery;
+import org.skife.jdbi.v2.sqlobject.SqlUpdate;
+import org.skife.jdbi.v2.sqlobject.customizers.Mapper;
+import org.skife.jdbi.v2.sqlobject.mixins.CloseMe;
+import org.skife.jdbi.v2.sqlobject.mixins.Transactional;
+import org.skife.jdbi.v2.sqlobject.mixins.Transmogrifier;
+import org.skife.jdbi.v2.sqlobject.stringtemplate.ExternalizedSqlViaStringTemplate3;
+import org.skife.jdbi.v2.tweak.ResultSetMapper;
+
+
+@ExternalizedSqlViaStringTemplate3()
+public interface DummySqlTest extends Transactional<DummySqlTest>, Transmogrifier, CloseMe {
+
+ @SqlUpdate
+ public void insertDummy(@Bind(binder = DummySqlTestBinder.class) DummyObject dummy);
+
+ @SqlQuery
+ @Mapper(DummySqlTestMapper.class)
+ public DummyObject getDummyFromId(@Bind("dummy_id") String dummyId);
+
+ public static class DummySqlTestBinder implements Binder<Bind, DummyObject> {
+ @Override
+ public void bind(@SuppressWarnings("rawtypes") SQLStatement stmt, Bind bind, DummyObject dummy) {
+ stmt.bind("dummy_id", dummy.getKey().toString());
+ stmt.bind("value", dummy.getValue());
+ }
+ }
+
+ public static class DummySqlTestMapper implements ResultSetMapper<DummyObject> {
+ @Override
+ public DummyObject map(int index, ResultSet r, StatementContext ctx)
+ throws SQLException {
+ final UUID key = UUID.fromString(r.getString("dummy_id"));
+ final String value = r.getString("value");
+ return new DummyObject(value, key);
+ }
+ }
+}
diff --git a/util/src/test/java/com/ning/billing/util/notificationq/MockNotificationQueue.java b/util/src/test/java/com/ning/billing/util/notificationq/MockNotificationQueue.java
new file mode 100644
index 0000000..b141310
--- /dev/null
+++ b/util/src/test/java/com/ning/billing/util/notificationq/MockNotificationQueue.java
@@ -0,0 +1,93 @@
+/*
+ * Copyright 2010-2011 Ning, Inc.
+ *
+ * Ning licenses this file to you under the Apache License, version 2.0
+ * (the "License"); you may not use this file except in compliance with the
+ * License. You may obtain a copy of the License at:
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations
+ * under the License.
+ */
+
+package com.ning.billing.util.notificationq;
+
+import java.util.ArrayList;
+import java.util.Comparator;
+import java.util.Iterator;
+import java.util.List;
+import java.util.TreeSet;
+
+import org.joda.time.DateTime;
+import org.skife.jdbi.v2.DBI;
+import org.skife.jdbi.v2.sqlobject.mixins.Transmogrifier;
+
+import com.ning.billing.util.clock.Clock;
+import com.ning.billing.util.notificationq.NotificationLifecycle.NotificationLifecycleState;
+import com.ning.billing.util.notificationq.NotificationQueueService.NotificationQueueHandler;
+
+public class MockNotificationQueue extends NotificationQueueBase implements NotificationQueue {
+
+
+ private TreeSet<Notification> notifications;
+
+ public MockNotificationQueue(final Clock clock, final String svcName, final String queueName, final NotificationQueueHandler handler, final NotificationConfig config) {
+ super(clock, svcName, queueName, handler, config);
+ notifications = new TreeSet<Notification>(new Comparator<Notification>() {
+ @Override
+ public int compare(Notification o1, Notification o2) {
+ if (o1.getEffectiveDate().equals(o2.getEffectiveDate())) {
+ return o1.getNotificationKey().compareTo(o2.getNotificationKey());
+ } else {
+ return o1.getEffectiveDate().compareTo(o2.getEffectiveDate());
+ }
+ }
+ });
+ }
+
+ @Override
+ public void recordFutureNotificationFromTransaction(
+ Transmogrifier transactionalDao, DateTime futureNotificationTime,
+ NotificationKey notificationKey) {
+ Notification notification = new DefaultNotification(notificationKey.toString(), futureNotificationTime);
+ synchronized(notifications) {
+ notifications.add(notification);
+ }
+ }
+
+ @Override
+ protected void doProcessEvents(int sequenceId) {
+
+ List<Notification> processedNotifications = new ArrayList<Notification>();
+ List<Notification> oldNotifications = new ArrayList<Notification>();
+
+ List<Notification> readyNotifications = new ArrayList<Notification>();
+ synchronized(notifications) {
+ Iterator<Notification> it = notifications.iterator();
+ while (it.hasNext()) {
+ Notification cur = it.next();
+ if (cur.isAvailableForProcessing(clock.getUTCNow())) {
+ readyNotifications.add(cur);
+ }
+ }
+ for (Notification cur : readyNotifications) {
+ handler.handleReadyNotification(cur.getNotificationKey());
+ DefaultNotification processedNotification = new DefaultNotification(cur.getId(), hostname, clock.getUTCNow().plus(config.getDaoClaimTimeMs()), NotificationLifecycleState.PROCESSED, cur.getNotificationKey(), cur.getEffectiveDate());
+ oldNotifications.add(cur);
+ processedNotifications.add(processedNotification);
+
+ }
+ if (oldNotifications.size() > 0) {
+ notifications.removeAll(oldNotifications);
+ }
+ if (processedNotifications.size() > 0) {
+ notifications.addAll(processedNotifications);
+ }
+ }
+
+ }
+}
diff --git a/util/src/test/java/com/ning/billing/util/notificationq/TestNotificationQueue.java b/util/src/test/java/com/ning/billing/util/notificationq/TestNotificationQueue.java
new file mode 100644
index 0000000..2e3bb3c
--- /dev/null
+++ b/util/src/test/java/com/ning/billing/util/notificationq/TestNotificationQueue.java
@@ -0,0 +1,428 @@
+/*
+ * Copyright 2010-2011 Ning, Inc.
+ *
+ * Ning licenses this file to you under the Apache License, version 2.0
+ * (the "License"); you may not use this file except in compliance with the
+ * License. You may obtain a copy of the License at:
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations
+ * under the License.
+ */
+
+package com.ning.billing.util.notificationq;
+
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertTrue;
+
+import java.io.IOException;
+import java.sql.SQLException;
+import java.util.Collection;
+import java.util.Map;
+import java.util.TreeMap;
+import java.util.UUID;
+
+import org.apache.commons.io.IOUtils;
+import org.joda.time.DateTime;
+import org.skife.config.ConfigurationObjectFactory;
+import org.skife.jdbi.v2.DBI;
+import org.skife.jdbi.v2.Handle;
+import org.skife.jdbi.v2.Transaction;
+import org.skife.jdbi.v2.TransactionStatus;
+import org.skife.jdbi.v2.tweak.HandleCallback;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.testng.annotations.AfterSuite;
+import org.testng.annotations.AfterTest;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.BeforeSuite;
+import org.testng.annotations.BeforeTest;
+import org.testng.annotations.Guice;
+import org.testng.annotations.Test;
+
+import com.google.common.base.Predicate;
+import com.google.common.collect.Collections2;
+import com.google.inject.AbstractModule;
+import com.google.inject.Inject;
+import com.ning.billing.dbi.DBIProvider;
+import com.ning.billing.dbi.DbiConfig;
+import com.ning.billing.dbi.MysqlTestingHelper;
+import com.ning.billing.util.clock.Clock;
+import com.ning.billing.util.clock.ClockMock;
+import com.ning.billing.util.notificationq.NotificationQueueService.NotificationQueueHandler;
+import com.ning.billing.util.notificationq.dao.NotificationSqlDao;
+
+@Guice(modules = TestNotificationQueue.TestNotificationQueueModule.class)
+public class TestNotificationQueue {
+
+ private final static Logger log = LoggerFactory.getLogger(TestNotificationQueue.class);
+
+ @Inject
+ private DBI dbi;
+
+ @Inject
+ MysqlTestingHelper helper;
+
+ @Inject
+ private Clock clock;
+
+ private DummySqlTest dao;
+
+ // private NotificationQueue queue;
+
+ private void startMysql() throws IOException, ClassNotFoundException, SQLException {
+ final String ddl = IOUtils.toString(NotificationSqlDao.class.getResourceAsStream("/com/ning/billing/util/ddl.sql"));
+ final String testDdl = IOUtils.toString(NotificationSqlDao.class.getResourceAsStream("/com/ning/billing/util/ddl_test.sql"));
+ helper.startMysql();
+ helper.initDb(ddl);
+ helper.initDb(testDdl);
+ }
+
+ @BeforeSuite(alwaysRun = true)
+ public void setup() throws Exception {
+ startMysql();
+ dao = dbi.onDemand(DummySqlTest.class);
+ }
+
+ @BeforeTest
+ public void beforeTest() {
+ dbi.withHandle(new HandleCallback<Void>() {
+
+ @Override
+ public Void withHandle(Handle handle) throws Exception {
+ handle.execute("delete from notifications");
+ handle.execute("delete from claimed_notifications");
+ handle.execute("delete from dummy");
+ return null;
+ }
+ });
+ // Reset time to real value
+ ((ClockMock) clock).resetDeltaFromReality();
+ }
+
+
+
+ /**
+ * Verify that we can call start/stop on a disabled queue and that both start/stop callbacks are called
+ *
+ * @throws InterruptedException
+ */
+ @Test
+ public void testSimpleQueueDisabled() throws InterruptedException {
+
+ final TestStartStop testStartStop = new TestStartStop(false, false);
+ DefaultNotificationQueue queue = new DefaultNotificationQueue(dbi, clock, "test-svc", "dead",
+ new NotificationQueueHandler() {
+ @Override
+ public void handleReadyNotification(String notificationKey) {
+ }
+ @Override
+ public void completedQueueStop() {
+ testStartStop.stopped();
+ }
+ @Override
+ public void completedQueueStart() {
+ testStartStop.started();
+ }
+ },
+ getNotificationConfig(true, 100, 1, 10000));
+
+ executeTest(testStartStop, queue, new WithTest() {
+ @Override
+ public void test(final DefaultNotificationQueue readyQueue) throws InterruptedException {
+ // Do nothing
+ }
+ });
+ assertTrue(true);
+ }
+
+ /**
+ * Test that we can post a notification in the future from a transaction and get the notification
+ * callback with the correct key when the time is ready
+ *
+ * @throws InterruptedException
+ */
+ @Test
+ public void testSimpleNotification() throws InterruptedException {
+
+ final Map<String, Boolean> expectedNotifications = new TreeMap<String, Boolean>();
+
+ final TestStartStop testStartStop = new TestStartStop(false, false);
+ DefaultNotificationQueue queue = new DefaultNotificationQueue(dbi, clock, "test-svc", "foo",
+ new NotificationQueueHandler() {
+ @Override
+ public void handleReadyNotification(String notificationKey) {
+ synchronized (expectedNotifications) {
+ expectedNotifications.put(notificationKey, Boolean.TRUE);
+ expectedNotifications.notify();
+ }
+ }
+ @Override
+ public void completedQueueStop() {
+ testStartStop.stopped();
+ }
+ @Override
+ public void completedQueueStart() {
+ testStartStop.started();
+ }
+ },
+ getNotificationConfig(false, 100, 1, 10000));
+
+
+ executeTest(testStartStop, queue, new WithTest() {
+ @Override
+ public void test(final DefaultNotificationQueue readyQueue) throws InterruptedException {
+
+ final UUID key = UUID.randomUUID();
+ final DummyObject obj = new DummyObject("foo", key);
+ final DateTime now = new DateTime();
+ final DateTime readyTime = now.plusMillis(2000);
+ final NotificationKey notificationKey = new NotificationKey() {
+ @Override
+ public String toString() {
+ return key.toString();
+ }
+ };
+ expectedNotifications.put(notificationKey.toString(), Boolean.FALSE);
+
+
+ // Insert dummy to be processed in 2 sec'
+ dao.inTransaction(new Transaction<Void, DummySqlTest>() {
+ @Override
+ public Void inTransaction(DummySqlTest transactional,
+ TransactionStatus status) throws Exception {
+
+ transactional.insertDummy(obj);
+ readyQueue.recordFutureNotificationFromTransaction(transactional,
+ readyTime, notificationKey);
+ return null;
+ }
+ });
+
+ // Move time in the future after the notification effectiveDate
+ ((ClockMock) clock).setDeltaFromReality(3000);
+
+ // Notification should have kicked but give it at least a sec' for thread scheduling
+ int nbTry = 1;
+ boolean success = false;
+ do {
+ synchronized(expectedNotifications) {
+ if (expectedNotifications.get(notificationKey.toString())) {
+ success = true;
+ break;
+ }
+ expectedNotifications.wait(1000);
+ }
+ } while (nbTry-- > 0);
+ assertEquals(success, true);
+ }
+ });
+ }
+
+ @Test
+ public void testManyNotifications() throws InterruptedException {
+ final Map<String, Boolean> expectedNotifications = new TreeMap<String, Boolean>();
+
+ final TestStartStop testStartStop = new TestStartStop(false, false);
+ DefaultNotificationQueue queue = new DefaultNotificationQueue(dbi, clock, "test-svc", "many",
+ new NotificationQueueHandler() {
+ @Override
+ public void handleReadyNotification(String notificationKey) {
+ synchronized (expectedNotifications) {
+ expectedNotifications.put(notificationKey, Boolean.TRUE);
+ expectedNotifications.notify();
+ }
+ }
+ @Override
+ public void completedQueueStop() {
+ testStartStop.stopped();
+ }
+ @Override
+ public void completedQueueStart() {
+ testStartStop.started();
+ }
+ },
+ getNotificationConfig(false, 100, 10, 10000));
+
+
+ executeTest(testStartStop, queue, new WithTest() {
+ @Override
+ public void test(final DefaultNotificationQueue readyQueue) throws InterruptedException {
+
+ final DateTime now = clock.getUTCNow();
+ final int MAX_NOTIFICATIONS = 100;
+ for (int i = 0; i < MAX_NOTIFICATIONS; i++) {
+
+ final int nextReadyTimeIncrementMs = 1000;
+
+ final UUID key = UUID.randomUUID();
+ final DummyObject obj = new DummyObject("foo", key);
+ final int currentIteration = i;
+
+ final NotificationKey notificationKey = new NotificationKey() {
+ @Override
+ public String toString() {
+ return key.toString();
+ }
+ };
+ expectedNotifications.put(notificationKey.toString(), Boolean.FALSE);
+
+ dao.inTransaction(new Transaction<Void, DummySqlTest>() {
+ @Override
+ public Void inTransaction(DummySqlTest transactional,
+ TransactionStatus status) throws Exception {
+
+ transactional.insertDummy(obj);
+ readyQueue.recordFutureNotificationFromTransaction(transactional,
+ now.plus((currentIteration + 1) * nextReadyTimeIncrementMs), notificationKey);
+ return null;
+ }
+ });
+
+ // Move time in the future after the notification effectiveDate
+ if (i == 0) {
+ ((ClockMock) clock).setDeltaFromReality(nextReadyTimeIncrementMs);
+ } else {
+ ((ClockMock) clock).addDeltaFromReality(nextReadyTimeIncrementMs);
+ }
+ }
+
+ // Wait a little longer since there are a lot of callback that need to happen
+ int nbTry = MAX_NOTIFICATIONS + 1;
+ boolean success = false;
+ do {
+ synchronized(expectedNotifications) {
+
+ Collection<Boolean> completed = Collections2.filter(expectedNotifications.values(), new Predicate<Boolean>() {
+ @Override
+ public boolean apply(Boolean input) {
+ return input;
+ }
+ });
+
+ if (completed.size() == MAX_NOTIFICATIONS) {
+ success = true;
+ break;
+ }
+ //log.debug(String.format("BEFORE WAIT : Got %d notifications at time %s (real time %s)", completed.size(), clock.getUTCNow(), new DateTime()));
+ expectedNotifications.wait(1000);
+ }
+ } while (nbTry-- > 0);
+ assertEquals(success, true);
+ }
+ });
+ }
+
+
+ NotificationConfig getNotificationConfig(final boolean off,
+ final long sleepTime, final int maxReadyEvents, final long claimTimeMs) {
+ return new NotificationConfig() {
+ @Override
+ public boolean isNotificationProcessingOff() {
+ return off;
+ }
+ @Override
+ public long getNotificationSleepTimeMs() {
+ return sleepTime;
+ }
+ @Override
+ public int getDaoMaxReadyEvents() {
+ return maxReadyEvents;
+ }
+ @Override
+ public long getDaoClaimTimeMs() {
+ return claimTimeMs;
+ }
+ };
+ }
+
+ private static class TestStartStop {
+ private boolean started;
+ private boolean stopped;
+
+ public TestStartStop(boolean started, boolean stopped) {
+ super();
+ this.started = started;
+ this.stopped = stopped;
+ }
+
+ public void started() {
+ synchronized(this) {
+ started = true;
+ notify();
+ }
+ }
+
+ public void stopped() {
+ synchronized(this) {
+ stopped = true;
+ notify();
+ }
+ }
+
+ public boolean waitForStartComplete(int timeoutMs) throws InterruptedException {
+ return waitForEventCompletion(timeoutMs, true);
+ }
+
+ public boolean waitForStopComplete(int timeoutMs) throws InterruptedException {
+ return waitForEventCompletion(timeoutMs, false);
+ }
+
+ private boolean waitForEventCompletion(int timeoutMs, boolean start) throws InterruptedException {
+ DateTime init = new DateTime();
+ synchronized(this) {
+ while (! ((start ? started : stopped))) {
+ wait(timeoutMs);
+ if (init.plusMillis(timeoutMs).isAfterNow()) {
+ break;
+ }
+ }
+ }
+ return (start ? started : stopped);
+ }
+ }
+
+ private interface WithTest {
+ public void test(DefaultNotificationQueue readyQueue) throws InterruptedException;
+ }
+
+ private void executeTest(final TestStartStop testStartStop,
+ DefaultNotificationQueue queue, WithTest test) throws InterruptedException{
+
+ queue.startQueue();
+ boolean started = testStartStop.waitForStartComplete(3000);
+ assertEquals(started, true);
+
+ test.test(queue);
+
+ queue.stopQueue();
+ boolean stopped = testStartStop.waitForStopComplete(3000);
+ assertEquals(stopped, true);
+ }
+
+
+ public static class TestNotificationQueueModule extends AbstractModule {
+ @Override
+ protected void configure() {
+
+ bind(Clock.class).to(ClockMock.class);
+
+ final MysqlTestingHelper helper = new MysqlTestingHelper();
+ bind(MysqlTestingHelper.class).toInstance(helper);
+ DBI dbi = helper.getDBI();
+ bind(DBI.class).toInstance(dbi);
+ /*
+ bind(DBI.class).toProvider(DBIProvider.class).asEagerSingleton();
+ final DbiConfig config = new ConfigurationObjectFactory(System.getProperties()).build(DbiConfig.class);
+ bind(DbiConfig.class).toInstance(config);
+ */
+ }
+ }
+
+
+}
diff --git a/util/src/test/resources/com/ning/billing/util/ddl_test.sql b/util/src/test/resources/com/ning/billing/util/ddl_test.sql
new file mode 100644
index 0000000..50de498
--- /dev/null
+++ b/util/src/test/resources/com/ning/billing/util/ddl_test.sql
@@ -0,0 +1,6 @@
+DROP TABLE IF EXISTS dummy;
+CREATE TABLE dummy (
+ dummy_id char(36) NOT NULL,
+ value varchar(256) NOT NULL,
+ PRIMARY KEY(dummy_id)
+) ENGINE = innodb;
diff --git a/util/src/test/resources/com/ning/billing/util/notificationq/DummySqlTest.sql.stg b/util/src/test/resources/com/ning/billing/util/notificationq/DummySqlTest.sql.stg
new file mode 100644
index 0000000..9a2e6e2
--- /dev/null
+++ b/util/src/test/resources/com/ning/billing/util/notificationq/DummySqlTest.sql.stg
@@ -0,0 +1,21 @@
+group DummySqlTest;
+
+insertDummy() ::= <<
+ insert into dummy (
+ dummy_id
+ , value
+ ) values (
+ :dummy_id
+ , :value
+ );
+>>
+
+getDummyFromId(dummy_id) ::= <<
+ select
+ dummy_id
+ , value
+ from dummy
+ where
+ dummy_id = :dummy_id
+ ;
+>>
\ No newline at end of file